cubefs“源”理解读_ Master源码解读之raft
前言
模块master在cubefs里面一共有超过2.8万行代码,如果加上raft的代码,大约有3万行。这是一个庞大的模块,我们只能一一进行拆分解读。在一系列的文章中,我们将逐个详细介绍master的子功能,希望对读者能够有所帮助。
本文我们重点讲述Raft相关的知识和代码。master模块使用的multi-raft协议来管理meta/data partition的。我们先介绍一下Raft的基本概念,然后再说明multi-raft。
Raft
Raft 是一种更为简单方便易于理解的分布式算法,主要解决了分布式中的一致性问题。相比传统的 Paxos 算法,Raft 将大量的计算问题分解成为了一些简单的相对独立的子问题,并有着和 Multi-Paxos 同样的性能。Raft算法把一致性问题分成了选主,日志复制和脑裂情景等。
简单来说,Raft就是一个选主算法,它定义角色:Leader,Candidate,Follower,并且提出Leader Election(选举),Term(任期),Election Timeout(选举超时)这些概念。同时它还包括了日志复制和脑裂情景。
Raft要求系统在任意时刻最多只有一个Leader,正常工作期间只有Leader和Followers。
这三个角色之间是可以转化的,它们的转化图如下:
当Leader选出后,就开始接收客户端的请求。leader节点采用了一定的算法来实现日志的同步和安全性。Raft的日志保存方式如下[2]:
Raft算法还包括了日志压缩,成员变更等内容。我们就不再详细描述。对于使用Raft的初学者来说,了解到Raft可以自动选主,以及保存日志就足够了。
Raft的详细介绍网络上面有很多资料[1],里面描述地非常详细。在这里我们只是简单地介绍一下里面的一些概念,方便我们展开下文的描述。
multi-raft
简单来说,Multi-Raft是在整个系统中,把所管理的数据按照一定的方式切片,每一个切片的数据都有自己的副本,这些副本之间的数据使用Raft来保证数据的一致性,在全局来看整个系统中同时存在多个Raft-Group,如图所示[3]:
使用方式
在这里,我们介绍一下第三方tiglabs/raft库函数的使用方式。有了这个基础,我们就可以深入分析master模块中的相关代码。
raft.StateMachine的定义接口:
Apply(command []byte, index uint64) (resp interface{}, err error)
ApplyMemberChange(confChange *proto.ConfChange, index uint64) (interface{}, error)
Snapshot() (proto.Snapshot, error)
ApplySnapshot(peers []proto.Peer, iterator proto.SnapIterator) (err error)
HandleFatalEvent(err *raft.FatalError)
HandleLeaderChange(leader uint64)
这些接口都是在tiglabs/raft里面进行调用。 master模块实现了一个raft.StateMachine的结构体MetadataFsm。这个放在文件master/metadata_fsm.go里面。其中Apply的函数把数据更新到rocksDB里面。
在createRaftServer函数里面,先是创建了一个m.raftStore(RaftStore的实例),然后再创建m.fsm(raft.StateMachine的实例),最后调用m.raftStore.CreatePartition,传入参数raftstore.PartitionConfig(其中的SM就是m.fsm)。这样就把rocksDB和raft关联起来。具体代码如下:
// 创建raft store中的一个partition.
func (s *raftStore) CreatePartition(cfg *PartitionConfig) (p Partition, err error) {
// 略.
rc := &raft.RaftConfig{
// 这里就是m.fsm
StateMachine: cfg.SM,
Applied: cfg.Applied,
}
if err = s.raftServer.CreateRaft(rc); err != nil {
return
}
p = newPartition(cfg, s.raftServer, walPath)
return
}
其实现的流程图如下: 
初始化raft
raft相关的成员嵌入到Server的实现方式,具体的代码流程如下:

结构体关系
在整个master模块中,最主要的Raft实现部分是Partition这部分定义的接口。
Partition是一组raft store partition的操作函数。Partition是RaftSore中multi-raft的分片实现。它在同一个raft服务的实例和系统资源的基础上,同时管理多组raft组。应该来说,对于这些多组raft来说,主控节点和成员节点的变动应该是一致的。
而Cluster则是保存了全部的cluster层信息。
具体的图解说明如下:

