MIT 6.5840 Notes: Lab 3C/D - Raft Persistence and Log Compaction

Introduction

In past two labs, we have implemented the core of Raft consensus algorithm. In this lab, we will focus on the persistence and log compaction of Raft.

Principles

Persistence

In Raft, certain state information must be persisted to ensure data consistency across server restarts. This persistent state includes:

  • Current term number: Tracks the current term in the Raft consensus process.
  • Voted candidate: Records which candidate received the server’s vote in the current term.
  • Log entries: Stores all log entries to maintain the replicated state machine.

In Raft, state is persisted immediately after every change. This approach is simple and reliable, ensuring the persisted state is always up-to-date. Here are specific situations where persistence is required:

  • Before responding to RequestVote RPC: Persist the voted candidate immediately after casting a vote for a term.
  • Before responding to AppendEntries RPC: Persist new log entries as soon as they are appended.
  • Before replying to any RPC after a term update: Persist the new term number to avoid inconsistencies.

By following these guidelines, Raft maintains a durable and consistent state, even in the event of server failures or restarts.

If you want to store the persistent state in a file, you should call fsync system call to ensure the data is written to disk. This call guarantees that the data is flushed to the disk and is not lost in the event of a crash. It’s the performance bottleneck of Raft, but it’s necessary to ensure data consistency.

Log Compaction

In Raft, the log can grow indefinitely over time, consuming more storage and slowing down log replication and server restarts. For example, in a Raft-based K/V store, a high volume of write requests can rapidly expand the log, leading to performance issues. During a restart, the server must replay all log entries, which can be time-consuming. Therefore, log compaction is crucial for maintaining performance and efficiency.

Snapshot is a common approach to log compaction. A snapshot is a point-in-time copy of the server state, including the log entries up to that point. The consensus layer don’t know how to apply the log entires to K/V store, so it’s the responsibility of the application layer to generate the snapshot. The consensus layer only needs to know the snapshot’s index and term number.

When a snapshot is generated, the server discards all log entries up to the snapshot’s index and term number. This process reduces the log size and improves performance. The snapshot is then saved to disk, and the server can resume normal operation. When a server restarts, it loads the snapshot and replays the log entries from that point onwards.

When a lagging server or a new server joins the cluster, leader can send the snapshot to it. The follower can apply the snapshot and replay the log entries from the snapshot’s index and term number. This approach is efficient and reduces the time required and network bandwidth consumed during log replication.

Implementation

Persistence States

In this lab, the Persister struct abstracts the process of reading and writing persistent data. It offers two methods: Save and Read, making it straightforward to encode and decode the persistent state using the labgob package.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
type PersistentStatus struct {
    CurrentTerm       int
    VoteFor           int
    Logs              []LogEntry
    LastSnapshotIndex int
}

func (rf *Raft) persist() {
    w := new(bytes.Buffer)
    e := labgob.NewEncoder(w)

    persistentStatus := &PersistentStatus{
        CurrentTerm:       rf.currentTerm,
        VoteFor:           rf.voteFor,
        Logs:              rf.logs,
        LastSnapshotIndex: rf.lastSnapshotIndex,
    }

    if e.Encode(persistentStatus) != nil {
        panic("persist encode failed")
    }

    rf.persister.Save(w.Bytes(), rf.snapshot)
}

func (rf *Raft) readPersist(data []byte) {
    if data == nil || len(data) < 1 { // bootstrap without any state?
        return
    }

    r := bytes.NewBuffer(data)
    d := labgob.NewDecoder(r)

    persistentStatus := &PersistentStatus{}

    if d.Decode(persistentStatus) != nil {
        panic("failed to decode raft persistent state")
    }

    rf.currentTerm = persistentStatus.CurrentTerm
    rf.voteFor = persistentStatus.VoteFor
    rf.logs = persistentStatus.Logs
    rf.lastSnapshotIndex = persistentStatus.LastSnapshotIndex
    // rf.logs[0].Term = persistentStatus.LastSnapshotIndex
    rf.commitIndex = rf.lastSnapshotIndex
    rf.lastApplied = rf.lastSnapshotIndex
    rf.snapshot = rf.persister.ReadSnapshot()
}

Create Snapshot

Application layer will generate the snapshot and call Raft’s Snapshot method to save the snapshot to consensus layer.

After saving the snapshot, Raft will discard all log entries up to the snapshot’s index and term number. Because the log entries’ index will be moved left, we need to calculate the real index of the log entries when we access them.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (rf *Raft) Snapshot(index int, snapshot []byte) {
    // Your code here (3D).
    rf.mu.Lock()
    defer rf.mu.Unlock()

    if rf.lastSnapshotIndex >= index || // have snapshot
        index > rf.lastApplied { // not applied
        return // skip
    }

    defer rf.persist()

    // split logs
    split := rf.getLogIndex(index)
    rf.lastSnapshotIndex = index
    rf.logs = append([]LogEntry{{Term: rf.logs[split].Term}}, rf.logs[split+1:]...)
    rf.snapshot = snapshot
}

func (rf *Raft) getLastIndex() int {
    return rf.lastSnapshotIndex + len(rf.logs) - 1
}

func (rf *Raft) getLastTerm() int {
    return rf.logs[len(rf.logs)-1].Term
}

