< Back

Understanding cubefs source code | The RAFT of master

2023-08-29Huocheng Wu

Introduction

The "master" module in Cubefs consists of over 28,000 lines of code. If we include the code for Raft, it amounts to approximately 30,000 lines. This is a sizable module, and we will need to break it down and examine it piece by piece. In a series of articles, we will provide detailed explanations of the sub-functions within the master module, hoping to assist readers in understanding them.

In this article, we will primarily focus on Raft-related knowledge and code. The master module utilizes the multi-Raft protocol to manage meta/data partitions. We will first introduce the basic concepts of Raft and then explain multi-Raft in further detail.

Raft

Raft is a distributed algorithm that is designed to be simpler, more understandable, and easier to implement compared to traditional algorithms like Paxos. It primarily addresses the issue of consensus in distributed systems. Raft breaks down the complex problem of distributed consensus into simpler, relatively independent sub-problems, while maintaining performance similar to that of Multi-Paxos. The Raft algorithm focuses on leader election, log replication, and handling split-brain scenarios.

In simple terms, Raft is a leader election algorithm that defines three roles: Leader, Candidate, and Follower. It introduces concepts such as Leader Election, Term, and Election Timeout. Additionally, it includes log replication and handling split-brain scenarios.

Raft specifies that there should be only one Leader in the system at any given time, with the Leader and Followers functioning during normal operation.

These three roles can transition between each other, and their transition diagram is as follows:

Picture

Once a Leader is elected, it starts to receive client requests. The Leader node employs a specific algorithm to achieve log synchronization and ensure log safety. The log storage mechanism in Raft is as follows:[2]:

Picture

Raft algorithm also includes features such as log compaction and membership changes, but we won't go into detail about them here. For beginners using Raft, understanding that Raft can automatically elect a leader and ensure log persistence is sufficient.

There are numerous online resources [1] that provide detailed explanations of the Raft algorithm. They delve into the concepts in depth. In this context, we will provide a brief introduction to some of the concepts to facilitate our discussion in the following sections.

multi-raft

In simple terms, Multi-Raft is a mechanism that slices the managed data in the entire system in a specific way. Each slice of data has its own replicas, and Raft is used to ensure data consistency among the replicas. At a global level, the system simultaneously operates multiple Raft groups, as illustrated in the diagram.[3]:

Picture

How to use raft

Here, we will introduce the usage of the third-party tiglabs/raft library functions. With this foundation, we can delve into analyzing the relevant code in the master module.

raft.StateMachine's interface:

Apply(command []byte, index uint64) (resp interface{}, err error)
ApplyMemberChange(confChange *proto.ConfChange, index uint64) (interface{}, error)
Snapshot() (proto.Snapshot, error)
ApplySnapshot(peers []proto.Peer, iterator proto.SnapIterator) (err error)
HandleFatalEvent(err *raft.FatalError)
HandleLeaderChange(leader uint64)

The interfaces mentioned are all called within the tiglabs/raft library.

The master module implements a struct called MetadataFsm, which is a raft.StateMachine. This implementation is located in the file master/metadata_fsm.go. The Apply function within this struct updates data to RocksDB.

Inside the createRaftServer function, the following steps are performed: First, an instance of m.raftStore (an instance of RaftStore) is created. Then, an instance of m.fsm (an instance of raft.StateMachine) is created. Finally, the m.raftStore.CreatePartition function is called, passing a raftstore.PartitionConfig parameter where the SM field is set to m.fsm. This establishes the association between RocksDB and Raft. The codes is:

// 创建raft store中的一个partition.
func (s *raftStore) CreatePartition(cfg *PartitionConfig) (p Partition, err error) {
    // 略.
    rc := &raft.RaftConfig{
    // 这里就是m.fsm
        StateMachine: cfg.SM,
        Applied:      cfg.Applied,
    }
    if err = s.raftServer.CreateRaft(rc); err != nil {
        return
    }
    p = newPartition(cfg, s.raftServer, walPath)
    return
}

The flow charts:

Picture

Initialize raft

Here is the translation of the code flow that embeds the RAFT-related members into the Server implementation:

Picture

Structural Relationships

In the entire master module, the primary implementation of Raft is defined in the Partition interface.

