本文主要是论文翻译

Raft

Raft本质是一种共识算法,数据集需要分片分散到多个机器上,为了容忍机器故障,通常会将分片冗余多份到多个机器上,每个冗余都是一个副本

为了保证副本间的数据一致性,引入了共识算法

Raft是一个主流的共识算法, 另一个是Paxos

Paxos 很难准确理解 并且 很难正确实现

raft是一个管理复制式日志的共识算法 最终结果和Paxos等价 但结构不同, 比Paxos更好理解,也更易于构建实际系统 Raft 在设计上将共识拆分为 leader election、log replication、safety 等几个模块, 为了减少状态数量,要求有更强的一致性 引入了新的集群变更, 利用重叠大多数特性来保证安全

复制式状态机

若干节点上的状态机计算同一状态的相同副本

即使其中一些节点挂掉了,系统整体仍然能继续工作

使用场景

复制式状态机用于解决分布式系统的一些容错问题

状态机架构

复制式状态机通常用replicated log 也就是复制式日志实现

共识算法管理着客户端发来的状态机命令的一个日志副本,状态机以完全相同的顺序执行日志中的命令,产生完全相同的输出

  • 每个Server保存着一个由命令序列组成的log,状态机按照顺序执行
  • 保持 replicated log的一致性是共识算法的职责 某个节点上的共识算法从客户端接收命令,然后写入log; 同时与其他节点共识模块通信, 保证即使某些机器挂掉,每个replicated log最终也以相同的顺序包含相同的请求
  • 命令被复制到其他节点后,每台机器都按顺序执行,从客户端来看,这些节点形成高度可靠的单个状态机

共识算法的典型特征

  1. 在任何non-Byzantine条件下都能保证安全 (包括网络延迟,丢包,乱序等)
  2. 只要多数节点能工作,彼此之间以及和客户端之间能通信,那么系统就是完全可用的(5台机器挂2台,依旧可以使用)
  3. 不依赖时序(timing)来保证日志的一致性: 在最坏情况下,时钟不准和极大的消息延迟都会导致可用性问题,因此避免依赖时序来保证一致性
  4. 一条命令收到集群大多数节点的响应,命令就算完成了 少量响应慢的机器不影响整体的系统性能

Paxos有什么问题

Paxos

  1. 首先定义了一个协议,让单个决议达成一致, 例如单条log entry 这个子集被称为单决议Paxos
  2. 通过组合协议的多个实例,提供决议序列的功能 例如一个log file,称为多决议Paxos
  3. 同时保证satety and liveness, 支持集群扩缩容节点

缺点 理解特别困难

  1. 单决议Paxos比较dense和subtle, 它分为了没有简单直观解释的两个阶段,很难直观解释为什么single-decree协议是工作的
  2. 多决议Paxos的组合规则显著增加了额外的复杂性和模糊性

没有考虑真实系统的实现

  1. 没有一个普遍接受的多决议 Paxos算法
  2. 多决议靠单决议组合只会增加复杂度
  3. Paxos的核心是一个对称点对点模型, 最终出于性能考虑建议了一个弱领导力模型; 在一个简化的,只需要做出单个决定的世界里,这样是有意义的, 但是实际系统不满足这个前提, 要做出多个决策,还是选举出一个leader 再由leader来协调决策更加简单和快速

这些问题导致实现系统与Paxos算法本身差异非常大

  • 每种实现都是以 Paxos 为起点,但在实现过程中遇到各种困难,最后开发出的是一个与最初设想迥异的架构
  • 非常花时间,而且很容易出错,加剧了 Paxos 的理解难度
  • Paxos 的公式对于定理证明来说可能不错,但实际实现与公式是如此不同,导致证明并没有多少意义

"==Paxos 的算法描述和真实需求之间存在一个巨大鸿沟,...... 最终的系统其实将建立在一个没有经过证明的协议之上=="

面向可理解性的设计

设计目标

  1. 必须为构建真实系统提供完整基础,能显著降低系统开发者需要的设计工作
  2. 必须在所有情况下保证安全,在典型场景下确保可用
  3. 对于常见操作必须高效
  4. 必须确保可理解性,最重要的目标,也是最大的挑战

可理解性的评估

  1. 问题分解: 将问题尽可能分解为独立的可解决,可解释和可理解的模块(将 leader election、log replication、safety、membership changes 拆分开来)
  2. 简化状态空间: 减少需要考虑的状态数量, 使系统更加清晰,尽量消除非确定性 (具体的来说,log允许有空洞,raft会避免各机器log不一致) 虽然在大多是情况下极力避免非确定性,但是在某些场景下,它会提高可理解性

尤其是,随机化方式会引入非确定性,但他们通过"以一种方式处理所有情况",让状态空间变得更小 用随机化来简化leader选举

raft共识算法

