实现项目请阅读raft论文以及相关课程,本文只做简单记录,无章法无逻辑

Project1

实现一个存储引擎的原始kv服务处理 (Put/ Del/ Get/ Scan)

瞅一眼接口

Write

用switch case类型断言选择操作

func (s *StandAloneStorage) Write(ctx *kvrpcpb.Context, batch []storage.Modify) error {
    if len(batch) == 0 {
        return nil
    }

    var err error
    for _, m := range batch {
        key, val, cf := m.Key(), m.Value(), m.Cf()
        switch data := m.Data.(type) {
        case storage.Put:
            err = engine_util.PutCF(s.engine.Kv, cf, key, val)
        case storage.Delete:
            err = engine_util.DeleteCF(s.engine.Kv, cf, key)
        default:
            return fmt.Errorf("unsupported modify type: %T", data)
        }

        if err != nil {
            return fmt.Errorf("failed to execute modification on key %s: %w", key, err)
        }
    }
    return nil
}

Reader

要实现三个函数

	GetCF(cf string, key []byte) ([]byte, error)
	IterCF(cf string) engine_util.DBIterator
	Close()
type StandAloneStorageReader struct {
	KvTxn *badger.Txn
}

func NewStandAloneStorageReader(txn *badger.Txn) *StandAloneStorageReader {
	if txn == nil {
		return nil
	}
	return &StandAloneStorageReader{
		KvTxn: txn,
	}
}
func (s *StandAloneStorage) Reader(ctx *kvrpcpb.Context) (storage.StorageReader, error) {
	txn := s.engine.Kv.NewTransaction(false)
	return NewStandAloneStorageReader(txn), nil
}

func (s *StandAloneStorageReader) GetCF(cf string, key []byte) ([]byte, error) {
	val, err := engine_util.GetCFFromTxn(s.KvTxn, cf, key)
	if err == badger.ErrKeyNotFound {
		return nil, nil
	}
	return val, err
}

func (s *StandAloneStorageReader) IterCF(cf string) engine_util.DBIterator {
	return engine_util.NewCFIterator(cf, s.KvTxn)
}

func (s *StandAloneStorageReader) Close() {
	s.KvTxn.Discard()
}

之后是四个方法

func (server *Server) RawScan(_ context.Context, req *kvrpcpb.RawScanRequest) (*kvrpcpb.RawScanResponse, error) {
    reader, err := server.storage.Reader(req.Context)
    if err != nil {
        return nil, err
    }

    Kvs := make([]*kvrpcpb.KvPair, 0, req.Limit)

    iter := reader.IterCF(req.Cf)
    defer iter.Close()

    for iter.Seek(req.StartKey); iter.Valid(); iter.Next() {
        if req.Limit == 0 {
            break
        }

        item := iter.Item()
        key := item.Key()
        val, err := item.Value() 
        if err != nil {
            return nil, err
        }

        Kvs = append(Kvs, &kvrpcpb.KvPair{
            Key:   key,
            Value: val,
        })
        req.Limit--
    }

    resp := &kvrpcpb.RawScanResponse{
        Kvs: Kvs,
    }
    return resp, nil
}

project2

  • 实现基本的Raft算法
  • 在Raft之上构建容错KVServer
  • 添加raftlog GC和快照

在此之前先学习一下raft,重构到隔壁Raft论文阅读去了

