raft-key-value-store / labs-fall23-AnurimaVaishnavi-master / assignment3 / src / raft / raft.go
raft.go
Raw
package raft

//
// this is an outline of the API that raft must expose to
// the service (or tester). see comments below for
// each of these functions for more details.
//
// rf = Make(...)
//   create a new Raft server.
// rf.Start(command interface{}) (index, term, isleader)
//   start agreement on a new log entry
// rf.GetState() (term, isLeader)
//   ask a Raft for its current term, and whether it thinks it is leader
// ApplyMsg
//   each time a new entry is committed to the log, each Raft peer
//   should send an ApplyMsg to the service (or tester)
//   in the same server.
//

import (
	"bytes"
	"encoding/gob"
	"labrpc"
	"math/rand"
	"sync"
	"time"
)

// import "bytes"
// import "encoding/gob"

// as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make().
type ApplyMsg struct {
	Index       int
	Command     interface{}
	UseSnapshot bool   // ignore for lab2; only used in lab3
	Snapshot    []byte // ignore for lab2; only used in lab3
}

type LogEntry struct {
	Term    int
	Command interface{}
}
type State int8

const (
	FOLLOWER State = iota
	LEADER
	CANDIDATE
)
const (
	electiontimeoutmin = 400 * time.Millisecond
	heartbeatvalue     = 100 * time.Millisecond
)

type Raft struct {
	mu        sync.Mutex
	peers     []*labrpc.ClientEnd
	persister *Persister
	me        int // index into peers[]
	// Your data here.
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.
	// Persistent state
	log         []*LogEntry
	votedFor    int
	currentTerm int
	isLeader    bool

	/*Timers for election and heart beat period*/
	electionTimeout *time.Timer
	heartBeatTimer  *time.Timer

	// Volatile state
	commitIndex int
	lastApplied int
	//volatile state on Leader
	nextIndex     []int
	matchIndex    []int
	currentLeader int
	votesReceived []bool
	state         State
}

type AppendEntriesArgs struct {
	Term         int        // Leader's term
	LeaderID     int        // Leader's ID
	PrevLogIndex int        // Index of the log entry preceding the new entries
	PrevLogTerm  int        // Term of the log entry preceding the new entries
	Entries      []LogEntry // Log entries to store (empty for heartbeat; may send more than one for efficiency)
	LeaderCommit int        // Leader's commit index
}

type AppendEntriesReply struct {
	Term    int  // Leader's term
	Success bool // True if the follower contained an entry matching PrevLogIndex and PrevLogTerm
}

// example RequestVote RPC arguments structure.
type RequestVoteArgs struct {
	// Your data here.
	Term         int
	CandidateId  int
	LastLogIndex int
	LastLogTerm  int
}

// example RequestVote RPC reply structure.
type RequestVoteReply struct {
	// Your data here.
	Term        int
	VoteGranted bool
}

// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {

	var term int
	var isleader bool
	// Your code here.
	term = rf.currentTerm
	isleader = rf.isLeader
	return term, isleader
}

func (rf *Raft) start_your_application() {
	for {
		select {
		case <-rf.electionTimeout.C:
			rf.electionTimeout.Reset(electiontimeoutmin + (time.Duration(rand.Int63()) % electiontimeoutmin))
			rf.currentTerm++
			rf.votedFor = rf.me
			rf.state = CANDIDATE
			rf.votesReceived[rf.me] = true
			rf.persist()
			rf.BroadcastVoteRequest()
		case <-rf.heartBeatTimer.C:
			if rf.state == LEADER {
				rf.periodicHeartbeat()
			}

		}
	}
}

