概述

在上一篇文章中介绍了 raft 读请求源码走读,本文继续结合 etcd-raft 源码介绍 raft 写请求流程。

需要说明的是,本文使用的是单节点集群环境,后续会补充多节点集群环境加以介绍。

写请求流程

客户端写入请求

客户端通过 PUT 请求写入键值对请求:

curl -L http://127.0.0.1:12380/my-key -XPUT -d hello

该请求被 httpKVAPI 处理:

 1func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {  
 2    key := r.RequestURI  
 3    defer r.Body.Close()  
 4    switch r.Method {  
 5    case http.MethodPut:                // 匹配 PUT 方法,处理写请求
 6       v, err := io.ReadAll(r.Body)  
 7       if err != nil {  
 8          log.Printf("Failed to read on PUT (%v)\n", err)  
 9          http.Error(w, "Failed on PUT", http.StatusBadRequest)  
10          return  
11       }  
12  
13       h.store.Propose(key, string(v))  // 调用 h.store.Propose 提交该请求
14	   w.WriteHeader(http.StatusNoContent)
15    }
16    ...
17}
18
19func (s *kvstore) Propose(k string, v string) {  
20    var buf strings.Builder  
21    if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {  
22       log.Fatal(err)  
23    }  
24    s.proposeC <- buf.String()  
25}

httpKVAPI.ServeHTTP 的重点在于 httpKVAPI.store.Propose 方法。该方法的作用是将写请求键值对编码后写入 kvstore.proposeC 通道。

那么谁在消费 proposeC 呢?

应用层消费 proposeC

应用层 raftNode 消费客户端写入 proposeC 的写请求。

首先查看 raftNode 的启动流程及整体结构。

raftNode 启动流程

main.gonewRaftNode 会创建应用层 RaftNode 节点:

 1func main() {
 2	...
 3	proposeC := make(chan string)  
 4	defer close(proposeC)  
 5	confChangeC := make(chan raftpb.ConfChange)  
 6	defer close(confChangeC)
 7	
 8	var kvs *kvstore  
 9	getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }  
10	
11	// 创建 RaftNode 节点,并且将客户端写入的 proposeC 作为参数传递给 newRaftNode
12	commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
13	...
14}
15
16func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,  
17    confChangeC <-chan raftpb.ConfChange,  
18) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) {  
19    commitC := make(chan *commit)  
20    errorC := make(chan error)  
21  
22    rc := &raftNode{  
23       proposeC:    proposeC,       // proposeC 传入 raftNode
24       ... 
25    }  
26    go rc.startRaft()  
27    return commitC, errorC, rc.snapshotterReady  
28}

创建 raftNode 后,调用 raftNode.startRaft 方法启动一个协程异步运行 startRaft 方法:

1func (rc *raftNode) startRaft() {  
2    ...
3    go rc.serveRaft()        // 启动协程监听 raft api
4    go rc.serveChannels()    // 启动协程监听 raftNode 的通道
5}

重点介绍 raft.serveChannels 方法:

 1func (rc *raftNode) serveChannels() {  
 2    ... 
 3  
 4    // send proposals over raft  
 5    go func() {  
 6       confChangeCount := uint64(0)  
 7  
 8       for rc.proposeC != nil && rc.confChangeC != nil {  
 9          select {  
10          case prop, ok := <-rc.proposeC:      // 在这里消费 proposeC 的写请求键值对
11             if !ok {  
12                rc.proposeC = nil  
13             } else {  
14                // blocks until accepted by raft state machine  
15                rc.node.Propose(context.TODO(), []byte(prop))  
16             }    
17          }  
18       }  
19       // client closed channel; shutdown raft if not already  
20       close(rc.stopc)  
21    }()  
22  
23    // for-select 事件循环 
24    for {  
25       select {  
26       case <-ticker.C:  
27          rc.node.Tick()  
28  
29       // store raft entries to wal, then publish over commit channel  
30       case rd := <-rc.node.Ready():  
31          ...
32       }  
33    }  
34}

serveChannels 中应用层 raftNodeproposeC 中消费写请求。

如果 proposeC 关闭将 proposeC 设为 nil 不允许写入。 如果正常读取写请求,则进入 raftNode.node.Propose 处理。