Raft实现共识的机制是

  1. 共同选举出一个leader
  2. 给予这个leader管理replicated log的完全职责
  3. leader接受来自客户端的log entry 然后复制给其他节点,并在安全时,告诉这些节点将这些entries应用到他们的状态机

只有一个leader的设计简化了replicated log的管理

  • leader能决定将新的entry放到log里的什么位置,而不询问其他机器
  • 数据流也是从leader到其他节点的简单单向方式 leader可能会挂掉 或者失联, 这种情况下会选举一个新的leader

基于以上leader机制, raft将共识算法分解为三个相对独立的子问题

  1. leader election
  2. log replication
  3. Safety

Raft基础

状态机

在任意时刻,每个节点处于以下三种状态之一: leader, follower, candidate

  • 正确情况下,集群中有且仅有一个leader,剩下全是follower
  • follower都是被动的,不会主动发出请求,只会响应candidate和leader的请求
  • leader负责处理所有客户端请求, 如果一个客户端向follower发起请求,后者把他重定向到leader
  • candidate是一个特殊状态,选举新的leader会用到 Raft状态机

如果一个follower在一段时间内收不到leader的请求,会变成一个candidate然后发起一轮选举

获得大多数选票的candidate成为新的leader

任期 term

时间被划分为长度不固定的任期

  • 每个任期都是从选举开始的, 此时多个candidate 都试图成为leader
  • 某个candidate赢得选举后,成为该任期内的leader. raft保证了在任意一个任期内,最多只会有一个leader
  • 有时选举会失败,这种情况下该任期就没有leader, 很快要开始新一轮选举
  • 不同节点上看到的任期转换时刻可能不同

任期是一个逻辑时钟,用来让各节点检测过期信息

每个节点都记录了当前任期号 currentTerm

节点通信的时候带上currentTerm信息

如果一个节点发现自己的任期号小于其他节点的,立刻更新自己的

如果一个candidate或leader发现自己任期过期,立刻切换成follower状态

如果一个节点收到过期任期编号请求,拒绝请求

节点中的通信 RPC

节点之间通过RPC通信,基础的共识算法只需要两个RPC

==RequestVote====AppendEntries==

Leader选举

心跳和选举触发流程

Raft使用heartbeat机制来触发leader选举

  • 节点启动是follower状态,主要能持续从leader或candidate收到RPC请求,就一直保持在follower状态
  • Leaders定期发送心跳给所有的follower,以保持它的权威
  • 如果一个follower在election timeout时间段都没有收到通信,就认为已经没有合法leader了,发起一次选举

选举流程

对于一个follower

  • 首先增大自己当前的Term
  • 切换到candidate状态
  • 然后选举自己作为leader,同时并发地向集群其他节点发送requestVote RPC
  • 然后它将处于candidate状态,直到发生以下三种情况
    • 该follower赢得此次选举,成为leader
    • 另一个节点赢得此次选举,成为leader
    • 选举超时,没有产生有效leader

获胜的条件

如果一个candidate获得了集群大多数节点针对同一任期的投票,那么就赢得了这个任期的选举

投票的标准是先到先得

一个candidate成为leader就给其他节点发送心跳,防止新的选举产生

在等待投票期间,一个candidate可能从其他服务器收到一个AppendEntries声称自己是leader, 如果这个leader的Term

  • 大于等于这个candidate的currentTerm,就认为leader合法
  • 小于这个candidate的currentTerm; 拒绝这个RPC, 仍然保留在candidate 第三种可能的结果是,candidate既没有赢得也没有输掉这次选举,如果没有额外的预防措施,这种投票分裂的情况可能会无限持续下去

避免无限循环放投票分裂

随机选举超时

从一些固定时长(例如100ms 150ms 200ms)中随机选择一个选举超时时间,让节点的超时时刻比较分散(redis避免缓存雪崩也可以这样),在大多数情况下同一时刻只有一台超时,这台超时的节点会在其他节点超时之前赢得选举(因为它的Term更大)

Leader向其他节点复制log

复制流程

选出一个leader后,开始服务客户端请求

  • 每个客户端请求都包含一条命令 由replicated state machine执行
  • leader把这个命令追加到自己的log,然后并发地通过AppendEntries复制给其他节点
  • 复制成功后,leader才会把这个entry应用到自己的状态机,然后把结果返回给客户端
  • 如果follower挂掉了或者很慢,或者发生了丢包, leader会无限重试AppendEntries,直到所有的follower最终都存储了所有的log entry

log文件组织结构

  1. Log 由log entry组成,每个entry都是顺序编号的, 这个索引标识了该entry在log中的位置
  2. 每个entry包含了
    1. leader创建该entry都任期
    2. 需要执行的命令
  3. 当一条entry被安全地应用到状态机后,就认为这个entry已经提交了

Commit的定义

