← Coursework

CS 350 · Spring 2023 · Boston University

Distributed Systems — Raft + MapReduce

A from-scratch Raft consensus implementation in Go (leader election, log replication, snapshotting) plus a MapReduce coordinator/worker, evaluated under a custom RPC fault-injection harness.

● What I built

  • Raft state machine — randomized election timeouts, AppendEntries-driven log replication, snapshot install RPC.
  • MapReduce coordinator and worker with plugin-loaded map/reduce functions and crash-timeout reassignment.
  • Custom labrpc network simulator used to inject delays, drops, reorders for chaos-style tests.
  • Read the original Raft, GFS, MapReduce, Dynamo, and PNUTS papers across the semester.

● Stack

GogRPCDistributed SystemsRaft Consensus

Raft is a consensus algorithm that lets a cluster of computers agree on state even when some fail. Most people learn it from the paper — Ongaro and Ousterhout, 2014 — but reading prose about how state machines coordinate is different from making them coordinate.

I implemented Raft from scratch in Go for CS 350 in Spring 2023, starting with a bare-bones struct and ending with a fully replicating log where the leader's entries are applied to followers' state machines only after a majority agrees. The key insight is that the algorithm works because it forces three separate roles to follow strict rules: followers listen for heartbeats, candidates compete for votes, and leaders enforce their will.

What it is

Raft solves the problem: how do N machines agree on a sequence of commands when some might crash or lose network connectivity? The answer is replicated state machine: every replica executes the same sequence of commands in the same order, so they all end up with the same state.

Raft achieves this by having a single leader at any moment. The leader receives commands from clients, appends them to its own log, replicates them to followers, and only considers a command committed (safe to apply) once a majority of servers have the entry in their log. That majority property is crucial: even if the leader crashes, a majority of followers have the committed entries, so the next leader will never lose committed work.

The three node roles are:

  • Follower: passive. Listens for heartbeats from the leader. If the timeout expires and no heartbeat arrives, it becomes a candidate.
  • Candidate: competing for leadership. Increments the term, votes for itself, and broadcasts RequestVote RPCs to all peers. If it wins a majority, it becomes leader. If another candidate with a higher term appears, or if a new leader sends AppendEntries, it steps back down to follower.
  • Leader: drives consensus. Sends periodic AppendEntries RPCs (even empty ones as heartbeats). When clients send commands, the leader appends to its log and replicates to followers. Once a majority acknowledges, the leader advances its commitIndex and applies entries to the state machine.

How it works: leader election

The election process is the engine of Raft. Every server has an election timer that resets when it hears from the leader or votes for a candidate. The timer is randomized (150–300ms in the reference implementation) to prevent two candidates from timing out simultaneously and splitting votes forever.

// When a follower's election timer expires, it becomes a candidate
type Raft struct {
  mu         sync.Mutex
  peers      []*labrpc.ClientEnd  // RPC endpoints of all peers
  persister  *Persister            // Persistent storage
  me         int                   // This peer's index

  CurrentTerm int                  // Latest term this server has seen
  VotedFor    int                  // Who we voted for in CurrentTerm (-1 = nobody)
  Log         []LogItem            // Log entries

  CommitIndex int                  // Index of highest log entry applied to state machine
  LastApplied int                  // Index of highest log entry applied

  ServerState string               // "follower", "candidate", or "leader"
  ElectionTimer time.Time          // When the election timer fires
}

The follower listens passively. When the election timer fires:

// Simplified: follower detects timeout and becomes candidate
if time.Now().After(rf.ElectionTimer) && rf.ServerState == "follower" {
  rf.mu.Lock()
  rf.CurrentTerm++
  rf.ServerState = "candidate"
  rf.VotedFor = rf.me  // vote for self
  rf.ElectionTimer = time.Now().Add(randomElectionTimeout())
  rf.mu.Unlock()

  // broadcast RequestVote to all peers
  rf.broadcastRequestVote()
}

Each peer that receives the RequestVote checks:

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
  rf.mu.Lock()
  defer rf.mu.Unlock()

  // Rule 1: If term is newer, update ours and reset vote
  if args.Term > rf.CurrentTerm {
    rf.CurrentTerm = args.Term
    rf.VotedFor = -1  // reset, so we can vote again
    rf.ServerState = "follower"
  }

  reply.Term = rf.CurrentTerm
  reply.VoteGranted = false

  // Rule 2: Vote only if we haven't voted yet in this term
  // AND the candidate's log is at least as up-to-date as ours
  if args.Term == rf.CurrentTerm && rf.VotedFor == -1 {
    if args.LastLogTerm > rf.getLastLogTerm() ||
       (args.LastLogTerm == rf.getLastLogTerm() && args.LastLogIndex >= rf.getLastLogIndex()) {
      rf.VotedFor = args.CandidateId
      reply.VoteGranted = true
    }
  }
}

The candidate wins if it collects votes from a majority and becomes leader. The term number is the tie-breaker: if two candidates claim to be leaders, the one with the higher term is correct (the other will see the higher term, step down, and become a follower of the legitimate leader).

How it works: log replication

Once a leader is elected, it sends AppendEntries RPCs to all followers. These messages carry:

  • The leader's current term (to force lagging followers to catch up)
  • prevLogIndex and prevLogTerm: the log index and term immediately before the new entries
  • The new entries themselves
  • leaderCommit: the leader's commit index (highest entry applied to its state machine)