// RaftLog manage the log entries, its struct look like:
//
//  snapshot/first.....applied....committed....stabled.....last
//  --------|------------------------------------------------|
//                            log entries
//
// for simplify the RaftLog implement should manage all log entries
// that not truncated
type RaftLog struct {
	// storage contains all stable entries since the last snapshot.
	// 存储包含自上次快照以来的所有稳定条目。
	storage Storage

	// committed is the highest log position that is known to be in
	// stable storage on a quorum of nodes.
	// 已知已提交的最高的日志条目的索引     committedIndex
	committed uint64

	// applied is the highest log position that the application has
	// been instructed to apply to its state machine.
	// Invariant: applied <= committed
	// 已经被应用到状态机的最高的日志条目的索引 appliedIndex
	applied uint64

	// log entries with index <= stabled are persisted to storage.
	// It is used to record the logs that are not persisted by storage yet.
	// Everytime handling `Ready`, the unstabled logs will be included.
	// stabled 保存的是已经持久化到 storage 的 index
	stabled uint64

	// all entries that have not yet compact.
	// 尚未压缩的所有条目
	entries []pb.Entry

	// the incoming unstable snapshot, if any.
	// 2C
	pendingSnapshot *pb.Snapshot

	// Your Data Here (2A).
	dummyIndex uint64
}

之后初始化,把日志恢复到刚刚提交并且应用最新快照的状态

func newLog(storage Storage) *RaftLog {
	// Your Code Here (2A).
	return nil
}
func newLog(storage Storage) *RaftLog {
	firstIndex, _ := storage.FirstIndex()
	lastIndex, _ := storage.LastIndex()
	entries, _ := storage.Entries(firstIndex, lastIndex+1)
	hardState, _, _ := storage.InitialState()
	
	rl := &RaftLog{
		storage:    storage,
		committed:  hardState.Commit,
		applied:    firstIndex - 1,
		stabled:    lastIndex,
		entries:    entries,
		pendingSnapshot: nil,
		dummyIndex: firstIndex,
	}

	return rl
}

之后的几个函数按照意思来写,不是很难

raft驱动 raft的时钟是一个逻辑时钟,上层不断调用Tick()来模拟时间递增

Msg

const (
	// 'MessageType_MsgHup' is a local message used for election. If an election timeout happened,
	// the node should pass 'MessageType_MsgHup' to its Step method and start a new election.
	MessageType_MsgHup MessageType = 0
	// 'MessageType_MsgBeat' is a local message that signals the leader to send a heartbeat
	// of the 'MessageType_MsgHeartbeat' type to its followers.
	MessageType_MsgBeat MessageType = 1
	// 'MessageType_MsgPropose' is a local message that proposes to append data to the leader's log entries.
	MessageType_MsgPropose MessageType = 2
	// 'MessageType_MsgAppend' contains log entries to replicate.
	MessageType_MsgAppend MessageType = 3
	// 'MessageType_MsgAppendResponse' is response to log replication request('MessageType_MsgAppend').
	MessageType_MsgAppendResponse MessageType = 4
	// 'MessageType_MsgRequestVote' requests votes for election.
	MessageType_MsgRequestVote MessageType = 5
	// 'MessageType_MsgRequestVoteResponse' contains responses from voting request.
	MessageType_MsgRequestVoteResponse MessageType = 6
	// 'MessageType_MsgSnapshot' requests to install a snapshot message.
	MessageType_MsgSnapshot MessageType = 7
	// 'MessageType_MsgHeartbeat' sends heartbeat from leader to its followers.
	MessageType_MsgHeartbeat MessageType = 8
	// 'MessageType_MsgHeartbeatResponse' is a response to 'MessageType_MsgHeartbeat'.
	MessageType_MsgHeartbeatResponse MessageType = 9
	// 'MessageType_MsgTransferLeader' requests the leader to transfer its leadership.
	MessageType_MsgTransferLeader MessageType = 11
	// 'MessageType_MsgTimeoutNow' send from the leader to the leadership transfer target, to let
	// the transfer target timeout immediately and start a new election.
	MessageType_MsgTimeoutNow MessageType = 12
)

MsgHup

local, 用于请求节点开始选举,字段:MsgType

MsgBeat

local 告知leader该发送心跳了,字段:MsgType

MsgPropose

local 用于上层propose条目,只有leader能处理 字段:

  • MsgType
  • Entries: 要propose的条目
  • To: 发送的节点

