Routing-Packets-In-A-Network-Overlay / src / main / java / csx55 / overlay / node / Registry.java
Registry.java
Raw
package csx55.overlay.node;

//imports
import java.io.*;
import java.net.*;
import java.util.HashMap;
import java.util.List;
import java.util.Scanner;
import java.util.ArrayList;
import csx55.overlay.transport.TCPReceiverThread;
import csx55.overlay.transport.TCPSenderThread;
import csx55.overlay.transport.TCPServerThread;
import csx55.overlay.util.OverlayCreator;
import csx55.overlay.wireformats.Deregister;
import csx55.overlay.wireformats.DeregisterResponse;
import csx55.overlay.wireformats.Event;
import csx55.overlay.wireformats.LinkWeights;
import csx55.overlay.wireformats.Protocol;
import csx55.overlay.wireformats.Register;
import csx55.overlay.wireformats.RegisterResponse;
import csx55.overlay.wireformats.TaskInitiate;
import csx55.overlay.wireformats.TaskSummaryRequest;
import csx55.overlay.wireformats.TaskSummaryResponse;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Registry class that accepts connection from messaging nodes and coordinates their actions based on the 
 * input provided in the foreground, main thread. Implements Node so receiver thread can use node.onEvent.
 */
public class Registry implements Node {
    private ArrayList<TaskSummaryResponse> statsForAllNodes = new ArrayList<>();    //stores a list of all stats from all nodes connected to the registry
    private HashMap<String, Object[]> clientConnections = new HashMap<>();          //<ip:port, [TCPSenderThread, TCPReceiverThread]>
    private LinkWeights weights = null;                                             //gets initialized after calling setup-overlay
    private AtomicInteger taskCompleted = new AtomicInteger(0);                     //used to print summary after all nodes finish sending messages
    private boolean sentWeights = false;                                            //used to deny messaging nodes from deregistering

    /**
     * The main method is called when the Registry starts. It will first check to see if the arguments
     * provided match the required number. If so, it will create a server socket using that host name.
     * It passes that server socket to the server thread so it can block and wait for any new connections.
     * The main thread will continue to run as a foreground process using showOptions(). This allows
     * for the user to input commands for the registry. The server socket is closed when the program exits.
     */
    public static void main(String[] args) throws IOException {
        if (args.length < 1) {
            System.out.println("Please provide a port num");
            System.exit(0);
        }
        System.out.println("Registry starting");

        Registry registry = new Registry();//crete an instance of Registry so the receiver can use node.onEvent()
        ServerSocket serverSocket = null;
        
        try {
            serverSocket = new ServerSocket(Integer.parseInt(args[0]));
            TCPServerThread serverThread = new TCPServerThread(registry, serverSocket);

            Thread threadForServer = new Thread(serverThread);
            threadForServer.start();
            
            registry.showOptions();
        }
        finally {
            serverSocket.close();
        }
    }

    /**
     * This is called by the receiver thread each time it gets a new message from the sender of any messaging node.
     * The event is an instance of that protocol, which could be to register, deregister, completion of message
     * passing, or arrival of statistics after passing messages. Along with the event, the sender and receiver
     * used to communicate with that messaging node is also passed in. This allows for the registry to send back
     * information to the messaging node if required.
     */
    @Override
    public void onEvent(Event e, TCPSenderThread sender, TCPReceiverThread receiver) {
        switch (e.getType()) {
            case Protocol.REGISTER_REQUEST://messaging node wants to register
                registerNode(e, sender, receiver);
                break;
            case Protocol.DEREGISTER_REQUEST://messaging wants to exit the overlay
                deregisterNode(e, sender, receiver);
                break;
            case Protocol.TASK_COMPLETE://messaging node has completed all of its tasks
                completeTask();
                break;
            case Protocol.TRAFFIC_SUMMARY://messaging node sent the statistics collected when performing tasks
                showSummaryStatistics(e);
                break;
            default:
                break;
        }
    }

