Routing-Packets-In-A-Network-Overlay / src / main / java / csx55 / overlay / node / MessagingNode.java
MessagingNode.java
Raw
package csx55.overlay.node;

//imports
import java.io.IOException;
import java.net.*;
import java.util.ArrayList;
import java.util.Scanner;
import java.util.Random;
import java.util.Map;
import java.util.HashMap;
import java.util.Collections;
import csx55.overlay.dijkstra.RoutingCache;
import csx55.overlay.transport.TCPReceiverThread;
import csx55.overlay.transport.TCPSenderThread;
import csx55.overlay.transport.TCPServerThread;
import csx55.overlay.util.StatisticsCollector;
import csx55.overlay.wireformats.Deregister;
import csx55.overlay.wireformats.DeregisterResponse;
import csx55.overlay.wireformats.Event;
import csx55.overlay.wireformats.LinkWeights;
import csx55.overlay.wireformats.Message;
import csx55.overlay.wireformats.MessagingNodesList;
import csx55.overlay.wireformats.Protocol;
import csx55.overlay.wireformats.Register;
import csx55.overlay.wireformats.RegisterResponse;
import csx55.overlay.wireformats.TaskComplete;
import csx55.overlay.wireformats.TaskInitiate;
import csx55.overlay.wireformats.TaskSummaryResponse;

/**
 * MessagingNode class that registers iteself to the Registry and allows connections from peers. It's main
 * task is to listen to messages from the registry and perform actions based on that. This could include connecting
 * to other nodes in the overlay or sending messages to them. Implements Node so receiver thread can use node.onEvent.
 */
public class MessagingNode implements Node {
    private TCPReceiverThread serverReceiver;                                                                   //receiver thread used to receive events from the registry
    private TCPSenderThread serverSender;                                                                       //sender thread used to send events to the registry
    private int nodePort;                                                                                       //the port number of this messaging node
    private String nodeIP;                                                                                      //the hostname of this messaging node
    private LinkWeights weights = null;                                                                         //used to store the link weights sent by registry
    private RoutingCache msgRoutes = null;                                                                      //used to cache the routes for this node
    private StatisticsCollector stats = new StatisticsCollector();                                              //collects the stats for this node when sending messages
    private Map<String, Object[]> connections = Collections.synchronizedMap(new HashMap<String, Object[]>());   //stores all peers connected to the node
    
    //constructor that initializes the hostname and port
    public MessagingNode(String ipAddress, int numPort) {
        this.nodeIP = ipAddress;
        this.nodePort = numPort;
    }