raft store
这部分的代码模块在raftstore里面,和master目录同一层级。总的代码行数只有1千多行,是在第三方代码tiglabs/raft的基础上面封装的。github.com/tiglabs/raft就是一个实现了multi-raft的函数库。它可以在一个raft服务器的实例里面,同时管理多个raft复制组。RaftStore里面有一个主要的概念是Partition,是multi-raft的分片。下面我们详细了解一下raft模块提供的功能。RaftStore的代码是在tiglabs/raft的基础上面开发的,我们只分析到RaftStore这层源码。
代码实现
从start函数的m.createRaftServer(cfg)启动了raft服务。函数createRaftServer主要是创建一个raft结构体,启动状态机,创建分区。
func (m *Server) createRaftServer(cfg *config.Config) (err error) {
// 初始化raft变量
if m.raftStore, err = raftstore.NewRaftStore(raftCfg, cfg); err != nil {
return errors.Trace(err, "NewRaftStore failed! id[%v] walPath[%v]", m.id, m.walDir)
}
// 设置状态机和外部的挂钩函数,这个是持久化保存raft数据的写法。
m.initFsm()
// 在raft store中创建分区的操作接口
if m.partition, err = m.raftStore.CreatePartition(partitionCfg); err != nil {
return errors.Trace(err, "CreatePartition failed")
}
return
}
结构体
首先是定义RaftStore配置的结构体Config,结构体里面比较重要的变量有:
type Config struct {
// 节点编号
NodeID uint64
// 心跳和选举超时时间
TickInterval int
// 选举过期时间。如果在这个时间段没有收到任何leader发送过来的消息,就重新选举
ElectionTick int
}
分区partitions的配置信息中比较重要的变量有:
type PartitionConfig struct {
// peers所在的IP地址组
Peers []PeerAddress
// 状态机
SM PartitionFsm
}
raftStore结构体中比较重要的变量:
type raftStore struct {
nodeID uint64 // 节点ID
resolver NodeResolver // node地址解析和管理
raftConfig *raft.Config // 第三方代码库函数的配置
raftServer *raft.RaftServer // 第三方代码raft的结构体
}
Partition接口
这是一组操作raft store partition的必要函数。这部分的写法是定义一组操作raft的interface接口Partition,然后在partition结构体的成员函数里面调用tiglabs/raft实现具体的功能。这些函数接口都是和tiglabs/raft提供的接口基本一致。
type Partition interface {
// 向raft日志提交命令.
Submit(cmd []byte) (resp interface{}, err error)
// 向raft日志提交成员改变的事件和信息.
ChangeMember(changeType proto.ConfChangeType, peer proto.Peer, context []byte) (resp interface{}, err error)
// 把raft partition从raft server里面移除,并且关闭.
Stop() error
// 停止并且删除partition.
Delete() error
// 返回当前raft的状态信息.
Status() (status *PartitionStatus)
// 给出当前组leader的ID和term。
LeaderTerm() (leaderID, term uint64)
// 如果该节点是组的leader就返回true,否则false.
IsRaftLeader() bool
// 给出当前raft日志的applied index值.
AppliedIndex() uint64
// 给出当前raft日志的committed index值
CommittedIndex() uint64
// 截断raft日志
Truncate(index uint64)
// 触发选举,尝试变成leader
TryToLeader(nodeID uint64) error
// 检查当前组是否活跃的节点数目大于组内节点的一半。只有大于一半才是上线。
IsOfflinePeer() bool
}
这些函数都是封装了成员变量raft所实现的第三方接口函数。第三方接口包括:
ChangeMember // 提交成员改变事件和信息到日志
RemoveRaft // 从raft组中删除并且关闭该partition
TryToLeader // 发起选举
Status // 查询当前节点的状态
LeaderTerm // 查询当前leader的节点ID和term
IsLeader // 判断一个节点是不是leader
AppliedIndex // 查询当前日志的applied index
CommittedIndex // 查询当前日志的committed index
Submit // 发送命令到日志
Truncate // 截断raft日志
这些函数接口都是实现了raft协议的模块接口函数,也是使用raft的必要接口。简单来说,就是一些发起选举,查询选举状态,查询leader节点,添加删除成员,修改日志等操作。所以我们说raft协议主要就是包括了选举和日志。
RaftStore接口
这部分定义raft store的接口函数。这部分也是一组接口函数,具体实现是在结构体raftStore的成员函数里面。函数实现的功能说明在如下面的代码注释里面。
type RaftStore interface {
// 在raft store里面创建一个新的Partition
CreatePartition(cfg *PartitionConfig) (Partition, error)
// 停止raft store服务
Stop()
// 返回raft的配置信息
RaftConfig() *raft.Config
// 查询raft服务的状态
RaftStatus(raftID uint64) (raftStatus *raft.Status)
// 节点地址管理的函数,包括添加节点地址,删除节点地址
NodeManager
// 返回raftServer的指针,这样就可以在状态机里面操作raft接口。
RaftServer() *raft.RaftServer
}
这部分raftStore.raftServer同partition.raft一样,都是raft.RaftServer的指针。只是raftStore.raftServer是整个服务的,而partition.raft只是单独partition的操作。 这部分代码里面其余的代码都是次要的,我们就不再累述。
状态机
从Raft的角色切换图中可以看到,集群里面节点的角色就是一个有限状态图。所以Raft的代码实现里面必然有一个状态机。为了能够和外层调用者的代码进行交互,tiglabs/raft设计了一个状态结构体raft.StateMachine,然后把m.fsm赋予它。m.fsm的类型是MetadataFsm。MetadataFsm实现raft.StateMachine的一组接口函数,从而在第三方代码里面进行调用的。
在master这部分代码里面,一共有3个结构体用到了状态机,分别是:Server,User,Cluster,这3个结构体实际指向的fsm都是相同的结构体。
下面我们详述MetadataFsm的实现。结构体MetadataFsm是用来表示metadata partition的有限状态机。其重要的成员变量如下:
type MetadataFsm struct {
// 持久化保存raft的日志信息
store *raftstore.RocksDBStore
}
成员函数
状态机结构体MetadataFsm的下面函数都是raft.StateMachine的接口实现:
func (*MetadataFsm).Apply(command []byte, index uint64) (resp interface{}, err error)
func (*MetadataFsm).ApplyMemberChange(confChange *proto.ConfChange, index uint64) (interface{}, error)
func (*MetadataFsm).ApplySnapshot(peers []proto.Peer, iterator proto.SnapIterator) (err error)
func (*MetadataFsm).HandleFatalEvent(err *raft.FatalError)
func (*MetadataFsm).HandleLeaderChange(leader uint64)
func (*MetadataFsm).Snapshot() (proto.Snapshot, error)
这部分的函数都是raft.StateMachine的接口实现。raft.StateMachine提供的这些接口则是用来持久化保存数据以及实现数据的快照功能。所以大家在查看源码的时候,可以发现这部分代码在metadata_fsm.go中的实现,最后都是保存数据到rocksDB里面。
初始化
后面4个成员变量都是函数,是外面模块注册进去的函数。这些成员变量函数都是通过register函数进行注册的。在文件master/server.go的函数initFsm里面,进行了状态机的初始化:
func (m *Server) initFsm() {
// 注册成员函数
// 从rocksDB获得applied的值,赋予结构体里面的成员applied
m.fsm.restore()
}
运行方式
下面用一张图来简要地描述一下这个状态机是如何工作的。

