MIT 6.5840 Notes: Lab 3B - Raft Log Replication

Introduction

In last post, we have implemented the leader election mechanism of Raft. In this post, we will implement the log replication mechanism of Raft.

Log replication is a critical component of the Raft consensus algorithm. It ensures that log entries are consistently replicated across a majority of servers, maintaining the integrity and consistency of the system state. In this lab, we will implement the log replication mechanism of Raft.

How Log Replication Works?

For example, consider a K/V server cluster built on the Raft consensus protocol. When a client sends a put request to the K/V server, the server creates a log entry containing the request and forwards it to the Raft leader. The leader then replicates the log entry to its followers. Once the log entry is replicated to a majority of servers (more than half), the leader commits the log entry and applies the put request to store the key-value pair in the K/V server.

Handling Leader Crashes

If the leader crashes before the log entry is committed, there are three possible scenarios:

  • Majority of followers have the log entry. In this case, a new leader with the log entry will be elected, and log replication will continue seamlessly.
  • Majority of followers do not have the log entry, but a new leader with the log entry is elected. The new leader will replicate the log entry to the followers, ensuring consistency.
  • Majority of followers do not have the log entry, and a new leader without the log entry is elected. In this situation, some followers may have the log entry while others do not. Followers with the log entry will truncate it, preventing the log entry from being committed. This is not an issue, as the client is aware of the failed operation and can handle it appropriately.

After a long time, the old leader becomes online again. The old leader will become a follower and replicate the log entries from the new leader to catch up with the current state of the system.

The key takeaway is that log replication ensures that log entries are consistently replicated across a majority of servers, maintaining the integrity and consistency of the system state.

Handling Network Partitions

Consider a Raft cluster with five servers: S1, S2, S3, S4, and S5. The leader is S1, with S2, S3, S4, and S5 as followers. S1 and S2 are in the same network segment (rank), while S3, S4, and S5 are in another. Suddenly, the switch connecting the two segments fails, causing a network partition. The servers are split into two groups: [S1, S2] and [S3, S4, S5].

In this situation, S3, S4, and S5 will elect a new leader, such as S3, while S2 will still believe S1 is the leader. Log entries sent to S1 will not be committed because a majority of servers do not receive them. However, log entries sent to S3 will be committed.

When the switch is fixed and the network partition is resolved, but S3 goes offline, S1 will still think it is the leader. After sending AppendEntries RPCs to other servers, S1 will realize its term is lower than the others. It will step down to a follower role, triggering a new leader election. The new leader will not be S1 or S2; it might be S4 or S5. S1 and S2 will truncate any conflicting log entries and synchronize with the current system state.

If the servers are partitioned into three or more groups, with no group holding a majority (e.g., [S1, S2], [S3, S4], [S5]), and S5 is the old leader, log entries sent to S5 will not be committed. The other groups will repeatedly attempt to elect a leader but will fail. Once the network partition is healed, S5 will first become a follower. It might be elected as a leader again, or not—it doesn’t matter. Since the log entries sent to S5 were not committed, they can be safely truncated.

How to vote for avoiding unnecessary log truncation?

Imagine a more complex scenario: after a series of network partitions and leader crashes, all issues are eventually resolved. However, due to many rounds of leader elections, each server now has different uncommitted log entries. The challenge is to minimize the number of log entries that need to be truncated to restore consistency.

Before voting in a leader election, a follower must check if the candidate’s log is at least as up-to-date as its own log. The follower uses the following rules:

  1. Compare Terms: The follower first looks at the term of the last log entry from the candidate. A higher term is preferred because it likely contains more recent log entries.

  2. Compare Log Indexes: If the terms are the same, the follower then compares the log index. If the candidate’s log index is equal to or greater than the follower’s log index, the follower will vote for the candidate.

  3. Rejecting Candidates: If the candidate’s log is not up-to-date (i.e., lower term or smaller log index), the follower will reject the candidate.

By following these rules, only candidates with the most up-to-date logs can become the leader. This approach helps avoid unnecessary log truncation and ensures the system converges to a consistent state more efficiently.

Summary

In summary, in a Raft cluster with 2F + 1 servers, the system can tolerate up to F server failures and still function correctly. If more than F servers fail, the system will stop working.

Implementing Log Replication

States

At first, we must add log entries to the Raft state. We can define a LogEntry struct to represent a log entry. Each log entry contains a term and a command. The term is the term in which the log entry was created, and the command is provided from application layer.

commitIndex is the index of the highest log entry known to be committed. lastApplied is the index of the highest log entry applied to the application state machine.

nextIndex[i] is the index of the next log entry to send to server i. matchIndex[i] is the index of the highest log entry known to be replicated on server i, only leader maintains these two arrays.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type Raft struct {
    mu        sync.Mutex          // Lock to protect shared access to this peer's state
    peers     []*labrpc.ClientEnd // RPC end points of all peers
    persister *tester.Persister   // Object to hold this peer's persisted state
    me        int                 // this peer's index into peers[]
    dead      int32               // set by Kill()

    applyCh chan raftapi.ApplyMsg

    // Persistent state
    currentTerm int
    voteFor     int
    logs        []LogEntry

    // Volatile state
    commitIndex int
    lastApplied int

    // Volatile state on leaders
    nextIndex  []int
    matchIndex []int

    // Other state
    state        State
    recvCh       chan struct{}
    toLeaderCh   chan struct{}
    toFollowerCh chan struct{}
    grantVoteCh  chan struct{}
}