func heart_beat(rf *Raft, server int) {
	reply := AppendEntriesReply{}
	prev_index := rf.nextIndex[server] - 1
	prev_term := 0
	if prev_index >= 0 {
		prev_term = rf.log[prev_index].Term
	}
	args := AppendEntriesArgs{
		LeaderID:     rf.me,
		Term:         rf.currentTerm,
		PrevLogIndex: prev_index,
		PrevLogTerm:  prev_term,
		LeaderCommit: rf.commitIndex,
	}
	rf.peers[server].Call("Raft.AppendEntries", &args, &reply)
}

func (rf *Raft) periodicHeartbeat() {
	for i := range rf.peers {
		if i != rf.me {
			go heart_beat(rf, i)
		}
	}
	rf.heartBeatTimer.Reset(heartbeatvalue)
}

// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
func (rf *Raft) persist() {
	// Your code here.
	// Example:
	w := new(bytes.Buffer)
	e := gob.NewEncoder(w)
	e.Encode(rf.currentTerm)
	e.Encode(rf.votedFor)
	e.Encode(rf.votesReceived)
	e.Encode(rf.log)
	data := w.Bytes()
	rf.persister.SaveRaftState(data)
}

// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
	// Your code here.
	// Example:
	r := bytes.NewBuffer(data)
	d := gob.NewDecoder(r)
	d.Decode(&rf.currentTerm)
	d.Decode(&rf.votedFor)
	d.Decode(&rf.log)
	d.Decode(&rf.votesReceived)
}

// example RequestVote RPC handler.
func (rf *Raft) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here.
	rf.mu.Lock()
	defer rf.mu.Unlock()
	defer rf.persist()
	reply.Term = args.Term
	reply.VoteGranted = false
	if args.Term > rf.currentTerm {
		rf.currentTerm = args.Term
		rf.state = FOLLOWER
		rf.isLeader = false
		rf.votedFor = -1
		lastTerm := 0
		if len(rf.log) > 0 {
			lastTerm = rf.log[len(rf.log)-1].Term
		}
		logOk := (args.LastLogTerm > lastTerm) || (args.LastLogTerm ==
			lastTerm && args.LastLogIndex >= len(rf.log)-1)
		if args.Term == rf.currentTerm && logOk && (rf.votedFor == -1 || rf.votedFor == args.CandidateId) {
			rf.votedFor = args.CandidateId
			reply.VoteGranted = true
		}
		rf.electionTimeout = time.NewTimer(electiontimeoutmin + (time.Duration(rand.Int63()) % electiontimeoutmin))
		for i := range rf.votesReceived {
			rf.votesReceived[i] = false
		}
	} else {
		reply.Term = rf.currentTerm
	}
}

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	// ... logic to handle AppendEntries RPC
	rf.mu.Lock()
	defer rf.mu.Unlock()
	defer rf.persist()
	reply.Term = rf.currentTerm
	if args.Term > rf.currentTerm {
		reply.Term = args.Term
		rf.currentTerm = args.Term
		rf.votedFor = -1
		rf.electionTimeout = time.NewTimer(electiontimeoutmin + (time.Duration(rand.Int63()) % electiontimeoutmin))
	}
	if rf.currentTerm == args.Term {
		rf.currentLeader = args.LeaderID
		rf.state = FOLLOWER
		rf.isLeader = false
		rf.votedFor = -1
		rf.electionTimeout.Reset(electiontimeoutmin + (time.Duration(rand.Int63()) % electiontimeoutmin))
		for i := range rf.votesReceived {
			rf.votesReceived[i] = false
		}

	}

	// logOk := (len(rf.log) >= args.PrevLogIndex) && (args.PrevLogIndex == 0 ||
	// 	args.PrevLogTerm == rf.log[args.PrevLogIndex].Term)
	// if rf.currentTerm == args.Term && logOk {
	// 	if len(args.Entries) > 0 && len(rf.log) > args.PrevLogIndex {

	// 	}

	// }

}

