package dslab.threading; import dslab.ComponentFactory; import dslab.protocol.Protocol; import dslab.rmi.channel.EncryptableSocketChannel; import dslab.rmi.stub.ServerStub; import dslab.util.ComponentId; import java.io.IOException; import java.io.UncheckedIOException; import java.net.ServerSocket; import java.net.SocketException; import java.nio.channels.Channel; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.logging.Logger; import static dslab.ComponentFactory.shutdownThreadpool; import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.logging.Level.SEVERE; /** * A dispatcher listens for client connections on a specific port and hands off client connections to delegates * that it runs in new threads. */ public class Dispatcher extends Thread { private final Function> channelHandlerFactory; private final ServerSocket serverSocket; private static final Logger LOG = Logger.getLogger(Dispatcher.class.getSimpleName()); private final ExecutorService threadPool; private final String protocolName; private final List openChannels = new ArrayList<>(); private final ComponentId componentId; /** * @param port the dispatcher listens on * @param channelHandlerFactory for the object that conducts the conversation with the client. The dispatcher * accepts the client connection, instantiates a session instance (passing it the tcp * @param componentFactory */ public Dispatcher(int port, Function> channelHandlerFactory, String protocolName, ComponentFactory componentFactory) { super(protocolName + "Dispatcher"); this.channelHandlerFactory = channelHandlerFactory; this.protocolName = protocolName; this.threadPool = newCachedThreadPool(); this.componentId = componentFactory.getComponentId(); try { serverSocket = new ServerSocket(port); } catch (IOException e) { throw new UncheckedIOException("The server couldn't create the server socket", e); } } @Override public void run() { LOG.info(protocolName + "-Dispatcher is listening for connections at " + serverSocket); while (!isInterrupted()) { try { try { // Hand off new connections to a serverStub in a dedicated thread immediately openChannels.removeIf(c -> !c.isOpen()); var channel = new EncryptableSocketChannel(serverSocket.accept(),componentId); openChannels.add(channel); LOG.info("Client " + channel.getId() + " connected. Dispatching to handler."); threadPool.execute(channelHandlerFactory.apply(channel)); } catch (SocketException e) { if (currentThread().isInterrupted()) { LOG.info("The server socket was closed as part of server shutdown"); LOG.info("Closing " + openChannels.size() + " open channels"); for (var c : openChannels) c.close(); } else throw e; } } catch (IOException e) { LOG.log(SEVERE, "Failed when trying to construct a channel to the client", e); //bad, but the show must go on. Keep running in a degraded state } } shutdownThreadpool(threadPool, Dispatcher.class); LOG.info("Dispatcher thread has shut down."); } @Override public void interrupt() { super.interrupt(); try { serverSocket.close(); //"A common idiom for interrupting network IO is to close the channel." } catch (IOException e) { throw new UncheckedIOException("Error when closing server socket", e); } } }