type LogEntry struct {
    Term    int
    Command interface{}
}

AppendEntries RPC

It’s a bit complex to implement the AppendEntries RPC. The leader sends AppendEntries RPCs to followers to replicate log entries. The followers respond with success or failure, depending on whether the log entry can be replicated. We must handle conflicting log entries and ensure that the system converges to a consistent state.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
type AppendEntriesArgs struct {
    Term         int
    LeaderId     int
    PrevLogIndex int
    PrevLogTerm  int
    Entries      []LogEntry
    LeaderCommit int
}

type AppendEntriesReply struct {
    Term    int
    Success bool

    // for optimization (not in paper)
    ConflictIndex int
    ConflictTerm  int
}

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    reply.Term = rf.currentTerm
    reply.Success = false
    reply.ConflictTerm = -1
    reply.ConflictIndex = -1

    if args.Term < rf.currentTerm {
        return
    }

    if args.Term > rf.currentTerm {
        rf.toFollower(args.Term)
    }

    sendToChannel(rf.recvCh)

    // ... handle append entries
}

func (rf *Raft) broadcastAppendEntries() {
    for i := range rf.peers {
        if i == rf.me {
            continue
        }

        go func(i int) {

            rf.mu.Lock()

            args := &AppendEntriesArgs{
                Term:         rf.currentTerm,
                LeaderId:     rf.me,
                PrevLogIndex: rf.nextIndex[i] - 1,
                PrevLogTerm:  rf.logs[rf.nextIndex[i]-1].Term,
                Entries:      rf.logs[rf.nextIndex[i]:],
                LeaderCommit: rf.commitIndex,
            }
            rf.mu.Unlock()

            reply := &AppendEntriesReply{}
            if ok := rf.sendAppendEntries(i, args, reply); !ok {
                return
            }

            // ... handle reply
        }(i)
    }
}

For leader, it sends AppendEntries RPCs to followers periodically, the request includes the term, leader ID, previous log index and term, log entries to replicate, and the leader commit index. The follower checks the term and updates its state accordingly. If the term is higher than the follower’s current term, it transitions to the follower state and updates its term. If the term is lower, the follower rejects the request.

Handle Conflicting Log Entries

For optimization, we can add two fields to the AppendEntriesReply struct: ConflictIndex and ConflictTerm. If the follower detects a conflict, it sets these fields to the index and term of the conflicting log entry. The leader can use this information to optimize log replication by removing conflicting log entries and appending new ones. It’s more easy to understand and implement than binary search.

If the follower’s log have different log entries with leader’s log, the follower will truncate the conflicting log entries and append new log entries.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    // ... handle term

    lastIndex := rf.getLastIndex()

    // follower's log is shorter than leader's log
    if args.PrevLogIndex > lastIndex {
        reply.ConflictIndex = lastIndex + 1
        return
    }

    // check previous log term
    if prevLogTerm := rf.logs[args.PrevLogIndex].Term; prevLogTerm != args.PrevLogTerm {
        reply.ConflictTerm = prevLogTerm

        for i := args.PrevLogIndex - 1; i >= 0; i-- { // find all conflict logs in prevLogTerm
            if rf.logs[i].Term != prevLogTerm {
                reply.ConflictIndex = i + 1
                break
            }
        }

        return
    }

    // remove conflict logs
    i := args.PrevLogIndex + 1 // last not conflict log
    j := 0                     // head of entries which is not conflict
    for ; i <= lastIndex && j < len(args.Entries); i, j = i+1, j+1 {
        if rf.logs[i].Term != args.Entries[j].Term { // conflict
            break
        }
    }
    rf.logs = append(rf.logs[:i], args.Entries[j:]...) // remove conflict logs and append new logs

    reply.Success = true // success append logs

    // ... update commitIndex
}

For leader, it will update nextIndex and matchIndex after receiving the AppendEntries reply. If the follower’s log is shorter than the leader’s log, the leader will decrease nextIndex and matchIndex to the follower’s log length. If the follower’s log has a term conflict, the leader will find the last log entry with the same term and update nextIndex and matchIndex to that index.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (rf *Raft) broadcastAppendEntries() {
     for i := range rf.peers {
        if i == rf.me {
            continue
        }

        go func(i int) {
            // .. send AppendEntries RPC

            rf.mu.Lock()
            defer rf.mu.Unlock()
            // not leader or old term
            if rf.state != State_Leader || args.Term != rf.currentTerm || reply.Term < rf.currentTerm {
                return
            }

            if reply.Term > rf.currentTerm {
                rf.toFollower(reply.Term)
                return
            }

            if reply.Success { // success append logs
                newMatchIndex := args.PrevLogIndex + len(args.Entries)
                rf.matchIndex[i] = max(rf.matchIndex[i], newMatchIndex)
                rf.nextIndex[i] = rf.matchIndex[i] + 1
            } else if reply.ConflictTerm < 0 { // follower's log is shorter than leader's log
                rf.nextIndex[i] = reply.ConflictIndex
                rf.matchIndex[i] = rf.nextIndex[i] - 1
            } else { // term conflict in reply.ConflictTerm
                newNextIndex := rf.getLastIndex()
                for ; newNextIndex > 0; newNextIndex-- {
                    if rf.logs[newNextIndex].Term == reply.ConflictTerm {
                        break
                    }
                }

                if newNextIndex > 0 {
                    rf.nextIndex[i] = newNextIndex
                } else {
                    rf.nextIndex[i] = reply.ConflictIndex
                }

                rf.matchIndex[i] = rf.nextIndex[i] - 1
            }

            // ... update commitIndex
        }(i)
    }
}