The Partition interface consists of a set of functions for operating on raft store partitions. Partition represents the sharding implementation of multi-raft in RaftStore. It manages multiple groups of raft clusters simultaneously based on the same instance of a raft service and system resources. It can be said that for these multiple raft groups, the changes in the control nodes and member nodes should be consistent.

On the other hand, Cluster stores all the information related to the cluster layer.

The specific graphical explanation:

Picture

raft store

This code module is located in the raftstore directory, at the same level as the master directory. The total number of lines of code is just over 1,000 lines. It is a wrapper built on top of the third-party code tiglabs/raft. github.com/tiglabs/raft is a library that implements multi-raft functionality. It allows for the management of multiple raft replication groups within a single instance of a raft server. The RaftStore module introduces a key concept called Partition, which represents the sharding of multi-raft. We will now delve into the details of the features provided by the raft module. The code in RaftStore is developed on top of tiglabs/raft, and our analysis will focus only on the RaftStore layer of the source code.

Code Implementation

The start function initiates the RAFT service by invoking m.createRaftServer(cfg). The main purpose of the createRaftServer function is to create a raft struct, initiate the state machine, and create partitions.

func (m *Server) createRaftServer(cfg *config.Config) (err error) {
// Initializing the raft variable
    if m.raftStore, err = raftstore.NewRaftStore(raftCfg, cfg); err != nil {
        return errors.Trace(err, "NewRaftStore failed! id[%v] walPath[%v]", m.id, m.walDir)
    }
// Setting up the state machine and external hooks is the approach used for persistently saving RAFT data.
    m.initFsm()
// Operation interface for creating partitions in Raft Store
    if m.partition, err = m.raftStore.CreatePartition(partitionCfg); err != nil {
        return errors.Trace(err, "CreatePartition failed")
    }
    return
}

Structures

First, we have the Config structure that defines the configuration for RaftStore. The important variables inside the structure are as follows:

type Config struct {
    //  Node ID
    NodeID            uint64
    // Heartbeat and election timeout interval
    TickInterval int
    // Election expiration time. If no message is received from the leader within this time period, a new election is initiated.
    ElectionTick int
}

In the partition configuration of partitions, the important variables are as follows:

type PartitionConfig struct {
 // IP address group of peers
    Peers   []PeerAddress
 // State machine
    SM      PartitionFsm
}

For the raftStore structure, the important variables are as follows:

type raftStore struct {
    nodeID     uint64 // Node ID
    resolver   NodeResolver // Node address resolution and management
    raftConfig *raft.Config // Configuration for third-party library functions
    raftServer *raft.RaftServer // Structure for the third-party Raft library
}

Partition Interface

This is a set of necessary functions for operating on the Raft store partition. The approach is to define an interface Partition that represents the operations on the Raft and then implement the specific functionality in the member functions of the partition structure by calling the tiglabs/raft library. These function interfaces are similar to the ones provided by the tiglabs/raft library.

type Partition interface {
    // Submit a command to the Raft log.
    Submit(cmd []byte) (resp interface{}, err error)

    // Submit a member change event and information to the Raft log.
    ChangeMember(changeType proto.ConfChangeType, peer proto.Peer, context []byte) (resp interface{}, err error)

    // Remove the Raft partition from the Raft server and close it.
    Stop() error

    // Stop and delete the partition.
    Delete() error

    // Return the current Raft status information.
    Status() (status *PartitionStatus)

    // Get the ID and term of the current group leader.
    LeaderTerm() (leaderID, term uint64)

    // Check if the current node is the leader of the group.
    IsRaftLeader() bool

    // Get the applied index value of the current Raft log.
    AppliedIndex() uint64

    // Get the committed index value of the current Raft log.
    CommittedIndex() uint64

    // Truncate the Raft log.
    Truncate(index uint64)

    // Trigger an election and attempt to become the leader.
    TryToLeader(nodeID uint64) error

    // Check if the current group has more than half of the active nodes. It is considered online only if it is greater than half.
    IsOfflinePeer() bool
}

These functions encapsulate the member variables implemented by the raft library. The third-party library interface includes:

