概述
在上一篇文章中介绍了 raft 读请求源码走读,本文继续结合 etcd-raft 源码介绍 raft 写请求流程。
需要说明的是,本文使用的是单节点集群环境,后续会补充多节点集群环境加以介绍。
写请求流程
客户端写入请求
客户端通过 PUT 请求写入键值对请求:
curl -L http://127.0.0.1:12380/my-key -XPUT -d hello
该请求被 httpKVAPI 处理:
1func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
2 key := r.RequestURI
3 defer r.Body.Close()
4 switch r.Method {
5 case http.MethodPut: // 匹配 PUT 方法,处理写请求
6 v, err := io.ReadAll(r.Body)
7 if err != nil {
8 log.Printf("Failed to read on PUT (%v)\n", err)
9 http.Error(w, "Failed on PUT", http.StatusBadRequest)
10 return
11 }
12
13 h.store.Propose(key, string(v)) // 调用 h.store.Propose 提交该请求
14 w.WriteHeader(http.StatusNoContent)
15 }
16 ...
17}
18
19func (s *kvstore) Propose(k string, v string) {
20 var buf strings.Builder
21 if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
22 log.Fatal(err)
23 }
24 s.proposeC <- buf.String()
25}
httpKVAPI.ServeHTTP 的重点在于 httpKVAPI.store.Propose 方法。该方法的作用是将写请求键值对编码后写入 kvstore.proposeC 通道。
那么谁在消费 proposeC 呢?
应用层消费 proposeC
应用层 raftNode 消费客户端写入 proposeC 的写请求。
首先查看 raftNode 的启动流程及整体结构。
raftNode 启动流程
在 main.go 中 newRaftNode 会创建应用层 RaftNode 节点:
1func main() {
2 ...
3 proposeC := make(chan string)
4 defer close(proposeC)
5 confChangeC := make(chan raftpb.ConfChange)
6 defer close(confChangeC)
7
8 var kvs *kvstore
9 getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
10
11 // 创建 RaftNode 节点,并且将客户端写入的 proposeC 作为参数传递给 newRaftNode
12 commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
13 ...
14}
15
16func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
17 confChangeC <-chan raftpb.ConfChange,
18) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) {
19 commitC := make(chan *commit)
20 errorC := make(chan error)
21
22 rc := &raftNode{
23 proposeC: proposeC, // proposeC 传入 raftNode
24 ...
25 }
26 go rc.startRaft()
27 return commitC, errorC, rc.snapshotterReady
28}
创建 raftNode 后,调用 raftNode.startRaft 方法启动一个协程异步运行 startRaft 方法:
1func (rc *raftNode) startRaft() {
2 ...
3 go rc.serveRaft() // 启动协程监听 raft api
4 go rc.serveChannels() // 启动协程监听 raftNode 的通道
5}
重点介绍 raft.serveChannels 方法:
1func (rc *raftNode) serveChannels() {
2 ...
3
4 // send proposals over raft
5 go func() {
6 confChangeCount := uint64(0)
7
8 for rc.proposeC != nil && rc.confChangeC != nil {
9 select {
10 case prop, ok := <-rc.proposeC: // 在这里消费 proposeC 的写请求键值对
11 if !ok {
12 rc.proposeC = nil
13 } else {
14 // blocks until accepted by raft state machine
15 rc.node.Propose(context.TODO(), []byte(prop))
16 }
17 }
18 }
19 // client closed channel; shutdown raft if not already
20 close(rc.stopc)
21 }()
22
23 // for-select 事件循环
24 for {
25 select {
26 case <-ticker.C:
27 rc.node.Tick()
28
29 // store raft entries to wal, then publish over commit channel
30 case rd := <-rc.node.Ready():
31 ...
32 }
33 }
34}
在 serveChannels 中应用层 raftNode 从 proposeC 中消费写请求。
如果 proposeC 关闭将 proposeC 设为 nil 不允许写入。
如果正常读取写请求,则进入 raftNode.node.Propose 处理。
raftNode 组合 node 对象,node 是算法层的入口,通过提供的一系列方法暴露给应用层调用。Propose 即为暴露的其中一个方法。
算法层处理写请求
算法层初始化流程
为了介绍逻辑的完整性,这里有必要在进入 raftNode.startRaft 方法查看算法层 node 是如何创建的:
1func (rc *raftNode) startRaft() {
2 ...
3
4 c := &raft.Config{
5 ID: uint64(rc.id),
6 ElectionTick: 10,
7 HeartbeatTick: 1,
8 Storage: rc.raftStorage,
9 MaxSizePerMsg: 1024 * 1024,
10 MaxInflightMsgs: 256,
11 MaxUncommittedEntriesSize: 1 << 30,
12 }
13
14 if oldwal || rc.join {
15 rc.node = raft.RestartNode(c)
16 } else {
17 // 首次创建 raftNode.node
18 rc.node = raft.StartNode(c, rpeers)
19 }
20 ...
21}
22
23func StartNode(c *Config, peers []Peer) Node {
24 n := setupNode(c, peers)
25 go n.run()
26 return n
27}
在 raft.StartNode(或 raft.RestartNode) 创建 raftNode.node 对象。实际是异步启了一个协程运行 node.run 方法。
node.Propose 处理写请求提交
上一节创建 node后,应用层调用 raftNode.node.Propose 方法提交写请求到算法层:
1func (n *node) Propose(ctx context.Context, data []byte) error {
2 // 将写请求数据封装为 Entry,将 Entry 封装为 Message
3 return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
4}
5
6func (n *node) stepWait(ctx context.Context, m pb.Message) error {
7 // 等待提交写请求处理完成
8 return n.stepWithWaitOption(ctx, m, true)
9}
10
11func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
12 ...
13 ch := n.propc
14 pm := msgWithResult{m: m} // 将 Message 封装为 msgWithResult
15 if wait {
16 pm.result = make(chan error, 1)
17 }
18 select {
19 case ch <- pm: // 将 msgWithResult 写入 node.propc 通道
20 if !wait {
21 return nil
22 }
23 case <-ctx.Done():
24 return ctx.Err()
25 case <-n.done:
26 return ErrStopped
27 }
28 select {
29 case err := <-pm.result: // 等待 msgWithResult.result 返回结果
30 if err != nil {
31 return err
32 }
33 ...
34 }
35 return nil
36}
算法层 node.Propose 将数据封装为 msgWithResult 发给 node.propc 通道,并且监听 msgWithResult.result 通道等待返回结果。
那么 msgWithResult 的数据是在哪里消费的呢?
消费 msgWithResult
在 算法层初始化流程 介绍了创建 node 对象后会异步启动 node.run 协程,这里继续看协程内部在做什么:
1func (n *node) run() {
2 var propc chan msgWithResult
3 var readyc chan Ready
4 var advancec chan struct{}
5 var rd Ready
6
7 r := n.rn.raft
8 lead := None
9
10 for {
11 ...
12 if lead != r.lead {
13 if r.hasLeader() {
14 if lead == None {
15 r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
16 } else {
17 r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
18 }
19 // 如果集群有 leader 则将 node.propc 赋给 propc
20 // 意味着只在有 leader 的情况下处理写请求提交
21 propc = n.propc
22 } else {
23 r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
24 // 如果集群无 leader 将 propc 设为 nil
25 // 意味着无 leader 时不会处理写请求提交
26 propc = nil
27 }
28 lead = r.lead
29 }
30
31 select {
32 case pm := <-propc: // 从 node.propc 中读取 msgWithResult 对象
33 m := pm.m // 获取消息
34 m.From = r.id // 消息是在同一个 raft 节点处理的,将 m.From 设为当前 raft 节点的 id
35 err := r.Step(m) // 调用 raft.Step 处理消息
36 if pm.result != nil {
37 pm.result <- err
38 close(pm.result)
39 }
40 ...
41 }
42 }
43}
node.run 消费 node.propc,并且调用 raft.Step 处理消费的消息:
1func (r *raft) Step(m pb.Message) error {
2 switch {
3 case m.Term == 0: // 消息的 Term 为 0,表示该消息为本地消息
4 // local message
5 case m.Term > r.Term:
6 ...
7 case m.Term < r.Term:
8 ...
9 }
10
11 switch m.Type {
12 case pb.MsgHup:
13 ...
14 case pb.MsgStorageAppendResp:
15 ...
16 case pb.MsgStorageApplyResp:
17 ...
18 case pb.MsgVote, pb.MsgPreVote:
19 ...
20 default:
21 err := r.step(r, m) // 这里的消息类型为 MsgProp 会进到默认分支
22 if err != nil {
23 return err
24 }
25 }
26 return nil
27}
消息的类型为 MsgProp 会进入到 raft.step 方法处理。raft.step 是一个指针函数指向不同角色的 step 处理函数。
根据不同角色有如下几种处理函数。
candidate
候选者 candidate 处理 MsgProp 消息:
1func stepCandidate(r *raft, m pb.Message) error {
2 switch m.Type {
3 case pb.MsgProp:
4 r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
5 return ErrProposalDropped
6 ...
7}
候选者并不能直接处理 MsgProp 写请求的提交消息,如果收到将直接丢弃。
follower
1func stepFollower(r *raft, m pb.Message) error {
2 switch m.Type {
3 case pb.MsgProp:
4 if r.lead == None {
5 r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
6 return ErrProposalDropped
7 } else if r.disableProposalForwarding {
8 r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
9 return ErrProposalDropped
10 }
11 m.To = r.lead
12 r.send(m)
13 case pb.MsgApp:
14 ...
15 case pb.MsgHeartbeat:
16 ...
17 case pb.MsgSnap:
18 ...
19 case pb.MsgTransferLeader:
20 ...
21 case pb.MsgTimeoutNow:
22 ...
23 }
24 return nil
25}
跟随者 follower 收到 MsgProp 写请求提交消息,会判断当前集群有无 leader。如果有,则调用 raft.send 转发该消息给 leader 处理。如果没有,则丢弃该消息。
leader
1func stepLeader(r *raft, m pb.Message) error {
2 switch m.Type {
3 case pb.MsgBeat:
4 ....
5 case pb.MsgCheckQuorum:
6 ...
7 case pb.MsgProp:
8 if len(m.Entries) == 0 {
9 r.logger.Panicf("%x stepped empty MsgProp", r.id)
10 }
11 if r.trk.Progress[r.id] == nil {
12 return ErrProposalDropped
13 }
14 if r.leadTransferee != None {
15 r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
16 return ErrProposalDropped
17 }
18
19 for i := range m.Entries {
20 e := &m.Entries[i]
21 var cc pb.ConfChangeI
22 // Entry 类型为 EntryNormal,不匹配
23 if e.Type == pb.EntryConfChange {
24 ...
25 } else if e.Type == pb.EntryConfChangeV2 {
26 ...
27 }
28 if cc != nil {
29 ...
30 }
31 }
32
33 // 调用 raft.appendEntry 处理 msgWithResult 的Entry
34 if !r.appendEntry(m.Entries...) {
35 return ErrProposalDropped
36 }
37
38 // 广播给 follower 节点处理
39 r.bcastAppend()
40 return nil
stepLeader 匹配 MsgProp 消息,并且根据消息中的 Entry.Type 执行不同的处理逻辑。对于 EntryNormal 的 Entry 将调用 raft.appendEntry 处理。
1func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
2 // 调用 raft.raftLog.lastIndex 获取最近的 index
3 li := r.raftLog.lastIndex()
4
5 // 更新 Entry 的 Term 和 Index
6 for i := range es {
7 es[i].Term = r.Term
8 es[i].Index = li + 1 + uint64(i)
9 }
10
11 // 将 Entry 添加到 raft.raftLog
12 // 这里两阶段提交的第一阶段添加的是 raft.raftLog.unstable
13 li = r.raftLog.append(es...)
14
15 // 获取最新的 last index 并组成 MsgAppResp 的消息发送
16 r.send(pb.Message{To: r.id, Type: pb.MsgAppResp, Index: li})
17 return true
18}
19
20func (l *raftLog) lastIndex() uint64 {
21 // 先从 unstable 获取最近的 index
22 if i, ok := l.unstable.maybeLastIndex(); ok {
23 return i
24 }
25
26 // 如果 unstable 获取不到最近的 index 就从 raftLog.storage 获取最近的 index
27 i, err := l.storage.LastIndex()
28 if err != nil {
29 panic(err) // TODO(bdarnell)
30 }
31 return i
32}
33
34func (l *raftLog) append(ents ...pb.Entry) uint64 {
35 // 如果 Entry 无数据,则返回
36 if len(ents) == 0 {
37 return l.lastIndex()
38 }
39
40 // 如果最新的 Entry index 小于 raftLog.committed 触发 panic
41 // 正常情况应该是 raftLog.committed 小于插入的 Entry index
42 if after := ents[0].Index - 1; after < l.committed {
43 l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
44 }
45
46 // 将 Entry 插入到 raftLog.unstable
47 l.unstable.truncateAndAppend(ents)
48 return l.lastIndex()
49}
在 raft.appendEntry 将 Entry 存入 raftLog.unstable 后调用 raft.send 发送类型为 MsgAppResp 的消息:
1func (r *raft) send(m pb.Message) {
2 if m.From == None {
3 m.From = r.id
4 }
5
6 if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
7 ...
8 }
9
10 // m.Type 为 MsgAppResp 匹配当前分支
11 if m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVoteResp {
12 r.msgsAfterAppend = append(r.msgsAfterAppend, m)
13 ...
14}
raft.send 方法处理 MsgAppResp 类型的消息是往 raft.msgsAfterAppend 数据中存入该消息。
存入 raft.msgsAfterAppend 后 node.run 将执行结果传递到 msgWithResult.result 通道内,该结果会被 node.Propose 消费:
1func (n *node) run() {
2 ...
3 select {
4 case pm := <-propc:
5 m := pm.m
6 m.From = r.id
7 err := r.Step(m)
8 if pm.result != nil {
9 pm.result <- err
10 close(pm.result)
11 }
12 ...
13 }
14 ...
15}
16
17func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
18 ...
19 select {
20 case err := <-pm.result: // 消费 pm.result 并返回
21 if err != nil {
22 return err
23 }
24 case <-ctx.Done():
25 return ctx.Err()
26 case <-n.done:
27 return ErrStopped
28 }
29 return nil
30}
ready 消息
通过上一节的描述 node.Propose 的交互就完成了。node.Propose 作为算法层的入口将 Propose 请求封装成消息发送给 node.run 处理,node.run 将消息(Entry)添加到 raftLog.unstable 中暂存,然后将请求数据封装为 MsgAppResp 类型的消息添加到 raft.msgsAfterAppend 数组。
本小节继续看这里为什么要添加到 raft.msgsAfterAppend 数组。
进入 node.run :
1// node.run 是一个不退出循环
2func (n *node) run() {
3 var propc chan msgWithResult
4 var readyc chan Ready
5 var advancec chan struct{}
6 var rd Ready
7
8 r := n.rn.raft
9
10 lead := None
11
12 for {
13 // 进入 node.rn.HasReady
14 if advancec == nil && n.rn.HasReady() {
15 rd = n.rn.readyWithoutAccept()
16 readyc = n.readyc
17 }
18 ...
19 }
20 ...
21}
22
23func (rn *RawNode) HasReady() bool {
24 ...
25 // 通过 raft.msgAfterAppend 判断是否 ready
26 if len(r.msgs) > 0 || len(r.msgsAfterAppend) > 0 {
27 return true
28 }
29 ...
30 return false
31}
可以看到,RawNode.HasReady 通过 raft.msgAfterAppend 判断是否 ready。并且,这里的 advancec 通道并未创建,为 nil。此时,程序会走到 node.rn.readyWithoutAccept:
1func (rn *RawNode) readyWithoutAccept() Ready {
2 r := rn.raft
3
4 rd := Ready{
5 // 获取 raft.raftLog.unstable 中的 Entry
6 Entries: r.raftLog.nextUnstableEnts(),
7 CommittedEntries: r.raftLog.nextCommittedEnts(rn.applyUnstableEntries()),
8 Messages: r.msgs,
9 }
10
11 ...
12 return rd
13}
node.rn.readyWithoutAccept 主要是构造 Ready 对象,接着在 node.run 中将该对象发到 readyc 通道:
1func (n *node) run() {
2 ...
3
4 for {
5 select {
6 case readyc <- rd: // 将 ready 发送到 readyc 通道
7 n.rn.acceptReady(rd)
8 if !n.rn.asyncStorageWrites {
9 advancec = n.advancec // 这里 node.rn.asyncStorageWrites 为 false,会走到这里
10 } else {
11 rd = Ready{}
12 }
13 readyc = nil // 将 readyc 通道关闭,只读不写
在发送 ready 到 readyc 后 node.run 会继续往下处理。此时,readyc 通道另一端也在消费 ready。
我们先往下走,看发完 ready 后 node.run 做了什么?
进入 n.rn.acceptReady:
1func (rn *RawNode) acceptReady(rd Ready) {
2 ...
3 // 这里异步写为 false
4 if !rn.asyncStorageWrites {
5 ...
6 // needStorageAppendRespMsg 判断 raftLog.unstable 是否有 Entry
7 // 如果有调用 newStorageAppendRespMsg 组消息
8 if needStorageAppendRespMsg(rn.raft, rd) {
9 m := newStorageAppendRespMsg(rn.raft, rd)
10 rn.stepsOnAdvance = append(rn.stepsOnAdvance, m)
11 }
12
13 // needStorageApplyRespMsg 判断 ready.CommittedEntries 是否有 Entry
14 // 这里在这个阶段无 CommittedEntries,先跳过
15 if needStorageApplyRespMsg(rd) {
16 m := newStorageApplyRespMsg(rn.raft, rd.CommittedEntries)
17 rn.stepsOnAdvance = append(rn.stepsOnAdvance, m)
18 }
19 }
20 ...
21}
22
23// newStorageAppendRespMsg 组类型为 MsgStorageAppendResp 的消息
24func newStorageAppendRespMsg(r *raft, rd Ready) pb.Message {
25 m := pb.Message{
26 Type: pb.MsgStorageAppendResp,
27 To: r.id,
28 From: LocalAppendThread,
29 Term: r.Term,
30 }
31 // 判断 raftLog.unstable 是否有 Entry
32 if r.raftLog.hasNextOrInProgressUnstableEnts() {
33 last := r.raftLog.lastEntryID()
34 m.Index = last.index
35 m.LogTerm = last.term
36 }
37 ...
38 return m
39}
基本上发完 ready 后,node.run 会组消息类型为 MsgStorageAppendResp 的消息,并添加到 rn.stepsOnAdvance 数组。
这里留了两个问题:
- 前面发完 ready 后,哪个组件消费 ready?
MsgStorageAppendResp消息发往数组后,在哪里处理的呢?
首先看第一个问题,发完 ready 后哪个组件消费 ready?
应用层处理 ready
应用层的 raftNode.serveChannels 会监听 readyc 通道,消费 ready:
1func (rc *raftNode) serveChannels() {
2 ...
3 // event loop on raft state machine updates
4 for {
5 select {
6 // store raft entries to wal, then publish over commit channel
7 case rd := <-rc.node.Ready():
8 // 首先 raftNode 调用预写日志 WAL 将 rd 的状态和数据写入到磁盘日志
9 // 这里写入不成功咋办?好像没有写入不成功的处理?
10 rc.wal.Save(rd.HardState, rd.Entries)
11
12 // 将 ready.Entries 添加到 raftNode.raftStorage
13 rc.raftStorage.Append(rd.Entries)
14
15 // 调用 raftNode.node.Advance 方法往 advancec 通道发消息
16 rc.node.Advance()
17 }
18 }
19}
raftNode 处理 ready 后,此时的状态是预写日志存储 Entries,raftStorage 添加 Entries。最后往 advancec 通道写数据通知算法层。
可以想到算法层的 node.run 会监听 advancec 通道并处理,我们直接进入此处逻辑:
1func (n *node) run() {
2 ...
3 for {
4 select {
5 case <-advancec:
6 // 重点看 node.rn.Advance
7 n.rn.Advance(rd)
8 rd = Ready{}
9 advancec = nil
10 }
11 }
12}
13
14func (rn *RawNode) Advance(_ Ready) {
15 // 在这里对数组做处理,从前面可知此时数组存的是 MsgStorageAppendResp 类型的消息
16 for i, m := range rn.stepsOnAdvance {
17 // 调用 raft.Step 状态机处理消息
18 _ = rn.raft.Step(m)
19 rn.stepsOnAdvance[i] = pb.Message{}
20 }
21 rn.stepsOnAdvance = rn.stepsOnAdvance[:0]
22}
23
24func (r *raft) Step(m pb.Message) error {
25 ...
26 switch m.Type {
27 case pb.MsgStorageAppendResp:
28 if m.Index != 0 {
29 // 进入 raft.raftLog.stableTo 将 raftLog.unstable 的 Entry 清掉
30 // 该 Entry 已经存储到 WAL 和 raftStorage 中了
31 r.raftLog.stableTo(entryID{term: m.LogTerm, index: m.Index})
32 }
33 ...
34 }
35 return nil
36}
raftNode 处理完 ready 消息后,会往 advancec 发消息,通知算法层 ready 消息处理完毕。算法层根据消息类型做不同处理,这里将 raftLog.unstable 中的 Entry 清理掉(这个 Entry 已经存在 raftStorage 和 WAL 中了)。
到这里还没有结束。接下来继续进入 node.run 循环,看算法层和应用层是如何进行第二轮交互的。
算法层和应用层交互
1func (n *node) run() {
2 ...
3 for {
4 // 在第一轮算法层-应用层交互后,算法层将 advancec 设为 nil
5 // 进入 node.rn.HasReady 判断是否可以组 ready
6 if advancec == nil && n.rn.HasReady() {
7 // 和前面第一轮交互类似,这里是组 ready 消息,后续将 ready 发往 readyc 通道
8 rd = n.rn.readyWithoutAccept()
9 readyc = n.readyc
10 }
11}
12
13func (rn *RawNode) HasReady() bool {
14 // 这个条件分支将被选中此时的 hardState 和 rn.prevHardState 不一致
15 // hardState 的 Commit 大于 rn.prevHardState,意味着需要 Commit Entry
16 if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
17 return true
18 }
19}
20
21func (rn *RawNode) readyWithoutAccept() Ready {
22 r := rn.raft
23
24 // 不同于第一轮交互,这里的 CommittedEntries 是要 Commit 给客户端的存储的 Entry
25 rd := Ready{
26 Entries: r.raftLog.nextUnstableEnts(),
27 CommittedEntries: r.raftLog.nextCommittedEnts(rn.applyUnstableEntries()),
28 Messages: r.msgs,
29 }
30 ...
31}
类似于第一轮算法层-应用层交互,这里算法层组 ready 发往应用层。不同的是这里的 Entry 是 CommittedEntry。
算法层发完 ready 后会继续进入 node.run 循环:
1func (n *node) run() {
2 for {
3 select {
4 case readyc <- rd:
5 n.rn.acceptReady(rd)
6 if !n.rn.asyncStorageWrites {
7 advancec = n.advancec
8 } else {
9 rd = Ready{}
10 }
11 readyc = nil
12 }
13 }
14}
15
16func (rn *RawNode) acceptReady(rd Ready) {
17 if !rn.asyncStorageWrites {
18 // 类似于第一轮算法层-应用层交互,这里组消息类型为 MsgStorageApplyResp 的消息加到 rn.stepsOnAdvance,后续算法层收到 advance 信号后会处理
19 if needStorageApplyRespMsg(rd) {
20 m := newStorageApplyRespMsg(rn.raft, rd.CommittedEntries)
21 rn.stepsOnAdvance = append(rn.stepsOnAdvance, m)
22 }
23 }
24 ...
25}
继续看应用层是如何处理 ready 的:
1func (rc *raftNode) serveChannels() {
2 for {
3 select {
4 case rd := <-rc.node.Ready():
5 // 重点在 raftNode.publishEntries
6 applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
7 if !ok {
8 rc.stop()
9 return
10 }
11 rc.node.Advance()
12 }
13}
14
15func (rc *raftNode) publishEntries(ents []raftpb.Entry) (<-chan struct{}, bool) {
16 data := make([]string, 0, len(ents))
17 for i := range ents {
18 switch ents[i].Type {
19 case raftpb.EntryNormal:
20 // 读取 Commit Entry 中的数据存入 data
21 if len(ents[i].Data) == 0 {
22 // ignore empty messages
23 break
24 }
25 s := string(ents[i].Data)
26 data = append(data, s)
27 }
28 }
29
30 var applyDoneC chan struct{}
31
32 if len(data) > 0 {
33 applyDoneC = make(chan struct{}, 1)
34 select {
35 // 将数据和 applyDoneC 通道发给 raftNode.commitC
36 case rc.commitC <- &commit{data, applyDoneC}:
37 case <-rc.stopc:
38 return nil, false
39 }
40 }
41
42 // commit 后更新 raftNode.appliedIndex
43 rc.appliedIndex = ents[len(ents)-1].Index
44
45 return applyDoneC, true
46}
应用层收到 ready 后,组 commit 发往 raftNode.commitC 通道。通道的另一头是客户端在消费:
1func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) {
2 for commit := range commitC {
3 if commit == nil {
4 ...
5 continue
6 }
7
8 for _, data := range commit.data {
9 var dataKv kv
10 dec := gob.NewDecoder(bytes.NewBufferString(data))
11 if err := dec.Decode(&dataKv); err != nil {
12 log.Fatalf("raftexample: could not decode message (%v)", err)
13 }
14 // 将数据(键值对)存储到 kvstore.kvStore 中
15 s.mu.Lock()
16 s.kvStore[dataKv.Key] = dataKv.Val
17 s.mu.Unlock()
18 }
19 // 写入成功后,关闭 commit.applyDoneC 通知应用层
20 close(commit.applyDoneC)
21 }
22 if err, ok := <-errorC; ok {
23 log.Fatal(err)
24 }
25}
客户端会监听 raftNode.commitC 通道,并将通道内的数据编码存储到 kvStore 中。
至此,写入的流程基本结束了。后续就是应用层和算法层的收尾工作。应用层发往 commit 后,发信号到 advancec,通知算法层 commit 已完成,算法层在做相应处理,这里和第一轮交互有点重复,就不赘述了。
小结
可以看到 raft 的写流程是比较复杂的,涉及客户端/应用层/算法层和多个通道的交互,这里只介绍了个大概,部分比较重要的内容如多节点通信等并未介绍,后续会着重开始走读这方面的内容以及学习 raft 的论文加深理解。
这里给出交互流程图如下:
