< Back

cubefs“源”理解读_ Master源码解读之raft

2023-08-29Huocheng Wu

前言

模块master在cubefs里面一共有超过2.8万行代码,如果加上raft的代码,大约有3万行。这是一个庞大的模块,我们只能一一进行拆分解读。在一系列的文章中,我们将逐个详细介绍master的子功能,希望对读者能够有所帮助。

本文我们重点讲述Raft相关的知识和代码。master模块使用的multi-raft协议来管理meta/data partition的。我们先介绍一下Raft的基本概念,然后再说明multi-raft。

Raft

Raft 是一种更为简单方便易于理解的分布式算法,主要解决了分布式中的一致性问题。相比传统的 Paxos 算法,Raft 将大量的计算问题分解成为了一些简单的相对独立的子问题,并有着和 Multi-Paxos 同样的性能。Raft算法把一致性问题分成了选主,日志复制和脑裂情景等。

简单来说,Raft就是一个选主算法,它定义角色:Leader,Candidate,Follower,并且提出Leader Election(选举),Term(任期),Election Timeout(选举超时)这些概念。同时它还包括了日志复制和脑裂情景。

Raft要求系统在任意时刻最多只有一个Leader,正常工作期间只有Leader和Followers。

这三个角色之间是可以转化的,它们的转化图如下:

图片

当Leader选出后,就开始接收客户端的请求。leader节点采用了一定的算法来实现日志的同步和安全性。Raft的日志保存方式如下[2]:

图片

Raft算法还包括了日志压缩,成员变更等内容。我们就不再详细描述。对于使用Raft的初学者来说,了解到Raft可以自动选主,以及保存日志就足够了。

Raft的详细介绍网络上面有很多资料[1],里面描述地非常详细。在这里我们只是简单地介绍一下里面的一些概念,方便我们展开下文的描述。

multi-raft

简单来说,Multi-Raft是在整个系统中,把所管理的数据按照一定的方式切片,每一个切片的数据都有自己的副本,这些副本之间的数据使用Raft来保证数据的一致性,在全局来看整个系统中同时存在多个Raft-Group,如图所示[3]:

图片

使用方式

在这里,我们介绍一下第三方tiglabs/raft库函数的使用方式。有了这个基础,我们就可以深入分析master模块中的相关代码。

raft.StateMachine的定义接口:

Apply(command []byte, index uint64) (resp interface{}, err error)
ApplyMemberChange(confChange *proto.ConfChange, index uint64) (interface{}, error)
Snapshot() (proto.Snapshot, error)
ApplySnapshot(peers []proto.Peer, iterator proto.SnapIterator) (err error)
HandleFatalEvent(err *raft.FatalError)
HandleLeaderChange(leader uint64)

这些接口都是在tiglabs/raft里面进行调用。 master模块实现了一个raft.StateMachine的结构体MetadataFsm。这个放在文件master/metadata_fsm.go里面。其中Apply的函数把数据更新到rocksDB里面。

在createRaftServer函数里面,先是创建了一个m.raftStore(RaftStore的实例),然后再创建m.fsm(raft.StateMachine的实例),最后调用m.raftStore.CreatePartition,传入参数raftstore.PartitionConfig(其中的SM就是m.fsm)。这样就把rocksDB和raft关联起来。具体代码如下:

// 创建raft store中的一个partition.
func (s *raftStore) CreatePartition(cfg *PartitionConfig) (p Partition, err error) {
    // 略.
    rc := &raft.RaftConfig{
    // 这里就是m.fsm
        StateMachine: cfg.SM,
        Applied:      cfg.Applied,
    }
    if err = s.raftServer.CreateRaft(rc); err != nil {
        return
    }
    p = newPartition(cfg, s.raftServer, walPath)
    return
}

其实现的流程图如下: 图片

初始化raft

raft相关的成员嵌入到Server的实现方式,具体的代码流程如下:

图片

结构体关系

在整个master模块中,最主要的Raft实现部分是Partition这部分定义的接口。

Partition是一组raft store partition的操作函数。Partition是RaftSore中multi-raft的分片实现。它在同一个raft服务的实例和系统资源的基础上,同时管理多组raft组。应该来说,对于这些多组raft来说,主控节点和成员节点的变动应该是一致的。