raftNode 组合 node 对象,node 是算法层的入口,通过提供的一系列方法暴露给应用层调用。Propose 即为暴露的其中一个方法。

算法层处理写请求

算法层初始化流程

为了介绍逻辑的完整性,这里有必要在进入 raftNode.startRaft 方法查看算法层 node 是如何创建的:

 1func (rc *raftNode) startRaft() {
 2	...
 3
 4	c := &raft.Config{  
 5	    ID:                        uint64(rc.id),  
 6	    ElectionTick:              10,  
 7	    HeartbeatTick:             1,  
 8	    Storage:                   rc.raftStorage,  
 9	    MaxSizePerMsg:             1024 * 1024,  
10	    MaxInflightMsgs:           256,  
11	    MaxUncommittedEntriesSize: 1 << 30,  
12	}  
13	  
14	if oldwal || rc.join {  
15	    rc.node = raft.RestartNode(c)  
16	} else {  
17	    // 首次创建 raftNode.node
18	    rc.node = raft.StartNode(c, rpeers)  
19	}
20	...
21}
22
23func StartNode(c *Config, peers []Peer) Node {  
24    n := setupNode(c, peers)  
25    go n.run()  
26    return n  
27}	

raft.StartNode(或 raft.RestartNode) 创建 raftNode.node 对象。实际是异步启了一个协程运行 node.run 方法。

node.Propose 处理写请求提交

上一节创建 node后,应用层调用 raftNode.node.Propose 方法提交写请求到算法层:

 1func (n *node) Propose(ctx context.Context, data []byte) error {  
 2	// 将写请求数据封装为 Entry,将 Entry 封装为 Message
 3    return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})  
 4}
 5
 6func (n *node) stepWait(ctx context.Context, m pb.Message) error {  
 7	// 等待提交写请求处理完成
 8    return n.stepWithWaitOption(ctx, m, true)  
 9}
10
11func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {  
12    ...
13    ch := n.propc  
14    pm := msgWithResult{m: m}    // 将 Message 封装为 msgWithResult
15    if wait {  
16       pm.result = make(chan error, 1)  
17    }  
18    select {  
19    case ch <- pm:               // 将 msgWithResult 写入 node.propc 通道
20       if !wait {  
21          return nil  
22       }  
23    case <-ctx.Done():  
24       return ctx.Err()  
25    case <-n.done:  
26       return ErrStopped  
27    }  
28    select {  
29    case err := <-pm.result:     // 等待 msgWithResult.result 返回结果
30       if err != nil {  
31          return err  
32       }  
33    ...
34    }  
35    return nil  
36}

算法层 node.Propose 将数据封装为 msgWithResult 发给 node.propc 通道,并且监听 msgWithResult.result 通道等待返回结果。

那么 msgWithResult 的数据是在哪里消费的呢?

消费 msgWithResult

算法层初始化流程 介绍了创建 node 对象后会异步启动 node.run 协程,这里继续看协程内部在做什么:

 1func (n *node) run() {  
 2    var propc chan msgWithResult  
 3    var readyc chan Ready  
 4    var advancec chan struct{}  
 5    var rd Ready  
 6  
 7    r := n.rn.raft  
 8    lead := None  
 9  
10    for {  
11       ... 
12       if lead != r.lead {  
13          if r.hasLeader() {  
14             if lead == None {  
15                r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)  
16             } else {  
17                r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)  
18             }  
19             // 如果集群有 leader 则将 node.propc 赋给 propc
20             // 意味着只在有 leader 的情况下处理写请求提交
21             propc = n.propc  
22          } else {  
23             r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)  
24             // 如果集群无 leader 将 propc 设为 nil
25             // 意味着无 leader 时不会处理写请求提交
26             propc = nil  
27          }  
28          lead = r.lead  
29       }  
30  
31       select {      
32       case pm := <-propc:     // 从 node.propc 中读取 msgWithResult 对象
33          m := pm.m            // 获取消息
34          m.From = r.id        // 消息是在同一个 raft 节点处理的,将 m.From 设为当前 raft 节点的 id
35          err := r.Step(m)     // 调用 raft.Step 处理消息
36          if pm.result != nil {  
37             pm.result <- err  
38             close(pm.result)  
39          }  
40       ... 
41       }  
42    }  
43}

