TNSM_Latency_Prediction / UDPServer / Golang / src / main.go
main.go
Raw
package main

import (
	"encoding/binary"
	//"flag"
	"fmt"
	"log"
	"net"
	"os"
	"os/signal"
	"path"
	"runtime"
	"sync"
	"sync/atomic"
	"time"

	"github.com/akamensky/argparse"
)

const (
	flushInterval = time.Duration(1) * time.Second
	maxQueueSize  = 10000000
	UDPPacketSize = 1500
	locTS6        = 8
	locTS5        = 14
	locTS4        = 17
	locTS3        = 20
	locTS2        = 26
	locTS1        = 32
)

var address string
var bufferPool sync.Pool
var ops uint64 = 0
var total uint64 = 0
var flushTicker *time.Ticker
var nbWorkers int

var smInLat sync.Map
var smEgLat sync.Map
var smE2ELat sync.Map

func init() {
	//flag.StringVar(&address, "addr", ":54321", "Address of the UDP server to test")
	//flag.IntVar(&nbWorkers, "concurrency", runtime.NumCPU(), "Number of workers to run in parallel")
}

type message struct {
	addr   net.Addr
	msg    []byte
	length int
}

type latency struct {
	ingressLat uint64
	egressLat  uint64
	e2eLat     uint64
}

type messageQueue chan message
type latencyQueue chan latency

var fileName *string
var resultPath *string
var intfw *string
var listenPort *string

var workers int = runtime.NumCPU()

var ingressLatencyFile = "ingress_latency.txt"

//string egressLatencyFile = "egress_latency.txt"
//string e2eLatencyFile = "e2e_latency.txt"

func (mq messageQueue) msgEnqueue(m message) {
	mq <- m
}

func (mq messageQueue) msgDequeue() {
	for m := range mq {
		handleMessage(m.addr, m.msg[0:m.length])
		bufferPool.Put(m.msg)
	}
}

func (lq latencyQueue) latEnqueue(l latency) {
	lq <- l
}

func (lq latencyQueue) latDequeue() {
	for l := range lq {
		handleLatency(l)
		//bufferPool.Put(m.msg)
	}
}

var mq messageQueue
var lq latencyQueue

func main() {

	parser := argparse.NewParser("main", "Listens for UDP packets")

	listenPort = parser.String(
		"p",
		"lPort",
		&argparse.Options{Required: true, Help: "Listen for the UDP server to listen on"})

	fileName = parser.String(
		"f",
		"filename",
		&argparse.Options{Required: true, Help: "Name of the file to be used in results"})

	resultPath = parser.String(
		"r",
		"resultpath",
		&argparse.Options{Required: true, Help: "Path to store the results"})

	intfw = parser.String(
		"i",
		"interface",
		&argparse.Options{Required: true, Help: "Interface ot listne on"})

	err := parser.Parse(os.Args)

	if err != nil {
		//fmt.Print(parser.Usage(err))
		log.Print("Parser for CLI arguments is Ok.\n")
	}

	log.Printf("Path: %s", GetPathName())
	log.Printf("ResPath: %s", *resultPath)
	log.Printf("Filename: %s", *fileName)
	log.Printf("Port: %s", *listenPort)

	//runtime.GOMAXPROCS(runtime.NumCPU())

	log.Printf("Cores: %d", runtime.NumCPU())
	//flag.Parse()

	bufferPool = sync.Pool{
		New: func() interface{} { return make([]byte, UDPPacketSize) },
	}

	mq = make(messageQueue, maxQueueSize)
	lq = make(latencyQueue, maxQueueSize)

	listenAndReceive(workers)
	countOccurrences(workers)

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)
	go func() {
		for range c {
			atomic.AddUint64(&total, ops)
			log.Printf("Total ops %d", total)
			smPrint()
			os.Exit(0)
		}
	}()

	flushTicker = time.NewTicker(flushInterval)
	for range flushTicker.C {
		log.Printf("Ops/s %f", float64(ops)/flushInterval.Seconds())
		atomic.AddUint64(&total, ops)
		atomic.StoreUint64(&ops, 0)
	}
}

func countOccurrences(maxWorkers int) error {

	for i := 0; i < maxWorkers; i++ {
		go lq.latDequeue()
		//go receive(c)
	}
	return nil
}

func listenAndReceive(maxWorkers int) error {
	c, err := net.ListenPacket("udp4", ":"+*listenPort)
	if err != nil {
		return err
	}
	log.Printf("Workers: %d", maxWorkers)
	for i := 0; i < maxWorkers; i++ {
		go mq.msgDequeue()
		go receive(c)
	}
	return nil
}

// receive accepts incoming datagrams on c and calls handleMessage() for each message
func receive(c net.PacketConn) {
	defer c.Close()

	for {
		msg := bufferPool.Get().([]byte)
		nbytes, addr, err := c.ReadFrom(msg[0:])
		if err != nil {
			log.Printf("Error %s", err)
			continue
		}
		mq.msgEnqueue(message{addr, msg, nbytes})
	}
}