而Cluster则是保存了全部的cluster层信息。

具体的图解说明如下:

图片

raft store

这部分的代码模块在raftstore里面,和master目录同一层级。总的代码行数只有1千多行,是在第三方代码tiglabs/raft的基础上面封装的。github.com/tiglabs/raft就是一个实现了multi-raft的函数库。它可以在一个raft服务器的实例里面,同时管理多个raft复制组。RaftStore里面有一个主要的概念是Partition,是multi-raft的分片。下面我们详细了解一下raft模块提供的功能。RaftStore的代码是在tiglabs/raft的基础上面开发的,我们只分析到RaftStore这层源码。

代码实现

从start函数的m.createRaftServer(cfg)启动了raft服务。函数createRaftServer主要是创建一个raft结构体,启动状态机,创建分区。

func (m *Server) createRaftServer(cfg *config.Config) (err error) {
// 初始化raft变量
    if m.raftStore, err = raftstore.NewRaftStore(raftCfg, cfg); err != nil {
        return errors.Trace(err, "NewRaftStore failed! id[%v] walPath[%v]", m.id, m.walDir)
    }
// 设置状态机和外部的挂钩函数,这个是持久化保存raft数据的写法。
    m.initFsm()
// 在raft store中创建分区的操作接口
    if m.partition, err = m.raftStore.CreatePartition(partitionCfg); err != nil {
        return errors.Trace(err, "CreatePartition failed")
    }
    return
}

结构体

首先是定义RaftStore配置的结构体Config,结构体里面比较重要的变量有:

type Config struct {
    // 节点编号
    NodeID            uint64
    // 心跳和选举超时时间
    TickInterval int
    // 选举过期时间。如果在这个时间段没有收到任何leader发送过来的消息,就重新选举
    ElectionTick int
}

分区partitions的配置信息中比较重要的变量有:

type PartitionConfig struct {
 // peers所在的IP地址组
    Peers   []PeerAddress
 // 状态机
    SM      PartitionFsm
}

raftStore结构体中比较重要的变量:

type raftStore struct {
    nodeID     uint64 // 节点ID
    resolver   NodeResolver // node地址解析和管理
    raftConfig *raft.Config // 第三方代码库函数的配置
    raftServer *raft.RaftServer // 第三方代码raft的结构体
}

Partition接口

这是一组操作raft store partition的必要函数。这部分的写法是定义一组操作raft的interface接口Partition,然后在partition结构体的成员函数里面调用tiglabs/raft实现具体的功能。这些函数接口都是和tiglabs/raft提供的接口基本一致。

type Partition interface {
    // 向raft日志提交命令.
    Submit(cmd []byte) (resp interface{}, err error)

    // 向raft日志提交成员改变的事件和信息.
    ChangeMember(changeType proto.ConfChangeType, peer proto.Peer, context []byte) (resp interface{}, err error)

    // 把raft partition从raft server里面移除,并且关闭.
    Stop() error

    // 停止并且删除partition.
    Delete() error

    // 返回当前raft的状态信息.
    Status() (status *PartitionStatus)

    // 给出当前组leader的ID和term。
    LeaderTerm() (leaderID, term uint64)

    // 如果该节点是组的leader就返回true,否则false.
    IsRaftLeader() bool

    // 给出当前raft日志的applied index值.
    AppliedIndex() uint64

    // 给出当前raft日志的committed index值
    CommittedIndex() uint64

    // 截断raft日志
    Truncate(index uint64)

    // 触发选举,尝试变成leader
    TryToLeader(nodeID uint64) error

    // 检查当前组是否活跃的节点数目大于组内节点的一半。只有大于一半才是上线。
    IsOfflinePeer() bool
}

这些函数都是封装了成员变量raft所实现的第三方接口函数。第三方接口包括:

ChangeMember // 提交成员改变事件和信息到日志
RemoveRaft // 从raft组中删除并且关闭该partition
TryToLeader // 发起选举
Status // 查询当前节点的状态
LeaderTerm // 查询当前leader的节点ID和term
IsLeader // 判断一个节点是不是leader
AppliedIndex // 查询当前日志的applied index
CommittedIndex // 查询当前日志的committed index
Submit // 发送命令到日志
Truncate // 截断raft日志