raft保证已提交的记录都是持久的,并且最终会被所有可用的状态机执行

  • 只要创建这个entry都leader把它成功复制到大多数节点,这个entry就算是提交了
  • 这也提交了leader log中所有前面的entry,包括之前那些由其他leader创建的entry

Log matching特性

raft这种log机制的设计是为了保证各节点log的高度一致性

如果不同log里,两个entry有完全相同的index和Term,那么

  1. 这两个entry一定包含相同的命令; 这源于 leader在任意给定Term和log index的情况下,最多只会创建一个entry,并且在log中的位置永远不会发生变化
  2. 这两个log中,从该index往前所有entry都分别相同 这点事还通过AppendEntries中简单的一致性检查来保证的
    1. AppendEntries请求中,leader会带上log前一个紧邻的entry都index和Term信息
    2. 如果follower log中以相同的index位置没有entry 或者有entry都Term不同, follower就拒接新的entry

Log不一致的场景

leader挂掉, 还没有将其log中的entry复制到其他节点就挂掉了

这些不一致会导致一系列复杂的leader和follower crash

a,b 是丢失记录

c,d是有额外的提交记录

e,f是以上两种情况

log丢失或多出来的记录可能会跨多个Term

f可能是从Term2成为leader,然后向log添加一些entry,但是还没来得及提交就挂掉了,重启后成为Term3的leader,然后添加一些entry到自己log,在提交Term2 & 3期间的entry挂掉了,之后又连续挂了几个Term

避免log不一致

处理不一致的方式是强制follower复制一份leader的log 这意味着有冲突的entry会被强制覆盖

解决冲突的流程

  • 找到leader和follower的最后一个共同认可的entry
  • 将follower log中从这条entry开始往后的Entries全部删掉
  • 将 leader log这条记录开始往后的所有Entries同步给follower

整个过程都发生在AppendEntries RPC的一致性检查

// https://github.com/etcd-io/etcd/blob/release-0.4/third_party/github.com/goraft/raft/server.go#L939

// Processes the "append entries" request.
func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
    if req.Term < s.currentTerm
        return _, false

    if req.Term == s.currentTerm {
        if s.state == Candidate  // step-down to follower when it is a candidate
            s.setState(Follower)
        s.leader = req.LeaderName
    } else {
        s.updateCurrentTerm(req.Term, req.LeaderName)
    }

    // Reject if log doesn't contain a matching previous entry.
    if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
        return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
    }

    s.log.appendEntries(req.Entries)      // Append entries to the log.
    s.log.setCommitIndex(req.CommitIndex) // Commit up to the commit index.

    // once the server appended and committed all the log entries from the leader
    return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
}

// https://github.com/etcd-io/etcd/blob/release-0.4/third_party/github.com/goraft/raft/log.go#L399
// Truncates the log to the given index and term. This only works if the log
// at the index has not been committed.
func (l *Log) truncate(index uint64, term uint64) error {
    if index < l.commitIndex // Do not allow committed entries to be truncated.
        return fmt.Errorf("raft.Log: Index is already committed (%v): (IDX=%v, TERM=%v)", l.commitIndex, index, term)

    if index > l.startIndex + len(l.entries) // Do not truncate past end of entries.
        return fmt.Errorf("raft.Log: Entry index does not exist (MAX=%v): (IDX=%v, TERM=%v)", len(l.entries), index, term)

    // If we're truncating everything then just clear the entries.
    if index == l.startIndex {
        l.file.Truncate(0)
        l.file.Seek(0, os.SEEK_SET)
        l.entries = []*LogEntry{}
    } else {
        // Do not truncate if the entry at index does not have the matching term.
        entry := l.entries[index-l.startIndex-1]
        if len(l.entries) > 0 && entry.Term != term
            return fmt.Errorf("raft.Log: Entry at index does not have matching term (%v): (IDX=%v, TERM=%v)", entry.Term, index, term)

        // Otherwise truncate up to the desired entry.
        if index < l.startIndex+uint64(len(l.entries)) {
            position := l.entries[index-l.startIndex].Position
            l.file.Truncate(position)
            l.file.Seek(position, os.SEEK_SET)
            l.entries = l.entries[0 : index-l.startIndex]
        }
    }

    return nil
}
  • Leader为每个follower维护了后者下一个要使用的log entry index,即nextIndex[followerID]变量
  • 一个节点成为leader时,会把整个nextIndex都初始化为自己的log文件下一个index
  • 如果一个follower log和leader的不一致,一致性检查会失败,从而拒接这个请求,leader收到拒接后,将减小nextIndex,然后重试这个请求,直到某条成功,这个时候就成功匹配
  • 之后follower log删掉index之后的所有entry,然后开始同步leader的entry 在之后的整个Term里 都保持一致

优化

当拒接一个AppendEntries请求的时候,follower可以将冲突entry都Term以及log中相同Term的最小index包含在响应里,让leader直接跳过这个Term中的所有冲突记录