func handleLatency(latValues latency) {

	if _, ok := smInLat.Load(latValues.ingressLat); !ok {
		smInLat.Store(latValues.ingressLat, uint64(1))
	} else {
		v, _ := smInLat.Load(latValues.ingressLat)
		val, _ := v.(uint64)
		smInLat.Store(latValues.ingressLat, val+uint64(1))
	}

	if _, ok := smEgLat.Load(latValues.egressLat); !ok {
		smEgLat.Store(latValues.egressLat, uint64(1))
	} else {
		v, _ := smEgLat.Load(latValues.egressLat)
		val, _ := v.(uint64)
		smEgLat.Store(latValues.egressLat, val+uint64(1))
	}

	if _, ok := smE2ELat.Load(latValues.e2eLat); !ok {
		smE2ELat.Store(latValues.e2eLat, uint64(1))
	} else {
		v, _ := smE2ELat.Load(latValues.e2eLat)
		val, _ := v.(uint64)
		smE2ELat.Store(latValues.e2eLat, val+uint64(1))
	}

}

func handleMessage(addr net.Addr, buf []byte) {

	//log.Printf("Packet received\n")
	//var ts2Lat uint64

	ingressLat := uint64(0)
	egressLat := uint64(0)
	e2eLat := uint64(0)

	ts6_unprocessed := append([]byte{0x0, 0x0}, buf[0:locTS6-2]...)    // From 0 to 6 bytes (ignore last 2 bytes which are fraction)
	ts5_unprocessed := append([]byte{0x0, 0x0}, buf[locTS6:locTS5]...) // form byte 8 to byte 14
	ts3_unprocessed := append([]byte{0x0, 0x0, 0x0, 0x0, 0x0}, buf[locTS4:locTS3]...)
	ts2_unprocessed := append([]byte{0x0, 0x0}, buf[locTS3:locTS2]...)
	ts1_unprocessed := append([]byte{0x0, 0x0}, buf[locTS2:locTS1]...)

	//fmt.Printf("[% x]\n", ts3_unprocessed)
	//fmt.Printf("[% x]\n", ts2_unprocessed)

	ts6 := binary.BigEndian.Uint64(ts6_unprocessed) & 0x3FFFF
	ts5 := binary.BigEndian.Uint64(ts5_unprocessed) & 0x3FFFF
	ts3 := binary.BigEndian.Uint64(ts3_unprocessed) & 0x3FFFF
	ts2 := binary.BigEndian.Uint64(ts2_unprocessed) & 0x3FFFF
	ts1 := binary.BigEndian.Uint64(ts1_unprocessed) & 0x3FFFF

	//log.Printf("ts3 shifted: %d", uint64(ts3))
	//log.Printf("ts2 shifted: %d", uint64(ts2))

	if ts3 < ts2 {
		ingressLat = uint64(0x3FFFF) - ts2 + ts3 + uint64(1)
	} else {
		ingressLat = ts3 - ts2
	}

	egressLat = ts6 - ts5
	e2eLat = ts6 - ts1

	//log.Printf("Value of ingress latency: %d", uint64(ts2Lat))

	allLatencies := latency{
		ingressLat,
		egressLat,
		e2eLat,
	}
	lq.latEnqueue(allLatencies)
	//handleLatency(ts2Lat)

	atomic.AddUint64(&ops, 1)
	//log.Printf("Packet count: %d", uint64(ops))
}

func GetPathName() string {
	t := time.Now()
	return path.Join(*resultPath, *fileName, *fileName+"_"+t.Format("2006_01_02_15_04_05"))
}

func addToPath(pathName string, additional string) string {
	return path.Join(pathName, additional)
}

func smPrint() {
	mIngLat := map[string]interface{}{}
	smInLat.Range(func(key, value interface{}) bool {
		mIngLat[fmt.Sprint(key)] = value
		return true
	})

	mEgrLat := map[string]interface{}{}
	smEgLat.Range(func(key, value interface{}) bool {
		mEgrLat[fmt.Sprint(key)] = value
		return true
	})

	mE2ELat := map[string]interface{}{}
	smE2ELat.Range(func(key, value interface{}) bool {
		mE2ELat[fmt.Sprint(key)] = value
		return true
	})

	path := GetPathName()

	if _, err := os.Stat(path); os.IsNotExist(err) {
		fmt.Println("Creating directory")
		os.MkdirAll(path, os.ModePerm)
	}

	f1, err := os.Create(addToPath(path, ingressLatencyFile))
	if err != nil {
		fmt.Println(err)
		return
	}

	f2, err := os.Create(addToPath(path, "conv_"+ingressLatencyFile))
	if err != nil {
		fmt.Println(err)
		return
	}

	for key, element := range mIngLat {
		//fmt.Printf("Latency: %s => Counted: %d \n", key, element)
		line := fmt.Sprintf("%s,%d\n", key, element)
		_, err := f1.WriteString(line)
		if err != nil {
			fmt.Println(err)
			f1.Close()
			return
		}

		var timesFound uint64 = element.(uint64)

		sum := 0
		for i := 0; i < int(timesFound); i++ {
			line := fmt.Sprintf("%s\n", key)
			_, err := f2.WriteString(line)
			if err != nil {
				fmt.Println(err)
				f2.Close()
				return
			}
			sum += i
		}
	}

	//fmt.Println(l, "bytes written successfully")
	err = f1.Close()
	if err != nil {
		fmt.Println(err)
		return
	}

	err = f2.Close()
	if err != nil {
		fmt.Println(err)
		return
	}

}