package raft // // this is an outline of the API that raft must expose to // the service (or tester). see comments below for // each of these functions for more details. // // rf = Make(...) // create a new Raft server. // rf.Start(command interface{}) (index, term, isleader) // start agreement on a new log entry // rf.GetState() (term, isLeader) // ask a Raft for its current term, and whether it thinks it is leader // ApplyMsg // each time a new entry is committed to the log, each Raft peer // should send an ApplyMsg to the service (or tester) // in the same server. // import ( "bytes" "encoding/gob" "labrpc" "math/rand" "sync" "time" ) // import "bytes" // import "encoding/gob" // as each Raft peer becomes aware that successive log entries are // committed, the peer should send an ApplyMsg to the service (or // tester) on the same server, via the applyCh passed to Make(). type ApplyMsg struct { Index int Command interface{} UseSnapshot bool // ignore for lab2; only used in lab3 Snapshot []byte // ignore for lab2; only used in lab3 } type LogEntry struct { Term int Command interface{} } type State int8 const ( FOLLOWER State = iota LEADER CANDIDATE ) const ( electiontimeoutmin = 400 * time.Millisecond heartbeatvalue = 100 * time.Millisecond ) type Raft struct { mu sync.Mutex peers []*labrpc.ClientEnd persister *Persister me int // index into peers[] // Your data here. // Look at the paper's Figure 2 for a description of what // state a Raft server must maintain. // Persistent state log []*LogEntry votedFor int currentTerm int isLeader bool /*Timers for election and heart beat period*/ electionTimeout *time.Timer heartBeatTimer *time.Timer // Volatile state commitIndex int lastApplied int //volatile state on Leader nextIndex []int matchIndex []int currentLeader int votesReceived []bool state State } type AppendEntriesArgs struct { Term int // Leader's term LeaderID int // Leader's ID PrevLogIndex int // Index of the log entry preceding the new entries PrevLogTerm int // Term of the log entry preceding the new entries Entries []LogEntry // Log entries to store (empty for heartbeat; may send more than one for efficiency) LeaderCommit int // Leader's commit index } type AppendEntriesReply struct { Term int // Leader's term Success bool // True if the follower contained an entry matching PrevLogIndex and PrevLogTerm } // example RequestVote RPC arguments structure. type RequestVoteArgs struct { // Your data here. Term int CandidateId int LastLogIndex int LastLogTerm int } // example RequestVote RPC reply structure. type RequestVoteReply struct { // Your data here. Term int VoteGranted bool } // return currentTerm and whether this server // believes it is the leader. func (rf *Raft) GetState() (int, bool) { var term int var isleader bool // Your code here. term = rf.currentTerm isleader = rf.isLeader return term, isleader } func (rf *Raft) start_your_application() { for { select { case <-rf.electionTimeout.C: rf.electionTimeout.Reset(electiontimeoutmin + (time.Duration(rand.Int63()) % electiontimeoutmin)) rf.currentTerm++ rf.votedFor = rf.me rf.state = CANDIDATE rf.votesReceived[rf.me] = true rf.persist() rf.BroadcastVoteRequest() case <-rf.heartBeatTimer.C: if rf.state == LEADER { rf.periodicHeartbeat() } } } } func heart_beat(rf *Raft, server int) { reply := AppendEntriesReply{} prev_index := rf.nextIndex[server] - 1 prev_term := 0 if prev_index >= 0 { prev_term = rf.log[prev_index].Term } args := AppendEntriesArgs{ LeaderID: rf.me, Term: rf.currentTerm, PrevLogIndex: prev_index, PrevLogTerm: prev_term, LeaderCommit: rf.commitIndex, } rf.peers[server].Call("Raft.AppendEntries", &args, &reply) } func (rf *Raft) periodicHeartbeat() { for i := range rf.peers { if i != rf.me { go heart_beat(rf, i) } } rf.heartBeatTimer.Reset(heartbeatvalue) } // save Raft's persistent state to stable storage, // where it can later be retrieved after a crash and restart. // see paper's Figure 2 for a description of what should be persistent. func (rf *Raft) persist() { // Your code here. // Example: w := new(bytes.Buffer) e := gob.NewEncoder(w) e.Encode(rf.currentTerm) e.Encode(rf.votedFor) e.Encode(rf.votesReceived) e.Encode(rf.log) data := w.Bytes() rf.persister.SaveRaftState(data) } // restore previously persisted state. func (rf *Raft) readPersist(data []byte) { // Your code here. // Example: r := bytes.NewBuffer(data) d := gob.NewDecoder(r) d.Decode(&rf.currentTerm) d.Decode(&rf.votedFor) d.Decode(&rf.log) d.Decode(&rf.votesReceived) } // example RequestVote RPC handler. func (rf *Raft) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) { // Your code here. rf.mu.Lock() defer rf.mu.Unlock() defer rf.persist() reply.Term = args.Term reply.VoteGranted = false if args.Term > rf.currentTerm { rf.currentTerm = args.Term rf.state = FOLLOWER rf.isLeader = false rf.votedFor = -1 lastTerm := 0 if len(rf.log) > 0 { lastTerm = rf.log[len(rf.log)-1].Term } logOk := (args.LastLogTerm > lastTerm) || (args.LastLogTerm == lastTerm && args.LastLogIndex >= len(rf.log)-1) if args.Term == rf.currentTerm && logOk && (rf.votedFor == -1 || rf.votedFor == args.CandidateId) { rf.votedFor = args.CandidateId reply.VoteGranted = true } rf.electionTimeout = time.NewTimer(electiontimeoutmin + (time.Duration(rand.Int63()) % electiontimeoutmin)) for i := range rf.votesReceived { rf.votesReceived[i] = false } } else { reply.Term = rf.currentTerm } } func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { // ... logic to handle AppendEntries RPC rf.mu.Lock() defer rf.mu.Unlock() defer rf.persist() reply.Term = rf.currentTerm if args.Term > rf.currentTerm { reply.Term = args.Term rf.currentTerm = args.Term rf.votedFor = -1 rf.electionTimeout = time.NewTimer(electiontimeoutmin + (time.Duration(rand.Int63()) % electiontimeoutmin)) } if rf.currentTerm == args.Term { rf.currentLeader = args.LeaderID rf.state = FOLLOWER rf.isLeader = false rf.votedFor = -1 rf.electionTimeout.Reset(electiontimeoutmin + (time.Duration(rand.Int63()) % electiontimeoutmin)) for i := range rf.votesReceived { rf.votesReceived[i] = false } } // logOk := (len(rf.log) >= args.PrevLogIndex) && (args.PrevLogIndex == 0 || // args.PrevLogTerm == rf.log[args.PrevLogIndex].Term) // if rf.currentTerm == args.Term && logOk { // if len(args.Entries) > 0 && len(rf.log) > args.PrevLogIndex { // } // } } // example code to send a RequestVote RPC to a server. // server is the index of the target server in rf.peers[]. // expects RPC arguments in args. // fills in *reply with RPC reply, so caller should // pass &reply. // the types of the args and reply passed to Call() must be // the same as the types of the arguments declared in the // handler function (including whether they are pointers). // // returns true if labrpc says the RPC was delivered. // // if you're having trouble getting RPC to work, check that you've // capitalized all field names in structs passed over RPC, and // that the caller passes the address of the reply struct with &, not // the struct itself. func (rf *Raft) sendRequestVote(server int, args RequestVoteArgs, reply *RequestVoteReply) bool { ok := rf.peers[server].Call("Raft.RequestVote", args, &reply) rf.mu.Lock() defer rf.mu.Unlock() defer rf.persist() if ok { if reply.Term > rf.currentTerm { rf.currentTerm = reply.Term rf.state = FOLLOWER rf.isLeader = false rf.votedFor = -1 rf.electionTimeout.Reset(electiontimeoutmin + (time.Duration(rand.Int63()) % electiontimeoutmin)) for i := range rf.votesReceived { rf.votesReceived[i] = false } } if rf.state == CANDIDATE && reply.VoteGranted { rf.votesReceived[server] = true } countVotes := 0 for i := range rf.votesReceived { if rf.votesReceived[i] { countVotes++ } } if countVotes > len(rf.peers)/2 && rf.state == CANDIDATE { rf.state = LEADER rf.electionTimeout.Reset(electiontimeoutmin + (time.Duration(rand.Int63()) % electiontimeoutmin)) rf.currentLeader = rf.me rf.isLeader = true for i := range rf.nextIndex { rf.nextIndex[i] = len(rf.log) rf.matchIndex[i] = -1 } rf.periodicHeartbeat() } } return ok } // the service using Raft (e.g. a k/v server) wants to start // agreement on the next command to be appended to Raft's log. if this // server isn't the leader, returns false. otherwise start the // agreement and return immediately. there is no guarantee that this // command will ever be committed to the Raft log, since the leader // may fail or lose an election. // // the first return value is the index that the command will appear at // if it's ever committed. the second return value is the current // term. the third return value is true if this server believes it is // the leader. func (rf *Raft) Start(command interface{}) (int, int, bool) { index := -1 term := -1 isLeader := true return index, term, isLeader } // the tester calls Kill() when a Raft instance won't // be needed again. you are not required to do anything // in Kill(), but it might be convenient to (for example) // turn off debug output from this instance. func (rf *Raft) Kill() { // Your code here, if desired. } // the service or tester wants to create a Raft server. the ports // of all the Raft servers (including this one) are in peers[]. this // server's port is peers[me]. all the servers' peers[] arrays // have the same order. persister is a place for this server to // save its persistent state, and also initially holds the most // recent saved state, if any. applyCh is a channel on which the // tester or service expects Raft to send ApplyMsg messages. // Make() must return quickly, so it should start goroutines // for any long-running work. func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft { rf := &Raft{} rf.peers = peers rf.persister = persister rf.me = me // Your initialization code here. rf.currentTerm = 0 rf.votedFor = -1 rf.log = make([]*LogEntry, 0) rf.commitIndex = -1 rf.lastApplied = -1 rf.currentLeader = -1 rf.isLeader = false rf.state = FOLLOWER rf.votesReceived = make([]bool, len(peers)) rf.nextIndex = make([]int, len(peers)) rf.matchIndex = make([]int, len(peers)) rf.electionTimeout = time.NewTimer(electiontimeoutmin + (time.Duration(rand.Int63()) % electiontimeoutmin)) rf.heartBeatTimer = time.NewTimer(heartbeatvalue) for i := range rf.votesReceived { rf.votesReceived[i] = false } // initialize from state persisted before a crash rf.readPersist(persister.ReadRaftState()) rf.persist() go rf.start_your_application() return rf } func (rf *Raft) BroadcastVoteRequest() { // Send VoteRequest to all nodes rf.mu.Lock() defer rf.mu.Unlock() defer rf.persist() LastLogIndex := 0 LastLogTerm := 0 if len(rf.log) > 0 { LastLogIndex = len(rf.log) - 1 LastLogTerm = rf.log[LastLogIndex].Term } request := RequestVoteArgs{ Term: rf.currentTerm, CandidateId: rf.me, LastLogIndex: LastLogIndex, LastLogTerm: LastLogTerm, } for node := range rf.peers { if node != rf.me { reply := RequestVoteReply{} go rf.sendRequestVote(node, request, &reply) } } }