Raft Intro
Raft is a consensus algorithm. It's used in distributed systems to reliably and consistently replicate state between multiple nodes using an append-only log. When raft nodes start up they first vote to elect a leader. After a leader is elected all application state writes go to the leader and the other nodes become followers. The leader sends all writes to the followers for replication and once a majority of the nodes have saved the write to their log it is considered committed and the leader responds to the writing client with success.
Raft guarantees that the log is always consistent among the leader and followers. It also designed for fault tolerance. Nodes may crash or restart (even the leader) or the system may experience network partitions. The leader maintains contact with the followers using periodic heartbeats. If there is a problem communicating with the leader the followers don't receive these heartbeats and they will elect a new leader.
Raft was designed to be easy to understand and implement. It's broken down into subproblems that function more or less independently: leader election, log replication, and safety. It also includes a joint consensus mechanism for membership changes that allow it to continue operating as nodes are added and removed.
In this short book we will take a deep dive into how Raft works under the hood. We'll look at each subcomponent of Raft and try to explain it as clearly as possible with words, diagrams, and Golang source code.
Proposed Outline
- 2 Consensus, what is it good for
- Agreement on distributed state
- Availability
- Fault tolerance
- 3 The Goals of Raft
- Easier to teach, easier to understand, easier to implement (than Paxos)
- As performant (as Paxos)
- Reasonably Fault Tolerant
- 4 How to Build a Raft (The Distinct Parts)
- The Roles
- Candidate
- Send RV RPCs to peers
- Become leader or become follower
- Leader
- Get client requests
- Send AE to followers
- Commit
- Respond to client requests
- Also, respond to RV, maybe step down and become follower
- Follower
- Wait for AE RPCs from Leader and respond to RV
- Candidate
- Leader Election
- Log Replication
- Safety
- Membership Changes w/ Joint Consensus
- The Roles
- 5 Leader Election Deep Dive
- RequestVote RPC
- Election Timeout
- Handling RequestVote
- Handling RequestVote Response
- Leader Failures
- 6 Log Replication Deep Dive
- AppendEntries RPC
- Handling AppendEntries
- Handling AppendEntries Response
- Retrying after Failures
- 7 Safety Deep Dive
- The State Machine Safety Property
- 8 Membership Changes Deep Dive
- Joint Consensus
- 9 Raft in the Real World
- Hashicorp Raft Library
- Consul
- RabbitMQ Quorum Queues
- MySQL Orchestrator
Consensus
Consensus algorithms are a key component of large scale reliable distributed systems. They allow a group of machines to act as a logical unit and continue to operate in the presence of failures. The point of consensus is for all of the machines to agree on a shared state. For this they maintain a replicated append-only log. The group can continue to operate in the presence of faults and network partitions as long as a majority of them can still communicate. You typically want an odd number of nodes so that the group isn't divisible into two equally sized groups.
You often see consensus algorithms in play when you have a control plane that needs fault tolerance. For example, nomad and consul masters, vault servers, and mysql orchestrator all use raft. You also see it in the data plane when you need reliable replication of state, e.g. rabbitmq quorum queues and mysql clustering (this uses Paxos but the point stands).
The Main Components of Raft
Leader Election
Raft uses a strong form of leadership to simplify problem of maintaining replicated log. A leader is elected using randomized timeouts at startup and when the current leader becomes unreachable. At any given time a node is in one of three states: leader, follower, or candidate. Under nominal conditions there is one leader and all other servers are followers. Followers don't do anything. They simply respond to requests from the leader. The leader handles all requests from clients. If a follower receives a client request then it just forwards it to the leader.
The leader sends periodic (with a small random delay) heartbeats to its followers to let them know it is still up and functioning. If a follower doesn't hear from the leader after a certain time period, called the election timeout it enters a the third state called the candidate state. In this state is considers itself a candidate to become the leader and requests other members of the cluster to vote for it. If it receives votes from a majority of the cluster (including itself) then it transitions to the leader state and begins serving requests from clients and sending heartbeats to the other clusters members.
A new election starts a new term. The given leader will serve requests for the entire duration of the term and during that time there will be only the one leader. Some leader elections can result in a split vote and in that case the term will end with no leader and a new election and term will start. Because of the randomized timeouts split votes are very unlikely to repeat.
Log Replication
Once a leader is elected it replicates all writes it receives to the other nodes, after appending the write to its own log. The followers append the write to their own log after doing some consistency checks and respond to the leader. When the leader sees that the log entry is replicated to a majority of the cluster it considers the log entry committed and applies it to its state.
The followers do a lot of checks to make sure their log is consistent with the leaders log. If any of those checks fail then the leader, which keeps track of the point up to which logs have been applied on each follower, tries to find the last point where the log of the follower was consistent with its own log and resend log entries to the follower from that point on. This fixes the follower's log to be consistent with the leaders.
Safety
Membership Changes
Leader Election
Raft's leader election process is designed to be simple and efficient. The leader is responsible for managing the replication of log entries to followers. If the leader fails, a new leader must be elected before log replication can continue. The leader election process is triggered by a follower detecting that it has not received a heartbeat from the leader in a certain amount of time. This timeout is known as the election timeout. When a follower times out, it starts a new election by transitioning to the candidate state and sending RequestVote RPCs to other servers in the cluster. If the candidate receives votes from a majority of the cluster, it becomes the leader.
Log Replication
The main operation of Raft's log replication is the AppendEntries RPC call. This call is made by the leader to each peer. It's also used as a heartbeat as described in the Leader Election section. Typically the heartbeats are AppendEntries calls with no log entries. However, when the leader is retrying failed AppendEntries calls, it will include log entries in the heartbeat messages to followers that are behind.
The high-level happy-path flow of log replication is as follows:
- The leader receives a write request from a client.
- The leader appends the new entry to its log.
- The leader sends AppendEntries RPCs to all followers to replicate the new entry.
- Each follower processes the AppendEntries RPC and appends the new entry to its log if the new entry is consistent with its log. It then responds to the leader letting it know if the entry was successfully replicated.
- Once the leader has received enough successful responses to know that the log entries have been replicated to a majority of the cluster, it considers the entry committed and applies it to its state machine. The commit index is updated.
- The leader responds to the client with the result of the write request.
- On the next AppendEntries RPC followers see the updated commit index and apply the entry to their state machines.
This of course ignores the many failure scenarios that can occur. Raft is designed to handle these failures and ensure that the log remains consistent among all cluster members. If a log entry or entries cannot be saved among a majority then Raft ensures those entries are never committed and the client will not receive a success response. Typically, the leader will give up on retries after some timeout has elapsed at which point it will send a failure response back to the client. We'll cover the non-happy-path scenarios in the following sections.
Log Replication Implementation Detail
Let's go through the log replication process in more detail and look at how it's implemented in the Hashicorp Raft library. The Hashicorp Raft library is written in Go and is a popular implementation. One of the authors of Raft, Diego Ongaro, was involved in the development of the library. We'll compare how I'm implementing it in MIT's distributed systems course to the real implementation by Hashicorp. This is mainly just to help me understand and verify the correctness of my implementation but hopefully serves as a good deep dive into how the replication works.
Leader Appends to its Log and sends RPCs to Followers
The client sending a write to the leader isn't very interesting so let's start at step 2 "The leader appends the new entry to its log". In my 6.5840 code this is pretty straightforward. We create a new log entry with the command that will be applied to the state machine (if committed) and the current term and append it to the log.
logent := &logEntry{
Command: command,
Term: rf.currentTerm,
}
// Append the log entry
rf.log = append(rf.log, logent)
My log is just a simple slice of log entries
log []*logEntry
TODO: Hashicorp equivalent
Next is step 3, "the leader sends AppendEntries RPCs to all followers". How does that look in my MIT lab code?
// Send AppendEntries RPCs to all other servers to replicate the log
for idx := range rf.peers {
if idx == rf.me {
continue // don't send AppendEntries RPC to self
}
peerIdx := idx
if len(rf.log) >= rf.nextIndex[peerIdx] {
// If last log index ≥ nextIndex for a follower: send AppendEntries RPC with log entries starting at nextIndex
entries := &AppendEntries{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: rf.nextIndex[peerIdx] - 1,
PrevLogTerm: rf.log[rf.nextIndex[peerIdx]-1].Term,
Entries: rf.log[rf.nextIndex[peerIdx]:],
LeaderCommit: rf.commitIndex,
}
go func() {
rf.appendEntriesAndHandleResponse(peerIdx, entries)
}()
}
}
The AppendEntries RPC includes the leader's current term, its id, the index of the log entry immediately preceding new ones in for the given peer, the term of that same preceding log entry, the actual entries to append (starting at the next index we are tracking for the given peer), and the leader's commit index. This is all specified in figure 2 of the Raft paper.
TODO: Hashicorp
The Followers Handle and Respond to AppendEntries
Okay next step is for the followers to process the AppendEntries RPC. This is where things get more complicated. If the AppendEntries has a term number less than the follower's term then the follower immediately replies false and the AppendEntries fails. The leader shouldn't be behind the followers!
// Initialize reply
reply.Term = rf.currentTerm
reply.VoteGranted = false
...
// 1. Reply false if term < currentTerm (§5.1)
if args.Term < rf.currentTerm {
DPrintf("Server %d: AppendEntries RPC reply sent to server %d. Term %d < currentTerm %d", rf.me, args.LeaderId, args.Term, rf.currentTerm)
return // Failed AppendEntries
}
Next the follower needs to ensure that the log entries the leader sent are consistent with its own log. To do that it first checks that the term of the preceding log entries match.
// Note: Even heartbeats can contain log entries bc they are used to retry failed appends (e.g. follower logs is
// inconsistent with leader). The leader will have decremented nextIndex for the follower that failed to append.
if len(args.Entries) > 0 {
// 2. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
prevLogSliceIndex := args.PrevLogIndex - 1
if prevLogSliceIndex < 0 || prevLogSliceIndex >= len(rf.log) {
DPrintf("Server %d: Index %d out of bounds, sliceIndex: %d", rf.me, args.PrevLogIndex, prevLogSliceIndex)
panic("Index out of bounds")
}
matchingPrevIndexLogEntry := rf.log[prevLogSliceIndex]
if matchingPrevIndexLogEntry.Term != args.PrevLogTerm {
DPrintf("Server %d: AppendEntries RPC reply sent to server %d. Log doesn't contain an entry at prevLogIndex %d whose term matches prevLogTerm %d", rf.me, args.LeaderId, args.PrevLogIndex, args.PrevLogTerm)
return // Failed AppendEntries
}
...
}
If the previous log entries terms match then we can proceed to check that the rest of the log is consistent. If there's a mismatch then the follower must delete the conflicting entry and everything that follows it.
if len(args.Entries) > 0 {
...
// 3. If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it (§5.3)
for i := prevLogSliceIndex + 1; i < len(rf.log); i++ {
// i - prevLogSliceIndex - 1 gives the correct index into args.Entries
if rf.log[i].Term != args.Entries[i-prevLogSliceIndex-1].Term {
rf.log = truncateLog(rf.log, i)
break
}
}
...
}
Once we verify the log is consistent with the leaders log up until this point then we can append the new log entries.
if len(args.Entries) > 0 {
...
// 4. Append any new entries not already in the log
for i, entry := range args.Entries {
logIndex := args.PrevLogIndex + 1 + i
if logIndex-1 >= len(rf.log) {
rf.log = append(rf.log, entry)
}
}
...
}
Now the log is appended. A final check the follower does when responding to each AppendEntries is to move the commit index forward to match the leaders commit index. After updating its term to match the leader's term, the follower can reply success.
if len(args.Entries) > 0 {
...
}
// 5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry) (§5.3)
// The min() ensures that the follower's commitIndex is updated safely, preventing it committing entries that it
// hasn't received yet. The leader's commitIndex could be beyond the follower's last log index. In that case, we
// adjust the commitIndex to the follower's last log index.
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = min(args.LeaderCommit, len(rf.log))
rf.applyCommittedEntries()
}
reply.Term = rf.currentTerm
reply.Success = true // Successful AppendEntries
The Leader Handles the Responses
Now back on the leader we have to handle the responses from the followers.