    /**
     * This method is called after all messages have completed their tasks and sent the TASK_COMPLETE message to this
     * registry. After this, the registry requests a task summary request from all nodes. When the nodes send back their
     * task summaries, this method is called by the onEvent method. It will add task summaries to a list in a synchronized
     * block since many nodes could be sending this at the same time. When all nodes have sent their summaries, the displayStats
     * method is called and cleared after all summaries have been printed to prepare for the next command.
     */
    private synchronized void showSummaryStatistics(Event e) {
        statsForAllNodes.add((TaskSummaryResponse) e);
        if (statsForAllNodes.size() == clientConnections.size()) {//all clients accounted for
            displayStats(statsForAllNodes);
            statsForAllNodes.clear();
        }
    }

    /**
     * This method will loop over the stats list that includes all summaries from all nodes.
     * It also keeps a running sum and count of messages for the sum row of the displayed table.
     */
    private void displayStats(List<TaskSummaryResponse> stats) {
        int sumSent = 0;
        long sumSentSum = 0L;
        int sumReceived = 0;
        long sumReceivedSum = 0L;

        //use tabs to format table; could be improved
        System.out.println("\t\tSent\t\tReceived\tSum of Sent\tSum of Received\t\tNum relayed");//header
        for (TaskSummaryResponse s: stats) {
            System.out.println(s.getInfo() + "\t" +
                            s.getMessagesSent() + "\t\t" + 
                            s.getMessagesRec() + "\t\t" + 
                            s.getSentSummation() + "\t" + 
                            s.getRecSummation() + "\t\t" + 
                            s.getMessagesRelayed());
            sumSent += s.getMessagesSent().get();
            sumReceived += s.getMessagesRec().get();
            sumReceivedSum += s.getRecSummation().get();
            sumSentSum += s.getSentSummation().get();
        }

        System.out.println(String.format("Sum\t\t%d\t\t%d\t\t%d\t%d", sumSent, sumReceived, sumSentSum, sumReceivedSum));//sum row
    }

    /**
     * This method is called from onEvent() after it receives a message from the messaging nodes saying that
     * it has completed all of its tasks. When the last node sends the request, the taskCompleted count is equal
     * to the number of clients and proceeds within the if statement. It will loop through each connection in the map
     * and send a request using TaskSummaryRequest usng the sender for that connection. After doing so, taskCompleted is
     * reset to 0 to prepare for the next start command.
     */
    private synchronized void completeTask() {
        taskCompleted.getAndIncrement();
        if (taskCompleted.get() == clientConnections.size()) {
            try {
                Thread.sleep(1500);//allow time for messages in transit to reach their destination
                //stats will be wrong if this is not done; they will be a part of the next operation
            }
            catch (InterruptedException ie) {
                System.out.println("There was an error when trying to sleep in completeTask()");
            }
            for (HashMap.Entry<String, Object[]> node: clientConnections.entrySet()) {
                TaskSummaryRequest request = new TaskSummaryRequest();
                try {
                    TCPSenderThread sender = (TCPSenderThread) node.getValue()[0];
                    sender.sendData(request.getBytes());
                }
                catch (IOException ioe) {
                    System.out.println("There was an error when sending the task summary to the messaging nodes");
                    return;
                }
            }
            taskCompleted.set(0);
        }
    }

    /**
     * This method is called from onEvent() after it receives a message from the messaging nodes saying that
     * they want to exit from the overlay. It is very similar to the registerNode method. It will first
     * check if the node is registered in the registry and if the connection information match. If not, the
     * node is removed from the registry and a response is created with the status and message. That response
     * is sent to the node using the sender. It is synchronized because many nodes can send deregistration
     * requests at the same time.
     */
    private synchronized void deregisterNode(Event e, TCPSenderThread sender, TCPReceiverThread receiver) {
        String ipPort = ((Deregister) e).toString();
        String connectedIP = receiver.getSocket().getInetAddress().getHostName();//ip address of the socket
        String responseMessage = "";
        byte status = Protocol.FAILURE;
        
        if (!clientConnections.containsKey(ipPort)) {//not registered in registry
            responseMessage = "This node does not exist in the Registry";
        }
        else if (!ipPort.split(":")[0].equals(connectedIP.split("\\.")[0])) {//mismath between information
            responseMessage = "There is a mismatch between the registration request and the socket's input stream" +
                                " Request: " + ipPort.split(":")[0] + " Input Stream: " + connectedIP;
        }

        //message is initially set to "" so no errors were appended
        if (responseMessage.equals("")) {
            responseMessage = "Deregistration request successful.";//success message
            status = Protocol.SUCCESS;
            clientConnections.remove(ipPort);
        }

        DeregisterResponse response = new DeregisterResponse(status, responseMessage);
        try {
            sender.sendData(response.getBytes());
        }
        catch (IOException ioe) {
            System.out.println("There was an error when sending the response to the Messaging Node");
        }
    }

