package csx55.overlay.wireformats; import java.io.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import csx55.overlay.util.StatisticsCollector; /** * This is a wireformat used by the messaging nodes to send a summary of the statistics collected when passing * messages to peers. It follows the same format as all of the other wireformats and is built using the provided * example in the slides. The 1st constructor accepts the hostname, port number, and the StatisticsCollector object * that has all of the stats for the node. The 2nd constructor takes in the byte stream and unmarshalls the information * sent by the node. It also implemnts the getBytes() method from the Event interface, which marshalls * the variables for this wireformat. The first constructor is used when creating an instance of the class * and the second is used by the event factory to create a new instance and return a class that the bytes * unmarshalled. The getBytes() is used before the data is sent using the sender thread. */ public class TaskSummaryResponse implements Event { private int type; private String ipAddress; private int portNum; private AtomicInteger messagesSent; private AtomicLong sentSummation; private AtomicInteger messagesRec; private AtomicLong recSummation; private AtomicInteger messagesRelayed; public TaskSummaryResponse(String ipAddress, int portNum, StatisticsCollector statistics) { this.type = Protocol.TRAFFIC_SUMMARY; this.ipAddress = ipAddress; this.portNum = portNum; this.messagesSent = statistics.getMessagesSent(); this.sentSummation = statistics.getSentSummation(); this.messagesRec = statistics.getMessagesRec(); this.recSummation = statistics.getRecSummation(); this.messagesRelayed = statistics.getMessagesRelayed(); } public TaskSummaryResponse(byte[] marshalledByte) throws IOException { ByteArrayInputStream baInputStream = new ByteArrayInputStream(marshalledByte); DataInputStream din = new DataInputStream(new BufferedInputStream(baInputStream)); this.type = din.readInt(); int size = din.readInt(); byte[] ipAddressBytes = new byte[size]; din.readFully(ipAddressBytes, 0, size); this.ipAddress = new String(ipAddressBytes); this.portNum = din.readInt(); this.messagesSent = new AtomicInteger(din.readInt()); this.sentSummation = new AtomicLong(din.readLong()); this.messagesRec = new AtomicInteger(din.readInt()); this.recSummation = new AtomicLong(din.readLong()); this.messagesRelayed = new AtomicInteger(din.readInt()); baInputStream.close(); din.close(); } //getters for all variables public AtomicInteger getMessagesSent() { return this.messagesSent; } public AtomicLong getSentSummation() { return this.sentSummation; } public AtomicInteger getMessagesRec() { return this.messagesRec; } public AtomicLong getRecSummation() { return this.recSummation; } public AtomicInteger getMessagesRelayed() { return this.messagesRelayed; } public String getInfo() { return this.ipAddress + ":" + Integer.toString(portNum); } @Override public int getType() { return this.type; } @Override public byte[] getBytes() throws IOException { byte[] marshalledBytes = null; ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(new BufferedOutputStream(baOutputStream)); dout.writeInt(type); byte[] identifierBytes = ipAddress.getBytes(); int elementLength = identifierBytes.length; dout.writeInt(elementLength); dout.write(identifierBytes); dout.writeInt(portNum); dout.writeInt(messagesSent.get()); dout.writeLong(sentSummation.get()); dout.writeInt(messagesRec.get()); dout.writeLong(recSummation.get()); dout.writeInt(messagesRelayed.get()); dout.flush(); marshalledBytes = baOutputStream.toByteArray(); baOutputStream.close(); dout.close(); return marshalledBytes; } }