这些函数接口都是实现了raft协议的模块接口函数,也是使用raft的必要接口。简单来说,就是一些发起选举,查询选举状态,查询leader节点,添加删除成员,修改日志等操作。所以我们说raft协议主要就是包括了选举和日志。

RaftStore接口

这部分定义raft store的接口函数。这部分也是一组接口函数,具体实现是在结构体raftStore的成员函数里面。函数实现的功能说明在如下面的代码注释里面。

type RaftStore interface {
    // 在raft store里面创建一个新的Partition
    CreatePartition(cfg *PartitionConfig) (Partition, error)

    // 停止raft store服务
    Stop()

    // 返回raft的配置信息
    RaftConfig() *raft.Config

    // 查询raft服务的状态
    RaftStatus(raftID uint64) (raftStatus *raft.Status)

    // 节点地址管理的函数,包括添加节点地址,删除节点地址
    NodeManager

    // 返回raftServer的指针,这样就可以在状态机里面操作raft接口。
    RaftServer() *raft.RaftServer
}

这部分raftStore.raftServer同partition.raft一样,都是raft.RaftServer的指针。只是raftStore.raftServer是整个服务的,而partition.raft只是单独partition的操作。 这部分代码里面其余的代码都是次要的,我们就不再累述。

状态机

从Raft的角色切换图中可以看到,集群里面节点的角色就是一个有限状态图。所以Raft的代码实现里面必然有一个状态机。为了能够和外层调用者的代码进行交互,tiglabs/raft设计了一个状态结构体raft.StateMachine,然后把m.fsm赋予它。m.fsm的类型是MetadataFsm。MetadataFsm实现raft.StateMachine的一组接口函数,从而在第三方代码里面进行调用的。

在master这部分代码里面,一共有3个结构体用到了状态机,分别是:Server,User,Cluster,这3个结构体实际指向的fsm都是相同的结构体。

下面我们详述MetadataFsm的实现。结构体MetadataFsm是用来表示metadata partition的有限状态机。其重要的成员变量如下:

type MetadataFsm struct {
    // 持久化保存raft的日志信息
    store               *raftstore.RocksDBStore
}

成员函数

状态机结构体MetadataFsm的下面函数都是raft.StateMachine的接口实现:

func (*MetadataFsm).Apply(command []byte, index uint64) (resp interface{}, err error)
func (*MetadataFsm).ApplyMemberChange(confChange *proto.ConfChange, index uint64) (interface{}, error)
func (*MetadataFsm).ApplySnapshot(peers []proto.Peer, iterator proto.SnapIterator) (err error)
func (*MetadataFsm).HandleFatalEvent(err *raft.FatalError)
func (*MetadataFsm).HandleLeaderChange(leader uint64)
func (*MetadataFsm).Snapshot() (proto.Snapshot, error)

这部分的函数都是raft.StateMachine的接口实现。raft.StateMachine提供的这些接口则是用来持久化保存数据以及实现数据的快照功能。所以大家在查看源码的时候,可以发现这部分代码在metadata_fsm.go中的实现,最后都是保存数据到rocksDB里面。

初始化

后面4个成员变量都是函数,是外面模块注册进去的函数。这些成员变量函数都是通过register函数进行注册的。在文件master/server.go的函数initFsm里面,进行了状态机的初始化:

func (m *Server) initFsm() {
    // 注册成员函数
    // 从rocksDB获得applied的值,赋予结构体里面的成员applied
    m.fsm.restore()
}

运行方式

下面用一张图来简要地描述一下这个状态机是如何工作的。

图片

图中我们只画出了操作:Apply,ApplyMemberChange,HandleLeaderChange。实际上还有快照的操作Snapshot和ApplySnapshot,以及出错的事件HandleFatalEvent。后面3个我们就不再累述。

保存数据

这里我们重点解读一下状态机中用来持久化保存数据的函数Apply。

图片

这个函数的重点是操作rocksDb来实现数据的持久化保存。

挂钩函数

下面我们解读一下注册的成员函数的具体功能。这部分代码在文件master/master_manager.go里面,这4个函数都是结构体Server的成员函数。

handleLeaderChange

这个函数就是当Raft的leader节点发生变化的时候,第三方的代码通知外层调用者的方式。它通过leader参数告知当前Raft集群的主控ID。然后外层调用者就可以进行上层的配置操作。

