概述
raft-example 提供了一个简化版的 KV 存储,本文围绕 raft-example 对读请求进行源码走读。
源码版本为 etcd release-3.6。
raftexample
程序结构
raftexample 程序结构如下所示:
1➜ raftexample git:(release-3.6) ✗ tree
2.
3├── Procfile
4├── README.md
5├── doc.go
6├── httpapi.go
7├── kvstore.go
8├── kvstore_test.go
9├── listener.go
10├── main.go
11├── raft.go
12├── raft_test.go
13└── raftexample_test.go
etcd raft 作为 raft 库只实现 raft 算法层的内容,对于节点通信,键值存储等都不涉及,需要用户自己提供。本文只介绍 raft 算法和存储相关内容,对节点通信等不做介绍。
启动 raftexample
进入 main.go 查看 raftexample 的启动流程:
1func main() {
2 // 初始化启动参数
3 cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
4 id := flag.Int("id", 1, "node ID")
5 kvport := flag.Int("port", 9121, "key-value server port")
6 join := flag.Bool("join", false, "join an existing cluster")
7 flag.Parse()
8
9 // 初始化 proposeC 和 confChangeC 通道
10 proposeC := make(chan string)
11 defer close(proposeC)
12 confChangeC := make(chan raftpb.ConfChange)
13 defer close(confChangeC)
14
15 // 初始化 kvstore,kvstore 负责存储 KV
16 var kvs *kvstore
17 getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
18 // 创建 raftNode
19 commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
20
21 // 创建 KVStore
22 kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)
23
24 // 作为客户端监听读写请求
25 serveHTTPKVAPI(kvs, *kvport, confChangeC, errorC)
26}
启动流程中,重点在于 proposeC,confChangeC 和 commitC 通道。在 newRaftNode 函数创建 raft 应用层节点。外部(客户端层)通过 proposeC 和 confChangeC 发送消息给 raft 应用层,客户端通过 commitC 接收消息。
在 newKVStore 函数中创建 KV 存储:
1type kvstore struct {
2 proposeC chan<- string // channel for proposing updates
3 mu sync.RWMutex
4 kvStore map[string]string // current committed key-value pairs
5 snapshotter *snap.Snapshotter
6}
raftexample 使用 map 存储键值对。
serveHTTPKVAPI 会创建 http handler 用来处理读写等请求。
httpKVAPI.ServeHTTP 中有多个请求处理分支,这里仅以 http.MethodGet 方法为例分析读请求是如何处理的:
1func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
2 key := r.RequestURI
3 defer r.Body.Close()
4 switch r.Method {
5 case http.MethodGet:
6 if v, ok := h.store.Lookup(key); ok {
7 w.Write([]byte(v))
8 } else {
9 http.Error(w, "Failed to GET", http.StatusNotFound)
10 }
11 default:
12 w.Header().Set("Allow", http.MethodPut)
13 w.Header().Add("Allow", http.MethodGet)
14 w.Header().Add("Allow", http.MethodPost)
15 w.Header().Add("Allow", http.MethodDelete)
16 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
17 }
18}
读请求处理流程
进入 h.store.Lookup查看读请求是如何处理的:
1func (s *kvstore) Lookup(key string) (string, bool) {
2 s.mu.RLock()
3 defer s.mu.RUnlock()
4 v, ok := s.kvStore[key]
5 return v, ok
6}
非常简单,读数据直接从 kvStore 中取数据即可。