图中我们只画出了操作:Apply,ApplyMemberChange,HandleLeaderChange。实际上还有快照的操作Snapshot和ApplySnapshot,以及出错的事件HandleFatalEvent。后面3个我们就不再累述。
保存数据
这里我们重点解读一下状态机中用来持久化保存数据的函数Apply。

这个函数的重点是操作rocksDb来实现数据的持久化保存。
挂钩函数
下面我们解读一下注册的成员函数的具体功能。这部分代码在文件master/master_manager.go里面,这4个函数都是结构体Server的成员函数。
handleLeaderChange
这个函数就是当Raft的leader节点发生变化的时候,第三方的代码通知外层调用者的方式。它通过leader参数告知当前Raft集群的主控ID。然后外层调用者就可以进行上层的配置操作。

代码的具体实现如下里面添加的注释说明。其中AddrDatabase是一个存储host地址的字典,具体是解析配置文件里面的peers值。
func (m *Server) handleLeaderChange(leader uint64) {
//…… 略 ……
m.leaderInfo.addr = AddrDatabase[leader]
//设置proxy重定向到新的leader地址
m.reverseProxy = m.newReverseProxy()
// 新的leader是本节点
if m.id == leader {
// 地址有变动,比如第一次启动。
if oldLeaderAddr != m.leaderInfo.addr {
m.cluster.checkPersistClusterValue() //检查cluster值是否持久化保存
m.loadMetadata() // 加载保存的metadata到内存里面
m.metaReady = true //启动HTTP服务时,在registerAPIMiddleware中使用到的判断条件
}
m.cluster.checkDataNodeHeartbeat() //创建检查DataNode心跳的任务,添加到AdminTaskManager的TaskMap里面
m.cluster.checkMetaNodeHeartbeat() //同上一样,创建MetaNode心跳任务
m.cluster.followerReadManager.reSet() //清空followerReadManager的成员
} else {
// 新的leader是其它节点
m.clearMetadata() //清空元数据
m.metaReady = false
//检查addr是否在MasterClient里面,如果没有就添加进去。并且设置leaderAddr为该值
m.cluster.masterClient.AddNode(m.leaderInfo.addr)
//设置leaderAddr为addr
m.cluster.masterClient.SetLeader(m.leaderInfo.addr)
}
}
简单来说,这个函数实现的功能就是当master节点发生改变时,重新设置重定向Proxy服务。如果本节点是master,则初始化元数据的值,建立检查心跳的任务。如果不是,则清除内存中的元数据,并且更新masterClient里面的leaderAddr。
handlePeerChange
这个函数主要是当一个raft组添加或者删除一个节点时,触发上层调用者的操作。具体函数实现如同下面注释。
func (m *Server) handlePeerChange(confChange *proto.ConfChange) (err error) {
switch confChange.Type {
case proto.ConfAddNode:
//…… 略 ……
// 往nodeResolver的字典里面添加心跳和复制的记录
m.raftStore.AddNodeWithPort(confChange.Peer.ID, arr[0], int(m.config.heartbeatPort), int(m.config.replicaPort))
AddrDatabase[confChange.Peer.ID] = string(confChange.Context)
case proto.ConfRemoveNode:
// 从nodeResolver里面删除键值为confChange.Peer.ID的记录
m.raftStore.DeleteNode(confChange.Peer.ID)
//…… 略 ……
handleApplySnapshot
这个是Raft进行快照操作的回调函数。
func (m *Server) handleApplySnapshot() {
// 从rocksDB里面读取applied的值,并且赋于状态机Server.fsm
m.fsm.restore()
// 从rocksDB里面取回值,赋予Server.cluster.idAlloc
m.restoreIDAlloc()
return
}
这个函数的主要功能就是把保存在rocksDB里面的记录读取出来,赋予Server.fsm和cluster.idAlloc这两个变量,从而实现快照的功能。
handleRaftUserCmd
这个回调函数是实现设置主控节点的LimiterInfo信息更新,目前也只支持这个功能。别的操作命令都不支持。
func (m *Server) handleRaftUserCmd(opt uint32, key string, cmdMap map[string][]byte) (err error) {
switch opt {
case opSyncPutFollowerApiLimiterInfo, opSyncPutApiLimiterInfo:
if m.cluster != nil && !m.partition.IsRaftLeader() {
// 更新Server.cluster.apiLimiter里面的字典limiterInfos
m.cluster.apiLimiter.updateLimiterInfoFromLeader(cmdMap[key])
}
//…… 略 ……
}
Raft命令
这部分raft命令的格式是定义在master/metadata_fsm_op.go文件里面的,也只有master模块里面的代码才能正确解析和运行这个命令的格式。
在master模块里面定义一个raft命令的格式,并且附带了3个成员函数。
type RaftCmd struct {
Op uint32 `json:"op"`
K string `json:"k"`
V []byte `json:"v"`
}
这个结构体的操作函数有3个:
func (*RaftCmd).Marshal() ([]byte, error)
func (*RaftCmd).Unmarshal(data []byte) (err error)
func (*RaftCmd).setOpType()
这些命令涉及的操作有好多个,我们就只用其中一个进行代码说明。
提交命令
这些提交命令到raft里面的流程是大同小异,我们就用其中一个来说明一下代码的流程。
同步提交cluster信息的函数是syncPutCluster,我们用这个来说明一下RaftCmd的使用流程。这条命令的操作码是opSyncPutCluster,键值是#c#name。后面的name是Cluster.Name的值。
func (c *Cluster) syncPutCluster() (err error) {
// 组织一条RaftCmd格式的命令,并且从结构体Cluster里面取出信息,生成命令的内容。
metadata := new(RaftCmd)
// 提交命令
return c.submit(metadata)
}
可以看出,这个函数就是组装一条RaftCmd命令,然后复制一下Cluster里面的值,最后调用submit函数提交。 而submit函数则是再次封装一下,然后调用partition.Submit发送命令。
func (c *Cluster) submit(metadata *RaftCmd) (err error) {
// 往partition提交命令,发送到raft组里面。
if _, err = c.partition.Submit(cmd); err != nil {
}
partition.Submit则是提交命令数据给raft日志。再深入进去就是raft的第三方软件包代码。
命令集
因为这部分代码的实现基本一致,只是封装的命令操作码和内容不同,所以我们不再一一详述。在这里,我们只是列出入口函数,操作码,键值格式,功能说明等。
| 函数 | 操作码 | 键值格式 |
|---|---|---|
| syncPutCluster | opSyncPutCluster | #c#name |
| syncAddNodeSet | opSyncAddNodeSet | #s#id |
这部分代码定义的命令格式不少,我们只是说明一下格式,就不再一一列举。有兴趣进一步了解的同学可以自行翻阅这部分代码。代码在文件master/metadata_fsm_op.go里面。
示例
在这里,我们通过走读一个应用raft的代码流程,来说明如何使用这个功能。这个例子的重点是raft的代码流程,而不是完整的创建卷的代码实现。这部分的流程图如下:

- 向master发起创建卷的HTTP请求。其中主要的实现就是调用c.doCreateVol(req)创建卷。
- 在doCreateVol中,调用c.syncAddVol(vol)发起添加卷。这个就是和raft相关的代码部分。
- 函数syncAddVol接着调用c.syncPutVolInfo。函数syncPutVolInfo创建一个RaftCmd结构体,key值是#vol#+ vol.ID,value值是volValue结构体,op code是opSyncAddVol(值是4)。
- 函数syncPutVolInfo调用c.submit(metadata)提交日志。函数submit把RaftCmd结构体转换为[]byte字节(json格式的转换)。函数submit调用c.partition.Submit(cmd)向raft集群提交日志。
- 接口Partition的函数Submit先是判断当前节点是否是leader,如果是,就调用p.raft.Submit(p.id, cmd)提交日志。结构体RaftServer的成员函数Submit向raft集群提交日志。
- 接下来就是第三方代码tiglabs/raft的实现。这里我们略过不提。感兴趣的读者可以自行阅读这部分代码。我们需要重点提到的是:在第三方代码中,会调用状态机MetadataFsm的实现函数Apply。这个函数就是我们一开始创建raft集群时配置的参数。
- 函数Apply把[]byte字节转换为RaftCmd结构体。因为op code是opSyncAddVol,所以是调用mf.store.BatchPut(cmdMap, true)把key和value这对值存放入rocksDB。 到这里,这部分raft的代码流程就走完了。后面是一些另外创建卷的内容。我们就不在这里继续描述。
结尾
在上面的章节中,我们介绍了master模块的raft。我们只是大概说明一下里面的代码实现,如果读者有兴趣了解,可以进一步阅读源码。后续我们还会继续解读别的代码部分。欢迎大家指正文中的错误。
参考文章
[1] https://raft.github.io/raft.pdf