    /**
     * This method is called from onEvent() after it receives a message from the messaging nodes saying that
     * they want to register themselves in the registru. It is very similar to the deregisterNode method. It will first
     * check if the node is already registered in the registry and if the connection information match. If not, the
     * node is added to the registry and a response is created with the status and message. That response
     * is sent to the node using the sender. It is synchronized because many nodes can send registration requests
     * at the same time. It accepts the sender and receiver used to communicate with the node, which is created
     * when the serverSocket accepts a new socket connection and creates a receiver thread.
     */
    private synchronized void registerNode(Event e, TCPSenderThread sender, TCPReceiverThread receiver) {
        String ipPort = ((Register) e).toString();
        String connectedIP = receiver.getSocket().getInetAddress().getHostName();
        String responseMessage = "";
        byte status = Protocol.FAILURE;
        
        if (clientConnections.containsKey(ipPort)) {//already registered
            responseMessage = "This node already exists in the registry";
        }
        else if (!ipPort.split(":")[0].equals(connectedIP.split("\\.")[0])) {//mismatched information
            responseMessage = "There is a mismatch between the registration request and the socket's input stream\n" +
                                "Request: " + ipPort.split(":")[0] + "!= Input Stream: " + connectedIP;
        }

        //message is initially set to "" so no errors were appended
        if (responseMessage.equals("")) {
            clientConnections.put(ipPort, new Object[]{sender, receiver});//add info, sender, and receiver to map
            System.out.println("Messaging Node connected on: " + ipPort);
            responseMessage = "Registration request successful. The number of messaging nodes currently constituting the overlay is (" + clientConnections.size() + ").";
            status = Protocol.SUCCESS;
        }

        RegisterResponse response = new RegisterResponse(status, responseMessage);
        try {
            sender.sendData(response.getBytes());
        }
        catch (IOException ioe) {
            clientConnections.remove(ipPort);//incase the data could not be sent, so the client does not know if the request was a success or failure
        }
    }

   /**
     * This method is a part of the foreground process of the main method. It allows the user to enter commands
     * and tell the register what to do. These include all the commands specified as a part of this assignment.
     * each command will call another method to process the requests.
     */
    private void showOptions() {
        try (Scanner scnr = new Scanner(System.in)) {//akin to closing scanner using finally
            while (true) {
                System.out.println("Input a command to interact with the server: ");
                String line = scnr.nextLine().toLowerCase();
                String[] command = line.split(" ");//could be two words like start 1 so split by space

                //process only the first string
                switch (command[0]) {
                    case "list-messaging-nodes":
                        listMessagingNodes();
                        break;
                    case "list-weights":
                        listWeights();
                        break;
                    case "setup-overlay":
                        setupOverlay(command);
                        break;
                    case "send-overlay-link-weights":
                        sendLinkWeights();
                        break;
                    case "start":
                        startMessagePassing(command);
                        break;
                    case "test":
                        System.out.println("Active Thread Count: " + Thread.activeCount());//test registration and deregistration
                        break;
                    default:
                        System.out.println("Incorrect command entered. Please enter a valid command!");
                        break;
                }
            }
        }
    }