但是实际中故障很少发生,不太可能有很多不一致的记录

新leader上台后无需任何特殊操作来恢复log一致性,leader永远不会覆盖或者删除自己log中的记录

  • 只要集群的大多数节点都是健康的,Raft 就能接受、复制和应用新的 log entry
  • 正常情况下**==只需一轮 RPC==** 就能将一个新 entry 复制到集群的大多数节点
  • ==个别比较慢的 follower 不影响集群性能==

安全

为了解决这个问题, 给leader election添加了一个限制条件,确保任何Term内的leader都包含了前面所有Term内提交的entry

包含所有已提交entry都节点才能被选为leader

除非前面所有Term内的entry都已经在某个节点上了,否则这个节点不能被选为leader, 这意味着无需从non-leader 节点向leader同步数据了 也就是log Entries只会从leader流向follower

  • 首先 除非log中已经包含了集群的所有已提交entries, 否则一个candidate不能被选举成leader
  • 其次,还活着的节点中,至少有一个节点保存了集群的所有已提交Entries(因为覆盖大多数节点的entry才算提交成功)
  • 那么 只要一个candidate的log和大多数节点都至少不落后,就持有了集群的所有已提交记录

判断哪个log更加新,依据的是最后一个entry都index和Term

  • 如果Term不同,Term更新的胜出
  • 如果Term相同,index更大的胜出

当前任期+副本数量过半,entry才能提交

如果一个entry已经存储到集群的大多数节点上了,leader任务这个entry提交成功了, 如果leader在这个entry提交之前挂了(就是没有同步到大多数节点), 那么下一个leader将承担这个entry的同步和提交任务

但是有一些新问题, 即使某个entry已经已经存储到大多数节点,也可能被新leader覆盖掉

  • 时刻a, S1是当前leader,把index=2的entry复制到了S2
  • 时刻b, S1挂了,S5被选举成了新的leader,任期Term为3, S5在index=2的位置接受了一个新entry
  • 时刻c, S5挂了, S1被选举成了下一任leader,同步挂掉之前的那个entryindex=2,然后成功同步到了大部分节点上,但是还没有提交

if线1 时刻d, S1又挂掉了, S5被选举成了下一个leader, 这种情况下index=2,term=2的entry会被index=2,term=3覆盖掉

if线2 时刻e, S1在挂掉之前把index=3,term=4的一条新纪录复制到了大部分节点上,这种情况下即使S1挂掉了,S5还是无法赢得选举

问题在于 判断是否要提交的唯一标准是已同步的副本数量 :超过半数 就认为提交,没有考虑到任期信息 就是上面的if线1的情况

为了避免if线1, raft做了一个限制 : 只有提交当前任期内的记录的时候,才能使用这种计算副本数量的方式

raft的Entries保留了原始Term信息,Term是不会随着时间或者log文件改变的,所以判断entry就更加容易了

安全性的证明

要证明的是一旦某条日志 entry 被提交(commit),那它在所有将来出现的 leader 的日志中一定存在

假设任期T内的leader,在他的任期内提交了一个entry,但是这个没有被后面任期的leader存储,没有存储这个entry都最小任期U, U > T

  1. leader U 当选成leader的时候,这个entry一定不在log中,因为leader不会删除或者覆盖Entries
  2. leader T 已经把这个entry同步到大部分节点,leaderU收到了大部分节点的投票,因此,至少一个节点 既接受了leaderT的entry同步,又投票给了leaderU
  3. 这个投票者一定是在投票给leaderU之前接受的这个entry,不然的话他会拒接来自leaderT的同步(它当前的Term比T大)
  4. 这个投票者在投给leaderU之前还存着这个entry,每个后面都leader都包含这个记录
  5. 这个投票者将选票给了leaderU,说明leaderU的log至少与该投票者是一样新的 两个矛盾如下:
    1. 如果投票者和leaderU的最后一个log Term一样,那么leaderU的log至少与投票者的log一样长,这个时候他包含了投票者log里的所有记录(这与前面假设投票者有,而U里没有矛盾) 因此,leaderU最后一个log Term必须比该投票者大, 还必须比T大
    2. 最早创建了leaderU的最后一个entry的leader,一定在他的log里包含了这个已提交的entry, 那么根据log matching property, leaderU的log里也一定包含了这个已提交的entry,导出矛盾

因此 大于T的所有Terms里的leader,一定包含Term T内提交的所有Entries,Log Matching Property 保证了未来的 leaders 也会包含间接提交的记录

Follower/candidate故障

无限重试 + 请求幂等

如果一个节点在完成了RPC之后且在响应之前挂掉了,它在重启之后会收到一个完全相同的RPC

Raft RPC是幂等的,这不会导致问题,如果一个follower收到了一个AppendEntries请求,而他的log中已经包含了待Append的entry,那么会直接胡烈这个请求

时序和可用性

