package csx55.overlay.transport; import java.io.*; import java.net.*; import csx55.overlay.node.Node; import csx55.overlay.wireformats.Event; import csx55.overlay.wireformats.EventFactory; /** * This receiver thread is created to receive events from the registry/messaging nodes for * all nodes in the system. Each connection will consist of A having a receiver to receive * information from B and B having a receiver for A. On start, the registry will accept new * socket connections and create receiver threads for each new connection. The messaging node * will also create a thread for the registry and any peers it connects to. */ public class TCPReceiverThread implements Runnable { private volatile boolean running = true;//set to true so the run method can start private Socket socket; private DataInputStream din; private Node node; private TCPSenderThread sender; public TCPReceiverThread(Node node, Socket socket, TCPSenderThread sender) throws IOException { this.node = node; this.socket = socket; this.sender = sender; din = new DataInputStream(socket.getInputStream()); } /** * The receiver will keep running until running is set to false by the close method. This method is only * changed after getting a EOFException, which occurs when the client closes the connection. Inside the * while loop, the first four bytes are read to determine the type of event. Then, using the singleton * EventFactory instance, the event is fetched and returned as an instance using the byte array for the * constructor. It then calls that event in either Registry or MessagingNode and passes in the event, * sender for sending back requests/responses and the receiver. */ @Override public void run() { int dataLength; Event event = null; byte[] data = null; while (running && socket != null) {//exits after running is set to false after node deregisters try { dataLength = din.readInt(); data = new byte[dataLength]; din.readFully(data, 0, dataLength); EventFactory factory = EventFactory.getInstance(); event = factory.processEvent(data); node.onEvent(event, sender, this); } catch (EOFException eofe) {//client closed connection try { sender.close();//close sender this.close();//close receiver } catch (IOException e) { System.out.println("There was an error when closing the sender/receiver (EOFException)"); } } catch (NullPointerException npe) { System.out.println("There was a null value in the data input stream in TCPReceiverThread (Most likely event)"); break; } catch (SocketException se) { System.out.println("There was an error with the socket when receiving information"); break; } catch (IOException ioe) { System.out.println("There is an error with event " + event.getType());//debugging break; } } } //return the socket associated with this receiver thread public Socket getSocket() { return this.socket; } //close all resources for this thread, which inludes the data input stream and socket; also set running to false public void close() throws IOException { this.din.close(); this.socket.close(); this.running = false; } }