DS-Lab / src / main / java / dslab / threading / Dispatcher.java
Dispatcher.java
Raw
package dslab.threading;

import dslab.ComponentFactory;
import dslab.protocol.Protocol;
import dslab.rmi.channel.EncryptableSocketChannel;
import dslab.rmi.stub.ServerStub;
import dslab.util.ComponentId;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ServerSocket;
import java.net.SocketException;
import java.nio.channels.Channel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.logging.Logger;

import static dslab.ComponentFactory.shutdownThreadpool;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.logging.Level.SEVERE;

/**
 * A dispatcher listens for client connections on a specific port and hands off client connections to delegates
 * that it runs in new threads.
 */
public class Dispatcher<PROTOCOL extends Protocol> extends Thread {

    private final Function<EncryptableSocketChannel, ServerStub<PROTOCOL>> channelHandlerFactory;
    private final ServerSocket serverSocket;
    private static final Logger LOG = Logger.getLogger(Dispatcher.class.getSimpleName());
    private final ExecutorService threadPool;
    private final String protocolName;

    private final List<Channel> openChannels = new ArrayList<>();
    private final ComponentId componentId;

    /**
     * @param port                  the dispatcher listens on
     * @param channelHandlerFactory for the object that conducts the conversation with the client. The dispatcher
     *                              accepts the client connection, instantiates a session instance (passing it the tcp
     * @param componentFactory
     */
    public Dispatcher(int port, Function<EncryptableSocketChannel, ServerStub<PROTOCOL>> channelHandlerFactory,
                      String protocolName, ComponentFactory componentFactory) {
        super(protocolName + "Dispatcher");
        this.channelHandlerFactory = channelHandlerFactory;
        this.protocolName = protocolName;
        this.threadPool = newCachedThreadPool();
        this.componentId = componentFactory.getComponentId();

        try {
            serverSocket = new ServerSocket(port);
        } catch (IOException e) {
            throw new UncheckedIOException("The server couldn't create the server socket", e);
        }
    }

    @Override
    public void run() {
        LOG.info(protocolName + "-Dispatcher is listening for connections at " + serverSocket);
        while (!isInterrupted()) {
            try {
                try {
                    // Hand off new connections to a serverStub in a dedicated thread immediately
                    openChannels.removeIf(c -> !c.isOpen());
                    var channel = new EncryptableSocketChannel(serverSocket.accept(),componentId);
                    openChannels.add(channel);
                    LOG.info("Client " + channel.getId() + " connected. Dispatching to handler.");
                    threadPool.execute(channelHandlerFactory.apply(channel));
                } catch (SocketException e) {
                    if (currentThread().isInterrupted()) {
                        LOG.info("The server socket was closed as part of server shutdown");
                        LOG.info("Closing " + openChannels.size() + " open channels");
                        for (var c : openChannels) c.close();
                    } else throw e;
                }
            } catch (IOException e) {
                LOG.log(SEVERE, "Failed when trying to construct a channel to the client", e);
                //bad, but the show must go on. Keep running in a degraded state
            }
        }
        shutdownThreadpool(threadPool, Dispatcher.class);
        LOG.info("Dispatcher thread has shut down.");
    }

    @Override
    public void interrupt() {
        super.interrupt();
        try {
            serverSocket.close(); //"A common idiom for interrupting network IO is to close the channel."
        } catch (IOException e) {
            throw new UncheckedIOException("Error when closing server socket", e);
        }
    }
}