raft的一个设计要求是安全性(无冲突)绝对不能依赖时序

另一方面 可用性对时序的依赖是不可避免的

raft里最依赖时序的就是leader选举部分,只要系统满足如下时序条件,Raft就能选出和保持一个稳定的leader

broadcastTime ≪ electionTimeout ≪ MTBF

  • broadcastTime:一个节点并发给其他节点发送请求并收到响应的平均时间
  • electionTimeout: 定义的选举超时时间
  • MTBF:单个节点的平均故障时间

广播耗时要比选举超时低一个数量级,这样leader才能可靠地发送心跳消息给follower,避免他们发起新的选举

选举超时要比MTBF低几个数量级,这样系统才能稳步前进,当leader挂掉了,系统大概会经历一个election timeout时间段的不可用

raft 一般要求接收方将请求持久化到稳定存储

broadcastTime 可能需要 ==0.5ms ~ 20ms==

electionTimeout 通常选择 ==10ms ~ 500ms==

==MTBF 是几个月或更长时间==,因此很容易满足时序要求

Raft核心

不同节点的状态参数

所有节点上的持久状态: 处理客户端请求的时候,需要先更新这些持久状态(存储在稳定介质上) 再响应请求

  1. currentTerm: 该节点已知的当前任期. 节点启动时初始化为0,然后单调递增
  2. votedFor:投票给谁, 如果没有就是none
  3. log[]: log entries, 索引从1开始,每个entry包含了状态机命令和leader收到这个entry时候的term信息

所有节点上的易失状态: 选举之后重新初始化

  1. commitIndex: 最后提交的entry都index,初始化为0,之后单调递增
  2. lastAppliedIndex:最后应用到状态机的index;初始化为0

Leader节点上的易失状态, 选举后重新初始化

  1. nextIndex[], 为每个节点分别维护的编号,下次replicate entry的时候用,初始化为leader_last_log_index + 1
  2. matchIndex[], 为每个节点分别维护的编号,表示已知的,复制成功的最大index,初始化为0 单调递增

AppendEntries RPC

用途: 由leader发起,用于 replicate log entries,也用作心跳

参数

  • term: leader的任期编号
  • leaderId: follower重定向客户端使用
  • prevLogIndex: 上一个 log entry的index
  • prevLogTerm: prevLogIndex entry的Term
  • entries[]: 需要追加到log的新entry (如果是heartbeat, 那么数组为空)
  • leaderCommit: leader的CommitIndex

返回结果

  • term: currentTerm
  • success: 如果follower包含了匹配 prevLogIndex和PrevLogTerm的entry,则返回true
// https://github.com/etcd-io/etcd/blob/release-0.4/third_party/github.com/goraft/raft/log.go#L467

func (l *Log) appendEntries(entries []*protobuf.LogEntry) error {
    startPosition, _ := l.file.Seek(0, os.SEEK_CUR) // 定位到起始写入位置

    for i := range entries { // Append each entry util hit an error.
        logEntry := &LogEntry{
            log:       l,             // 日志文件
            Position:  startPosition, // 起始写入位置
            pb:        entries[i],    // 待写入 log entry
        }

        size = l.writeEntry(logEntry, w)
        startPosition += size
    }

    return nil
}

func (l *Log) writeEntry(entry *LogEntry, w io.Writer) (int64, error) {
    if len(l.entries) > 0 {
        lastEntry := l.entries[len(l.entries)-1] // 上一个已经写入日志的 entry

        if entry.Term < lastEntry.Term           // 待写入 entry 所带的任期号不能小于前一 entry 所带的任期号
            return -1, Errorf("raft.Log: Cannot append entry with earlier term")
        if entry.Term == lastEntry.Term && entry.Index <= lastEntry.Index // 写入位置必须在前一个 entry 之后
            return -1, Errorf("raft.Log: Cannot append entry with earlier index in the same term")
    }

    size := entry.Encode(w) // 写到持久存储,然后就可以 append 到 entries list 了
    l.entries.append(entry)

    return int64(size), nil
}

RequestVote RPC

用途: 由candidate发起,用于收集投票

参数:

  • term
  • candidateId
  • lastLogIndex
  • lastLogTerm 返回结果
  • term
  • voteGranted
// https://github.com/etcd-io/etcd/blob/release-0.4/third_party/github.com/goraft/raft/server.go#L1071

func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
    if _, ok := s.peers[req.CandidateName]; !ok // Candidate 节点不在本集群,直接 deny
        return _, false

    if req.Term < s.Term   // 请求来自更早的任期(old term),直接拒绝
        return _, false

    if req.Term > s.Term { // 看到了比本节点还要新的任期号(term number),update 到本节点
        s.updateCurrentTerm(req.Term, "")
    } else if s.votedFor != "" && s.votedFor != req.CandidateName { // 当前节点已经投给了其他 candidate
        return _, false
    }

    lastIndex, lastTerm := s.log.lastInfo()
    if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm // 如果 candidate 的 log 比我们的要老,则不投给它
        return _, false

    // 投票给该 candidate,然后重置本节点的 election timeout
    s.votedFor = req.CandidateName
    return newRequestVoteResponse(s.currentTerm, true), true
}

