DS-Lab / src / main / java / dslab / ComponentFactory.java
ComponentFactory.java
Raw
package dslab;

import at.ac.tuwien.dsg.orvell.Shell;
import dslab.authentication.AuthenticationService;
import dslab.authentication.User;
import dslab.client.IMessageClient;
import dslab.client.MessageClient;
import dslab.client.MessageClientDeliveryService;
import dslab.mailbox.IMailboxServer;
import dslab.mailbox.MailboxManager;
import dslab.mailbox.MailboxServer;
import dslab.monitoring.IMonitoringServer;
import dslab.monitoring.MonitoringClient;
import dslab.monitoring.MonitoringServer;
import dslab.monitoring.MonitoringServiceImpl;
import dslab.nameserver.INameserver;
import dslab.nameserver.INameserverRemote;
import dslab.nameserver.Nameserver;
import dslab.util.NameserverConfig;
import dslab.rmi.channel.EncryptableSocketChannel;
import dslab.rmi.channel.SocketChannel;
import dslab.rmi.stub.dmap.DmapClientStub;
import dslab.rmi.stub.dmap.DmapClientStubImpl;
import dslab.rmi.stub.dmtp.DmtpClientStub;
import dslab.rmi.stub.dmtp.DmtpClientStubImpl;
import dslab.routing.*;
import dslab.transfer.ITransferServer;
import dslab.transfer.TransferServer;
import dslab.util.*;

import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.ExportException;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.logging.Logger;

import static dslab.util.ComponentType.*;
import static java.lang.reflect.Proxy.newProxyInstance;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.logging.Level.WARNING;
import static java.util.stream.Collectors.toUnmodifiableSet;

/**
 * The component factory provides methods to create the core components of the application. You can edit the method body
 * if the component instantiation requires additional logic.
 * <p>
 * Do not change the existing method signatures!
 */
public class ComponentFactory {

    private static final Logger LOG = Logger.getLogger(ComponentFactory.class.getSimpleName());
    private final Set<Runnable> shutdownHooks = ConcurrentHashMap.newKeySet();
    private volatile AuthenticationService authenticationService;
    private volatile Collection<User> users;
    private final ComponentId componentId;
    private volatile DeliveryService deliveryService;
    private volatile DomainNameResolver domainNameResolver;
    private final Config config;
    private volatile MailboxManager mailboxManager;
    private volatile MonitoringClient monitoringClient;
    private volatile MonitoringServiceImpl monitoringService;
    private NameserverConfig nameserverConfig;

    public ComponentFactory(ComponentId componentId, Config config) {
        this.componentId = componentId;
        this.config = config;
    }

    /**
     * Creates a new {@link IMonitoringServer} instance.
     *
     * @param componentId the component id
     * @param in          the input stream used for accepting cli commands
     * @param out         the output stream to print to
     * @return a new MonitoringServer instance
     */
    public static IMonitoringServer createMonitoringServer(String componentId, InputStream in, PrintStream out)
            throws Exception {

        return new MonitoringServer(componentId, new Config(componentId), in, out);
    }

    /**
     * Creates a new {@link IMailboxServer} instance.
     *
     * @param componentId the component id
     * @param in          the input stream used for accepting cli commands
     * @param out         the output stream to print to
     * @return a new MailboxServer instance
     */
    public static IMailboxServer createMailboxServer(String componentId, InputStream in, PrintStream out)
            throws Exception {

        return new MailboxServer(new ComponentFactory(new ComponentId(componentId), new Config(componentId)), in, out);
    }

    /**
     * Creates a new {@link ITransferServer} instance.
     *
     * @param componentId the component id
     * @param in          the input stream used for accepting cli commands
     * @param out         the output stream to print to
     * @return a new TransferServer instance
     */
    public static ITransferServer createTransferServer(String componentId, InputStream in, PrintStream out)
            throws Exception {

        return new TransferServer(
                in,
                out,
                new ComponentFactory(new ComponentId(componentId), new Config(componentId)));
    }

    public static void shutdownThreadpool(ExecutorService threadPool, Class<?> owningClass) {
        if (!threadPool.isTerminated()) {
            try {
                threadPool.shutdown();
                LOG.info("Awaiting termination of " + owningClass.getSimpleName() + "'s threadpool");
                if (!threadPool.awaitTermination(2, SECONDS)) {
                    LOG.warning("Threadpool didn't shut down after 2 seconds. Forcing termination.");
                    threadPool.shutdownNow();
                }
            } catch (InterruptedException e) {
                LOG.info("Got an InterruptedException that I will ignore. Keep trying to shut down threadpool.");
            }
        }
        LOG.info("Threadpool of " + owningClass.getSimpleName() + " " + (threadPool.isTerminated() ? "" : "DID NOT ") +
                "shut down successfully");
    }

    public synchronized Config getConfig() {
        return config;
    }

    public synchronized DeliveryService getDeliveryService() {
        if (deliveryService == null) {
            if (componentId.getType() == MAILBOX)
                deliveryService = getMailboxManager();
            else if (componentId.getType() == TRANSFER) {
                deliveryService = new TransferServerDeliveryService(getMonitoringClient(), getDomainNameResolver(),
                        this);
            } else if (componentId.getType() == CLIENT) {
                deliveryService = new MessageClientDeliveryService(getMessageClientConfig(), this);
            } else
                throw new IllegalStateException("Don't know which delivery service to instantiate for " + getComponentId());
        }
        return deliveryService;
    }

    public synchronized MailboxManager getMailboxManager() {
        if (mailboxManager == null)
            mailboxManager = new MailboxManager(getHomeDomain(), getAuthenticationService());
        return mailboxManager;
    }

