package csx55.overlay.node; //imports import java.io.*; import java.net.*; import java.util.HashMap; import java.util.List; import java.util.Scanner; import java.util.ArrayList; import csx55.overlay.transport.TCPReceiverThread; import csx55.overlay.transport.TCPSenderThread; import csx55.overlay.transport.TCPServerThread; import csx55.overlay.util.OverlayCreator; import csx55.overlay.wireformats.Deregister; import csx55.overlay.wireformats.DeregisterResponse; import csx55.overlay.wireformats.Event; import csx55.overlay.wireformats.LinkWeights; import csx55.overlay.wireformats.Protocol; import csx55.overlay.wireformats.Register; import csx55.overlay.wireformats.RegisterResponse; import csx55.overlay.wireformats.TaskInitiate; import csx55.overlay.wireformats.TaskSummaryRequest; import csx55.overlay.wireformats.TaskSummaryResponse; import java.util.concurrent.atomic.AtomicInteger; /** * Registry class that accepts connection from messaging nodes and coordinates their actions based on the * input provided in the foreground, main thread. Implements Node so receiver thread can use node.onEvent. */ public class Registry implements Node { private ArrayList<TaskSummaryResponse> statsForAllNodes = new ArrayList<>(); //stores a list of all stats from all nodes connected to the registry private HashMap<String, Object[]> clientConnections = new HashMap<>(); //<ip:port, [TCPSenderThread, TCPReceiverThread]> private LinkWeights weights = null; //gets initialized after calling setup-overlay private AtomicInteger taskCompleted = new AtomicInteger(0); //used to print summary after all nodes finish sending messages private boolean sentWeights = false; //used to deny messaging nodes from deregistering /** * The main method is called when the Registry starts. It will first check to see if the arguments * provided match the required number. If so, it will create a server socket using that host name. * It passes that server socket to the server thread so it can block and wait for any new connections. * The main thread will continue to run as a foreground process using showOptions(). This allows * for the user to input commands for the registry. The server socket is closed when the program exits. */ public static void main(String[] args) throws IOException { if (args.length < 1) { System.out.println("Please provide a port num"); System.exit(0); } System.out.println("Registry starting"); Registry registry = new Registry();//crete an instance of Registry so the receiver can use node.onEvent() ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(Integer.parseInt(args[0])); TCPServerThread serverThread = new TCPServerThread(registry, serverSocket); Thread threadForServer = new Thread(serverThread); threadForServer.start(); registry.showOptions(); } finally { serverSocket.close(); } } /** * This is called by the receiver thread each time it gets a new message from the sender of any messaging node. * The event is an instance of that protocol, which could be to register, deregister, completion of message * passing, or arrival of statistics after passing messages. Along with the event, the sender and receiver * used to communicate with that messaging node is also passed in. This allows for the registry to send back * information to the messaging node if required. */ @Override public void onEvent(Event e, TCPSenderThread sender, TCPReceiverThread receiver) { switch (e.getType()) { case Protocol.REGISTER_REQUEST://messaging node wants to register registerNode(e, sender, receiver); break; case Protocol.DEREGISTER_REQUEST://messaging wants to exit the overlay deregisterNode(e, sender, receiver); break; case Protocol.TASK_COMPLETE://messaging node has completed all of its tasks completeTask(); break; case Protocol.TRAFFIC_SUMMARY://messaging node sent the statistics collected when performing tasks showSummaryStatistics(e); break; default: break; } } /** * This method is called after all messages have completed their tasks and sent the TASK_COMPLETE message to this * registry. After this, the registry requests a task summary request from all nodes. When the nodes send back their * task summaries, this method is called by the onEvent method. It will add task summaries to a list in a synchronized * block since many nodes could be sending this at the same time. When all nodes have sent their summaries, the displayStats * method is called and cleared after all summaries have been printed to prepare for the next command. */ private synchronized void showSummaryStatistics(Event e) { statsForAllNodes.add((TaskSummaryResponse) e); if (statsForAllNodes.size() == clientConnections.size()) {//all clients accounted for displayStats(statsForAllNodes); statsForAllNodes.clear(); } } /** * This method will loop over the stats list that includes all summaries from all nodes. * It also keeps a running sum and count of messages for the sum row of the displayed table. */ private void displayStats(List<TaskSummaryResponse> stats) { int sumSent = 0; long sumSentSum = 0L; int sumReceived = 0; long sumReceivedSum = 0L; //use tabs to format table; could be improved System.out.println("\t\tSent\t\tReceived\tSum of Sent\tSum of Received\t\tNum relayed");//header for (TaskSummaryResponse s: stats) { System.out.println(s.getInfo() + "\t" + s.getMessagesSent() + "\t\t" + s.getMessagesRec() + "\t\t" + s.getSentSummation() + "\t" + s.getRecSummation() + "\t\t" + s.getMessagesRelayed()); sumSent += s.getMessagesSent().get(); sumReceived += s.getMessagesRec().get(); sumReceivedSum += s.getRecSummation().get(); sumSentSum += s.getSentSummation().get(); } System.out.println(String.format("Sum\t\t%d\t\t%d\t\t%d\t%d", sumSent, sumReceived, sumSentSum, sumReceivedSum));//sum row } /** * This method is called from onEvent() after it receives a message from the messaging nodes saying that * it has completed all of its tasks. When the last node sends the request, the taskCompleted count is equal * to the number of clients and proceeds within the if statement. It will loop through each connection in the map * and send a request using TaskSummaryRequest usng the sender for that connection. After doing so, taskCompleted is * reset to 0 to prepare for the next start command. */ private synchronized void completeTask() { taskCompleted.getAndIncrement(); if (taskCompleted.get() == clientConnections.size()) { try { Thread.sleep(1500);//allow time for messages in transit to reach their destination //stats will be wrong if this is not done; they will be a part of the next operation } catch (InterruptedException ie) { System.out.println("There was an error when trying to sleep in completeTask()"); } for (HashMap.Entry<String, Object[]> node: clientConnections.entrySet()) { TaskSummaryRequest request = new TaskSummaryRequest(); try { TCPSenderThread sender = (TCPSenderThread) node.getValue()[0]; sender.sendData(request.getBytes()); } catch (IOException ioe) { System.out.println("There was an error when sending the task summary to the messaging nodes"); return; } } taskCompleted.set(0); } } /** * This method is called from onEvent() after it receives a message from the messaging nodes saying that * they want to exit from the overlay. It is very similar to the registerNode method. It will first * check if the node is registered in the registry and if the connection information match. If not, the * node is removed from the registry and a response is created with the status and message. That response * is sent to the node using the sender. It is synchronized because many nodes can send deregistration * requests at the same time. */ private synchronized void deregisterNode(Event e, TCPSenderThread sender, TCPReceiverThread receiver) { String ipPort = ((Deregister) e).toString(); String connectedIP = receiver.getSocket().getInetAddress().getHostName();//ip address of the socket String responseMessage = ""; byte status = Protocol.FAILURE; if (!clientConnections.containsKey(ipPort)) {//not registered in registry responseMessage = "This node does not exist in the Registry"; } else if (!ipPort.split(":")[0].equals(connectedIP.split("\\.")[0])) {//mismath between information responseMessage = "There is a mismatch between the registration request and the socket's input stream" + " Request: " + ipPort.split(":")[0] + " Input Stream: " + connectedIP; } //message is initially set to "" so no errors were appended if (responseMessage.equals("")) { responseMessage = "Deregistration request successful.";//success message status = Protocol.SUCCESS; clientConnections.remove(ipPort); } DeregisterResponse response = new DeregisterResponse(status, responseMessage); try { sender.sendData(response.getBytes()); } catch (IOException ioe) { System.out.println("There was an error when sending the response to the Messaging Node"); } } /** * This method is called from onEvent() after it receives a message from the messaging nodes saying that * they want to register themselves in the registru. It is very similar to the deregisterNode method. It will first * check if the node is already registered in the registry and if the connection information match. If not, the * node is added to the registry and a response is created with the status and message. That response * is sent to the node using the sender. It is synchronized because many nodes can send registration requests * at the same time. It accepts the sender and receiver used to communicate with the node, which is created * when the serverSocket accepts a new socket connection and creates a receiver thread. */ private synchronized void registerNode(Event e, TCPSenderThread sender, TCPReceiverThread receiver) { String ipPort = ((Register) e).toString(); String connectedIP = receiver.getSocket().getInetAddress().getHostName(); String responseMessage = ""; byte status = Protocol.FAILURE; if (clientConnections.containsKey(ipPort)) {//already registered responseMessage = "This node already exists in the registry"; } else if (!ipPort.split(":")[0].equals(connectedIP.split("\\.")[0])) {//mismatched information responseMessage = "There is a mismatch between the registration request and the socket's input stream\n" + "Request: " + ipPort.split(":")[0] + "!= Input Stream: " + connectedIP; } //message is initially set to "" so no errors were appended if (responseMessage.equals("")) { clientConnections.put(ipPort, new Object[]{sender, receiver});//add info, sender, and receiver to map System.out.println("Messaging Node connected on: " + ipPort); responseMessage = "Registration request successful. The number of messaging nodes currently constituting the overlay is (" + clientConnections.size() + ")."; status = Protocol.SUCCESS; } RegisterResponse response = new RegisterResponse(status, responseMessage); try { sender.sendData(response.getBytes()); } catch (IOException ioe) { clientConnections.remove(ipPort);//incase the data could not be sent, so the client does not know if the request was a success or failure } } /** * This method is a part of the foreground process of the main method. It allows the user to enter commands * and tell the register what to do. These include all the commands specified as a part of this assignment. * each command will call another method to process the requests. */ private void showOptions() { try (Scanner scnr = new Scanner(System.in)) {//akin to closing scanner using finally while (true) { System.out.println("Input a command to interact with the server: "); String line = scnr.nextLine().toLowerCase(); String[] command = line.split(" ");//could be two words like start 1 so split by space //process only the first string switch (command[0]) { case "list-messaging-nodes": listMessagingNodes(); break; case "list-weights": listWeights(); break; case "setup-overlay": setupOverlay(command); break; case "send-overlay-link-weights": sendLinkWeights(); break; case "start": startMessagePassing(command); break; case "test": System.out.println("Active Thread Count: " + Thread.activeCount());//test registration and deregistration break; default: System.out.println("Incorrect command entered. Please enter a valid command!"); break; } } } } /** * This method is called when the foreground process receives a command from the user to start the tasks. * This must be performed only after the overlay has been created and sent to the messaging nodes. If these * are true, a TaskInitiate message is created and sent to every node registered along with the number of * rounds to perform the tasks. */ private void startMessagePassing(String[] command) { if (weights == null) {//weights have not been setup using setup-overlay System.out.println("Link weights have not been setup yet. Try creating the overlay first"); return; } if (clientConnections.size() < 2) {//there are no clients registered System.out.println("There must be at least 2 nodes registerd to start sending messages"); return; } if (command.length < 2) {//user did not provide the number of rounds System.out.println("You must provide a number of rounds along with the command: 'start 10'"); return; } if (!sentWeights) {//weights have not been sent to messaging nodes using send-overlay-link-weights System.out.println("Link weights have not been sent yet. Try running send-overlay-link-weights first"); return; } int rounds = 1; try { rounds = Integer.parseInt(command[1]); } catch (NumberFormatException nfe) { System.out.println("The number of rounds specified could not be parsed. Try again with an integer"); return; } TaskInitiate task = new TaskInitiate(rounds); for (HashMap.Entry<String, Object[]> node: clientConnections.entrySet()) { try { TCPSenderThread sender = (TCPSenderThread) node.getValue()[0]; sender.sendData(task.getBytes()); } catch (IOException ioe) { System.out.println("There was an error when sending the task to the messaging nodes"); return; } } System.out.println("Messages are being sent."); } /** * This method is called when the user enters the send-overlay-link-weights command in the foreground * process. The overlay must have already been set up before this command using setup-overlay. * If true, it loops through the connnections and sends the link weights using the LinkWeights wireformat. */ private void sendLinkWeights() { if (weights == null) {//weights have not been setup System.out.println("Link weights have not been setup yet. Try creating the overlay first"); return; } for (HashMap.Entry<String, Object[]> node: clientConnections.entrySet()) { try { TCPSenderThread sender = (TCPSenderThread) node.getValue()[0]; sender.sendData(weights.getBytes()); } catch (IOException ioe) { System.out.println("There was an error sending the link weights to messaging nodes"); return; } } sentWeights = true; System.out.println("Link weights were sent to messaging nodes successfully"); } /** * This method is called when the user enters the setup-overlay command in the foreground process. * It will create a new object of OverlayCreator and use its createOverlay method, which takes in a * hashmap of connections. The returned LinkWeights objects will contain all of the link weights in * the overlay. For more information, look at OverlayCreator or LinkWeights. */ private void setupOverlay(String[] command) { int k = 4; try { k = Integer.parseInt(command[1]); } catch (Exception e) { System.out.println("Enter the command and also the num of messaging node: 'setup-overlay 4'"); return; } try { this.weights = (new OverlayCreator()).createOverlay(clientConnections, k);//named OverlayCreator object not required System.out.println("Overlay was created successfully and sent to messaging nodes"); } catch (Exception e) { System.out.println(e.getMessage()); return; } } /** * This method is called when the user enters the list-weights command in the foreground process. * It will iterate through the list in LinkWeights and print out each one. Links have to be setup * using setup-overlay before this is called. */ private void listWeights() { if (weights == null) System.out.println("Link weights have not been setup yet. Try creating the overlay first"); else { ArrayList<String> linksInOverlay = weights.getLinks(); String str = "All edges in the topology (" + linksInOverlay.size() + "): \n"; for (String s: linksInOverlay) { str += s + "\n"; } System.out.println(str); } } /** * This method is called when the user enters list-messaging-nodes command in the foreground process. * It will simply just iterate through the clientConnections hashmap and print out the keyset if any * nodes are registered. */ private void listMessagingNodes() { if (clientConnections.size() == 0) {//no nodes connected System.out.println("There are no nodes connected to the Registry"); } else { for (String key: clientConnections.keySet()) { System.out.println(key); } } } }