// example code to send a RequestVote RPC to a server.
// server is the index of the target server in rf.peers[].
// expects RPC arguments in args.
// fills in *reply with RPC reply, so caller should
// pass &reply.
// the types of the args and reply passed to Call() must be
// the same as the types of the arguments declared in the
// handler function (including whether they are pointers).
//
// returns true if labrpc says the RPC was delivered.
//
// if you're having trouble getting RPC to work, check that you've
// capitalized all field names in structs passed over RPC, and
// that the caller passes the address of the reply struct with &, not
// the struct itself.
func (rf *Raft) sendRequestVote(server int, args RequestVoteArgs, reply *RequestVoteReply) bool {
	ok := rf.peers[server].Call("Raft.RequestVote", args, &reply)
	rf.mu.Lock()
	defer rf.mu.Unlock()
	defer rf.persist()
	if ok {
		if reply.Term > rf.currentTerm {
			rf.currentTerm = reply.Term
			rf.state = FOLLOWER
			rf.isLeader = false
			rf.votedFor = -1

			rf.electionTimeout.Reset(electiontimeoutmin + (time.Duration(rand.Int63()) % electiontimeoutmin))
			for i := range rf.votesReceived {
				rf.votesReceived[i] = false
			}
		}
		if rf.state == CANDIDATE && reply.VoteGranted {
			rf.votesReceived[server] = true
		}
		countVotes := 0
		for i := range rf.votesReceived {
			if rf.votesReceived[i] {
				countVotes++
			}
		}
		if countVotes > len(rf.peers)/2 && rf.state == CANDIDATE {
			rf.state = LEADER
			rf.electionTimeout.Reset(electiontimeoutmin + (time.Duration(rand.Int63()) % electiontimeoutmin))
			rf.currentLeader = rf.me
			rf.isLeader = true
			for i := range rf.nextIndex {
				rf.nextIndex[i] = len(rf.log)
				rf.matchIndex[i] = -1
			}
			rf.periodicHeartbeat()
		}
	}
	return ok
}

// the service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election.
//
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
func (rf *Raft) Start(command interface{}) (int, int, bool) {
	index := -1
	term := -1
	isLeader := true
	return index, term, isLeader
}

// the tester calls Kill() when a Raft instance won't
// be needed again. you are not required to do anything
// in Kill(), but it might be convenient to (for example)
// turn off debug output from this instance.
func (rf *Raft) Kill() {
	// Your code here, if desired.
}

// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me
	// Your initialization code here.
	rf.currentTerm = 0
	rf.votedFor = -1
	rf.log = make([]*LogEntry, 0)
	rf.commitIndex = -1
	rf.lastApplied = -1
	rf.currentLeader = -1
	rf.isLeader = false
	rf.state = FOLLOWER
	rf.votesReceived = make([]bool, len(peers))
	rf.nextIndex = make([]int, len(peers))
	rf.matchIndex = make([]int, len(peers))
	rf.electionTimeout = time.NewTimer(electiontimeoutmin + (time.Duration(rand.Int63()) % electiontimeoutmin))
	rf.heartBeatTimer = time.NewTimer(heartbeatvalue)
	for i := range rf.votesReceived {
		rf.votesReceived[i] = false
	}
	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())
	rf.persist()
	go rf.start_your_application()
	return rf
}

func (rf *Raft) BroadcastVoteRequest() {
	// Send VoteRequest to all nodes
	rf.mu.Lock()
	defer rf.mu.Unlock()
	defer rf.persist()
	LastLogIndex := 0
	LastLogTerm := 0
	if len(rf.log) > 0 {
		LastLogIndex = len(rf.log) - 1
		LastLogTerm = rf.log[LastLogIndex].Term
	}
	request := RequestVoteArgs{
		Term:         rf.currentTerm,
		CandidateId:  rf.me,
		LastLogIndex: LastLogIndex,
		LastLogTerm:  LastLogTerm,
	}

	for node := range rf.peers {
		if node != rf.me {
			reply := RequestVoteReply{}
			go rf.sendRequestVote(node, request, &reply)
		}
	}

}