    private synchronized Domain getHomeDomain() {
        return new Domain(getMailboxServerConfig().domain());
    }

    public synchronized AuthenticationService getAuthenticationService() {
        if (authenticationService == null) {
            authenticationService = new AuthenticationService(getUsers());
        }
        return authenticationService;
    }

    private synchronized Collection<User> getUsers() {
        if (users == null) {
            var user_password = new Config(getConfig().getString("users.config"));
            users = user_password.listKeys()
                    .stream()
                    .map(username -> new User(
                            new Address(username, getHomeDomain()),
                            user_password.getString(username)))
                    .collect(toUnmodifiableSet());
        }
        return users;
    }

    public ComponentId getComponentId() {
        return componentId;
    }

    public Runnable createShell(Object commandObject, InputStream in, PrintStream out) {
        var shell = new Shell(in, out).register(commandObject);
        shell.setPrompt(getComponentId() + "> ");
        return shell;
    }

    public synchronized DomainNameResolver getDomainNameResolver() {
        if (domainNameResolver == null) {
            try {
                domainNameResolver =
                        new DomainNameResolver((INameserverRemote) getRegistry().lookup(getConfig().getString(
                                "root_id")));
            } catch (RemoteException | NotBoundException e) {
                LOG.severe("Couldn't connect to root DNS. Are you sure it is running?");
                throw new RuntimeException(e);
            }
        }
        return domainNameResolver;
    }

    public synchronized MonitoringClient getMonitoringClient() {
        if (monitoringClient == null) {
            try {
                monitoringClient = new MonitoringClient(getTransferServerConfig());
            }
            catch (Exception e) {
                throw new IllegalStateException("Failed to initialize MonitoringClient", e);
            }
        }

        return monitoringClient;
    }

    private TransferServerConfig getTransferServerConfig() {
        return new TransferServerConfig(getConfig());
    }

    public synchronized MonitoringServiceImpl getMonitoringService() {
        if (monitoringService == null)
            monitoringService = new MonitoringServiceImpl();
        return monitoringService;
    }

    public synchronized void shutdown() {

        LOG.info("Shutting down ComponentFactory");

        LOG.info("Running " + shutdownHooks.size() + " shutdown hooks");
        shutdownHooks.forEach(Runnable::run);
        LOG.info("ComponentFactory shutdown complete");
    }

    /**
     * Creates a new {@link INameserver} instance.
     *
     * @param componentId the component id
     * @param in          the input stream used for accepting cli commands
     * @param out         the output stream to print to
     * @return a new Nameserver instance
     */
    public static INameserver createNameserver(String componentId, InputStream in, PrintStream out)
            throws Exception {

        Config config = new Config(componentId);
        return new Nameserver(componentId, config, in, out);
    }

    /**
     * Creates a new {@link IMessageClient} instance.
     *
     * @param componentId the component id
     * @param in          the input stream used for accepting cli commands
     * @param out         the output stream to print to
     * @return a new MessageClient instance
     */
    public static IMessageClient createMessageClient(String componentId, InputStream in, PrintStream out)
            throws Exception {

        Config config = new Config(componentId);
        return new MessageClient(componentId, config, in, out);
    }

    public void registerShutdownHook(Runnable runnable) {
        shutdownHooks.add(runnable);
    }

    public synchronized NameserverConfig getNameserverConfig() {
        if (this.nameserverConfig == null) {
            this.nameserverConfig = new NameserverConfig(getConfig());
        }
        return nameserverConfig;
    }

    public synchronized Registry getRegistry() {
        try {
            if (componentId.getType() == NS) {
                if (getNameserverConfig().isRoot()) {
                    try {
                        LocateRegistry.createRegistry(getNameserverConfig().registryPort());
                    } catch (ExportException e) {
                        LOG.log(WARNING, "It appears the registry was already created. Will skip creation.", e);
                    }
                }

            }

            return LocateRegistry.getRegistry(
                    //Don't use typesafe config classes here, because these parameters could be required by
                    //ns, ts or mbox server
                    config.getString("registry.host"), config.getInt("registry.port"));
        } catch (RemoteException e) {
            throw new RuntimeException(e);
        }
    }

    public MailserverConfig getMailboxServerConfig() {
        return new MailserverConfig(componentId, getConfig());
    }

    public INameserverRemote getRootNameserverRemote() throws NotBoundException, RemoteException {
        return (INameserverRemote) getRegistry().lookup(getMailboxServerConfig().rootId());
    }

    public DmapClientStub getDmapClientStub() {
        try {
            var config = getMessageClientConfig();
            var socket = new Socket(config.mailboxHost(), config.mailboxPort());
            var channel = new EncryptableSocketChannel(socket, getComponentId());
            return newDmapInstance(channel);
        } catch (IOException e) {
            throw new IllegalStateException("Error when trying to reach mailbox server. Are you sure it is running?", e);
        }
    }

    public MessageClientConfig getMessageClientConfig() {
        return new MessageClientConfig(getConfig());
    }

    public DmtpClientStub newDmtpInstance(SocketChannel channel) {
        return (DmtpClientStub) newProxyInstance(
                ComponentFactory.class.getClassLoader(),
                new Class[]{DmtpClientStub.class},
                new DmtpClientStubImpl(channel));
    }

    public DmapClientStub newDmapInstance(EncryptableSocketChannel channel) {
        return (DmapClientStub) newProxyInstance(
                ComponentFactory.class.getClassLoader(),
                new Class[]{DmapClientStub.class},
                new DmapClientStubImpl(channel));
    }
}