    /**
     * This is the main method for the MessangingNode class. The command to start this main thread will also
     * contain two arguments, which are used to connect to the server socket of the registry. After checking for
     * the correct number of arguments, the method will create a server socket as well that is assigned to any
     * available port number in the system. This is used by peers to connect to this node whe it receives
     * the MessagingNodeList wireformat. It then calls the register method defined below to start a receiver and 
     * sender thread to communicate with the registry. The main thread will continue to run as a foreground process
     * and allow users to input commands to the node.
     */
    public static void main(String[] args) {
        if (args.length < 2) {//not enough arguments provided
            System.out.println("Please provide the ip-address and port num");
            System.exit(0);
        }

        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(0);//use 0 to let OS decide which port to use
            int portNum  = serverSocket.getLocalPort();//get the port number after OS assigns it
            MessagingNode mNode = new MessagingNode(InetAddress.getLocalHost().getHostName(), portNum);//crete an instance of Registry so the receiver can use node.onEvent()

            //Server thread for incoming messaging node connections when creating overlay
            TCPServerThread acceptor = new TCPServerThread(mNode, serverSocket);
            Thread acceptorThread = new Thread(acceptor);
            acceptorThread.start();

            mNode.registerToRegistry(args[0], Integer.parseInt(args[1]));//call register to send registration request 
            mNode.showOptions();//allows users to input commands
        }
        catch (SocketException se) {
            System.out.println("Exiting due to refused connection");
            System.exit(0);
        }
        catch (IOException ioe) {
            System.out.println("Exiting due to error with IO");
            System.exit(0);
        }
        finally {//close the server socket when exiting
            try {
                serverSocket.close();
                System.exit(0);
            } catch (IOException ioe) {
                System.out.println("There was an error when trying to close the server socket for the messaging node");
            }
            
        }
    }

    /**
     * This method is used by the main thread to register with the Registry when starting up.
     * It does so by sending a Regsiter wireformat with the ip and port of this socket.
     * A socket is created first using the hostname and port number provided at the command line.
     * This will create a connection to the server socket of the Registry. Then a sender and receiver
     * thread is created to communicate with the Registry.
     */
    public void registerToRegistry(String ipAddress, int port) {
        try {
            Socket newSocket = new Socket(ipAddress, port);//socket connection to Registry
            serverSender = new TCPSenderThread(newSocket);
            serverReceiver = new TCPReceiverThread(this, newSocket, serverSender);

            Register register = new Register(this.nodeIP, this.nodePort);
            serverSender.sendData(register.getBytes());

            //Thread for receiver from registry
            Thread registryReceiver = new Thread(serverReceiver);
            registryReceiver.start();
            System.out.println("Messaging node starting connection at: " + this.nodeIP + ":" + this.nodePort);
        }
        catch (IOException ioe) {
            System.out.println("There was an error connecting to the Registry/Messaging Node");
            System.exit(0);//exit if not able to connect to the Registry
        }
    }

    /**
     * This is called by the receiver thread each time it gets a new message from its peers or the Registry.
     * The event is an instance of that protocol, which could be to register, deregister, response from the
     * Registry, starting tasks, etc. Along with the event, the sender and receiver used to communicate with 
     * that messaging node is also passed in. This allows for the registry to send back
     * information to the peers or Registry if required.
     */
    @Override
    public void onEvent(Event e, TCPSenderThread sender, TCPReceiverThread receiver) {
        switch (e.getType()) {
            case Protocol.REGISTER_REQUEST:
                createConnection(e, sender, receiver);
                break;
            case Protocol.REGISTER_RESPONSE:
                System.out.println(((RegisterResponse) e).toString());
                break;
            case Protocol.MESSAGING_NODES_LIST:
                connectToPeers(e);
                break;
            case Protocol.LINK_WEIGHTS:
                createRoutesFromLinkWeights(e);
                break;
            case Protocol.TASK_INITIATE:
                startMessagingPeers(e);
                break;
            case Protocol.MESSAGE:
                messagePeer(e);
                break;
            case Protocol.PULL_TRAFFIC_SUMMARY:
                sendStats();
                break;
            case Protocol.DEREGISTER_RESPONSE:
                closeAndExit(e);
                break;    
        }
    }

    /**
     * This method is called from the onEvent method whenever it receives a response from the Registry regarding
     * the deregistration request sent by this node. The response will, primarily, include the status, which is 
     * checked in this method to see if the request was successful. If it was successful, this method closes all
     * of the resources associated with the sender and receiver to the Registry. If not, this node remains connected.
     */
    private void closeAndExit(Event e) {
        DeregisterResponse response = (DeregisterResponse) e;
        byte status = response.getStatus();
        System.out.println(response.toString());

        if (status == Protocol.SUCCESS) {//if success
            try {
                serverSender.close();
                serverReceiver.close();
                System.exit(0);
            }
            catch (IOException ioe) {
                System.out.println("There was an error when trying to close the sender and receiver");
            }
        }
        else {//if failure; cannot see why this would happen but is here just in case
            System.out.println("Deregistration failed; this node is still connected to the Registry");
        }

    }

    /**
     * This method is called from the onEvent method when the receiver gets a LinkWeights wireformat
     * from the registry. It will use the weights to construct the shorest path from this node to every
     * other node in the overlay using RoutingCache. For more information, look at the LinkWeights and 
     * RoutingCache classes.
     */
    private void createRoutesFromLinkWeights(Event e) {
        weights = (LinkWeights) e;
        msgRoutes = new RoutingCache(weights, nodeIP, nodePort);
        System.out.println("Link weights received and processed. Ready to send messages");
    }

    /**
     * This method is called from the onEvent method when the receiver gets a TaskSummaryRequest
     * wireformat from the Registry, which is asking for the summary of this node when passing
     * messages to peers. This is done after start has already been specified from the Registry
     * and message passing has completed.
     */
    private void sendStats() {
        TaskSummaryResponse stats_to_send = new TaskSummaryResponse(nodeIP, nodePort, stats);//ip and port of this node, and StatisticsCollector object

        try {
            serverSender.sendData(stats_to_send.getBytes());
            stats.setToZero();
        }
        catch (IOException ioe) {
            System.out.println("There was an error sending the stats of this node to the registry");
        }
    }

    /*
     * This method is called from the onEvent method when the receiver gets a Message wireformat
     * from its peers. When the registry sends a start request, the startMessagingPeers method will
     * create a Message that includes the random payload, current position within the route, and 
     * the route itself as a list. If the current position is the same as the size of the route,
     * it means that this is the sink node, so the payload is added to the received sum and count.
     * If not, this node gets the sender of the next node in the route and sends the message to that
     * peer. In addition, the relayed counter is increased in the StatisticsSummary object. This method
     * is synchronized because there could be many messages incoming to this node from all other nodes.
     */
    private synchronized void messagePeer(Event e) {
        Message message = (Message) e;
        String[] msgRoute = message.getRoutingPlan();
        int curr = message.getPosition().get();

        if (msgRoute.length == curr) {//this node is the sink node and should receive the payload
            stats.receiveInfo(message.getPayload());//add to received count/sum
        }
        else {//this node is not the sink node, so relay this message to the next node in route using list of connections
            TCPSenderThread nextNodeSender = (TCPSenderThread) connections.get(msgRoute[curr])[0];//get sender of next node in route
            message.increment();//increment the position of the message in route

            try {
                nextNodeSender.sendData(message.getBytes());
                stats.relayInfo();//increase the count of relayed messaged in stats
            }
            catch (IOException ioe) {
                System.out.println("There was an error sending the message");
            }
        }
    }

    /**
     * This method is called from the onEvent method when the receiver gets a TaskInitiate wireformat
     * from the Registry, indicating that the node should start exchanging messages with its peers for
     * the number of rounds defined in the wireformat. This is used for the outer loop of the method.
     * It will run number of rounds times. FOr each round, a random sink node is selected from the list
     * of peers, which does not inculde itself. The inner loop will iterate 5 times as that was the value
     * specified in the assignment. This means that this node will send five messages to a random sink node.
     * The payload is also a random integer.
     */
    private synchronized void startMessagingPeers(Event e) {
        Random rand = new Random();
        int numRounds = ((TaskInitiate) e).getNumRounds();

        for (int i = 0; i < numRounds; i++) {//loop numRounds times
            String route[];
            String sink;
            TCPSenderThread nSender;

            sink = msgRoutes.get(rand.nextInt(msgRoutes.size()));//get a radom sink node from the list of nodes
            route = msgRoutes.getMsgRoute(sink);//get the route from the cache to the sink node
            nSender = (TCPSenderThread) connections.get(route[0])[0];//get the sender to the first node in the route; this could or could not be the sink node

            for (int j = 0; j < 5; j++) {//send five messages
                int payload = rand.nextInt();//random integer with no bounds
                Message message = new Message(payload, 1, route);//create a Message object
                try {
                    nSender.sendData(message.getBytes());//send Message
                }
                catch (Exception ex) {
                    System.out.println("There was an error when sending the message to a sink node");
                }
                stats.sendInfo(payload);//add to the sent count/sum in stats
            }
        }

        sendTaskCompletion();//send TaskComplete message
    }

    /**
     * This method is called by startMessagingPeers after it is done sending all messages to its peers.
     * It sends the TaskComplete wireformat to the registry.
     */
    private synchronized void sendTaskCompletion() {
        TaskComplete task = new TaskComplete(this.nodeIP, this.nodePort);//create a wireformat to inform Registry of completion
        try {
            serverSender.sendData(task.getBytes());
            System.out.println("Completed message communication with no errors");
        }
        catch (IOException ioe) {
            System.out.println("There was an error sending the task completion message to the registry");
        } 
    }

    /**
     * This method is called from the onEvent method when the receiver gets a MessagingNodesList wireformat
     * from the Registry, indicating that the node should connect to the peers in the list. The list will 
     * contain only the peers this node needs to form a connection to. For a numEdge of 2, this will be one
     * connection. The other will be formed by one of its peers. This is a similar method to registerToRegistry.
     */
    private void connectToPeers(Event e) {
        MessagingNodesList msgList = (MessagingNodesList) e;
        ArrayList<String> neighbors = msgList.getMessagingNodes();//list of peers to connect to

        for (String n: neighbors) {//loop over peers list
            String[] node = n.split(":");//split by : because the neighbors are in the format ip:port
            try {
                //create a socket using the ip and port of the neighbor, which will create a connection to the serverSocket of that node
                //create sender and receiver to communicate with that node
                Socket nSocket = new Socket(node[0], Integer.parseInt(node[1]));
                TCPSenderThread nSender = new TCPSenderThread(nSocket);
                TCPReceiverThread nReceiver = new TCPReceiverThread(this, nSocket, nSender);
                Register register = new Register(this.nodeIP, this.nodePort);//create a new wireformat to register with that peer
                
                Thread threadForReceiver = new Thread(nReceiver);
                threadForReceiver.start();
                
                nSender.sendData(register.getBytes());
                connections.put(n, new Object[] {nSender, nReceiver});
            }
            catch (IOException ioe) {
                System.out.println("There was an error when connecting to the messaging node");
            }
            System.out.println("All connections are established. Number of connections(" + msgList.getNumNodes() + ")");
        }

    }

    /**
     * This method is called from the onEvent method when the receiver gets a Register weireformat from one
     * of its peers. This is what allows bidirectional communication between the nodes. A node will
     * connect to some of its neighbors and receive registration from others. However, the connections
     * hashmap will have a full list of connected peers. 
     */
    private void createConnection(Event e, TCPSenderThread sender, TCPReceiverThread receiver) {
        Register node = (Register) e;
        connections.put(node.toString(), new Object[] {sender, receiver});
    }

    /**
     * This is used by the main thread as a part of the foreground process to accept commands
     * from the user. The messaging nodes only have two functions, to exit the overlay and to
     * print the shortest path stored in the routing cache object. It simply matches the command
     * to a string and calls methods based on that.
     */
    public void showOptions() {
        try (Scanner scnr = new Scanner(System.in)) {
            System.out.println("Input a command to interact with the node: ");
            while (true) {
                String line = scnr.nextLine().toLowerCase();
                String[] command = line.split(" ");

                if (command[0].equals("print-shortest-path")) printShortestPath();
                else if (command[0].equals("exit-overlay")) exitOverlay();
                else if (command[0].equals("test")) {//used for testing; not part of the assignment 
                    for (Map.Entry<String, Object[]> entry: connections.entrySet()) {
                        Socket tempSenderSocket = ((TCPSenderThread) entry.getValue()[0]).getSocket();
                        Socket tempReceiverSocket = ((TCPReceiverThread) entry.getValue()[1]).getSocket();
                        
                        System.out.println("Peer: " + entry.getKey() + " -> Socket for Sender: " + tempSenderSocket + " -> Socket for Receiver: " + tempReceiverSocket);
                    }
                }
                else {
                    System.out.println("Command not recognized. Try again");
                    continue;
                }
            }
        }
    }

    /**
     * This method is called when the user enters print-shortest-path as an input. It will
     * simply just print the shortest path from this node to every other node. Link weights must
     * have been sent already by the Registry before calling this methd.
     */
    private void printShortestPath() {
        if (msgRoutes == null) System.out.println("Link weights have not been setup yet. Try creating the overlay first");
        else msgRoutes.printSP(weights);//see method deifinition for more details
    }

    /**
     * This method is called when the user enters exit-overlay as an input. It will create a deregister
     * object with its hostname and port number then send that to the Registry. The node is not deregistered
     * here as it must receive a deregister reponse from the registry. If the weights have already been set up,
     * the node cannot deregister. In this case, use ctrl+c to exit.
     */
    private void exitOverlay() {
        if (weights != null) {//cannot deregister after creating overlay; use ctrl+c in this case
            System.out.println("Link weights have already been set up. Cannot deregister");
            return;
        }
        
        Deregister deregister = new Deregister(nodeIP, nodePort);//create a deregistration wireformat
        
        try {
            this.serverSender.sendData(deregister.getBytes());//send to registry
        }
        catch (IOException ioe) {
            System.out.println("There was an error when sending a deregistration request to the registry");
        }
    }
}