    /**
     * This method is called when the foreground process receives a command from the user to start the tasks.
     * This must be performed only after the overlay has been created and sent to the messaging nodes. If these
     * are true, a TaskInitiate message is created and sent to every node registered along with the number of 
     * rounds to perform the tasks. 
     */
    private void startMessagePassing(String[] command) {
        if (weights == null) {//weights have not been setup using setup-overlay
            System.out.println("Link weights have not been setup yet. Try creating the overlay first");
            return;
        }
        if (clientConnections.size() < 2) {//there are no clients registered 
            System.out.println("There must be at least 2 nodes registerd to start sending messages");
            return;
        }
        if (command.length < 2) {//user did not provide the number of rounds
            System.out.println("You must provide a number of rounds along with the command: 'start 10'");
            return;
        }
        if (!sentWeights) {//weights have not been sent to messaging nodes using send-overlay-link-weights
            System.out.println("Link weights have not been sent yet. Try running send-overlay-link-weights first");
            return;
        }

        int rounds = 1;
        try {
            rounds = Integer.parseInt(command[1]);
        }
        catch (NumberFormatException nfe) {
            System.out.println("The number of rounds specified could not be parsed. Try again with an integer");
            return;
        }

        TaskInitiate task = new TaskInitiate(rounds);
        for (HashMap.Entry<String, Object[]> node: clientConnections.entrySet()) {
            try {
                TCPSenderThread sender = (TCPSenderThread) node.getValue()[0];
                sender.sendData(task.getBytes());
            }
            catch (IOException ioe) {
                System.out.println("There was an error when sending the task to the messaging nodes");
                return;
            }
        }
        System.out.println("Messages are being sent.");
    }

    /**
     * This method is called when the user enters the send-overlay-link-weights command in the foreground
     * process. The overlay must have already been set up before this command using setup-overlay.
     * If true, it loops through the connnections and sends the link weights using the LinkWeights wireformat.
     */
    private void sendLinkWeights() {
        if (weights == null) {//weights have not been setup
            System.out.println("Link weights have not been setup yet. Try creating the overlay first");
            return;
        }
        
        for (HashMap.Entry<String, Object[]> node: clientConnections.entrySet()) {
            try {
                TCPSenderThread sender = (TCPSenderThread) node.getValue()[0];
                sender.sendData(weights.getBytes());
            }
            catch (IOException ioe) {
                System.out.println("There was an error sending the link weights to messaging nodes");
                return;
            }
        }

        sentWeights = true;
        System.out.println("Link weights were sent to messaging nodes successfully");
    }

    /**
     * This method is called when the user enters the setup-overlay command in the foreground process.
     * It will create a new object of OverlayCreator and use its createOverlay method, which takes in a
     * hashmap of connections. The returned LinkWeights objects will contain all of the link weights in
     * the overlay. For more information, look at OverlayCreator or LinkWeights.
     */
    private void setupOverlay(String[] command) {
        int k = 4;
        try {
            k = Integer.parseInt(command[1]);
        }
        catch (Exception e) {
            System.out.println("Enter the command and also the num of messaging node: 'setup-overlay 4'");
            return;
        }

        try {
            this.weights = (new OverlayCreator()).createOverlay(clientConnections, k);//named OverlayCreator object not required
            System.out.println("Overlay was created successfully and sent to messaging nodes");
        }
        catch (Exception e) {
            System.out.println(e.getMessage());
            return;
        }
    }

    /**
     * This method is called when the user enters the list-weights command in the foreground process.
     * It will iterate through the list in LinkWeights and print out each one. Links have to be setup
     * using setup-overlay before this is called.
     */
    private void listWeights() {
        if (weights == null) System.out.println("Link weights have not been setup yet. Try creating the overlay first");
        else {
            ArrayList<String> linksInOverlay = weights.getLinks();
            String str = "All edges in the topology (" + linksInOverlay.size() + "): \n";

            for (String s: linksInOverlay) {
                str += s + "\n";
            }

            System.out.println(str);
        }
    }

    /**
     * This method is called when the user enters list-messaging-nodes command in the foreground process.
     * It will simply just iterate through the clientConnections hashmap and print out the keyset if any
     * nodes are registered.
     */
    private void listMessagingNodes() {
        if (clientConnections.size() == 0) {//no nodes connected
            System.out.println("There are no nodes connected to the Registry");
        }
        else {
            for (String key: clientConnections.keySet()) {
                System.out.println(key);
            }
        }
    }
}