Routing-Packets-In-A-Network-Overlay / src / main / java / csx55 / overlay / transport / TCPReceiverThread.java
TCPReceiverThread.java
Raw
package csx55.overlay.transport;

import java.io.*;
import java.net.*;
import csx55.overlay.node.Node;
import csx55.overlay.wireformats.Event;
import csx55.overlay.wireformats.EventFactory;

/**
 * This receiver thread is created to receive events from the registry/messaging nodes for 
 * all nodes in the system. Each connection will consist of A having a receiver to receive
 * information from B and B having a receiver for A. On start, the registry will accept new
 * socket connections and create receiver threads for each new connection. The messaging node
 * will also create a thread for the registry and any peers it connects to.
 */
public class TCPReceiverThread implements Runnable {
    private volatile boolean running = true;//set to true so the run method can start
    private Socket socket;
    private DataInputStream din;
    private Node node;
    private TCPSenderThread sender;

    public TCPReceiverThread(Node node, Socket socket, TCPSenderThread sender) throws IOException {
        this.node = node;
        this.socket = socket;
        this.sender = sender;
        din = new DataInputStream(socket.getInputStream());
    }

    /**
     * The receiver will keep running until running is set to false by the close method. This method is only
     * changed after getting a EOFException, which occurs when the client closes the connection. Inside the 
     * while loop, the first four bytes are read to determine the type of event. Then, using the singleton
     * EventFactory instance, the event is fetched and returned as an instance using the byte array for the 
     * constructor. It then calls that event in either Registry or MessagingNode and passes in the event, 
     * sender for sending back requests/responses and the receiver.
     */
    @Override
    public void run() {
        int dataLength;
        Event event = null;
        byte[] data = null;

        while (running && socket != null) {//exits after running is set to false after node deregisters
            try {
                dataLength = din.readInt();
                data = new byte[dataLength];
                din.readFully(data, 0, dataLength);
                
                EventFactory factory = EventFactory.getInstance();
                event = factory.processEvent(data);
                
                node.onEvent(event, sender, this);         
            }
            catch (EOFException eofe) {//client closed connection
                try {
                    sender.close();//close sender
                    this.close();//close receiver
                    
                } catch (IOException e) {
                    System.out.println("There was an error when closing the sender/receiver (EOFException)");
                }
            }
            catch (NullPointerException npe) {
                System.out.println("There was a null value in the data input stream in TCPReceiverThread (Most likely event)");
                break;
            }
            catch (SocketException se) {
                System.out.println("There was an error with the socket when receiving information");
                break;
            }
            catch (IOException ioe) {
                System.out.println("There is an error with event " + event.getType());//debugging
                break;
            }
        }
    }

    //return the socket associated with this receiver thread
    public Socket getSocket() {
        return this.socket;
    }

    //close all resources for this thread, which inludes the data input stream and socket; also set running to false
    public void close() throws IOException {
        this.din.close();
        this.socket.close();
        this.running = false;
    }
}