package csx55.overlay.node; //imports import java.io.IOException; import java.net.*; import java.util.ArrayList; import java.util.Scanner; import java.util.Random; import java.util.Map; import java.util.HashMap; import java.util.Collections; import csx55.overlay.dijkstra.RoutingCache; import csx55.overlay.transport.TCPReceiverThread; import csx55.overlay.transport.TCPSenderThread; import csx55.overlay.transport.TCPServerThread; import csx55.overlay.util.StatisticsCollector; import csx55.overlay.wireformats.Deregister; import csx55.overlay.wireformats.DeregisterResponse; import csx55.overlay.wireformats.Event; import csx55.overlay.wireformats.LinkWeights; import csx55.overlay.wireformats.Message; import csx55.overlay.wireformats.MessagingNodesList; import csx55.overlay.wireformats.Protocol; import csx55.overlay.wireformats.Register; import csx55.overlay.wireformats.RegisterResponse; import csx55.overlay.wireformats.TaskComplete; import csx55.overlay.wireformats.TaskInitiate; import csx55.overlay.wireformats.TaskSummaryResponse; /** * MessagingNode class that registers iteself to the Registry and allows connections from peers. It's main * task is to listen to messages from the registry and perform actions based on that. This could include connecting * to other nodes in the overlay or sending messages to them. Implements Node so receiver thread can use node.onEvent. */ public class MessagingNode implements Node { private TCPReceiverThread serverReceiver; //receiver thread used to receive events from the registry private TCPSenderThread serverSender; //sender thread used to send events to the registry private int nodePort; //the port number of this messaging node private String nodeIP; //the hostname of this messaging node private LinkWeights weights = null; //used to store the link weights sent by registry private RoutingCache msgRoutes = null; //used to cache the routes for this node private StatisticsCollector stats = new StatisticsCollector(); //collects the stats for this node when sending messages private Map<String, Object[]> connections = Collections.synchronizedMap(new HashMap<String, Object[]>()); //stores all peers connected to the node //constructor that initializes the hostname and port public MessagingNode(String ipAddress, int numPort) { this.nodeIP = ipAddress; this.nodePort = numPort; } /** * This is the main method for the MessangingNode class. The command to start this main thread will also * contain two arguments, which are used to connect to the server socket of the registry. After checking for * the correct number of arguments, the method will create a server socket as well that is assigned to any * available port number in the system. This is used by peers to connect to this node whe it receives * the MessagingNodeList wireformat. It then calls the register method defined below to start a receiver and * sender thread to communicate with the registry. The main thread will continue to run as a foreground process * and allow users to input commands to the node. */ public static void main(String[] args) { if (args.length < 2) {//not enough arguments provided System.out.println("Please provide the ip-address and port num"); System.exit(0); } ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(0);//use 0 to let OS decide which port to use int portNum = serverSocket.getLocalPort();//get the port number after OS assigns it MessagingNode mNode = new MessagingNode(InetAddress.getLocalHost().getHostName(), portNum);//crete an instance of Registry so the receiver can use node.onEvent() //Server thread for incoming messaging node connections when creating overlay TCPServerThread acceptor = new TCPServerThread(mNode, serverSocket); Thread acceptorThread = new Thread(acceptor); acceptorThread.start(); mNode.registerToRegistry(args[0], Integer.parseInt(args[1]));//call register to send registration request mNode.showOptions();//allows users to input commands } catch (SocketException se) { System.out.println("Exiting due to refused connection"); System.exit(0); } catch (IOException ioe) { System.out.println("Exiting due to error with IO"); System.exit(0); } finally {//close the server socket when exiting try { serverSocket.close(); System.exit(0); } catch (IOException ioe) { System.out.println("There was an error when trying to close the server socket for the messaging node"); } } } /** * This method is used by the main thread to register with the Registry when starting up. * It does so by sending a Regsiter wireformat with the ip and port of this socket. * A socket is created first using the hostname and port number provided at the command line. * This will create a connection to the server socket of the Registry. Then a sender and receiver * thread is created to communicate with the Registry. */ public void registerToRegistry(String ipAddress, int port) { try { Socket newSocket = new Socket(ipAddress, port);//socket connection to Registry serverSender = new TCPSenderThread(newSocket); serverReceiver = new TCPReceiverThread(this, newSocket, serverSender); Register register = new Register(this.nodeIP, this.nodePort); serverSender.sendData(register.getBytes()); //Thread for receiver from registry Thread registryReceiver = new Thread(serverReceiver); registryReceiver.start(); System.out.println("Messaging node starting connection at: " + this.nodeIP + ":" + this.nodePort); } catch (IOException ioe) { System.out.println("There was an error connecting to the Registry/Messaging Node"); System.exit(0);//exit if not able to connect to the Registry } } /** * This is called by the receiver thread each time it gets a new message from its peers or the Registry. * The event is an instance of that protocol, which could be to register, deregister, response from the * Registry, starting tasks, etc. Along with the event, the sender and receiver used to communicate with * that messaging node is also passed in. This allows for the registry to send back * information to the peers or Registry if required. */ @Override public void onEvent(Event e, TCPSenderThread sender, TCPReceiverThread receiver) { switch (e.getType()) { case Protocol.REGISTER_REQUEST: createConnection(e, sender, receiver); break; case Protocol.REGISTER_RESPONSE: System.out.println(((RegisterResponse) e).toString()); break; case Protocol.MESSAGING_NODES_LIST: connectToPeers(e); break; case Protocol.LINK_WEIGHTS: createRoutesFromLinkWeights(e); break; case Protocol.TASK_INITIATE: startMessagingPeers(e); break; case Protocol.MESSAGE: messagePeer(e); break; case Protocol.PULL_TRAFFIC_SUMMARY: sendStats(); break; case Protocol.DEREGISTER_RESPONSE: closeAndExit(e); break; } } /** * This method is called from the onEvent method whenever it receives a response from the Registry regarding * the deregistration request sent by this node. The response will, primarily, include the status, which is * checked in this method to see if the request was successful. If it was successful, this method closes all * of the resources associated with the sender and receiver to the Registry. If not, this node remains connected. */ private void closeAndExit(Event e) { DeregisterResponse response = (DeregisterResponse) e; byte status = response.getStatus(); System.out.println(response.toString()); if (status == Protocol.SUCCESS) {//if success try { serverSender.close(); serverReceiver.close(); System.exit(0); } catch (IOException ioe) { System.out.println("There was an error when trying to close the sender and receiver"); } } else {//if failure; cannot see why this would happen but is here just in case System.out.println("Deregistration failed; this node is still connected to the Registry"); } } /** * This method is called from the onEvent method when the receiver gets a LinkWeights wireformat * from the registry. It will use the weights to construct the shorest path from this node to every * other node in the overlay using RoutingCache. For more information, look at the LinkWeights and * RoutingCache classes. */ private void createRoutesFromLinkWeights(Event e) { weights = (LinkWeights) e; msgRoutes = new RoutingCache(weights, nodeIP, nodePort); System.out.println("Link weights received and processed. Ready to send messages"); } /** * This method is called from the onEvent method when the receiver gets a TaskSummaryRequest * wireformat from the Registry, which is asking for the summary of this node when passing * messages to peers. This is done after start has already been specified from the Registry * and message passing has completed. */ private void sendStats() { TaskSummaryResponse stats_to_send = new TaskSummaryResponse(nodeIP, nodePort, stats);//ip and port of this node, and StatisticsCollector object try { serverSender.sendData(stats_to_send.getBytes()); stats.setToZero(); } catch (IOException ioe) { System.out.println("There was an error sending the stats of this node to the registry"); } } /* * This method is called from the onEvent method when the receiver gets a Message wireformat * from its peers. When the registry sends a start request, the startMessagingPeers method will * create a Message that includes the random payload, current position within the route, and * the route itself as a list. If the current position is the same as the size of the route, * it means that this is the sink node, so the payload is added to the received sum and count. * If not, this node gets the sender of the next node in the route and sends the message to that * peer. In addition, the relayed counter is increased in the StatisticsSummary object. This method * is synchronized because there could be many messages incoming to this node from all other nodes. */ private synchronized void messagePeer(Event e) { Message message = (Message) e; String[] msgRoute = message.getRoutingPlan(); int curr = message.getPosition().get(); if (msgRoute.length == curr) {//this node is the sink node and should receive the payload stats.receiveInfo(message.getPayload());//add to received count/sum } else {//this node is not the sink node, so relay this message to the next node in route using list of connections TCPSenderThread nextNodeSender = (TCPSenderThread) connections.get(msgRoute[curr])[0];//get sender of next node in route message.increment();//increment the position of the message in route try { nextNodeSender.sendData(message.getBytes()); stats.relayInfo();//increase the count of relayed messaged in stats } catch (IOException ioe) { System.out.println("There was an error sending the message"); } } } /** * This method is called from the onEvent method when the receiver gets a TaskInitiate wireformat * from the Registry, indicating that the node should start exchanging messages with its peers for * the number of rounds defined in the wireformat. This is used for the outer loop of the method. * It will run number of rounds times. FOr each round, a random sink node is selected from the list * of peers, which does not inculde itself. The inner loop will iterate 5 times as that was the value * specified in the assignment. This means that this node will send five messages to a random sink node. * The payload is also a random integer. */ private synchronized void startMessagingPeers(Event e) { Random rand = new Random(); int numRounds = ((TaskInitiate) e).getNumRounds(); for (int i = 0; i < numRounds; i++) {//loop numRounds times String route[]; String sink; TCPSenderThread nSender; sink = msgRoutes.get(rand.nextInt(msgRoutes.size()));//get a radom sink node from the list of nodes route = msgRoutes.getMsgRoute(sink);//get the route from the cache to the sink node nSender = (TCPSenderThread) connections.get(route[0])[0];//get the sender to the first node in the route; this could or could not be the sink node for (int j = 0; j < 5; j++) {//send five messages int payload = rand.nextInt();//random integer with no bounds Message message = new Message(payload, 1, route);//create a Message object try { nSender.sendData(message.getBytes());//send Message } catch (Exception ex) { System.out.println("There was an error when sending the message to a sink node"); } stats.sendInfo(payload);//add to the sent count/sum in stats } } sendTaskCompletion();//send TaskComplete message } /** * This method is called by startMessagingPeers after it is done sending all messages to its peers. * It sends the TaskComplete wireformat to the registry. */ private synchronized void sendTaskCompletion() { TaskComplete task = new TaskComplete(this.nodeIP, this.nodePort);//create a wireformat to inform Registry of completion try { serverSender.sendData(task.getBytes()); System.out.println("Completed message communication with no errors"); } catch (IOException ioe) { System.out.println("There was an error sending the task completion message to the registry"); } } /** * This method is called from the onEvent method when the receiver gets a MessagingNodesList wireformat * from the Registry, indicating that the node should connect to the peers in the list. The list will * contain only the peers this node needs to form a connection to. For a numEdge of 2, this will be one * connection. The other will be formed by one of its peers. This is a similar method to registerToRegistry. */ private void connectToPeers(Event e) { MessagingNodesList msgList = (MessagingNodesList) e; ArrayList<String> neighbors = msgList.getMessagingNodes();//list of peers to connect to for (String n: neighbors) {//loop over peers list String[] node = n.split(":");//split by : because the neighbors are in the format ip:port try { //create a socket using the ip and port of the neighbor, which will create a connection to the serverSocket of that node //create sender and receiver to communicate with that node Socket nSocket = new Socket(node[0], Integer.parseInt(node[1])); TCPSenderThread nSender = new TCPSenderThread(nSocket); TCPReceiverThread nReceiver = new TCPReceiverThread(this, nSocket, nSender); Register register = new Register(this.nodeIP, this.nodePort);//create a new wireformat to register with that peer Thread threadForReceiver = new Thread(nReceiver); threadForReceiver.start(); nSender.sendData(register.getBytes()); connections.put(n, new Object[] {nSender, nReceiver}); } catch (IOException ioe) { System.out.println("There was an error when connecting to the messaging node"); } System.out.println("All connections are established. Number of connections(" + msgList.getNumNodes() + ")"); } } /** * This method is called from the onEvent method when the receiver gets a Register weireformat from one * of its peers. This is what allows bidirectional communication between the nodes. A node will * connect to some of its neighbors and receive registration from others. However, the connections * hashmap will have a full list of connected peers. */ private void createConnection(Event e, TCPSenderThread sender, TCPReceiverThread receiver) { Register node = (Register) e; connections.put(node.toString(), new Object[] {sender, receiver}); } /** * This is used by the main thread as a part of the foreground process to accept commands * from the user. The messaging nodes only have two functions, to exit the overlay and to * print the shortest path stored in the routing cache object. It simply matches the command * to a string and calls methods based on that. */ public void showOptions() { try (Scanner scnr = new Scanner(System.in)) { System.out.println("Input a command to interact with the node: "); while (true) { String line = scnr.nextLine().toLowerCase(); String[] command = line.split(" "); if (command[0].equals("print-shortest-path")) printShortestPath(); else if (command[0].equals("exit-overlay")) exitOverlay(); else if (command[0].equals("test")) {//used for testing; not part of the assignment for (Map.Entry<String, Object[]> entry: connections.entrySet()) { Socket tempSenderSocket = ((TCPSenderThread) entry.getValue()[0]).getSocket(); Socket tempReceiverSocket = ((TCPReceiverThread) entry.getValue()[1]).getSocket(); System.out.println("Peer: " + entry.getKey() + " -> Socket for Sender: " + tempSenderSocket + " -> Socket for Receiver: " + tempReceiverSocket); } } else { System.out.println("Command not recognized. Try again"); continue; } } } } /** * This method is called when the user enters print-shortest-path as an input. It will * simply just print the shortest path from this node to every other node. Link weights must * have been sent already by the Registry before calling this methd. */ private void printShortestPath() { if (msgRoutes == null) System.out.println("Link weights have not been setup yet. Try creating the overlay first"); else msgRoutes.printSP(weights);//see method deifinition for more details } /** * This method is called when the user enters exit-overlay as an input. It will create a deregister * object with its hostname and port number then send that to the Registry. The node is not deregistered * here as it must receive a deregister reponse from the registry. If the weights have already been set up, * the node cannot deregister. In this case, use ctrl+c to exit. */ private void exitOverlay() { if (weights != null) {//cannot deregister after creating overlay; use ctrl+c in this case System.out.println("Link weights have already been set up. Cannot deregister"); return; } Deregister deregister = new Deregister(nodeIP, nodePort);//create a deregistration wireformat try { this.serverSender.sendData(deregister.getBytes());//send to registry } catch (IOException ioe) { System.out.println("There was an error when sending a deregistration request to the registry"); } } }