Follower处理循环

对来自candidate和leader的RPC请求做出响应

如果知道election timeout都没从当前leader收到AppendEntries rpc,也没有投票给某个candidate,则进入candidate状态

// https://github.com/etcd-io/etcd/blob/release-0.4/third_party/github.com/goraft/raft/server.go#L664

func (s *server) followerLoop() {
    for s.State() == Follower {
        select {
        case e := <-s.c:
            switch req := e.target.(type) {
            case JoinCommand:
                //If no log entries exist and a self-join command is issued then immediately become leader and commit entry.
                if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
                    s.setState(Leader)
                    s.processCommand(req, e)
                }
            case *AppendEntriesRequest:
                // If heartbeats get too close to the election timeout then send an event.
                if elapsedTime > electionTimeout*ElectionTimeoutThresholdPercent {
                    s.DispatchEvent(ElectionTimeoutThresholdEventType)
                }
                s.processAppendEntriesRequest(req)
            case *RequestVoteRequest:
                s.processRequestVoteRequest(req)
            case *SnapshotRequest:
                s.processSnapshotRequest(req)
            }

        case <-timeoutChan:
            s.setState(Candidate)
        }

        timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
    }
}

Candidate处理循环

转成candidate角色后,立刻开始选举

  1. 增大currentTerm
  2. 投票给自己
  3. 重置选举定时器
  4. 发送requestVote RPC给其他节点

如果收到了大多数节点的赞成票,就成为leader

如果从新leader收到了AppendEntries RPC, 就转入follower角色

如果election timeout,就再次开始选举

// https://github.com/etcd-io/etcd/blob/release-0.4/third_party/github.com/goraft/raft/server.go#L730

// The event loop that is run when the server is in a Candidate state.
func (s *server) candidateLoop() {
    prevLeader := s.leader
    s.leader = ""

    lastLogIndex, lastLogTerm := s.log.lastInfo()
    doVote := true
    votesGranted := 0

    for s.State() == Candidate {
        if doVote {
            s.currentTerm++      // Increment current term, vote for self.
            s.votedFor = s.name

            // Send RequestVote RPCs to all other servers.
            respChan = make(chan *RequestVoteResponse, len(s.peers))
            for _, peer := range s.peers {
                 sendVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm, respChan)
            }

            // Wait for either:
            //   * Votes received from majority of servers: become leader
            //   * AppendEntries RPC received from new leader: step down.
            //   * Election timeout elapses without election resolution: increment term, start new election
            //   * Discover higher term: step down (§5.1)
            votesGranted = 1
            timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
            doVote = false
        }

        // If we received enough votes then stop waiting for more votes.
        if votesGranted == s.QuorumSize() {
            s.setState(Leader)
            return
        }

        // Collect votes from peers.
        select {
        case resp := <-respChan:
            if success := s.processVoteResponse(resp); success
                votesGranted++

        case e := <-s.c:
            var err error
            switch req := e.target.(type) {
            case Command:
                err = NotLeaderError
            case *AppendEntriesRequest:
                s.processAppendEntriesRequest(req)
            case *RequestVoteRequest:
                s.processRequestVoteRequest(req)
            }

            // Callback to event.
            e.c <- err

        case <-timeoutChan:
            doVote = true
        }
    }
}

Leader处理循环

  • 定期发送心跳给其他节点,防止timeout
  • 从客户端接收请求,把entry追加到local log,等entry应用到状态机后再发送响应
  • 对于follower i,如果lastLogIndex >= nextIndex[i],将从nextIndex[i]开始的所有log entry发送给节点i
    • 成功更新nextIndexmatchIndex
    • 失败 nextIndex[i]--,然后重试
  • 如果存在N满足N > commitIndex matchIndex[i]>=N对大多数i成立,log[N].term = currentTerm: 设置CommitIndex = N
// https://github.com/etcd-io/etcd/blob/release-0.4/third_party/github.com/goraft/raft/server.go#L811

func (s *server) leaderLoop() {
    logIndex, _ := s.log.lastInfo()

    // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
    for _, peer := range s.peers {
        peer.setPrevLogIndex(logIndex)
        peer.startHeartbeat() // 定期发送心跳
    }

    // Commit a NOP after the server becomes leader.
    // "Upon election: send initial empty AppendEntries RPCs (heartbeat) to each server."
    s.Do(NOPCommand{})

    // Begin to collect response from followers
    for s.State() == Leader {
        select {
        case e := <-s.c:
            switch req := e.target.(type) {
            case Command:
                s.processCommand(req, e)
                continue
            case *AppendEntriesRequest:
                s.processAppendEntriesRequest(req)
            case *AppendEntriesResponse:
                s.processAppendEntriesResponse(req)
            case *RequestVoteRequest:
                s.processRequestVoteRequest(req)
            }
        }
    }

    s.syncedPeer = nil
}

