CS 350 · Spring 2023 · Boston University
Distributed Systems — Raft + MapReduce
A from-scratch Raft consensus implementation in Go (leader election, log replication, snapshotting) plus a MapReduce coordinator/worker, evaluated under a custom RPC fault-injection harness.
● What I built
- Raft state machine — randomized election timeouts, AppendEntries-driven log replication, snapshot install RPC.
- MapReduce coordinator and worker with plugin-loaded map/reduce functions and crash-timeout reassignment.
- Custom labrpc network simulator used to inject delays, drops, reorders for chaos-style tests.
- Read the original Raft, GFS, MapReduce, Dynamo, and PNUTS papers across the semester.
● Stack
Raft is a consensus algorithm that lets a cluster of computers agree on state even when some fail. Most people learn it from the paper — Ongaro and Ousterhout, 2014 — but reading prose about how state machines coordinate is different from making them coordinate.
I implemented Raft from scratch in Go for CS 350 in Spring 2023, starting with a bare-bones struct and ending with a fully replicating log where the leader's entries are applied to followers' state machines only after a majority agrees. The key insight is that the algorithm works because it forces three separate roles to follow strict rules: followers listen for heartbeats, candidates compete for votes, and leaders enforce their will.
What it is
Raft solves the problem: how do N machines agree on a sequence of commands when some might crash or lose network connectivity? The answer is replicated state machine: every replica executes the same sequence of commands in the same order, so they all end up with the same state.
Raft achieves this by having a single leader at any moment. The leader receives commands from clients, appends them to its own log, replicates them to followers, and only considers a command committed (safe to apply) once a majority of servers have the entry in their log. That majority property is crucial: even if the leader crashes, a majority of followers have the committed entries, so the next leader will never lose committed work.
The three node roles are:
- Follower: passive. Listens for heartbeats from the leader. If the timeout expires and no heartbeat arrives, it becomes a candidate.
- Candidate: competing for leadership. Increments the term, votes for itself, and broadcasts RequestVote RPCs to all peers. If it wins a majority, it becomes leader. If another candidate with a higher term appears, or if a new leader sends AppendEntries, it steps back down to follower.
- Leader: drives consensus. Sends periodic AppendEntries RPCs (even empty ones as heartbeats). When clients send commands, the leader appends to its log and replicates to followers. Once a majority acknowledges, the leader advances its commitIndex and applies entries to the state machine.
How it works: leader election
The election process is the engine of Raft. Every server has an election timer that resets when it hears from the leader or votes for a candidate. The timer is randomized (150–300ms in the reference implementation) to prevent two candidates from timing out simultaneously and splitting votes forever.
// When a follower's election timer expires, it becomes a candidate
type Raft struct {
mu sync.Mutex
peers []*labrpc.ClientEnd // RPC endpoints of all peers
persister *Persister // Persistent storage
me int // This peer's index
CurrentTerm int // Latest term this server has seen
VotedFor int // Who we voted for in CurrentTerm (-1 = nobody)
Log []LogItem // Log entries
CommitIndex int // Index of highest log entry applied to state machine
LastApplied int // Index of highest log entry applied
ServerState string // "follower", "candidate", or "leader"
ElectionTimer time.Time // When the election timer fires
}
The follower listens passively. When the election timer fires:
// Simplified: follower detects timeout and becomes candidate
if time.Now().After(rf.ElectionTimer) && rf.ServerState == "follower" {
rf.mu.Lock()
rf.CurrentTerm++
rf.ServerState = "candidate"
rf.VotedFor = rf.me // vote for self
rf.ElectionTimer = time.Now().Add(randomElectionTimeout())
rf.mu.Unlock()
// broadcast RequestVote to all peers
rf.broadcastRequestVote()
}
Each peer that receives the RequestVote checks:
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
// Rule 1: If term is newer, update ours and reset vote
if args.Term > rf.CurrentTerm {
rf.CurrentTerm = args.Term
rf.VotedFor = -1 // reset, so we can vote again
rf.ServerState = "follower"
}
reply.Term = rf.CurrentTerm
reply.VoteGranted = false
// Rule 2: Vote only if we haven't voted yet in this term
// AND the candidate's log is at least as up-to-date as ours
if args.Term == rf.CurrentTerm && rf.VotedFor == -1 {
if args.LastLogTerm > rf.getLastLogTerm() ||
(args.LastLogTerm == rf.getLastLogTerm() && args.LastLogIndex >= rf.getLastLogIndex()) {
rf.VotedFor = args.CandidateId
reply.VoteGranted = true
}
}
}
The candidate wins if it collects votes from a majority and becomes leader. The term number is the tie-breaker: if two candidates claim to be leaders, the one with the higher term is correct (the other will see the higher term, step down, and become a follower of the legitimate leader).
How it works: log replication
Once a leader is elected, it sends AppendEntries RPCs to all followers. These messages carry:
- The leader's current term (to force lagging followers to catch up)
prevLogIndexandprevLogTerm: the log index and term immediately before the new entries- The new entries themselves
leaderCommit: the leader's commit index (highest entry applied to its state machine)
The follower checks the log consistency property: does the entry at prevLogIndex have term prevLogTerm?
If not, it rejects and tells the leader where the conflict is. The leader then backs up and retries.
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.CurrentTerm
reply.Success = false
// Reject if leader's term is old
if args.Term < rf.CurrentTerm {
return
}
// If leader's term is newer, step down
if args.Term > rf.CurrentTerm {
rf.CurrentTerm = args.Term
rf.ServerState = "follower"
rf.VotedFor = -1
}
// Check log consistency: do we have prevLogIndex with term prevLogTerm?
lastIndex := rf.getLastLogIndex()
if args.PrevLogIndex > lastIndex {
// Our log is shorter; tell leader the gap
reply.ConflictIndex = lastIndex + 1
return
}
if rf.Log[args.PrevLogIndex].Term != args.PrevLogTerm {
// Mismatch; delete this entry and all after
rf.Log = rf.Log[:args.PrevLogIndex]
reply.Success = false
return
}
// Append new entries
rf.Log = append(rf.Log, args.Entries...)
reply.Success = true
// Apply entries up to leaderCommit
if args.LeaderCommit > rf.CommitIndex {
rf.CommitIndex = min(args.LeaderCommit, rf.getLastLogIndex())
go rf.ApplyLog() // async apply to state machine
}
}
The leader increments NextIndex[follower] when AppendEntries succeeds. If it fails, the leader decrements
NextIndex and retries. This ensures followers' logs converge to the leader's.
Once the leader sees that a majority of followers have replicated an entry, it advances CommitIndex
and applies entries to the state machine:
func (rf *Raft) updateCommitIndex() {
lastIndex := rf.getLastLogIndex()
// Check each possible commit point, from the end of the log backwards
for N := lastIndex; N > rf.CommitIndex; N-- {
// Count how many replicas have this entry
count := 1 // leader has it
for i := range rf.peers {
if i != rf.me && rf.MatchIndex[i] >= N {
count++
}
}
// If majority has it AND it's in our current term, commit
if count > len(rf.peers)/2 && rf.Log[N].Term == rf.CurrentTerm {
rf.CommitIndex = N
go rf.ApplyLog()
return
}
}
}
Test scenarios and edge cases
The implementation passes a battery of tests:
- Election: Multiple followers, no leader. One times out and wins a majority vote.
- Log replication: Leader sends entries, followers append and apply.
- Persistence: Crash and restart; persistent state (term, vote, log) is recovered.
- Snapshot installation: For very long logs, leaders can install a snapshot on followers to avoid sending the entire log.
- Network partitions: A partition isolates some followers. The majority partition elects a new leader if the old one is isolated.
The trickiest part is the log consistency check with conflict resolution. If the leader and follower disagree on a log entry, the follower must tell the leader which entries are wrong. The Raft paper specifies this optimization: when a follower rejects AppendEntries, it returns not just the index but also the term of the conflicting entry, so the leader can skip backwards quickly.
What I learned
-
Concurrency is hard; mutexes save you. Every access to the Raft state (term, log, commit index) is protected by a single mutex. No reader-writer locks, no atomics. Simple and correct.
-
The paper is the spec. I referred to Figure 2 (the state machine table) constantly. It defines exactly when each role checks the term, when it votes, when it applies entries. Follow the table.
-
Timeouts are not for wimps. The election timeout must be long enough that leaders have time to send heartbeats but short enough that a failed leader is detected quickly. 150–300ms is a practical range for same-datacenter clusters; larger networks need longer timeouts.
-
Persistence is non-negotiable. Write
currentTerm,votedFor, and the log to disk (or in the implementation, aPersisterobject) before sending any RPC reply. If you don't, a crash can replay stale votes and break safety. -
Test against chaos. The test harness kills servers at random, partitions the network, delays messages. Your implementation must survive. This forces you to think about every case where a message arrives out of order, where a peer crashes mid-operation, or where the network lies to you.
What I'd do differently
-
Batch log entries. The implementation sends one AppendEntries at a time. In production, batch multiple entries per RPC to reduce overhead.
-
Implement log compaction properly. Raft's snapshotting mechanism lets followers skip entries they've already applied. The reference implementation supports it, but true incremental snapshots (not just persisting the state machine snapshot) would speed up recovery.
-
Add metrics. I'd instrument the code to track election times, replication latency, and log divergence. These tell you if the cluster is unhealthy.
-
Benchmark against other consensus algorithms. Raft is simple and proven, but for the use case (Byzantine failures? Very large clusters?), other algorithms might be better. Practical test would tell.
The full implementation lives at github.com/ArkashJ/Raft. It's a teaching implementation, not production-ready, but it captures the core algorithm. The state machine apply loop, the conflict resolution, the term-based ordering — all the pieces that make Raft work are in there.
● Papers
- → In Search of an Understandable Consensus Algorithm (Raft)Ongaro & Ousterhout (2014)
- → MapReduce: Simplified Data Processing on Large ClustersDean & Ghemawat (2004)
- → The Google File SystemGhemawat et al. (2003)
● Code
Note Code excerpts illustrate concepts. Full homework solutions are not published.