概述

raft-example 提供了一个简化版的 KV 存储,本文围绕 raft-example 对读请求进行源码走读。

源码版本为 etcd release-3.6

raftexample

程序结构

raftexample 程序结构如下所示:

 1  raftexample git:(release-3.6)  tree
 2.
 3├── Procfile
 4├── README.md
 5├── doc.go
 6├── httpapi.go
 7├── kvstore.go
 8├── kvstore_test.go
 9├── listener.go
10├── main.go
11├── raft.go
12├── raft_test.go
13└── raftexample_test.go

etcd raft 作为 raft 库只实现 raft 算法层的内容,对于节点通信,键值存储等都不涉及,需要用户自己提供。本文只介绍 raft 算法和存储相关内容,对节点通信等不做介绍。

启动 raftexample

进入 main.go 查看 raftexample 的启动流程:

 1func main() {  
 2	// 初始化启动参数
 3    cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")  
 4    id := flag.Int("id", 1, "node ID")  
 5    kvport := flag.Int("port", 9121, "key-value server port")  
 6    join := flag.Bool("join", false, "join an existing cluster")  
 7    flag.Parse()  
 8  
 9	// 初始化 proposeC 和 confChangeC 通道
10    proposeC := make(chan string)  
11    defer close(proposeC)  
12    confChangeC := make(chan raftpb.ConfChange)  
13    defer close(confChangeC)  
14  
15	// 初始化 kvstore,kvstore 负责存储 KV
16    var kvs *kvstore  
17    getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }  
18    // 创建 raftNode
19    commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)  
20  
21	// 创建 KVStore
22    kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)  
23  
24    // 作为客户端监听读写请求 
25    serveHTTPKVAPI(kvs, *kvport, confChangeC, errorC)  
26}

启动流程中,重点在于 proposeCconfChangeCcommitC 通道。在 newRaftNode 函数创建 raft 应用层节点。外部(客户端层)通过 proposeCconfChangeC 发送消息给 raft 应用层,客户端通过 commitC 接收消息。

newKVStore 函数中创建 KV 存储:

1type kvstore struct {  
2    proposeC    chan<- string // channel for proposing updates  
3    mu          sync.RWMutex  
4    kvStore     map[string]string // current committed key-value pairs  
5    snapshotter *snap.Snapshotter  
6}

raftexample 使用 map 存储键值对。

serveHTTPKVAPI 会创建 http handler 用来处理读写等请求。

httpKVAPI.ServeHTTP 中有多个请求处理分支,这里仅以 http.MethodGet 方法为例分析读请求是如何处理的:

 1func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {  
 2    key := r.RequestURI  
 3    defer r.Body.Close()  
 4    switch r.Method {
 5	case http.MethodGet:  
 6	    if v, ok := h.store.Lookup(key); ok {  
 7	       w.Write([]byte(v))  
 8	    } else {  
 9	       http.Error(w, "Failed to GET", http.StatusNotFound)  
10	    }
11    default:  
12       w.Header().Set("Allow", http.MethodPut)  
13       w.Header().Add("Allow", http.MethodGet)  
14       w.Header().Add("Allow", http.MethodPost)  
15       w.Header().Add("Allow", http.MethodDelete)  
16       http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)  
17    }  
18}

读请求处理流程

进入 h.store.Lookup查看读请求是如何处理的:

1func (s *kvstore) Lookup(key string) (string, bool) {  
2    s.mu.RLock()  
3    defer s.mu.RUnlock()  
4    v, ok := s.kvStore[key]  
5    return v, ok  
6}

非常简单,读数据直接从 kvStore 中取数据即可。