图片

代码的具体实现如下里面添加的注释说明。其中AddrDatabase是一个存储host地址的字典,具体是解析配置文件里面的peers值。

func (m *Server) handleLeaderChange(leader uint64) {
    //…… 略 …… 
    m.leaderInfo.addr = AddrDatabase[leader]
    //设置proxy重定向到新的leader地址
    m.reverseProxy = m.newReverseProxy()

    // 新的leader是本节点
    if m.id == leader {
        // 地址有变动,比如第一次启动。
        if oldLeaderAddr != m.leaderInfo.addr {
            m.cluster.checkPersistClusterValue() //检查cluster值是否持久化保存
            m.loadMetadata() // 加载保存的metadata到内存里面
            m.metaReady = true //启动HTTP服务时,在registerAPIMiddleware中使用到的判断条件
        }
        m.cluster.checkDataNodeHeartbeat() //创建检查DataNode心跳的任务,添加到AdminTaskManager的TaskMap里面
        m.cluster.checkMetaNodeHeartbeat() //同上一样,创建MetaNode心跳任务
        m.cluster.followerReadManager.reSet() //清空followerReadManager的成员
    } else {
    // 新的leader是其它节点
        m.clearMetadata() //清空元数据
        m.metaReady = false
        //检查addr是否在MasterClient里面,如果没有就添加进去。并且设置leaderAddr为该值
        m.cluster.masterClient.AddNode(m.leaderInfo.addr)
        //设置leaderAddr为addr
        m.cluster.masterClient.SetLeader(m.leaderInfo.addr)
    }
}

简单来说,这个函数实现的功能就是当master节点发生改变时,重新设置重定向Proxy服务。如果本节点是master,则初始化元数据的值,建立检查心跳的任务。如果不是,则清除内存中的元数据,并且更新masterClient里面的leaderAddr。

handlePeerChange

这个函数主要是当一个raft组添加或者删除一个节点时,触发上层调用者的操作。具体函数实现如同下面注释。