func (rf *Raft) getLogIndex(originalIndex int) int {
    if originalIndex < rf.lastSnapshotIndex {
        panic("snapshot index")
    }

    return originalIndex - rf.lastSnapshotIndex
}

func (rf *Raft) getLogRealIndex(index int) int {
    return index + rf.lastSnapshotIndex
}

Install Snapshot

When a follower receives a snapshot from the leader, it should install the snapshot and discard all log entries up to the snapshot’s index and term number.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
type InstallSnapshotArgs struct {
    Term              int
    LeaderId          int
    LastSnapshotIndex int
    LastSnapshotTerm  int
    Snapshot          []byte
}

type InstallSnapshotReply struct {
    Term int
}

func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
    DPrintf("[%d] Server %d recvied snapshot request from %d", rf.currentTerm, rf.me, args.LeaderId)
    rf.mu.Lock()
    defer rf.mu.Unlock()

    reply.Term = rf.currentTerm
    if args.Term < rf.currentTerm { // old term
        return
    }

    if args.Term > rf.currentTerm {
        rf.toFollower(args.Term)
    }

    sendToChannel(rf.recvCh)

    if args.LastSnapshotIndex <= rf.lastSnapshotIndex { // have snapshot
        return
    }

    defer rf.persist()

    rf.lastSnapshotIndex = args.LastSnapshotIndex
    rf.logs[0].Term = args.LastSnapshotTerm
    rf.commitIndex = args.LastSnapshotIndex
    rf.lastApplied = args.LastSnapshotIndex
    rf.snapshot = args.Snapshot

    msg := raftapi.ApplyMsg{
        SnapshotValid: true,
        Snapshot:      args.Snapshot,
        SnapshotTerm:  args.LastSnapshotTerm,
        SnapshotIndex: args.LastSnapshotIndex,
    }

    for i := 1; i <= len(rf.logs); i++ {
        if rf.getLogRealIndex(i) == args.LastSnapshotIndex && rf.logs[i].Term == args.LastSnapshotTerm { // have later logs
            rf.logs = append([]LogEntry{{Term: args.LastSnapshotTerm}}, rf.logs[i+1:]...) // keep later logs
            rf.applyCh <- msg

            return
        }
    }

    // no later logs, can remove all logs
    rf.logs = []LogEntry{{Term: args.LastSnapshotTerm}}

    rf.applyCh <- msg

    return
}

Leader sends snapshot

When a leader detects that a follower is lagging, it can send a snapshot to the follower. The follower will install the snapshot and replay the log entries from the snapshot’s index and term number.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
func (rf *Raft) leaderBroadcast() {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    for i := range rf.peers {
        if i == rf.me {
            continue
        }

        if rf.nextIndex[i] <= rf.lastSnapshotIndex { // can send snapshot
            args := &InstallSnapshotArgs{
                Term:              rf.currentTerm,
                LeaderId:          rf.me,
                LastSnapshotIndex: rf.lastSnapshotIndex,
                LastSnapshotTerm:  rf.logs[0].Term,
                Snapshot:          rf.snapshot,
            }

            go rf.handleInstallSnapshot(i, args)
        } else {
            args := &AppendEntriesArgs{
                Term:         rf.currentTerm,
                LeaderId:     rf.me,
                PrevLogIndex: rf.nextIndex[i] - 1,
                PrevLogTerm:  rf.logs[rf.getLogIndex(rf.nextIndex[i]-1)].Term,
                Entries:      rf.logs[rf.getLogIndex(rf.nextIndex[i]):],
                LeaderCommit: rf.commitIndex,
            }

            if args.PrevLogIndex == rf.lastSnapshotIndex {
                args.PrevLogTerm = rf.logs[0].Term
            }

            go rf.handleAppendEntries(i, args)
        }
    }
}

func (rf *Raft) handleInstallSnapshot(server int, args *InstallSnapshotArgs) {
    reply := &InstallSnapshotReply{}
    if ok := rf.sendInstallSnapshot(server, args, reply); !ok {
        return
    }

    rf.mu.Lock()
    defer rf.mu.Unlock()
    defer rf.persist()

    if rf.state != State_Leader || args.Term != rf.currentTerm || reply.Term < rf.currentTerm {
        return
    }

    if reply.Term > rf.currentTerm {
        rf.toFollower(reply.Term)
        return
    }

    rf.nextIndex[server] = args.LastSnapshotIndex + 1
    rf.matchIndex[server] = args.LastSnapshotIndex

    // update commitIndex
    for n := rf.getLastIndex(); n >= rf.commitIndex; n-- {
        count := 1
        for j := range rf.peers {
            if j != rf.me && rf.matchIndex[j] >= n {
                count++
            }
        }

        if count > len(rf.peers)/2 {
            rf.commitIndex = n
            break
        }
    }
}

Conclusion

In this lab, we have implemented the persistence and log compaction of Raft. By persisting the state information and compacting the log entries, Raft ensures data consistency and maintains performance efficiency. The snapshot mechanism is a powerful tool for reducing log size and improving server restart times.

My implementation of this lab can be found on GitHub, hope it helps you understand the concepts better. If you have any questions or feedback, feel free to send me an email. Thank you for reading!

Built with Hugo
Theme Stack designed by Jimmy