五大特性

  1. ==Election Safety==(选举安全):在任意 term(任期)内,==最多只会有一个== leader 被选出来。
  2. ==Leader Append-Only==(只追加):leader 从不覆盖或删除它的日志中的 entry;只会追加(append)。
  3. ==Log Matching==(日志匹配):如果两个日志包含了 ==index 和 term 完全相同的 entry==, 那从这个 index 往前的那些 entry  也都是完全相同的
  4. ==Leader Completeness==:如果一个 entry 在某个 term 被提交,那它将出现在所有 term 更大的 leaders 的 log 中
  5. ==State Machine Safety==(状态机安全):如果一个节点在特定 index 应用了一个 entry 到它的状态机,那其他节点不会在相同 idnex 应用另一个不同的 entry。

集群节点数量变化

之前都是假设集群配置不变

但是实际场景里,有时候需要添加或者删除节点

增删节点可能导致集群分裂

本质问题就是避免在增删节点期间同时出现两个及以上leader

遗憾的是,不管用什么方式,这个过程都是不安全的, 无法在同一时刻原子地切换所有节点

例如 节点数量从3 -> 5,这个过程是不安全的,因为不同节点的切换发生在不同时刻,箭头指向的时刻,集群分裂成了两个大多数,分别用老的配置和新的配置,各自选出来一个leader

解决方法

两阶段方式

在raft里

  1. 集群先切换到一个联合共识的事务型配置
  2. 一旦联合共识提交,系统切换到新配置

联合共识 combines both old and new config

  • 不管是新配置还是老配置,log Entries都会被复制到其他所有节点
  • 不管是新配置还是老配置,任何节点都可能成为leader
  • 不管是新配置还是老配置,选举或提交都需要大多数节点同意 联合共识使得每个节点在任意时刻切换配置,而不会牺牲安全性; 而且集群变更期间,仍然能够服务客户端请求

虚线代表配置已经创建还未提交,实线代表已经提交

当leader收到一个配置变更请求,会把请求作为一个log entry存储,用作联合共识,然后用log replication机制同步到其他节点

任何一个节点把这个新配置应用到自己的log后,用这个配置来做未来所有决策,这意味着leader会使用这个联合共识的规则来决定何时提交

如果leader挂了,使用old或联合配置的节点可能会选出一个新的leader,取决于获胜的candidate是否收到了联合共识,但不管在什么情况下,new都无法做出单边决策

联合共识提交后,除非有其他节点的同意,否则old和new都无法做出决定,并且leader completeness property保证了只有有联合共识的节点才能被选举成leader,这样leader就可以安全地创建log entry描述new配置并且同步到其他节点

三个问题

新节点状态为空,需要一段时间同步log

新节点加入集群,log是空白的,因此需要一段时间来追赶到最新状态,这段时间是无法提交新的entry的 包括联合共识

为了避免这个问题, raft引入了一个特殊的用在配置变更前的阶段,这个阶段 新节点是以非投票成员加入集群的 leader会同步entry给他们,但是不参与投票,计算集群节点数量的时候也不考虑他们,等新节点赶上其他节点状态后,才开始配置变更过程

从集群中移除leader

这种情况下,leader在提交了new之后就卸任,变成了follower

这意味着在leader提交new期间,有一个时间窗口,他管理着一个不包括自己的集群;他同步log Entries给其他节点,但是自己不能被算作大多数,leader切换发生在new提交之后,这是新配置能独立工作的最早时刻 在这个时刻之前,有可能只有old中某个节点才能被选为leader

被移除的节点不断超时 触发选举

被移出的节点是指不在new配置里的节点,这些节点不会收到leader心跳,于是会timeout 触发新的选举

他们带有的Term会更大,迫使leader回退成follower,这样会把新leader赶下台,之后重复这个过程

为了避免这个情况,在大多数节点知道已经有一个leader的情况下,会忽略投票RPC

只要leader能向其他集群正常发送心跳,就不会被更大的Term的candidate赶下台

日志压缩

snapshot

快照

对当前的整个日志都做一次快照,写到持久存储上,然后把已经做过快照的日志清空

// https://github.com/etcd-io/etcd/blob/release-0.4/third_party/github.com/goraft/raft/server.go#L865

