Routing-Packets-In-A-Network-Overlay / src / main / java / csx55 / overlay / wireformats / Message.java
Message.java
Raw
package csx55.overlay.wireformats;

import java.io.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * This is a wireformat used by the messaging nodes to pass messages to one another after the start command in the registry.
 * It follows the same format as all of the other wireformats and is built using the provided example in the slides. 
 * This wireformat accepts the payload, which is a random integer, current position of the message in the routing path, 
 * and a list of the routing plan as one constructor and intializes the instance variables. Another constructor takes in the byte 
 * stream and unmarshalls the information sent by the node. It also implemnts the getBytes() method from the 
 * Event interface, which marshalls the variables for this wireformat. The first constructor is used when creating an 
 * instance of the class and the second is used by the event factory to create a new instance and return a class that the bytes
 * unmarshalled. The getBytes() is used before the data is sent using the sender thread.
 */
public class Message implements Event {
    private int type;
    private int payload;
    private AtomicInteger currPosition;
    private String[] routingPlan;

    public Message(int payload, int position, String[] routingPlan) {
        this.type = Protocol.MESSAGE;
        this.payload = payload;
        this.currPosition = new AtomicInteger(position);//set as Atomic because it is incremented in every new node
        this.routingPlan = routingPlan;
    }

    public Message(byte[] marshalledByte) throws IOException {
        ByteArrayInputStream baInputStream = new ByteArrayInputStream(marshalledByte);
        DataInputStream din = new DataInputStream(new BufferedInputStream(baInputStream));

        this.type = din.readInt();
        this.payload = din.readInt();
        this.currPosition = new AtomicInteger(din.readInt());

        int size = din.readInt();
        this.routingPlan = new String[size];
        for (int i = 0; i < size; i++) {
            int planSize = din.readInt();
            byte[] planBytes = new byte[planSize];
            din.readFully(planBytes);
            this.routingPlan[i] = (new String(planBytes));
        }

        baInputStream.close();
        din.close();
    }

    @Override
    public int getType() {
        return type;
    }

    @Override
    public byte[] getBytes() throws IOException {
        byte[] marshalledBytes = null;
        ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
        DataOutputStream dout = new DataOutputStream(new BufferedOutputStream(baOutputStream));

        dout.writeInt(type);
        dout.writeInt(payload);
        dout.writeInt(currPosition.get());
        dout.writeInt(routingPlan.length);

        for (String s: routingPlan) {
            byte[] planBytes = s.getBytes();
            int size = planBytes.length;
            dout.writeInt(size);
            dout.write(planBytes);
        }

        dout.flush();
        marshalledBytes = baOutputStream.toByteArray();

        baOutputStream.close();
        dout.close();
        return marshalledBytes;
    }

    //get the value needed to send to the peer
    public int getPayload() {
        return this.payload;
    }

    //get the route needed to be followed to get to sink node
    public String[] getRoutingPlan() {
        return this.routingPlan;
    }

    //get the current position
    public AtomicInteger getPosition() {
        return this.currPosition;
    }

    //increment the atomic integer for current position
    public void increment() {
        currPosition.getAndIncrement();
    }
}