raft-key-value-store / labs-fall23-AnurimaVaishnavi-master / assignment2 / src / chandy-lamport / server.go
server.go
Raw
package chandy_lamport

import "log"

// The main participant of the distributed snapshot protocol.
// Servers exchange token messages and marker messages among each other.
// Token messages represent the transfer of tokens from one server to another.
// Marker messages represent the progress of the snapshot process. The bulk of
// the distributed protocol is implemented in `HandlePacket` and `StartSnapshot`.

type Snapshot struct {
	id       string
	tokens   int
	messages []*SnapshotMessage
}

type Server struct {
	Id            string
	Tokens        int
	sim           *Simulator
	outboundLinks map[string]*Link // key = link.dest
	inboundLinks  map[string]*Link // key = link.src
	// TODO: ADD MORE FIELDS HERE
	record_as_part_of_channel map[int]map[string]bool
	state                     *SyncMap
	active                    bool
}

// A unidirectional communication channel between two servers
// Each link contains an event queue (as opposed to a packet queue)
type Link struct {
	src    string
	dest   string
	events *Queue
}

func NewServer(id string, tokens int, sim *Simulator) *Server {
	return &Server{
		id,
		tokens,
		sim,
		make(map[string]*Link),
		make(map[string]*Link),
		make(map[int]map[string]bool),
		NewSyncMap(),
		true,
	}
}

// Add a unidirectional link to the destination server
func (server *Server) AddOutboundLink(dest *Server) {
	if server == dest {
		return
	}
	l := Link{server.Id, dest.Id, NewQueue()}
	server.outboundLinks[dest.Id] = &l
	dest.inboundLinks[server.Id] = &l
}

// Send a message on all of the server's outbound links
func (server *Server) SendToNeighbors(message interface{}) {
	for _, serverId := range getSortedKeys(server.outboundLinks) {
		link := server.outboundLinks[serverId]
		server.sim.logger.RecordEvent(
			server,
			SentMessageEvent{server.Id, link.dest, message})
		link.events.Push(SendMessageEvent{
			server.Id,
			link.dest,
			message,
			server.sim.GetReceiveTime()})
	}
}

// Send a number of tokens to a neighbor attached to this server
func (server *Server) SendTokens(numTokens int, dest string) {
	if server.Tokens < numTokens {
		log.Fatalf("Server %v attempted to send %v tokens when it only has %v\n",
			server.Id, numTokens, server.Tokens)
	}
	message := TokenMessage{numTokens}
	server.sim.logger.RecordEvent(server, SentMessageEvent{server.Id, dest, message})
	// Update local state before sending the tokens
	server.Tokens -= numTokens
	link, ok := server.outboundLinks[dest]
	if !ok {
		log.Fatalf("Unknown dest ID %v from server %v\n", dest, server.Id)
	}
	link.events.Push(SendMessageEvent{
		server.Id,
		dest,
		message,
		server.sim.GetReceiveTime()})
}

// Callback for when a message is received on this server.
// When the snapshot algorithm completes on this server, this function
// should notify the simulator by calling `sim.NotifySnapshotComplete`.
func (server *Server) HandlePacket(src string, message interface{}) {
	// TODO: IMPLEMENT ME
	marker, ok := message.(MarkerMessage)
	if ok {
		snapshotID := marker.snapshotId
		_, found := server.state.Load(snapshotID)
		if !found {
			server.StartSnapshot(snapshotID)
		}
		server.record_as_part_of_channel[marker.snapshotId][src] = true
		n := len(server.inboundLinks)
		trueCount := 0
		for _, value := range server.record_as_part_of_channel[marker.snapshotId] {
			if value {
				trueCount++
			}
		}
		if trueCount == n {
			server.sim.NotifySnapshotComplete(server.Id, marker.snapshotId)
			server.active = false
		}
	} else {
		if token, ok := message.(TokenMessage); ok {
			server.Tokens += token.numTokens
			server.state.lock.RLock()
			for snapshotID, value := range server.state.internalMap {
				if snapshotKey, ok := snapshotID.(int); ok {
					if !server.record_as_part_of_channel[snapshotKey][src] {
						if snapshot, ok := value.(*Snapshot); ok {
							snapshot.messages = append(snapshot.messages, &SnapshotMessage{src, server.Id, message})
						}
					}
				}
			}
			defer server.state.lock.RUnlock()
		}
	}
}

// Start the chandy-lamport snapshot algorithm on this server.
// This should be called only once per server.
func (server *Server) StartSnapshot(snapshotId int) {
	// TODO: IMPLEMENT ME
	server.state.Store(snapshotId, &Snapshot{server.Id, server.Tokens, make([]*SnapshotMessage, 0)})
	server.record_as_part_of_channel[snapshotId] = make(map[string]bool)
	server.SendToNeighbors(MarkerMessage{snapshotId})
}