node.run 消费 node.propc,并且调用 raft.Step 处理消费的消息:

 1func (r *raft) Step(m pb.Message) error {    
 2    switch {  
 3    case m.Term == 0:  // 消息的 Term 为 0,表示该消息为本地消息
 4       // local message  
 5    case m.Term > r.Term:  
 6	   ...
 7    case m.Term < r.Term:  
 8       ...
 9    }  
10  
11    switch m.Type {  
12    case pb.MsgHup:  
13       ...
14    case pb.MsgStorageAppendResp:  
15       ...
16    case pb.MsgStorageApplyResp:  
17       ... 
18    case pb.MsgVote, pb.MsgPreVote:  
19       ... 
20    default:  
21       err := r.step(r, m)  // 这里的消息类型为 MsgProp 会进到默认分支
22       if err != nil {  
23          return err  
24       }  
25    }  
26    return nil  
27}

消息的类型为 MsgProp 会进入到 raft.step 方法处理。raft.step 是一个指针函数指向不同角色的 step 处理函数。

根据不同角色有如下几种处理函数。

candidate

候选者 candidate 处理 MsgProp 消息:

1func stepCandidate(r *raft, m pb.Message) error {  
2    switch m.Type {  
3    case pb.MsgProp:  
4       r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)  
5       return ErrProposalDropped
6    ...
7}

候选者并不能直接处理 MsgProp 写请求的提交消息,如果收到将直接丢弃。

follower

 1func stepFollower(r *raft, m pb.Message) error {  
 2    switch m.Type {  
 3    case pb.MsgProp:  
 4       if r.lead == None {  
 5          r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)  
 6          return ErrProposalDropped  
 7       } else if r.disableProposalForwarding {  
 8          r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)  
 9          return ErrProposalDropped  
10       }  
11       m.To = r.lead  
12       r.send(m)  
13    case pb.MsgApp:  
14       ... 
15    case pb.MsgHeartbeat:  
16       ... 
17    case pb.MsgSnap:  
18       ...  
19    case pb.MsgTransferLeader:  
20       ...  
21    case pb.MsgTimeoutNow:  
22       ...
23    }  
24    return nil  
25}

跟随者 follower 收到 MsgProp 写请求提交消息,会判断当前集群有无 leader。如果有,则调用 raft.send 转发该消息给 leader 处理。如果没有,则丢弃该消息。

leader

 1func stepLeader(r *raft, m pb.Message) error {    
 2    switch m.Type {  
 3    case pb.MsgBeat:  
 4       .... 
 5    case pb.MsgCheckQuorum:  
 6       ... 
 7    case pb.MsgProp:  
 8       if len(m.Entries) == 0 {  
 9          r.logger.Panicf("%x stepped empty MsgProp", r.id)  
10       }  
11       if r.trk.Progress[r.id] == nil {  
12          return ErrProposalDropped  
13       }  
14       if r.leadTransferee != None {  
15          r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)  
16          return ErrProposalDropped  
17       }  
18  
19       for i := range m.Entries {  
20          e := &m.Entries[i]  
21          var cc pb.ConfChangeI  
22          // Entry 类型为 EntryNormal,不匹配
23          if e.Type == pb.EntryConfChange {  
24             ... 
25          } else if e.Type == pb.EntryConfChangeV2 {  
26             ...
27          }  
28          if cc != nil {  
29             ... 
30          }  
31       }  
32  
33	   // 调用 raft.appendEntry 处理 msgWithResult 的Entry
34       if !r.appendEntry(m.Entries...) {  
35          return ErrProposalDropped  
36       }  
37       
38       // 广播给 follower 节点处理
39       r.bcastAppend()  
40       return nil

stepLeader 匹配 MsgProp 消息,并且根据消息中的 Entry.Type 执行不同的处理逻辑。对于 EntryNormal 的 Entry 将调用 raft.appendEntry 处理。

 1func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {  
 2	// 调用 raft.raftLog.lastIndex 获取最近的 index
 3    li := r.raftLog.lastIndex()  
 4    
 5    // 更新 Entry 的 Term 和 Index
 6    for i := range es {  
 7       es[i].Term = r.Term  
 8       es[i].Index = li + 1 + uint64(i)  
 9    }
10    
11    // 将 Entry 添加到 raft.raftLog
12    // 这里两阶段提交的第一阶段添加的是 raft.raftLog.unstable
13    li = r.raftLog.append(es...)  
14    
15    // 获取最新的 last index 并组成 MsgAppResp 的消息发送
16    r.send(pb.Message{To: r.id, Type: pb.MsgAppResp, Index: li})  
17    return true  
18}
19
20func (l *raftLog) lastIndex() uint64 {  
21	// 先从 unstable 获取最近的 index
22    if i, ok := l.unstable.maybeLastIndex(); ok {  
23       return i  
24    }  
25    
26    // 如果 unstable 获取不到最近的 index 就从 raftLog.storage 获取最近的 index
27    i, err := l.storage.LastIndex()  
28    if err != nil {  
29       panic(err) // TODO(bdarnell)  
30    }  
31    return i  
32}
33
34func (l *raftLog) append(ents ...pb.Entry) uint64 {  
35	// 如果 Entry 无数据,则返回
36    if len(ents) == 0 {  
37       return l.lastIndex()  
38    }  
39    
40    // 如果最新的 Entry index 小于 raftLog.committed 触发 panic
41    // 正常情况应该是 raftLog.committed 小于插入的 Entry index
42    if after := ents[0].Index - 1; after < l.committed {  
43       l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)  
44    }  
45    
46    // 将 Entry 插入到 raftLog.unstable
47    l.unstable.truncateAndAppend(ents)  
48    return l.lastIndex()  
49}