ChangeMember // Submit member change events and information to the log.
RemoveRaft // Remove and close the partition from the Raft group.
TryToLeader // Initiate an election.
Status // Query the current node's status.
LeaderTerm // Get the ID and term of the current leader node.
IsLeader // Determine if a node is the leader.
AppliedIndex // Query the applied index of the current log.
CommittedIndex // Query the committed index of the current log.
Submit // Send a command to the log.
Truncate // Truncate the Raft log.

These function interfaces implement the module interfaces of the Raft protocol and are essential for using Raft. In simple terms, they involve operations such as initiating elections, querying election status, querying the leader node, adding and removing members, and modifying the log. Therefore, the Raft protocol mainly includes elections and logs.

RaftStore Interface

This section defines the interface functions of the Raft store. These functions are implemented in the member functions of the raftStore structure. The functionality of the functions is explained in the code comments below.

type RaftStore interface {
    // Create a new partition in the Raft store.
    CreatePartition(cfg *PartitionConfig) (Partition, error)

    // Stop the Raft store service.
    Stop()

    // Get the Raft configuration information.
    RaftConfig() *raft.Config

    // Check the status of the Raft service.
    RaftStatus(raftID uint64) (raftStatus *raft.Status)

    // Node address management functions, including adding and removing node addresses.
    NodeManager

    // Return a pointer to the RaftServer, allowing access to the Raft interface within the state machine.
    RaftServer() *raft.RaftServer
}

The raftStore.raftServer and partition.raft in this code are both pointers to raft.RaftServer. However, raftStore.raftServer represents the entire service, while partition.raft represents the operations specific to an individual partition.

The rest of the code in this section is relatively minor, and we won't go into further detail.

State Machine

From the Raft role transition diagram, we can see that the roles of nodes in the cluster form a finite state graph. Therefore, the Raft code implementation must have a state machine. To interact with the outer caller's code, tiglabs/raft designed a state structure called raft.StateMachine and assigned m.fsm to it. The type of m.fsm is MetadataFsm. MetadataFsm implements a set of interface functions of raft.StateMachine, which can be called by third-party code.

In the master part of the code, there are three structures that use the state machine: Server, User, and Cluster. All three structures actually point to the same fsm structure.

Next, we will explain the implementation of MetadataFsm. The MetadataFsm structure is used to represent the finite state machine of the metadata partition. The important member variable is as follows:

type MetadataFsm struct {
    // Persistent storage for storing Raft log information
    store *raftstore.RocksDBStore
}

Member Functions

The following functions in the MetadataFsm state machine structure are implementations of the raft.StateMachine interface:

func (*MetadataFsm).Apply(command []byte, index uint64) (resp interface{}, err error)
func (*MetadataFsm).ApplyMemberChange(confChange *proto.ConfChange, index uint64) (interface{}, error)
func (*MetadataFsm).ApplySnapshot(peers []proto.Peer, iterator proto.SnapIterator) (err error)
func (*MetadataFsm).HandleFatalEvent(err *raft.FatalError)
func (*MetadataFsm).HandleLeaderChange(leader uint64)
func (*MetadataFsm).Snapshot() (proto.Snapshot, error)

These functions are implementations of the raft.StateMachine interface. The interfaces provided by raft.StateMachine are used for persistently storing data and implementing data snapshot functionality. Therefore, when examining the source code, you will find that this part of the code is implemented in metadata_fsm.go and ultimately stores data in RocksDB.

Initialization

The next four member variables are functions registered by external modules. These member functions are registered using the register function. In the initFsm function in the master/server.go file, the state machine is initialized as follows:

func (m *Server) initFsm() {
    // Register member functions
    // Retrieve the `applied` value from RocksDB and assign it to the `applied` member variable in the structure
    m.fsm.restore()
}

Execution

The following diagram provides a brief description of how this state machine works.

Picture

In the diagram, we have only depicted the operations: Apply, ApplyMemberChange, and HandleLeaderChange. In reality, there are also operations for snapshot (Snapshot and ApplySnapshot) and handling error events (HandleFatalEvent). We won't go into further detail on these three operations.

Data Persistence

Here, we will focus on understanding the Apply function, which is used in the state machine to persistently store data.

Picture

The focus of this function is to operate on RocksDB to achieve persistent data storage.

Hook Functions

Now let's examine the specific functionalities of the registered member functions. This section of code is located in the file master/master_manager.go, and these four functions are member functions of the Server structure.

