TNSM_Latency_Prediction / UDPServer / BPF / collector.py
collector.py
Raw
from __future__ import print_function

import argparse
import threading
import time
import sys

import pyximport; pyximport.install()
import bpf

def parse_params():
    parser = argparse.ArgumentParser(description='InfluxBD INTCollector client.')

    parser.add_argument("ifaces", nargs='+',
    help="List of ifaces to receive INT reports")

    parser.add_argument("-i", "--int_port", default=54321, type=int,
        help="Destination port of INT Telemetry reports")

    parser.add_argument("-H", "--host", default="localhost",
        help="InfluxDB server address")

    parser.add_argument("-D", "--database", default="INTdatabase",
        help="Database name")

    parser.add_argument("-p", "--period", default=10, type=int,
        help="Time period to push data in normal condition")

    parser.add_argument("-P", "--event_period", default=1, type=float,
        help="Time period to push event data")

    parser.add_argument("-t", "--int_time", action='store_true',
        help="Use INT timestamp instead of local time")

    parser.add_argument("-e", "--event_mode", default="THRESHOLD",
        help="Event detection mode: INTERVAL or THRESHOLD. \
        Option -p is disabled for THRESHOLD and is hard-coded instead")

    parser.add_argument("-d", "--debug_mode", default=0, type=int,
        help="Set to 1 to print event")

    return parser.parse_args()


if __name__ == "__main__":

    args = parse_params()

    collector = InDBCollector.InDBCollector(int_dst_port=args.int_port,
        debug_mode=args.debug_mode, host=args.host,
        database=args.database, int_time=args.int_time,
        event_mode=args.event_mode)


    for iface in args.ifaces:
        collector.attach_iface(iface)

    # clear all old dbs. For easy testing
    for db in collector.client.get_list_database():
        collector.client.drop_database(db["name"])
    collector.client.create_database(args.database)


    push_stop_flag = threading.Event()

    # A separated thread to push event data
    def _event_push():

        while not push_stop_flag.is_set():

            time.sleep(args.event_period)

            collector.lock.acquire()
            data = collector.event_data
            collector.event_data = []
            collector.lock.release()

            if args.debug_mode==2:
                print("Len of events: ", len(data))

            if data:
                collector.client.write_points(points=data, protocol="line")


    # A separated thread to push data
    if args.event_mode == "INTERVAL":
        def _periodically_push():
            cnt = 0
            while not push_stop_flag.is_set():
                # use cnt to partition sleep time,
                # so Ctrl-C could terminate the program earlier
                time.sleep(1)
                cnt += 1
                if cnt < args.period:
                    continue
                cnt = 0

                data = collector.collect_data()
                if data:
                    collector.client.write_points(points=data, protocol=protocol)
                    if args.debug_mode==2:
                        print("Periodically push: ", len(data))


        periodically_push = threading.Thread(target=_periodically_push)
        periodically_push.start()

    event_push = threading.Thread(target=_event_push)
    event_push.start()


    # Start polling events
    collector.open_events()

    print("eBPF progs Loaded")
    sys.stdout.flush()

    try:
        while 1:
            collector.poll_events()

    except KeyboardInterrupt:
        pass

    finally:
        push_stop_flag.set()
        if args.event_mode == "INTERVAL":
            periodically_push.join()
        event_push.join()

        collector.detach_all_iface()
        print("Done")