Commit Log Entries

After the leader receives the reply from the followers, it will update the commitIndex to the minimum value of the majority of matchIndex. If the leader finds a majority of servers have replicated a log entry at index i, it will set commitIndex to i.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (rf *Raft) broadcastAppendEntries() {
    for i := range rf.peers {
        if i == rf.me {
            continue
        }

        go func(i int) {
            // ... send AppendEntries RPC
            // ... handle reply

            // update commitIndex
            for n := rf.getLastIndex(); n >= rf.commitIndex; n-- {
                count := 1
                for j := range rf.peers {
                    if j != rf.me && rf.matchIndex[j] >= n {
                        count++
                    }
                }

                if count > len(rf.peers)/2 {
                    rf.commitIndex = n
                    break
                }
            }
        }(i)
    }
}

For follower, it just need to set commitIndex to the minimum value of the leader’s commit index and the last log index.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    // ... handle term
    // ... handle append entries

    // update commitIndex
    if args.LeaderCommit > rf.commitIndex {
        lastIndex = rf.getLastIndex()

        rf.commitIndex = min(args.LeaderCommit, lastIndex) // in paper figure 2 AppendEntries RPC (5)
    }
}

After updating the commitIndex, applier goroutine will apply the log entries to the application state machine. It will apply the log entries from lastApplied + 1 to commitIndex.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (rf *Raft) applyLogs() {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
        msg := raftapi.ApplyMsg{
            CommandValid: true,
            Command:      rf.logs[rf.getLogIndex(i)].Command,
            CommandIndex: i,
        }

        select {
        case rf.applyCh <- msg:
        default:
            return // avoid blocking
        }

        rf.lastApplied = i
    }
}

func (rf *Raft) applier() {
    for !rf.killed() {
        rf.applyLogs()

        time.Sleep(10 * time.Millisecond)
    }
}

func Make(peers []*labrpc.ClientEnd, me int,
    persister *tester.Persister, applyCh chan raftapi.ApplyMsg) raftapi.Raft {
    // ... init Raft
    go rf.ticker()
    go rf.applier()
    return rf
}

RequestVote RPC

Here is just a little change in the RequestVote RPC. If the candidate’s log is not up-to-date, the follower will reject the candidate.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
type RequestVoteArgs struct {
    Term         int
    CandidateId  int
    LastLogIndex int
    LastLogTerm  int
}

type RequestVoteReply struct {
    Term        int
    VoteGranted bool
}

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    DPrintf("[%d] Server %d recvied voted request from %d", rf.currentTerm, rf.me, args.CandidateId)

    if args.Term < rf.currentTerm { // candidate's Term is lower
        reply.Term = rf.currentTerm
        reply.VoteGranted = false // not grant
        return
    } else if args.Term > rf.currentTerm {
        rf.toFollower(args.Term)
    }

    // grant
    reply.Term = rf.currentTerm
    if (rf.voteFor < 0 || rf.voteFor == args.CandidateId) &&
        rf.isLogUpper(args.LastLogTerm, args.LastLogIndex) {
        rf.voteFor = args.CandidateId
        reply.VoteGranted = true
        sendToChannel(rf.grantVoteCh)
        DPrintf("[%d] Server %d voted for %d", rf.currentTerm, rf.me, rf.voteFor)
    }

    return
}

func (rf *Raft) isLogUpper(term, index int) bool {
    lastTerm := rf.getLastTerm()
    lastIndex := rf.getLastIndex()

    if term == lastTerm { // same term, compare log index
        return index >= lastIndex
    }

    return term > lastTerm
}

Conclusion

In conclusion, log replication is a critical component of the Raft consensus algorithm. It ensures that log entries are consistently replicated across a majority of servers, maintaining the integrity and consistency of the system state. In this lab, we implemented the log replication mechanism of Raft, handling conflicting log entries, updating the commit index, and applying log entries to the application state machine. By implementing log replication, we have taken a significant step towards building a fault-tolerant and consistent distributed system.

In the next post, we will implement the Raft persistence and log compaction mechanism, ensuring that the Raft state is durable and efficient. Stay tuned for more updates!

My implementation of this lab can be found on GitHub, hope it helps you understand the concepts better. If you have any questions or feedback, feel free to send me an email. Thank you for reading!

Built with Hugo
Theme Stack designed by Jimmy