// https://github.com/etcd-io/etcd/blob/release-0.4/third_party/github.com/goraft/raft/server.go#L1188
func (s *server) TakeSnapshot() error {
    lastIndex, lastTerm := s.log.commitInfo()

    // check if there is log has been committed since the last snapshot.
    if lastIndex == s.log.startIndex {
        return nil
    }

    // Attach snapshot to pending snapshot and save it to disk.
    s.pendingSnapshot = &Snapshot{lastIndex, lastTerm, nil, nil, s.SnapshotPath(lastIndex, lastTerm)}

    state := s.stateMachine.Save()

    // Clone the list of peers.
    peers := make([]*Peer, 0, len(s.peers)+1)
    for _, peer := range s.peers {
        peers = append(peers, peer.clone())
    }
    peers = append(peers, &Peer{Name: s.Name(), ConnectionString: s.connectionString})

    // Attach snapshot to pending snapshot and save it to disk.
    s.pendingSnapshot.Peers = peers
    s.pendingSnapshot.State = state
    s.saveSnapshot()

    // We keep some log entries after the snapshot.
    // We do not want to send the whole snapshot to the slightly slow machines
    if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
        compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
        compactTerm := s.log.getEntry(compactIndex).Term()
        s.log.compact(compactIndex, compactTerm)
    }

    return nil
}

// https://github.com/etcd-io/etcd/blob/release-0.4/third_party/github.com/goraft/raft/log.go#L567
// compact the log before index (including index)
func (l *Log) compact(index uint64, term uint64) error {
    var entries []*LogEntry

    // the index may be greater than the current index if we just recovery from on snapshot
    if index >= l.internalCurrentIndex() {
        entries = make([]*LogEntry, 0)
    } else {
        entries = l.entries[index-l.startIndex:] // get all log entries after index
    }

    // create a new log file and add all the entries
    new_file_path := l.path + ".new"
    file := os.OpenFile(new_file_path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
    for _, entry := range entries {
        position, _ := l.file.Seek(0, os.SEEK_CUR)
        entry.Position = position

        entry.Encode(file)
    }
    file.Sync()

    old_file := l.file
    os.Rename(new_file_path, l.path) // rename the new log file
    l.file = file
    old_file.Close() // close the old log file

    // compaction the in memory log
    l.entries = entries
    l.startIndex = index
    l.startTerm = term
    return nil
}

增量压缩

  • 首先选择一块已经积累了一些已经删除和已经覆盖写入的对象区域
  • 以更紧凑的方式重写还活着的对象

客户端交互

寻找leader

客户端启动的时候,随机选择一个raft节点进行连接

  • 如果是leader直接处理请求
  • 如果不是leader,拒接请求并把leader信息告诉客户端
// https://github.com/etcd-io/etcd/blob/release-0.4/third_party/github.com/goraft/raft/append_entries.go#L11

// The request sent to a server to append entries to the log.
type AppendEntriesRequest struct {
    Term         uint64
    PrevLogIndex uint64
    PrevLogTerm  uint64
    CommitIndex  uint64
    LeaderName   string
    Entries      []*protobuf.LogEntry
}

如果leader挂了,客户端请求会超时,然后随机选择 再次连接

可线性化语义

每个操作看起来都是立即执行,精确执行一次

raft可能多次执行同一条命令,例如 在提交来自客户端的log entry但是没来得及响应就挂了,客户端重试命令导致二次执行

解决方案

  • 客户端每个命令分配唯一的顺序编号
  • 状态机为每个客户端记录最后的已执行命令顺序,放入响应之中
  • 状态机如果发现某个序号的命令已经执行过,就会直接返回,不会再执行一遍
// https://github.com/etcd-io/etcd/blob/release-0.4/third_party/github.com/goraft/raft/append_entries.go#L21

// The response returned from a server appending entries to the log.
type AppendEntriesResponse struct {
    pb     *protobuf.AppendEntriesResponse
    peer   string
    append bool
}

// https://github.com/etcd-io/etcd/blob/release-0.4/third_party/github.com/goraft/raft/protobuf/append_entries_responses.pb.go#L35
type AppendEntriesResponse struct {
    Term             *uint64 `protobuf:"varint,1,req" json:"Term,omitempty"`
    Index            *uint64 `protobuf:"varint,2,req" json:"Index,omitempty"`
    CommitIndex      *uint64 `protobuf:"varint,3,req" json:"CommitIndex,omitempty"`
    Success          *bool   `protobuf:"varint,4,req" json:"Success,omitempty"`
    XXX_unrecognized []byte  `json:"-"`
}

只读操作

只读操作无需向log文件写入任何内容,但是如果没有额外措施,可能有返回过期数据的风险,因为处理这次请求的leader可能已经被取代

可线性化读可以 确保不会返回过期数据,引入两个措施

  1. leader必须有哪些entry已经被提交的最新消息
  2. leader在处理只读请求之前,必须能检查自己是否被剥夺了leader

raft的处理方式是,在处理只读请求之前,让leader与集群的大多数节点交换心跳信息 或者leader可以依靠心跳机制提供一种租约形式(但是这需要依赖时序)