package chandy_lamport import "log" // The main participant of the distributed snapshot protocol. // Servers exchange token messages and marker messages among each other. // Token messages represent the transfer of tokens from one server to another. // Marker messages represent the progress of the snapshot process. The bulk of // the distributed protocol is implemented in `HandlePacket` and `StartSnapshot`. type Snapshot struct { id string tokens int messages []*SnapshotMessage } type Server struct { Id string Tokens int sim *Simulator outboundLinks map[string]*Link // key = link.dest inboundLinks map[string]*Link // key = link.src // TODO: ADD MORE FIELDS HERE record_as_part_of_channel map[int]map[string]bool state *SyncMap active bool } // A unidirectional communication channel between two servers // Each link contains an event queue (as opposed to a packet queue) type Link struct { src string dest string events *Queue } func NewServer(id string, tokens int, sim *Simulator) *Server { return &Server{ id, tokens, sim, make(map[string]*Link), make(map[string]*Link), make(map[int]map[string]bool), NewSyncMap(), true, } } // Add a unidirectional link to the destination server func (server *Server) AddOutboundLink(dest *Server) { if server == dest { return } l := Link{server.Id, dest.Id, NewQueue()} server.outboundLinks[dest.Id] = &l dest.inboundLinks[server.Id] = &l } // Send a message on all of the server's outbound links func (server *Server) SendToNeighbors(message interface{}) { for _, serverId := range getSortedKeys(server.outboundLinks) { link := server.outboundLinks[serverId] server.sim.logger.RecordEvent( server, SentMessageEvent{server.Id, link.dest, message}) link.events.Push(SendMessageEvent{ server.Id, link.dest, message, server.sim.GetReceiveTime()}) } } // Send a number of tokens to a neighbor attached to this server func (server *Server) SendTokens(numTokens int, dest string) { if server.Tokens < numTokens { log.Fatalf("Server %v attempted to send %v tokens when it only has %v\n", server.Id, numTokens, server.Tokens) } message := TokenMessage{numTokens} server.sim.logger.RecordEvent(server, SentMessageEvent{server.Id, dest, message}) // Update local state before sending the tokens server.Tokens -= numTokens link, ok := server.outboundLinks[dest] if !ok { log.Fatalf("Unknown dest ID %v from server %v\n", dest, server.Id) } link.events.Push(SendMessageEvent{ server.Id, dest, message, server.sim.GetReceiveTime()}) } // Callback for when a message is received on this server. // When the snapshot algorithm completes on this server, this function // should notify the simulator by calling `sim.NotifySnapshotComplete`. func (server *Server) HandlePacket(src string, message interface{}) { // TODO: IMPLEMENT ME marker, ok := message.(MarkerMessage) if ok { snapshotID := marker.snapshotId _, found := server.state.Load(snapshotID) if !found { server.StartSnapshot(snapshotID) } server.record_as_part_of_channel[marker.snapshotId][src] = true n := len(server.inboundLinks) trueCount := 0 for _, value := range server.record_as_part_of_channel[marker.snapshotId] { if value { trueCount++ } } if trueCount == n { server.sim.NotifySnapshotComplete(server.Id, marker.snapshotId) server.active = false } } else { if token, ok := message.(TokenMessage); ok { server.Tokens += token.numTokens server.state.lock.RLock() for snapshotID, value := range server.state.internalMap { if snapshotKey, ok := snapshotID.(int); ok { if !server.record_as_part_of_channel[snapshotKey][src] { if snapshot, ok := value.(*Snapshot); ok { snapshot.messages = append(snapshot.messages, &SnapshotMessage{src, server.Id, message}) } } } } defer server.state.lock.RUnlock() } } } // Start the chandy-lamport snapshot algorithm on this server. // This should be called only once per server. func (server *Server) StartSnapshot(snapshotId int) { // TODO: IMPLEMENT ME server.state.Store(snapshotId, &Snapshot{server.Id, server.Tokens, make([]*SnapshotMessage, 0)}) server.record_as_part_of_channel[snapshotId] = make(map[string]bool) server.SendToNeighbors(MarkerMessage{snapshotId}) }