package labrpc // // channel-based RPC, for 824 labs. // allows tests to disconnect RPC connections. // // we will use the original labrpc.go to test your code for grading. // so, while you can modify this code to help you debug, please // test against the original before submitting. // // adapted from Go net/rpc/server.go. // // sends gob-encoded values to ensure that RPCs // don't include references to program objects. // // net := MakeNetwork() -- holds network, clients, servers. // end := net.MakeEnd(endname) -- create a client end-point, to talk to one server. // net.AddServer(servername, server) -- adds a named server to network. // net.DeleteServer(servername) -- eliminate the named server. // net.Connect(endname, servername) -- connect a client to a server. // net.Enable(endname, enabled) -- enable/disable a client. // net.Reliable(bool) -- false means drop/delay messages // // end.Call("Raft.AppendEntries", &args, &reply) -- send an RPC, wait for reply. // the "Raft" is the name of the server struct to be called. // the "AppendEntries" is the name of the method to be called. // Call() returns true to indicate that the server executed the request // and the reply is valid. // Call() returns false if the network lost the request or reply // or the server is down. // It is OK to have multiple Call()s in progress at the same time on the // same ClientEnd. // Concurrent calls to Call() may be delivered to the server out of order, // since the network may re-order messages. // Call() is guaranteed to return (perhaps after a delay) *except* if the // handler function on the server side does not return. That is, there // is no need to implement your own timeouts around Call(). // the server RPC handler function must declare its args and reply arguments // as pointers, so that their types exactly match the types of the arguments // to Call(). // // srv := MakeServer() // srv.AddService(svc) -- a server can have multiple services, e.g. Raft and k/v // pass srv to net.AddServer() // // svc := MakeService(receiverObject) -- obj's methods will handle RPCs // much like Go's rpcs.Register() // pass svc to srv.AddService() // import "encoding/gob" import "bytes" import "reflect" import "sync" import "log" import "strings" import "math/rand" import "time" type reqMsg struct { endname interface{} // name of sending ClientEnd svcMeth string // e.g. "Raft.AppendEntries" argsType reflect.Type args []byte replyCh chan replyMsg } type replyMsg struct { ok bool reply []byte } type ClientEnd struct { endname interface{} // this end-point's name ch chan reqMsg // copy of Network.endCh } // send an RPC, wait for the reply. // the return value indicates success; false means the // server couldn't be contacted. func (e *ClientEnd) Call(svcMeth string, args interface{}, reply interface{}) bool { req := reqMsg{} req.endname = e.endname req.svcMeth = svcMeth req.argsType = reflect.TypeOf(args) req.replyCh = make(chan replyMsg) qb := new(bytes.Buffer) qe := gob.NewEncoder(qb) if err := qe.Encode(args); err != nil { // NOTE: not sure if making this be Fatal is appropriate // leaving as warning for now. log.Printf("Warning: %v", err) } req.args = qb.Bytes() e.ch <- req rep := <-req.replyCh if rep.ok { rb := bytes.NewBuffer(rep.reply) rd := gob.NewDecoder(rb) if err := rd.Decode(reply); err != nil { log.Fatalf("ClientEnd.Call(): decode reply: %v\n", err) } return true } else { return false } } type Network struct { mu sync.Mutex reliable bool longDelays bool // pause a long time on send on disabled connection longReordering bool // sometimes delay replies a long time ends map[interface{}]*ClientEnd // ends, by name enabled map[interface{}]bool // by end name servers map[interface{}]*Server // servers, by name connections map[interface{}]interface{} // endname -> servername endCh chan reqMsg } func MakeNetwork() *Network { rn := &Network{} rn.reliable = true rn.ends = map[interface{}]*ClientEnd{} rn.enabled = map[interface{}]bool{} rn.servers = map[interface{}]*Server{} rn.connections = map[interface{}](interface{}){} rn.endCh = make(chan reqMsg) // single goroutine to handle all ClientEnd.Call()s go func() { for xreq := range rn.endCh { go rn.ProcessReq(xreq) } }() return rn } func (rn *Network) Reliable(yes bool) { rn.mu.Lock() defer rn.mu.Unlock() rn.reliable = yes } func (rn *Network) LongReordering(yes bool) { rn.mu.Lock() defer rn.mu.Unlock() rn.longReordering = yes } func (rn *Network) LongDelays(yes bool) { rn.mu.Lock() defer rn.mu.Unlock() rn.longDelays = yes } func (rn *Network) ReadEndnameInfo(endname interface{}) (enabled bool, servername interface{}, server *Server, reliable bool, longreordering bool, ) { rn.mu.Lock() defer rn.mu.Unlock() enabled = rn.enabled[endname] servername = rn.connections[endname] if servername != nil { server = rn.servers[servername] } reliable = rn.reliable longreordering = rn.longReordering return } func (rn *Network) IsServerDead(endname interface{}, servername interface{}, server *Server) bool { rn.mu.Lock() defer rn.mu.Unlock() if rn.enabled[endname] == false || rn.servers[servername] != server { return true } return false } func (rn *Network) ProcessReq(req reqMsg) { enabled, servername, server, reliable, longreordering := rn.ReadEndnameInfo(req.endname) if enabled && servername != nil && server != nil { if reliable == false { // short delay ms := (rand.Int() % 27) time.Sleep(time.Duration(ms) * time.Millisecond) } if reliable == false && (rand.Int()%1000) < 100 { // drop the request, return as if timeout req.replyCh <- replyMsg{false, nil} return } // execute the request (call the RPC handler). // in a separate thread so that we can periodically check // if the server has been killed and the RPC should get a // failure reply. ech := make(chan replyMsg) go func() { r := server.dispatch(req) ech <- r }() // wait for handler to return, // but stop waiting if DeleteServer() has been called, // and return an error. var reply replyMsg replyOK := false serverDead := false for replyOK == false && serverDead == false { select { case reply = <-ech: replyOK = true case <-time.After(100 * time.Millisecond): serverDead = rn.IsServerDead(req.endname, servername, server) } } // do not reply if DeleteServer() has been called, i.e. // the server has been killed. this is needed to avoid // situation in which a client gets a positive reply // to an Append, but the server persisted the update // into the old Persister. config.go is careful to call // DeleteServer() before superseding the Persister. serverDead = rn.IsServerDead(req.endname, servername, server) if replyOK == false || serverDead == true { // server was killed while we were waiting; return error. req.replyCh <- replyMsg{false, nil} } else if reliable == false && (rand.Int()%1000) < 100 { // drop the reply, return as if timeout req.replyCh <- replyMsg{false, nil} } else if longreordering == true && rand.Intn(900) < 600 { // delay the response for a while ms := 200 + rand.Intn(1+rand.Intn(2000)) time.Sleep(time.Duration(ms) * time.Millisecond) req.replyCh <- reply } else { req.replyCh <- reply } } else { // simulate no reply and eventual timeout. ms := 0 if rn.longDelays { // let Raft tests check that leader doesn't send // RPCs synchronously. ms = (rand.Int() % 7000) } else { // many kv tests require the client to try each // server in fairly rapid succession. ms = (rand.Int() % 100) } time.Sleep(time.Duration(ms) * time.Millisecond) req.replyCh <- replyMsg{false, nil} } } // create a client end-point. // start the thread that listens and delivers. func (rn *Network) MakeEnd(endname interface{}) *ClientEnd { rn.mu.Lock() defer rn.mu.Unlock() if _, ok := rn.ends[endname]; ok { log.Fatalf("MakeEnd: %v already exists\n", endname) } e := &ClientEnd{} e.endname = endname e.ch = rn.endCh rn.ends[endname] = e rn.enabled[endname] = false rn.connections[endname] = nil return e } func (rn *Network) AddServer(servername interface{}, rs *Server) { rn.mu.Lock() defer rn.mu.Unlock() rn.servers[servername] = rs } func (rn *Network) DeleteServer(servername interface{}) { rn.mu.Lock() defer rn.mu.Unlock() rn.servers[servername] = nil } // connect a ClientEnd to a server. // a ClientEnd can only be connected once in its lifetime. func (rn *Network) Connect(endname interface{}, servername interface{}) { rn.mu.Lock() defer rn.mu.Unlock() rn.connections[endname] = servername } // enable/disable a ClientEnd. func (rn *Network) Enable(endname interface{}, enabled bool) { rn.mu.Lock() defer rn.mu.Unlock() rn.enabled[endname] = enabled } // get a server's count of incoming RPCs. func (rn *Network) GetCount(servername interface{}) int { rn.mu.Lock() defer rn.mu.Unlock() svr := rn.servers[servername] return svr.GetCount() } // // a server is a collection of services, all sharing // the same rpc dispatcher. so that e.g. both a Raft // and a k/v server can listen to the same rpc endpoint. // type Server struct { mu sync.Mutex services map[string]*Service count int // incoming RPCs } func MakeServer() *Server { rs := &Server{} rs.services = map[string]*Service{} return rs } func (rs *Server) AddService(svc *Service) { rs.mu.Lock() defer rs.mu.Unlock() rs.services[svc.name] = svc } func (rs *Server) dispatch(req reqMsg) replyMsg { rs.mu.Lock() rs.count += 1 // split Raft.AppendEntries into service and method dot := strings.LastIndex(req.svcMeth, ".") serviceName := req.svcMeth[:dot] methodName := req.svcMeth[dot+1:] service, ok := rs.services[serviceName] rs.mu.Unlock() if ok { return service.dispatch(methodName, req) } else { choices := []string{} for k, _ := range rs.services { choices = append(choices, k) } log.Fatalf("labrpc.Server.dispatch(): unknown service %v in %v.%v; expecting one of %v\n", serviceName, serviceName, methodName, choices) return replyMsg{false, nil} } } func (rs *Server) GetCount() int { rs.mu.Lock() defer rs.mu.Unlock() return rs.count } // an object with methods that can be called via RPC. // a single server may have more than one Service. type Service struct { name string rcvr reflect.Value typ reflect.Type methods map[string]reflect.Method } func MakeService(rcvr interface{}) *Service { svc := &Service{} svc.typ = reflect.TypeOf(rcvr) svc.rcvr = reflect.ValueOf(rcvr) svc.name = reflect.Indirect(svc.rcvr).Type().Name() svc.methods = map[string]reflect.Method{} for m := 0; m < svc.typ.NumMethod(); m++ { method := svc.typ.Method(m) mtype := method.Type mname := method.Name //fmt.Printf("%v pp %v ni %v 1k %v 2k %v no %v\n", // mname, method.PkgPath, mtype.NumIn(), mtype.In(1).Kind(), mtype.In(2).Kind(), mtype.NumOut()) if method.PkgPath != "" || // capitalized? mtype.NumIn() != 3 || //mtype.In(1).Kind() != reflect.Ptr || mtype.In(2).Kind() != reflect.Ptr || mtype.NumOut() != 0 { // the method is not suitable for a handler //fmt.Printf("bad method: %v\n", mname) } else { // the method looks like a handler svc.methods[mname] = method } } return svc } func (svc *Service) dispatch(methname string, req reqMsg) replyMsg { if method, ok := svc.methods[methname]; ok { // prepare space into which to read the argument. // the Value's type will be a pointer to req.argsType. args := reflect.New(req.argsType) // decode the argument. ab := bytes.NewBuffer(req.args) ad := gob.NewDecoder(ab) if err := ad.Decode(args.Interface()); err != nil { log.Fatal(err) } // allocate space for the reply. replyType := method.Type.In(2) replyType = replyType.Elem() replyv := reflect.New(replyType) // call the method. function := method.Func function.Call([]reflect.Value{svc.rcvr, args.Elem(), replyv}) // encode the reply. rb := new(bytes.Buffer) re := gob.NewEncoder(rb) if err := re.EncodeValue(replyv); err != nil { log.Fatal(err) } return replyMsg{true, rb.Bytes()} } else { choices := []string{} for k, _ := range svc.methods { choices = append(choices, k) } log.Fatalf("labrpc.Service.dispatch(): unknown method %v in %v; expecting one of %v\n", methname, req.svcMeth, choices) return replyMsg{false, nil} } }