raft.appendEntry 将 Entry 存入 raftLog.unstable 后调用 raft.send 发送类型为 MsgAppResp 的消息:

 1func (r *raft) send(m pb.Message) {
 2	if m.From == None {  
 3	    m.From = r.id  
 4	}
 5	
 6	if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {  
 7	    ...
 8	}
 9	
10	// m.Type 为 MsgAppResp 匹配当前分支
11	if m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVoteResp {
12	r.msgsAfterAppend = append(r.msgsAfterAppend, m)  
13	...
14}

raft.send 方法处理 MsgAppResp 类型的消息是往 raft.msgsAfterAppend 数据中存入该消息。

存入 raft.msgsAfterAppendnode.run 将执行结果传递到 msgWithResult.result 通道内,该结果会被 node.Propose 消费:

 1func (n *node) run() {
 2	...
 3	select {  
 4	case pm := <-propc:  
 5	    m := pm.m  
 6	    m.From = r.id  
 7	    err := r.Step(m)  
 8	    if pm.result != nil {  
 9	       pm.result <- err  
10	       close(pm.result)  
11	    }
12	    ...
13	}
14	...
15}
16
17func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {  
18    ... 
19    select {  
20    case err := <-pm.result:  // 消费 pm.result 并返回
21       if err != nil {  
22          return err  
23       }  
24    case <-ctx.Done():  
25       return ctx.Err()  
26    case <-n.done:  
27       return ErrStopped  
28    }  
29    return nil  
30}

ready 消息

通过上一节的描述 node.Propose 的交互就完成了。node.Propose 作为算法层的入口将 Propose 请求封装成消息发送给 node.run 处理,node.run 将消息(Entry)添加到 raftLog.unstable 中暂存,然后将请求数据封装为 MsgAppResp 类型的消息添加到 raft.msgsAfterAppend 数组。

本小节继续看这里为什么要添加到 raft.msgsAfterAppend 数组。

进入 node.run

 1// node.run 是一个不退出循环
 2func (n *node) run() {  
 3    var propc chan msgWithResult  
 4    var readyc chan Ready  
 5    var advancec chan struct{}  
 6    var rd Ready  
 7  
 8    r := n.rn.raft  
 9  
10    lead := None  
11  
12    for {  
13	   // 进入 node.rn.HasReady
14       if advancec == nil && n.rn.HasReady() {  
15          rd = n.rn.readyWithoutAccept()  
16          readyc = n.readyc  
17       }
18       ...
19    }
20    ...
21}
22
23func (rn *RawNode) HasReady() bool {  
24    ...
25    // 通过 raft.msgAfterAppend 判断是否 ready
26    if len(r.msgs) > 0 || len(r.msgsAfterAppend) > 0 {  
27       return true  
28    }  
29    ... 
30    return false  
31}

可以看到,RawNode.HasReady 通过 raft.msgAfterAppend 判断是否 ready。并且,这里的 advancec 通道并未创建,为 nil。此时,程序会走到 node.rn.readyWithoutAccept

 1func (rn *RawNode) readyWithoutAccept() Ready {  
 2    r := rn.raft  
 3  
 4    rd := Ready{  
 5	   // 获取 raft.raftLog.unstable 中的 Entry
 6       Entries:          r.raftLog.nextUnstableEnts(),  
 7       CommittedEntries: r.raftLog.nextCommittedEnts(rn.applyUnstableEntries()),  
 8       Messages:         r.msgs,  
 9    }  
10    
11    ... 
12    return rd  
13}

node.rn.readyWithoutAccept 主要是构造 Ready 对象,接着在 node.run 中将该对象发到 readyc 通道:

 1func (n *node) run() {  
 2    ...
 3    
 4    for {
 5	    select {
 6	    case readyc <- rd:  // 将 ready 发送到 readyc 通道
 7	    n.rn.acceptReady(rd)  
 8	    if !n.rn.asyncStorageWrites {  
 9	       advancec = n.advancec  // 这里 node.rn.asyncStorageWrites 为 false,会走到这里
10	    } else {  
11	       rd = Ready{}  
12	    }  
13	    readyc = nil        // 将 readyc 通道关闭,只读不写

在发送 readyreadycnode.run 会继续往下处理。此时,readyc 通道另一端也在消费 ready。

我们先往下走,看发完 readynode.run 做了什么?

进入 n.rn.acceptReady:

 1func (rn *RawNode) acceptReady(rd Ready) {
 2	...
 3	// 这里异步写为 false
 4	if !rn.asyncStorageWrites {
 5		... 
 6		// needStorageAppendRespMsg 判断 raftLog.unstable 是否有 Entry
 7		// 如果有调用 newStorageAppendRespMsg 组消息
 8	    if needStorageAppendRespMsg(rn.raft, rd) {  
 9	       m := newStorageAppendRespMsg(rn.raft, rd)  
10	       rn.stepsOnAdvance = append(rn.stepsOnAdvance, m)  
11	    }  
12	    
13	    // needStorageApplyRespMsg 判断 ready.CommittedEntries 是否有 Entry
14	    // 这里在这个阶段无 CommittedEntries,先跳过
15	    if needStorageApplyRespMsg(rd) {  
16	       m := newStorageApplyRespMsg(rn.raft, rd.CommittedEntries)  
17	       rn.stepsOnAdvance = append(rn.stepsOnAdvance, m)  
18	    }  
19	}
20	...
21}
22
23// newStorageAppendRespMsg 组类型为 MsgStorageAppendResp 的消息
24func newStorageAppendRespMsg(r *raft, rd Ready) pb.Message {  
25    m := pb.Message{  
26       Type: pb.MsgStorageAppendResp,  
27       To:   r.id,  
28       From: LocalAppendThread,   
29       Term: r.Term,  
30    }
31    // 判断 raftLog.unstable 是否有 Entry
32	if r.raftLog.hasNextOrInProgressUnstableEnts() { 
33       last := r.raftLog.lastEntryID()  
34       m.Index = last.index        
35       m.LogTerm = last.term       
36    }  
37    ... 
38    return m  
39}

基本上发完 ready 后,node.run 会组消息类型为 MsgStorageAppendResp 的消息,并添加到 rn.stepsOnAdvance 数组。

这里留了两个问题:

  1. 前面发完 ready 后,哪个组件消费 ready?
  2. MsgStorageAppendResp 消息发往数组后,在哪里处理的呢?

首先看第一个问题,发完 ready 后哪个组件消费 ready?

应用层处理 ready

应用层的 raftNode.serveChannels 会监听 readyc 通道,消费 ready

 1func (rc *raftNode) serveChannels() {
 2	...
 3    // event loop on raft state machine updates  
 4    for {  
 5       select {  
 6       // store raft entries to wal, then publish over commit channel  
 7       case rd := <-rc.node.Ready():  
 8	      // 首先 raftNode 调用预写日志 WAL 将 rd 的状态和数据写入到磁盘日志
 9	      // 这里写入不成功咋办?好像没有写入不成功的处理?
10          rc.wal.Save(rd.HardState, rd.Entries)  
11          
12          // 将 ready.Entries 添加到 raftNode.raftStorage
13          rc.raftStorage.Append(rd.Entries)  
14           
15          // 调用 raftNode.node.Advance 方法往 advancec 通道发消息
16          rc.node.Advance()  
17       }  
18    }  
19}

raftNode 处理 ready 后,此时的状态是预写日志存储 Entries,raftStorage 添加 Entries。最后往 advancec 通道写数据通知算法层。

可以想到算法层的 node.run 会监听 advancec 通道并处理,我们直接进入此处逻辑:

 1func (n *node) run() {
 2	...
 3	for {
 4		select {
 5		case <-advancec:  
 6			// 重点看 node.rn.Advance
 7		    n.rn.Advance(rd)  
 8		    rd = Ready{}  
 9		    advancec = nil
10		}
11	}
12}
13
14func (rn *RawNode) Advance(_ Ready) {  
15    // 在这里对数组做处理,从前面可知此时数组存的是 MsgStorageAppendResp 类型的消息
16    for i, m := range rn.stepsOnAdvance {  
17	   // 调用 raft.Step 状态机处理消息
18       _ = rn.raft.Step(m)  
19       rn.stepsOnAdvance[i] = pb.Message{}  
20    }  
21    rn.stepsOnAdvance = rn.stepsOnAdvance[:0]  
22}
23
24func (r *raft) Step(m pb.Message) error {
25	...
26	switch m.Type {
27	case pb.MsgStorageAppendResp:  
28	    if m.Index != 0 {  
29		   // 进入 raft.raftLog.stableTo 将 raftLog.unstable 的 Entry 清掉
30		   // 该 Entry 已经存储到 WAL 和 raftStorage 中了
31	       r.raftLog.stableTo(entryID{term: m.LogTerm, index: m.Index})  
32	    }  
33	...
34	}
35	return nil
36}

raftNode 处理完 ready 消息后,会往 advancec 发消息,通知算法层 ready 消息处理完毕。算法层根据消息类型做不同处理,这里将 raftLog.unstable 中的 Entry 清理掉(这个 Entry 已经存在 raftStorageWAL 中了)。

到这里还没有结束。接下来继续进入 node.run 循环,看算法层和应用层是如何进行第二轮交互的。

算法层和应用层交互

 1func (n *node) run() {
 2	...
 3	for {  
 4		// 在第一轮算法层-应用层交互后,算法层将 advancec 设为 nil
 5		// 进入 node.rn.HasReady 判断是否可以组 ready
 6	    if advancec == nil && n.rn.HasReady() {
 7		    // 和前面第一轮交互类似,这里是组 ready 消息,后续将 ready 发往 readyc 通道
 8	        rd = n.rn.readyWithoutAccept()  
 9		    readyc = n.readyc  
10		}
11}
12
13func (rn *RawNode) HasReady() bool {
14	// 这个条件分支将被选中此时的 hardState 和 rn.prevHardState 不一致
15	// hardState 的 Commit 大于 rn.prevHardState,意味着需要 Commit Entry
16	if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {  
17	    return true  
18	}
19}
20
21func (rn *RawNode) readyWithoutAccept() Ready {  
22    r := rn.raft  
23  
24	// 不同于第一轮交互,这里的 CommittedEntries 是要 Commit 给客户端的存储的 Entry 
25    rd := Ready{  
26       Entries:          r.raftLog.nextUnstableEnts(),  
27       CommittedEntries: r.raftLog.nextCommittedEnts(rn.applyUnstableEntries()),  
28       Messages:         r.msgs,  
29    }
30    ...
31}

类似于第一轮算法层-应用层交互,这里算法层组 ready 发往应用层。不同的是这里的 EntryCommittedEntry

算法层发完 ready 后会继续进入 node.run 循环:

 1func (n *node) run() {
 2	for {
 3		select {
 4		case readyc <- rd:
 5			n.rn.acceptReady(rd)  
 6			if !n.rn.asyncStorageWrites {  
 7			    advancec = n.advancec  
 8			} else {  
 9			    rd = Ready{}  
10			}  
11			readyc = nil
12		}
13	}
14}
15	
16func (rn *RawNode) acceptReady(rd Ready) {
17	if !rn.asyncStorageWrites {
18		// 类似于第一轮算法层-应用层交互,这里组消息类型为 MsgStorageApplyResp 的消息加到 rn.stepsOnAdvance,后续算法层收到 advance 信号后会处理
19		if needStorageApplyRespMsg(rd) {  
20	    m := newStorageApplyRespMsg(rn.raft, rd.CommittedEntries)  
21	    rn.stepsOnAdvance = append(rn.stepsOnAdvance, m)  
22	    }
23	}
24	...
25}

继续看应用层是如何处理 ready 的:

 1func (rc *raftNode) serveChannels() {
 2	for {  
 3	    select {
 4	    case rd := <-rc.node.Ready():
 5	    // 重点在 raftNode.publishEntries
 6	    applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))  
 7		if !ok {  
 8		    rc.stop()  
 9		    return  
10		} 
11		rc.node.Advance()
12	} 
13}
14
15func (rc *raftNode) publishEntries(ents []raftpb.Entry) (<-chan struct{}, bool) {
16	data := make([]string, 0, len(ents))  
17	for i := range ents {  
18	    switch ents[i].Type {  
19	    case raftpb.EntryNormal:  
20		   // 读取 Commit Entry 中的数据存入 data 
21	       if len(ents[i].Data) == 0 {  
22	          // ignore empty messages  
23	          break  
24	       }  
25	       s := string(ents[i].Data)  
26	       data = append(data, s) 
27		}
28	}
29	
30	var applyDoneC chan struct{}  
31	  
32	if len(data) > 0 {  
33	    applyDoneC = make(chan struct{}, 1)  
34	    select {  
35	    // 将数据和 applyDoneC 通道发给 raftNode.commitC
36	    case rc.commitC <- &commit{data, applyDoneC}:  
37	    case <-rc.stopc:  
38	       return nil, false  
39	    }  
40	}  
41	  
42	// commit 后更新 raftNode.appliedIndex 
43	rc.appliedIndex = ents[len(ents)-1].Index  
44	  
45	return applyDoneC, true	
46}          

应用层收到 ready 后,组 commit 发往 raftNode.commitC 通道。通道的另一头是客户端在消费:

 1func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) {  
 2    for commit := range commitC {  
 3       if commit == nil {  
 4          ... 
 5          continue  
 6       }  
 7  
 8       for _, data := range commit.data {  
 9          var dataKv kv  
10          dec := gob.NewDecoder(bytes.NewBufferString(data))  
11          if err := dec.Decode(&dataKv); err != nil {  
12             log.Fatalf("raftexample: could not decode message (%v)", err)  
13          }  
14          // 将数据(键值对)存储到 kvstore.kvStore 中
15          s.mu.Lock()  
16          s.kvStore[dataKv.Key] = dataKv.Val  
17          s.mu.Unlock()  
18       }  
19       // 写入成功后,关闭 commit.applyDoneC 通知应用层
20       close(commit.applyDoneC)  
21    }  
22    if err, ok := <-errorC; ok {  
23       log.Fatal(err)  
24    }  
25}

客户端会监听 raftNode.commitC 通道,并将通道内的数据编码存储到 kvStore 中。

至此,写入的流程基本结束了。后续就是应用层和算法层的收尾工作。应用层发往 commit 后,发信号到 advancec,通知算法层 commit 已完成,算法层在做相应处理,这里和第一轮交互有点重复,就不赘述了。

小结

可以看到 raft 的写流程是比较复杂的,涉及客户端/应用层/算法层和多个通道的交互,这里只介绍了个大概,部分比较重要的内容如多节点通信等并未介绍,后续会着重开始走读这方面的内容以及学习 raft 的论文加深理解。

这里给出交互流程图如下:

raft 写流程图

参考资料