实现项目请阅读raft论文以及相关课程,本文只做简单记录,无章法无逻辑
Project1
实现一个存储引擎的原始kv服务处理 (Put/ Del/ Get/ Scan)
瞅一眼接口
Write
用switch case类型断言选择操作
func (s *StandAloneStorage) Write(ctx *kvrpcpb.Context, batch []storage.Modify) error {
if len(batch) == 0 {
return nil
}
var err error
for _, m := range batch {
key, val, cf := m.Key(), m.Value(), m.Cf()
switch data := m.Data.(type) {
case storage.Put:
err = engine_util.PutCF(s.engine.Kv, cf, key, val)
case storage.Delete:
err = engine_util.DeleteCF(s.engine.Kv, cf, key)
default:
return fmt.Errorf("unsupported modify type: %T", data)
}
if err != nil {
return fmt.Errorf("failed to execute modification on key %s: %w", key, err)
}
}
return nil
}
Reader
要实现三个函数
GetCF(cf string, key []byte) ([]byte, error)
IterCF(cf string) engine_util.DBIterator
Close()
type StandAloneStorageReader struct {
KvTxn *badger.Txn
}
func NewStandAloneStorageReader(txn *badger.Txn) *StandAloneStorageReader {
if txn == nil {
return nil
}
return &StandAloneStorageReader{
KvTxn: txn,
}
}
func (s *StandAloneStorage) Reader(ctx *kvrpcpb.Context) (storage.StorageReader, error) {
txn := s.engine.Kv.NewTransaction(false)
return NewStandAloneStorageReader(txn), nil
}
func (s *StandAloneStorageReader) GetCF(cf string, key []byte) ([]byte, error) {
val, err := engine_util.GetCFFromTxn(s.KvTxn, cf, key)
if err == badger.ErrKeyNotFound {
return nil, nil
}
return val, err
}
func (s *StandAloneStorageReader) IterCF(cf string) engine_util.DBIterator {
return engine_util.NewCFIterator(cf, s.KvTxn)
}
func (s *StandAloneStorageReader) Close() {
s.KvTxn.Discard()
}
之后是四个方法
func (server *Server) RawScan(_ context.Context, req *kvrpcpb.RawScanRequest) (*kvrpcpb.RawScanResponse, error) {
reader, err := server.storage.Reader(req.Context)
if err != nil {
return nil, err
}
Kvs := make([]*kvrpcpb.KvPair, 0, req.Limit)
iter := reader.IterCF(req.Cf)
defer iter.Close()
for iter.Seek(req.StartKey); iter.Valid(); iter.Next() {
if req.Limit == 0 {
break
}
item := iter.Item()
key := item.Key()
val, err := item.Value()
if err != nil {
return nil, err
}
Kvs = append(Kvs, &kvrpcpb.KvPair{
Key: key,
Value: val,
})
req.Limit--
}
resp := &kvrpcpb.RawScanResponse{
Kvs: Kvs,
}
return resp, nil
}
project2
- 实现基本的Raft算法
- 在Raft之上构建容错KVServer
- 添加raftlog GC和快照
在此之前先学习一下raft,重构到隔壁Raft论文阅读去了
// RaftLog manage the log entries, its struct look like:
//
// snapshot/first.....applied....committed....stabled.....last
// --------|------------------------------------------------|
// log entries
//
// for simplify the RaftLog implement should manage all log entries
// that not truncated
type RaftLog struct {
// storage contains all stable entries since the last snapshot.
// 存储包含自上次快照以来的所有稳定条目。
storage Storage
// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
// 已知已提交的最高的日志条目的索引 committedIndex
committed uint64
// applied is the highest log position that the application has
// been instructed to apply to its state machine.
// Invariant: applied <= committed
// 已经被应用到状态机的最高的日志条目的索引 appliedIndex
applied uint64
// log entries with index <= stabled are persisted to storage.
// It is used to record the logs that are not persisted by storage yet.
// Everytime handling `Ready`, the unstabled logs will be included.
// stabled 保存的是已经持久化到 storage 的 index
stabled uint64
// all entries that have not yet compact.
// 尚未压缩的所有条目
entries []pb.Entry
// the incoming unstable snapshot, if any.
// 2C
pendingSnapshot *pb.Snapshot
// Your Data Here (2A).
dummyIndex uint64
}
之后初始化,把日志恢复到刚刚提交并且应用最新快照的状态
func newLog(storage Storage) *RaftLog {
// Your Code Here (2A).
return nil
}
func newLog(storage Storage) *RaftLog {
firstIndex, _ := storage.FirstIndex()
lastIndex, _ := storage.LastIndex()
entries, _ := storage.Entries(firstIndex, lastIndex+1)
hardState, _, _ := storage.InitialState()
rl := &RaftLog{
storage: storage,
committed: hardState.Commit,
applied: firstIndex - 1,
stabled: lastIndex,
entries: entries,
pendingSnapshot: nil,
dummyIndex: firstIndex,
}
return rl
}
之后的几个函数按照意思来写,不是很难
raft驱动 raft的时钟是一个逻辑时钟,上层不断调用Tick()来模拟时间递增
Msg
const (
// 'MessageType_MsgHup' is a local message used for election. If an election timeout happened,
// the node should pass 'MessageType_MsgHup' to its Step method and start a new election.
MessageType_MsgHup MessageType = 0
// 'MessageType_MsgBeat' is a local message that signals the leader to send a heartbeat
// of the 'MessageType_MsgHeartbeat' type to its followers.
MessageType_MsgBeat MessageType = 1
// 'MessageType_MsgPropose' is a local message that proposes to append data to the leader's log entries.
MessageType_MsgPropose MessageType = 2
// 'MessageType_MsgAppend' contains log entries to replicate.
MessageType_MsgAppend MessageType = 3
// 'MessageType_MsgAppendResponse' is response to log replication request('MessageType_MsgAppend').
MessageType_MsgAppendResponse MessageType = 4
// 'MessageType_MsgRequestVote' requests votes for election.
MessageType_MsgRequestVote MessageType = 5
// 'MessageType_MsgRequestVoteResponse' contains responses from voting request.
MessageType_MsgRequestVoteResponse MessageType = 6
// 'MessageType_MsgSnapshot' requests to install a snapshot message.
MessageType_MsgSnapshot MessageType = 7
// 'MessageType_MsgHeartbeat' sends heartbeat from leader to its followers.
MessageType_MsgHeartbeat MessageType = 8
// 'MessageType_MsgHeartbeatResponse' is a response to 'MessageType_MsgHeartbeat'.
MessageType_MsgHeartbeatResponse MessageType = 9
// 'MessageType_MsgTransferLeader' requests the leader to transfer its leadership.
MessageType_MsgTransferLeader MessageType = 11
// 'MessageType_MsgTimeoutNow' send from the leader to the leadership transfer target, to let
// the transfer target timeout immediately and start a new election.
MessageType_MsgTimeoutNow MessageType = 12
)
MsgHup
local, 用于请求节点开始选举,字段:MsgType
MsgBeat
local 告知leader该发送心跳了,字段:MsgType
MsgPropose
local 用于上层propose条目,只有leader能处理 字段:
- MsgType
- Entries: 要propose的条目
- To: 发送的节点
其他角色收到直接返回ErrProposalDropped; leader收到后
- 判断r.leadTransferee是否为None,不是就返回Err;
- 把m.Entries追加到自己的entries;
- 向其他节点发送追加日志RPC,集群同步;
- 如果集群里只有自己,直接更新committedIndex
MsgAppend
日志复制 字段
- MsgType
- To
- From
- Term
- LogTerm: 要发送的条目的前一条的Term
- Index: 要发送的条目的前一个条目的Index
- Entires: 要发送的日志条目
- Commit: 当前节点的committedIndex Leader发送:
- 前置判断: 如果要发送的Index已经被压缩了,转为发送快照,否则往下
- 当Leader收到MsgPropose, 给其他所有节点发送MsgAppend
- 当MsgAppend被接收者拒绝,Leader调整next,重新进行前置判断,如果无需快照, 则按照新的next重新发送MsgAppend
follower和candidate
- 判断Msg的Term是否大于等于自己的Term,若小于直接拒绝
- 判断LogIndex是否大于等于自己的最后一条日志索引,大于说明节点漏消息了
- 判断LogTerm是否和自己最后一条日志任期相等,不相等说明有冲突
- 如果漏消息或者有冲突,要缩小prevLogIndex, 再去匹配前一个log,直到匹配成功,把entry追加或覆盖到本节点(可以找到冲突日志的任期,把返回的index设置成上一个任期的最后一个idx位置)
- 拒绝后Leader收到对应响应,会把Msg的idx取出来,设置成下一次的LogIndex,然后发送日志复制
- 如果没有冲突,说明位置匹配对了,如果follower节点后面有日志,直接截断,追加Msg传来的日志
- 如果进行截断操作,要更新stabled
- 更新committedIndex,
min(leader.commited,m.Index+len(m.entries))
- 同意接受,回复response
MsgAppendResponse
对上个的响应
- MsgType
- Term
- To
- Reject
- From
- Index:
r.RaftLog.LastIndex
用于Leader更快地更新next
不管接受还是拒绝,都要发送
- 只有Leader会处理这个消息
- 如果被拒绝了,查看任期,比自己大可能出现网络分区,自己变成follower
- 否则就是prevlog冲突了,调整完再次发送
- 没有被拒绝就成功了,更新match和next
- 更新commit索引
MsgRequestVote
当节点开始选举并成为 Candidate 时,立刻向其他所有节点发送 MsgRequestVote
接受处理
- 判断Term是否大于自己,是的话变成follower
- 如果votedFor不为空,或者不等于candidate,说明已经投过票,拒绝
- candidate的日志至少和自己一样新, 否则拒绝
同意票数过半就Leader
MsgRequestVoteResponse
节点收到 MsgRequestVote 时,会将结果通过 MsgRequestVoteResponse 发送给 Candidate;
- 只有candidate处理这个msg
- 根据
m.Reject
更新v.vote[m.From]
,记录投票结果 - 算出同意和拒绝票数
- 如果同意过半,成为Leader
- 拒绝过半成为follower
MsgSnapshot
MsgHeartbeat
Commit: util.RaftInvalidIndex
每当 Leader 的 heartbeatTimeout 达到时,就会给其余所有节点发送 MsgHeartbeat
判断msg的Term是否大于等于自己 重置选举倒计时 发送response
MsgHeartbeatResponse
当leader节点收到 MsgHeartbeat 时,会相应的回复 MsgHeartbeatResponse
- 只有 Leader 会处理 MsgHeartbeatResponse,其余角色直接忽略
- 通过 m.Commit 判断节点是否落后了,如果是,则进行日志追加
MsgTransferLeader
MsgTimeoutNow
Raft驱动
计时器
每调用一次,增加节点的心跳计时 如果是leader,就增加选举计时