package csx55.overlay.wireformats; import java.io.*; import java.util.concurrent.atomic.AtomicInteger; /** * This is a wireformat used by the messaging nodes to pass messages to one another after the start command in the registry. * It follows the same format as all of the other wireformats and is built using the provided example in the slides. * This wireformat accepts the payload, which is a random integer, current position of the message in the routing path, * and a list of the routing plan as one constructor and intializes the instance variables. Another constructor takes in the byte * stream and unmarshalls the information sent by the node. It also implemnts the getBytes() method from the * Event interface, which marshalls the variables for this wireformat. The first constructor is used when creating an * instance of the class and the second is used by the event factory to create a new instance and return a class that the bytes * unmarshalled. The getBytes() is used before the data is sent using the sender thread. */ public class Message implements Event { private int type; private int payload; private AtomicInteger currPosition; private String[] routingPlan; public Message(int payload, int position, String[] routingPlan) { this.type = Protocol.MESSAGE; this.payload = payload; this.currPosition = new AtomicInteger(position);//set as Atomic because it is incremented in every new node this.routingPlan = routingPlan; } public Message(byte[] marshalledByte) throws IOException { ByteArrayInputStream baInputStream = new ByteArrayInputStream(marshalledByte); DataInputStream din = new DataInputStream(new BufferedInputStream(baInputStream)); this.type = din.readInt(); this.payload = din.readInt(); this.currPosition = new AtomicInteger(din.readInt()); int size = din.readInt(); this.routingPlan = new String[size]; for (int i = 0; i < size; i++) { int planSize = din.readInt(); byte[] planBytes = new byte[planSize]; din.readFully(planBytes); this.routingPlan[i] = (new String(planBytes)); } baInputStream.close(); din.close(); } @Override public int getType() { return type; } @Override public byte[] getBytes() throws IOException { byte[] marshalledBytes = null; ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(new BufferedOutputStream(baOutputStream)); dout.writeInt(type); dout.writeInt(payload); dout.writeInt(currPosition.get()); dout.writeInt(routingPlan.length); for (String s: routingPlan) { byte[] planBytes = s.getBytes(); int size = planBytes.length; dout.writeInt(size); dout.write(planBytes); } dout.flush(); marshalledBytes = baOutputStream.toByteArray(); baOutputStream.close(); dout.close(); return marshalledBytes; } //get the value needed to send to the peer public int getPayload() { return this.payload; } //get the route needed to be followed to get to sink node public String[] getRoutingPlan() { return this.routingPlan; } //get the current position public AtomicInteger getPosition() { return this.currPosition; } //increment the atomic integer for current position public void increment() { currPosition.getAndIncrement(); } }