Introduction
In past two labs, we have implemented the core of Raft consensus algorithm. In this lab, we will focus on the persistence and log compaction of Raft.
Principles
Persistence
In Raft, certain state information must be persisted to ensure data consistency across server restarts. This persistent state includes:
- Current term number: Tracks the current term in the Raft consensus process.
- Voted candidate: Records which candidate received the server’s vote in the current term.
- Log entries: Stores all log entries to maintain the replicated state machine.
In Raft, state is persisted immediately after every change. This approach is simple and reliable, ensuring the persisted state is always up-to-date. Here are specific situations where persistence is required:
- Before responding to
RequestVote
RPC: Persist the voted candidate immediately after casting a vote for a term. - Before responding to
AppendEntries
RPC: Persist new log entries as soon as they are appended. - Before replying to any RPC after a term update: Persist the new term number to avoid inconsistencies.
By following these guidelines, Raft maintains a durable and consistent state, even in the event of server failures or restarts.
If you want to store the persistent state in a file, you should call fsync
system call to ensure the data is written to disk. This call guarantees that the data is flushed to the disk and is not lost in the event of a crash. It’s the performance bottleneck of Raft, but it’s necessary to ensure data consistency.
Log Compaction
In Raft, the log can grow indefinitely over time, consuming more storage and slowing down log replication and server restarts. For example, in a Raft-based K/V store, a high volume of write requests can rapidly expand the log, leading to performance issues. During a restart, the server must replay all log entries, which can be time-consuming. Therefore, log compaction is crucial for maintaining performance and efficiency.
Snapshot is a common approach to log compaction. A snapshot is a point-in-time copy of the server state, including the log entries up to that point. The consensus layer don’t know how to apply the log entires to K/V store, so it’s the responsibility of the application layer to generate the snapshot. The consensus layer only needs to know the snapshot’s index and term number.
When a snapshot is generated, the server discards all log entries up to the snapshot’s index and term number. This process reduces the log size and improves performance. The snapshot is then saved to disk, and the server can resume normal operation. When a server restarts, it loads the snapshot and replays the log entries from that point onwards.
When a lagging server or a new server joins the cluster, leader can send the snapshot to it. The follower can apply the snapshot and replay the log entries from the snapshot’s index and term number. This approach is efficient and reduces the time required and network bandwidth consumed during log replication.
Implementation
Persistence States
In this lab, the Persister
struct abstracts the process of reading and writing persistent data. It offers two methods: Save
and Read
, making it straightforward to encode and decode the persistent state using the labgob
package.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| type PersistentStatus struct {
CurrentTerm int
VoteFor int
Logs []LogEntry
LastSnapshotIndex int
}
func (rf *Raft) persist() {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
persistentStatus := &PersistentStatus{
CurrentTerm: rf.currentTerm,
VoteFor: rf.voteFor,
Logs: rf.logs,
LastSnapshotIndex: rf.lastSnapshotIndex,
}
if e.Encode(persistentStatus) != nil {
panic("persist encode failed")
}
rf.persister.Save(w.Bytes(), rf.snapshot)
}
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
persistentStatus := &PersistentStatus{}
if d.Decode(persistentStatus) != nil {
panic("failed to decode raft persistent state")
}
rf.currentTerm = persistentStatus.CurrentTerm
rf.voteFor = persistentStatus.VoteFor
rf.logs = persistentStatus.Logs
rf.lastSnapshotIndex = persistentStatus.LastSnapshotIndex
// rf.logs[0].Term = persistentStatus.LastSnapshotIndex
rf.commitIndex = rf.lastSnapshotIndex
rf.lastApplied = rf.lastSnapshotIndex
rf.snapshot = rf.persister.ReadSnapshot()
}
|
Create Snapshot
Application layer will generate the snapshot and call Raft’s Snapshot
method to save the snapshot to consensus layer.
After saving the snapshot, Raft will discard all log entries up to the snapshot’s index and term number. Because the log entries’ index will be moved left, we need to calculate the real index of the log entries when we access them.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (3D).
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.lastSnapshotIndex >= index || // have snapshot
index > rf.lastApplied { // not applied
return // skip
}
defer rf.persist()
// split logs
split := rf.getLogIndex(index)
rf.lastSnapshotIndex = index
rf.logs = append([]LogEntry{{Term: rf.logs[split].Term}}, rf.logs[split+1:]...)
rf.snapshot = snapshot
}
func (rf *Raft) getLastIndex() int {
return rf.lastSnapshotIndex + len(rf.logs) - 1
}
func (rf *Raft) getLastTerm() int {
return rf.logs[len(rf.logs)-1].Term
}
func (rf *Raft) getLogIndex(originalIndex int) int {
if originalIndex < rf.lastSnapshotIndex {
panic("snapshot index")
}
return originalIndex - rf.lastSnapshotIndex
}
func (rf *Raft) getLogRealIndex(index int) int {
return index + rf.lastSnapshotIndex
}
|
Install Snapshot
When a follower receives a snapshot from the leader, it should install the snapshot and discard all log entries up to the snapshot’s index and term number.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
| type InstallSnapshotArgs struct {
Term int
LeaderId int
LastSnapshotIndex int
LastSnapshotTerm int
Snapshot []byte
}
type InstallSnapshotReply struct {
Term int
}
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
DPrintf("[%d] Server %d recvied snapshot request from %d", rf.currentTerm, rf.me, args.LeaderId)
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm { // old term
return
}
if args.Term > rf.currentTerm {
rf.toFollower(args.Term)
}
sendToChannel(rf.recvCh)
if args.LastSnapshotIndex <= rf.lastSnapshotIndex { // have snapshot
return
}
defer rf.persist()
rf.lastSnapshotIndex = args.LastSnapshotIndex
rf.logs[0].Term = args.LastSnapshotTerm
rf.commitIndex = args.LastSnapshotIndex
rf.lastApplied = args.LastSnapshotIndex
rf.snapshot = args.Snapshot
msg := raftapi.ApplyMsg{
SnapshotValid: true,
Snapshot: args.Snapshot,
SnapshotTerm: args.LastSnapshotTerm,
SnapshotIndex: args.LastSnapshotIndex,
}
for i := 1; i <= len(rf.logs); i++ {
if rf.getLogRealIndex(i) == args.LastSnapshotIndex && rf.logs[i].Term == args.LastSnapshotTerm { // have later logs
rf.logs = append([]LogEntry{{Term: args.LastSnapshotTerm}}, rf.logs[i+1:]...) // keep later logs
rf.applyCh <- msg
return
}
}
// no later logs, can remove all logs
rf.logs = []LogEntry{{Term: args.LastSnapshotTerm}}
rf.applyCh <- msg
return
}
|
Leader sends snapshot
When a leader detects that a follower is lagging, it can send a snapshot to the follower. The follower will install the snapshot and replay the log entries from the snapshot’s index and term number.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
| func (rf *Raft) leaderBroadcast() {
rf.mu.Lock()
defer rf.mu.Unlock()
for i := range rf.peers {
if i == rf.me {
continue
}
if rf.nextIndex[i] <= rf.lastSnapshotIndex { // can send snapshot
args := &InstallSnapshotArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
LastSnapshotIndex: rf.lastSnapshotIndex,
LastSnapshotTerm: rf.logs[0].Term,
Snapshot: rf.snapshot,
}
go rf.handleInstallSnapshot(i, args)
} else {
args := &AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: rf.nextIndex[i] - 1,
PrevLogTerm: rf.logs[rf.getLogIndex(rf.nextIndex[i]-1)].Term,
Entries: rf.logs[rf.getLogIndex(rf.nextIndex[i]):],
LeaderCommit: rf.commitIndex,
}
if args.PrevLogIndex == rf.lastSnapshotIndex {
args.PrevLogTerm = rf.logs[0].Term
}
go rf.handleAppendEntries(i, args)
}
}
}
func (rf *Raft) handleInstallSnapshot(server int, args *InstallSnapshotArgs) {
reply := &InstallSnapshotReply{}
if ok := rf.sendInstallSnapshot(server, args, reply); !ok {
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
defer rf.persist()
if rf.state != State_Leader || args.Term != rf.currentTerm || reply.Term < rf.currentTerm {
return
}
if reply.Term > rf.currentTerm {
rf.toFollower(reply.Term)
return
}
rf.nextIndex[server] = args.LastSnapshotIndex + 1
rf.matchIndex[server] = args.LastSnapshotIndex
// update commitIndex
for n := rf.getLastIndex(); n >= rf.commitIndex; n-- {
count := 1
for j := range rf.peers {
if j != rf.me && rf.matchIndex[j] >= n {
count++
}
}
if count > len(rf.peers)/2 {
rf.commitIndex = n
break
}
}
}
|
Conclusion
In this lab, we have implemented the persistence and log compaction of Raft. By persisting the state information and compacting the log entries, Raft ensures data consistency and maintains performance efficiency. The snapshot mechanism is a powerful tool for reducing log size and improving server restart times.
My implementation of this lab can be found on GitHub, hope it helps you understand the concepts better. If you have any questions or feedback, feel free to send me an email. Thank you for reading!