The follower checks the log consistency property: does the entry at prevLogIndex have term prevLogTerm? If not, it rejects and tells the leader where the conflict is. The leader then backs up and retries.

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
  rf.mu.Lock()
  defer rf.mu.Unlock()

  reply.Term = rf.CurrentTerm
  reply.Success = false

  // Reject if leader's term is old
  if args.Term < rf.CurrentTerm {
    return
  }

  // If leader's term is newer, step down
  if args.Term > rf.CurrentTerm {
    rf.CurrentTerm = args.Term
    rf.ServerState = "follower"
    rf.VotedFor = -1
  }

  // Check log consistency: do we have prevLogIndex with term prevLogTerm?
  lastIndex := rf.getLastLogIndex()
  if args.PrevLogIndex > lastIndex {
    // Our log is shorter; tell leader the gap
    reply.ConflictIndex = lastIndex + 1
    return
  }

  if rf.Log[args.PrevLogIndex].Term != args.PrevLogTerm {
    // Mismatch; delete this entry and all after
    rf.Log = rf.Log[:args.PrevLogIndex]
    reply.Success = false
    return
  }

  // Append new entries
  rf.Log = append(rf.Log, args.Entries...)
  reply.Success = true

  // Apply entries up to leaderCommit
  if args.LeaderCommit > rf.CommitIndex {
    rf.CommitIndex = min(args.LeaderCommit, rf.getLastLogIndex())
    go rf.ApplyLog()  // async apply to state machine
  }
}

The leader increments NextIndex[follower] when AppendEntries succeeds. If it fails, the leader decrements NextIndex and retries. This ensures followers' logs converge to the leader's.

Once the leader sees that a majority of followers have replicated an entry, it advances CommitIndex and applies entries to the state machine:

func (rf *Raft) updateCommitIndex() {
  lastIndex := rf.getLastLogIndex()

  // Check each possible commit point, from the end of the log backwards
  for N := lastIndex; N > rf.CommitIndex; N-- {
    // Count how many replicas have this entry
    count := 1  // leader has it
    for i := range rf.peers {
      if i != rf.me && rf.MatchIndex[i] >= N {
        count++
      }
    }

    // If majority has it AND it's in our current term, commit
    if count > len(rf.peers)/2 && rf.Log[N].Term == rf.CurrentTerm {
      rf.CommitIndex = N
      go rf.ApplyLog()
      return
    }
  }
}

Test scenarios and edge cases

The implementation passes a battery of tests:

  1. Election: Multiple followers, no leader. One times out and wins a majority vote.
  2. Log replication: Leader sends entries, followers append and apply.
  3. Persistence: Crash and restart; persistent state (term, vote, log) is recovered.
  4. Snapshot installation: For very long logs, leaders can install a snapshot on followers to avoid sending the entire log.
  5. Network partitions: A partition isolates some followers. The majority partition elects a new leader if the old one is isolated.

The trickiest part is the log consistency check with conflict resolution. If the leader and follower disagree on a log entry, the follower must tell the leader which entries are wrong. The Raft paper specifies this optimization: when a follower rejects AppendEntries, it returns not just the index but also the term of the conflicting entry, so the leader can skip backwards quickly.

What I learned

  • Concurrency is hard; mutexes save you. Every access to the Raft state (term, log, commit index) is protected by a single mutex. No reader-writer locks, no atomics. Simple and correct.

  • The paper is the spec. I referred to Figure 2 (the state machine table) constantly. It defines exactly when each role checks the term, when it votes, when it applies entries. Follow the table.

  • Timeouts are not for wimps. The election timeout must be long enough that leaders have time to send heartbeats but short enough that a failed leader is detected quickly. 150–300ms is a practical range for same-datacenter clusters; larger networks need longer timeouts.

  • Persistence is non-negotiable. Write currentTerm, votedFor, and the log to disk (or in the implementation, a Persister object) before sending any RPC reply. If you don't, a crash can replay stale votes and break safety.

  • Test against chaos. The test harness kills servers at random, partitions the network, delays messages. Your implementation must survive. This forces you to think about every case where a message arrives out of order, where a peer crashes mid-operation, or where the network lies to you.

What I'd do differently

  • Batch log entries. The implementation sends one AppendEntries at a time. In production, batch multiple entries per RPC to reduce overhead.

  • Implement log compaction properly. Raft's snapshotting mechanism lets followers skip entries they've already applied. The reference implementation supports it, but true incremental snapshots (not just persisting the state machine snapshot) would speed up recovery.

  • Add metrics. I'd instrument the code to track election times, replication latency, and log divergence. These tell you if the cluster is unhealthy.

  • Benchmark against other consensus algorithms. Raft is simple and proven, but for the use case (Byzantine failures? Very large clusters?), other algorithms might be better. Practical test would tell.

The full implementation lives at github.com/ArkashJ/Raft. It's a teaching implementation, not production-ready, but it captures the core algorithm. The state machine apply loop, the conflict resolution, the term-based ordering — all the pieces that make Raft work are in there.

● Papers

● Code

Note Code excerpts illustrate concepts. Full homework solutions are not published.