handleLeaderChange

This function is used to notify the outer caller when there is a change in the leader node of the Raft. It informs the external caller about the current controlling ID of the Raft cluster through the leader parameter. The external caller can then perform higher-level configuration operations based on this information.

Picture

The specific implementation of the code, along with added comments, is as follows. In this code snippet, AddrDatabase is a dictionary that stores host addresses, and it is populated by parsing the peers value from the configuration file.

func (m *Server) handleLeaderChange(leader uint64) {
    // ... omitted ...
    m.leaderInfo.addr = AddrDatabase[leader]
    // Set proxy to redirect to the new leader address
    m.reverseProxy = m.newReverseProxy()

    // If the new leader is the current node
    if m.id == leader {
        // Address has changed, such as during the initial startup
        if oldLeaderAddr != m.leaderInfo.addr {
            m.cluster.checkPersistClusterValue() // Check if the cluster value is persistently stored
            m.loadMetadata() // Load persisted metadata into memory
            m.metaReady = true // Used as a condition in registerAPIMiddleware when starting the HTTP service
        }
        m.cluster.checkDataNodeHeartbeat() // Create a task to check DataNode heartbeats and add it to TaskMap in AdminTaskManager
        m.cluster.checkMetaNodeHeartbeat() // Same as above, create MetaNode heartbeat task
        m.cluster.followerReadManager.reSet() // Clear the members of followerReadManager
    } else {
        // If the new leader is another node
        m.clearMetadata() // Clear metadata
        m.metaReady = false
        // Check if addr is in MasterClient, add it if not, and set leaderAddr to that value
        m.cluster.masterClient.AddNode(m.leaderInfo.addr)
        // Set leaderAddr to addr
        m.cluster.masterClient.SetLeader(m.leaderInfo.addr)
    }
}

In simple terms, the function handles the change in the master node by resetting the redirect proxy service. If the current node is the master, it initializes the metadata values, creates tasks to check heartbeats, and performs other related operations. If the current node is not the master, it clears the metadata in memory, updates the leaderAddr in masterClient, and performs other necessary actions.

handlePeerChange

This function mainly triggers operations for the upper-level caller when a node is added or removed from a Raft group. The specific implementation of the function is as follows, along with added comments.

