package main import ( //"encoding/binary" "encoding/binary" "flag" "fmt" //"fmt" "log" "net" "os" "os/signal" "runtime" "sync" "sync/atomic" "time" ) const ( flushInterval = time.Duration(1) * time.Second maxQueueSize = 1000000 UDPPacketSize = 1500 locTS6 = 8 locTS5 = 14 locTS4 = 17 locTS3 = 20 locTS2 = 26 locTS1 = 32 ) var address string var bufferPool sync.Pool var ops uint64 = 0 var total uint64 = 0 var flushTicker *time.Ticker var nbWorkers int var m map[uint64]uint64 func init() { flag.StringVar(&address, "addr", ":54321", "Address of the UDP server to test") flag.IntVar(&nbWorkers, "concurrency", runtime.NumCPU(), "Number of workers to run in parallel") } type message struct { addr net.Addr msg []byte length int } type messageQueue chan message func (mq messageQueue) enqueue(m message) { mq <- m } func (mq messageQueue) dequeue() { for m := range mq { handleMessage(m.addr, m.msg[0:m.length]) bufferPool.Put(m.msg) } } var mq messageQueue func main() { m = make(map[uint64]uint64) runtime.GOMAXPROCS(runtime.NumCPU()) log.Printf("Cores: %d", runtime.NumCPU()) flag.Parse() bufferPool = sync.Pool{ New: func() interface{} { return make([]byte, UDPPacketSize) }, } mq = make(messageQueue, maxQueueSize) listenAndReceive(nbWorkers) c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) go func() { for range c { atomic.AddUint64(&total, ops) log.Printf("Total ops %d", total) os.Exit(0) } }() flushTicker = time.NewTicker(flushInterval) for range flushTicker.C { log.Printf("Ops/s %f", float64(ops)/flushInterval.Seconds()) atomic.AddUint64(&total, ops) atomic.StoreUint64(&ops, 0) } } func listenAndReceive(maxWorkers int) error { c, err := net.ListenPacket("udp4", address) if err != nil { return err } for i := 0; i < maxWorkers; i++ { go mq.dequeue() go receive(c) } return nil } // receive accepts incoming datagrams on c and calls handleMessage() for each message func receive(c net.PacketConn) { defer c.Close() for { msg := bufferPool.Get().([]byte) nbytes, addr, err := c.ReadFrom(msg[0:]) if err != nil { log.Printf("Error %s", err) continue } mq.enqueue(message{addr, msg, nbytes}) } } func handleMessage(addr net.Addr, buf []byte) { //log.Printf("Packet received\n") //var ts2Lat uint64 ts2Lat := uint64(0) ts3_unprocessed := append([]byte{0x0, 0x0, 0x0, 0x0, 0x0}, buf[locTS4:locTS3]...) ts2_unprocessed := append([]byte{0x0, 0x0}, buf[locTS3:locTS2]...) fmt.Printf("[% x]\n", ts3_unprocessed) fmt.Printf("[% x]\n", ts2_unprocessed) ts3 := binary.BigEndian.Uint64(ts3_unprocessed) & 0x3FFFF ts2 := binary.BigEndian.Uint64(ts2_unprocessed) & 0x3FFFF log.Printf("ts3 shifted: %d", uint64(ts3)) log.Printf("ts2 shifted: %d", uint64(ts2)) if ts3 < ts2 { ts2Lat = uint64(0x3FFFF) - ts2 + ts3 + uint64(1) } else { ts2Lat = ts3 - ts2 } log.Printf("Value of ingress latency: %d", uint64(ts2Lat)) atomic.AddUint64(&ops, 1) //log.Printf("Packet count: %d", uint64(ops)) }