func (m *Server) handlePeerChange(confChange *proto.ConfChange) (err error) {
    switch confChange.Type {
    case proto.ConfAddNode:
//…… 略 …… 
        // 往nodeResolver的字典里面添加心跳和复制的记录
        m.raftStore.AddNodeWithPort(confChange.Peer.ID, arr[0], int(m.config.heartbeatPort), int(m.config.replicaPort))
        AddrDatabase[confChange.Peer.ID] = string(confChange.Context)
    case proto.ConfRemoveNode:
        // 从nodeResolver里面删除键值为confChange.Peer.ID的记录
        m.raftStore.DeleteNode(confChange.Peer.ID)
//…… 略 …… 

handleApplySnapshot

这个是Raft进行快照操作的回调函数。

func (m *Server) handleApplySnapshot() {
    // 从rocksDB里面读取applied的值,并且赋于状态机Server.fsm
    m.fsm.restore()
    // 从rocksDB里面取回值,赋予Server.cluster.idAlloc
    m.restoreIDAlloc()
    return
}

这个函数的主要功能就是把保存在rocksDB里面的记录读取出来,赋予Server.fsm和cluster.idAlloc这两个变量,从而实现快照的功能。

handleRaftUserCmd

这个回调函数是实现设置主控节点的LimiterInfo信息更新,目前也只支持这个功能。别的操作命令都不支持。

func (m *Server) handleRaftUserCmd(opt uint32, key string, cmdMap map[string][]byte) (err error) {
    switch opt {
    case opSyncPutFollowerApiLimiterInfo, opSyncPutApiLimiterInfo:
        if m.cluster != nil && !m.partition.IsRaftLeader() {
            // 更新Server.cluster.apiLimiter里面的字典limiterInfos
            m.cluster.apiLimiter.updateLimiterInfoFromLeader(cmdMap[key])
        }
//…… 略 ……
}

Raft命令

这部分raft命令的格式是定义在master/metadata_fsm_op.go文件里面的,也只有master模块里面的代码才能正确解析和运行这个命令的格式。

在master模块里面定义一个raft命令的格式,并且附带了3个成员函数。

type RaftCmd struct {
    Op uint32 `json:"op"`
    K  string `json:"k"`
    V  []byte `json:"v"`
}

这个结构体的操作函数有3个:

func (*RaftCmd).Marshal() ([]byte, error)
func (*RaftCmd).Unmarshal(data []byte) (err error)
func (*RaftCmd).setOpType()

这些命令涉及的操作有好多个,我们就只用其中一个进行代码说明。

提交命令

这些提交命令到raft里面的流程是大同小异,我们就用其中一个来说明一下代码的流程。

同步提交cluster信息的函数是syncPutCluster,我们用这个来说明一下RaftCmd的使用流程。这条命令的操作码是opSyncPutCluster,键值是#c#name。后面的name是Cluster.Name的值。

func (c *Cluster) syncPutCluster() (err error) {
// 组织一条RaftCmd格式的命令,并且从结构体Cluster里面取出信息,生成命令的内容。
    metadata := new(RaftCmd)
// 提交命令
    return c.submit(metadata)
}

可以看出,这个函数就是组装一条RaftCmd命令,然后复制一下Cluster里面的值,最后调用submit函数提交。 而submit函数则是再次封装一下,然后调用partition.Submit发送命令。

func (c *Cluster) submit(metadata *RaftCmd) (err error) {
// 往partition提交命令,发送到raft组里面。
    if _, err = c.partition.Submit(cmd); err != nil {
}

partition.Submit则是提交命令数据给raft日志。再深入进去就是raft的第三方软件包代码。

命令集

因为这部分代码的实现基本一致,只是封装的命令操作码和内容不同,所以我们不再一一详述。在这里,我们只是列出入口函数,操作码,键值格式,功能说明等。

函数操作码键值格式
syncPutClusteropSyncPutCluster#c#name
syncAddNodeSetopSyncAddNodeSet#s#id

这部分代码定义的命令格式不少,我们只是说明一下格式,就不再一一列举。有兴趣进一步了解的同学可以自行翻阅这部分代码。代码在文件master/metadata_fsm_op.go里面。

示例

在这里,我们通过走读一个应用raft的代码流程,来说明如何使用这个功能。这个例子的重点是raft的代码流程,而不是完整的创建卷的代码实现。这部分的流程图如下:

图片

  1. 向master发起创建卷的HTTP请求。其中主要的实现就是调用c.doCreateVol(req)创建卷。
  2. 在doCreateVol中,调用c.syncAddVol(vol)发起添加卷。这个就是和raft相关的代码部分。
  3. 函数syncAddVol接着调用c.syncPutVolInfo。函数syncPutVolInfo创建一个RaftCmd结构体,key值是#vol#+ vol.ID,value值是volValue结构体,op code是opSyncAddVol(值是4)。
  4. 函数syncPutVolInfo调用c.submit(metadata)提交日志。函数submit把RaftCmd结构体转换为[]byte字节(json格式的转换)。函数submit调用c.partition.Submit(cmd)向raft集群提交日志。
  5. 接口Partition的函数Submit先是判断当前节点是否是leader,如果是,就调用p.raft.Submit(p.id, cmd)提交日志。结构体RaftServer的成员函数Submit向raft集群提交日志。
  6. 接下来就是第三方代码tiglabs/raft的实现。这里我们略过不提。感兴趣的读者可以自行阅读这部分代码。我们需要重点提到的是:在第三方代码中,会调用状态机MetadataFsm的实现函数Apply。这个函数就是我们一开始创建raft集群时配置的参数。
  7. 函数Apply把[]byte字节转换为RaftCmd结构体。因为op code是opSyncAddVol,所以是调用mf.store.BatchPut(cmdMap, true)把key和value这对值存放入rocksDB。 到这里,这部分raft的代码流程就走完了。后面是一些另外创建卷的内容。我们就不在这里继续描述。

结尾

在上面的章节中,我们介绍了master模块的raft。我们只是大概说明一下里面的代码实现,如果读者有兴趣了解,可以进一步阅读源码。后续我们还会继续解读别的代码部分。欢迎大家指正文中的错误。

参考文章

[1] https://open in new windowraft.github.io/raft.pdfopen in new window

[2] https://zhuanlan.zhihu.com/p/32052223open in new window

[3] https://zhuanlan.zhihu.com/p/33047950open in new window