func (m *Server) handlePeerChange(confChange *proto.ConfChange) (err error) {
    switch confChange.Type {
    case proto.ConfAddNode:
        // ... omitted ...
        // Add heartbeat and replication records to the nodeResolver dictionary
        m.raftStore.AddNodeWithPort(confChange.Peer.ID, arr[0], int(m.config.heartbeatPort), int(m.config.replicaPort))
        AddrDatabase[confChange.Peer.ID] = string(confChange.Context)
    case proto.ConfRemoveNode:
        // Delete the record with the key confChange.Peer.ID from nodeResolver
        m.raftStore.DeleteNode(confChange.Peer.ID)
        // ... omitted ...

handleApplySnapshot

This is the callback function for Raft's snapshot operation.

func (m *Server) handleApplySnapshot() {
    // Read the "applied" value from RocksDB and assign it to the Server's state machine (Server.fsm)
    m.fsm.restore()
    // Retrieve the value from RocksDB and assign it to Server.cluster.idAlloc
    m.restoreIDAlloc()
    return
}

The main functionality of this function is to retrieve the records stored in RocksDB and assign them to the variables Server.fsm and Server.cluster.idAlloc. This allows for the implementation of the snapshot feature.

handleRaftUserCmd

This callback function is used to update the LimiterInfo information for the master control node. Currently, it only supports this functionality. Other operation commands are not supported.

func (m *Server) handleRaftUserCmd(opt uint32, key string, cmdMap map[string][]byte) (err error) {
    switch opt {
    case opSyncPutFollowerApiLimiterInfo, opSyncPutApiLimiterInfo:
        if m.cluster != nil && !m.partition.IsRaftLeader() {
            // Update the dictionary `limiterInfos` in `Server.cluster.apiLimiter`
            m.cluster.apiLimiter.updateLimiterInfoFromLeader(cmdMap[key])
        }
        // ... omitted ...
}

Raft Commands

The format of these Raft commands is defined in the file master/metadata_fsm_op.go, and only the code within the master module can correctly parse and execute these command formats.

Within the master module, a format for a Raft command is defined, along with three member functions.

type RaftCmd struct {
    Op uint32 `json:"op"`
    K  string `json:"k"`
    V  []byte `json:"v"`
}

This structure has three operation functions:

func (*RaftCmd).Marshal() ([]byte, error)
func (*RaftCmd).Unmarshal(data []byte) (err error)
func (*RaftCmd).setOpType()

These commands involve multiple operations, but we will only use one of them to explain the code.

Submit Command

The process of submitting these commands to Raft is similar. Let's use one of them, syncPutCluster, to explain the usage process of RaftCmd. The operation code for this command is opSyncPutCluster, and the key is #c#name. The value of name is the value of Cluster.Name.

func (c *Cluster) syncPutCluster() (err error) {
    // Construct a RaftCmd command and extract information from the Cluster structure to generate the command's content.
    metadata := new(RaftCmd)
    // Submit the command
    return c.submit(metadata)
}

As you can see, this function assembles a RaftCmd command, copies the values from the Cluster structure, and finally calls the submit function to submit the command. The submit function further encapsulates the command and calls partition.Submit to send the command.

func (c *Cluster) submit(metadata *RaftCmd) (err error) {
    // Submit the command to the partition, sending it to the Raft group.
    if _, err = c.partition.Submit(cmd); err != nil {
}

partition.Submit is responsible for submitting the command data to the Raft log. Going deeper, it involves third-party software package code for Raft.

Command Set

Because the implementation of this code is essentially the same, with only different encapsulated command operation codes and content, we will not go into detail for each one. Here, we only list the entry functions, operation codes, key value formats, and descriptions.

FunctionOperation CodeKey Value Format
syncPutClusteropSyncPutCluster#c#name
syncAddNodeSetopSyncAddNodeSet#s#id

There are many command formats defined in this code, and we will only provide an explanation of the format without listing them one by one. Students who are interested in further understanding can refer to this part of the code. The code is located in the file master/metadata_fsm_op.go.

Example

Here, we will walk through the code flow of an application using Raft to illustrate how to use this functionality. The focus of this example is the code flow of Raft, rather than the complete implementation of volume creation. The flowchart for this part is as follows:

Picture

  1. Send an HTTP request to the master to create a volume. The main implementation is to call c.doCreateVol(req) to create the volume.
  2. In doCreateVol, c.syncAddVol(vol) is called to initiate the addition of the volume. This is the code part related to Raft.
  3. The syncAddVol function then calls c.syncPutVolInfo. The syncPutVolInfo function creates a RaftCmd structure, where the key is #vol# + vol.ID, the value is a volValue structure, and the op code is opSyncAddVol (with a value of 4).
  4. The syncPutVolInfo function calls c.submit(metadata) to submit the log. The submit function converts the RaftCmd structure to []byte bytes (in JSON format). The submit function calls c.partition.Submit(cmd) to submit the log to the Raft cluster.
  5. The Submit function of the Partition interface first checks if the current node is the leader. If it is, it calls p.raft.Submit(p.id, cmd) to submit the log. The Submit function of the RaftServer structure submits the log to the Raft cluster.
  6. Next is the implementation of the third-party code tiglabs/raft. We won't go into detail here. Interested readers can read this part of the code on their own. What we need to emphasize is that in the third-party code, the implementation function Apply of the state machine MetadataFsm is called. This function is the parameter we configured when creating the Raft cluster initially.
  7. The Apply function converts the []byte bytes to a RaftCmd structure. Since the op code is opSyncAddVol, it calls mf.store.BatchPut(cmdMap, true) to store the key-value pair in RocksDB. Here, this part of the Raft code flow is completed. The subsequent content is about other aspects of volume creation, which we won't continue to describe here.

Conclusion

In the previous sections, we introduced the Raft implementation in the master module. We provided a brief overview of the code implementation, and if readers are interested, they can further explore the source code. In the future, we will continue to explain other parts of the code. Feel free to point out any errors in the text.

Reference article

[1] https://open in new windowraft.github.io/raft.pdfopen in new window

[2] https://zhuanlan.zhihu.com/p/32052223open in new window

[3] https://zhuanlan.zhihu.com/p/33047950open in new window