其他角色收到直接返回ErrProposalDropped; leader收到后

  • 判断r.leadTransferee是否为None,不是就返回Err;
  • 把m.Entries追加到自己的entries;
  • 向其他节点发送追加日志RPC,集群同步;
  • 如果集群里只有自己,直接更新committedIndex

MsgAppend

日志复制 字段

  • MsgType
  • To
  • From
  • Term
  • LogTerm: 要发送的条目的前一条的Term
  • Index: 要发送的条目的前一个条目的Index
  • Entires: 要发送的日志条目
  • Commit: 当前节点的committedIndex Leader发送:
  • 前置判断: 如果要发送的Index已经被压缩了,转为发送快照,否则往下
  • 当Leader收到MsgPropose, 给其他所有节点发送MsgAppend
  • 当MsgAppend被接收者拒绝,Leader调整next,重新进行前置判断,如果无需快照, 则按照新的next重新发送MsgAppend

follower和candidate

  • 判断Msg的Term是否大于等于自己的Term,若小于直接拒绝
  • 判断LogIndex是否大于等于自己的最后一条日志索引,大于说明节点漏消息了
  • 判断LogTerm是否和自己最后一条日志任期相等,不相等说明有冲突
  • 如果漏消息或者有冲突,要缩小prevLogIndex, 再去匹配前一个log,直到匹配成功,把entry追加或覆盖到本节点(可以找到冲突日志的任期,把返回的index设置成上一个任期的最后一个idx位置)
  • 拒绝后Leader收到对应响应,会把Msg的idx取出来,设置成下一次的LogIndex,然后发送日志复制
  • 如果没有冲突,说明位置匹配对了,如果follower节点后面有日志,直接截断,追加Msg传来的日志
  • 如果进行截断操作,要更新stabled
  • 更新committedIndex,min(leader.commited,m.Index+len(m.entries))
  • 同意接受,回复response

MsgAppendResponse

对上个的响应

  • MsgType
  • Term
  • To
  • Reject
  • From
  • Index: r.RaftLog.LastIndex用于Leader更快地更新next

不管接受还是拒绝,都要发送

  • 只有Leader会处理这个消息
  • 如果被拒绝了,查看任期,比自己大可能出现网络分区,自己变成follower
  • 否则就是prevlog冲突了,调整完再次发送
  • 没有被拒绝就成功了,更新match和next
  • 更新commit索引

MsgRequestVote

当节点开始选举并成为 Candidate 时,立刻向其他所有节点发送 MsgRequestVote

接受处理

  • 判断Term是否大于自己,是的话变成follower
  • 如果votedFor不为空,或者不等于candidate,说明已经投过票,拒绝
  • candidate的日志至少和自己一样新, 否则拒绝

同意票数过半就Leader

MsgRequestVoteResponse

节点收到 MsgRequestVote 时,会将结果通过 MsgRequestVoteResponse 发送给 Candidate;

  • 只有candidate处理这个msg
  • 根据m.Reject更新v.vote[m.From],记录投票结果
  • 算出同意和拒绝票数
  • 如果同意过半,成为Leader
  • 拒绝过半成为follower

MsgSnapshot

MsgHeartbeat

Commit: util.RaftInvalidIndex 每当 Leader 的 heartbeatTimeout 达到时,就会给其余所有节点发送 MsgHeartbeat

判断msg的Term是否大于等于自己 重置选举倒计时 发送response

MsgHeartbeatResponse

当leader节点收到 MsgHeartbeat 时,会相应的回复 MsgHeartbeatResponse

  • 只有 Leader 会处理 MsgHeartbeatResponse,其余角色直接忽略
  • 通过 m.Commit 判断节点是否落后了,如果是,则进行日志追加

MsgTransferLeader

MsgTimeoutNow

Raft驱动

计时器

每调用一次,增加节点的心跳计时 如果是leader,就增加选举计时