package dslab.monitoring; import dslab.rmi.channel.DatagramReadOnlyChannel; import dslab.rmi.serialize.ParseException; import java.net.DatagramSocket; import java.net.SocketException; import java.nio.channels.ClosedChannelException; import java.util.logging.Logger; import static java.util.logging.Level.WARNING; /** * The monitoring server-sided end of the udp channel between transfer servers and monitoring server. */ public class MonitoringServerStub extends Thread { private static final Logger LOG = Logger.getLogger(MonitoringServerStub.class.getSimpleName()); private final MonitoringServiceImpl monitoringService; private final MonitoringSerializer serializer = new MonitoringSerializer(); private final DatagramReadOnlyChannel channel; public MonitoringServerStub(MonitoringServiceImpl monitoringService, int listeningPort) throws SocketException { this.monitoringService = monitoringService; this.channel = new DatagramReadOnlyChannel(new DatagramSocket(listeningPort)); } /** * Listens for UDP packets, deserializes the received packets into object representations and passes * them to {@link MonitoringServiceImpl} for processing */ @Override public void run() { try (channel) { LOG.info("Monitoring-Server is listening for packets at " + channel.toString()); while (!isInterrupted()) { try { var rawMessage = channel.read(); monitoringService.save(serializer.deserialize(rawMessage)); } catch (ParseException e) { LOG.log(WARNING, "Failed to parse message:", e); } } } catch (ClosedChannelException e) { LOG.info("The channel was closed. Shutting down."); } } @Override public void interrupt() { super.interrupt(); channel.close(); //"A common idiom for interrupting network IO is to close the channel." } }