TNSM_Latency_Prediction / UDPServer / Golang / src / main_bk.go
main_bk.go
Raw
package main

import (
	//"encoding/binary"
	"encoding/binary"
	"flag"
	"fmt"

	//"fmt"
	"log"
	"net"
	"os"
	"os/signal"
	"runtime"
	"sync"
	"sync/atomic"
	"time"
)

const (
	flushInterval = time.Duration(1) * time.Second
	maxQueueSize  = 1000000
	UDPPacketSize = 1500
	locTS6        = 8
	locTS5        = 14
	locTS4        = 17
	locTS3        = 20
	locTS2        = 26
	locTS1        = 32
)

var address string
var bufferPool sync.Pool
var ops uint64 = 0
var total uint64 = 0
var flushTicker *time.Ticker
var nbWorkers int

var m map[uint64]uint64

func init() {
	flag.StringVar(&address, "addr", ":54321", "Address of the UDP server to test")
	flag.IntVar(&nbWorkers, "concurrency", runtime.NumCPU(), "Number of workers to run in parallel")
}

type message struct {
	addr   net.Addr
	msg    []byte
	length int
}

type messageQueue chan message

func (mq messageQueue) enqueue(m message) {
	mq <- m
}

func (mq messageQueue) dequeue() {
	for m := range mq {
		handleMessage(m.addr, m.msg[0:m.length])
		bufferPool.Put(m.msg)
	}
}

var mq messageQueue

func main() {

	m = make(map[uint64]uint64)

	runtime.GOMAXPROCS(runtime.NumCPU())
	log.Printf("Cores: %d", runtime.NumCPU())
	flag.Parse()

	bufferPool = sync.Pool{
		New: func() interface{} { return make([]byte, UDPPacketSize) },
	}
	mq = make(messageQueue, maxQueueSize)
	listenAndReceive(nbWorkers)

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)
	go func() {
		for range c {
			atomic.AddUint64(&total, ops)
			log.Printf("Total ops %d", total)
			os.Exit(0)
		}
	}()

	flushTicker = time.NewTicker(flushInterval)
	for range flushTicker.C {
		log.Printf("Ops/s %f", float64(ops)/flushInterval.Seconds())
		atomic.AddUint64(&total, ops)
		atomic.StoreUint64(&ops, 0)
	}
}

func listenAndReceive(maxWorkers int) error {
	c, err := net.ListenPacket("udp4", address)
	if err != nil {
		return err
	}
	for i := 0; i < maxWorkers; i++ {
		go mq.dequeue()
		go receive(c)
	}
	return nil
}

// receive accepts incoming datagrams on c and calls handleMessage() for each message
func receive(c net.PacketConn) {
	defer c.Close()

	for {
		msg := bufferPool.Get().([]byte)
		nbytes, addr, err := c.ReadFrom(msg[0:])
		if err != nil {
			log.Printf("Error %s", err)
			continue
		}
		mq.enqueue(message{addr, msg, nbytes})
	}
}

func handleMessage(addr net.Addr, buf []byte) {

	//log.Printf("Packet received\n")
	//var ts2Lat uint64
	ts2Lat := uint64(0)

	ts3_unprocessed := append([]byte{0x0, 0x0, 0x0, 0x0, 0x0}, buf[locTS4:locTS3]...)
	ts2_unprocessed := append([]byte{0x0, 0x0}, buf[locTS3:locTS2]...)

	fmt.Printf("[% x]\n", ts3_unprocessed)
	fmt.Printf("[% x]\n", ts2_unprocessed)

	ts3 := binary.BigEndian.Uint64(ts3_unprocessed) & 0x3FFFF
	ts2 := binary.BigEndian.Uint64(ts2_unprocessed) & 0x3FFFF

	log.Printf("ts3 shifted: %d", uint64(ts3))
	log.Printf("ts2 shifted: %d", uint64(ts2))

	if ts3 < ts2 {
		ts2Lat = uint64(0x3FFFF) - ts2 + ts3 + uint64(1)
	} else {
		ts2Lat = ts3 - ts2
	}

	log.Printf("Value of ingress latency: %d", uint64(ts2Lat))

	atomic.AddUint64(&ops, 1)
	//log.Printf("Packet count: %d", uint64(ops))
}