DS-Lab / src / main / java / dslab / routing / TransferServerDeliveryService.java
TransferServerDeliveryService.java
Raw
package dslab.routing;

import dslab.ComponentFactory;
import dslab.monitoring.MonitoringClient;
import dslab.protocol.Message;
import dslab.protocol.dmtp.DeliveryFailureReport;
import dslab.rmi.channel.SocketChannel;

import java.util.concurrent.ExecutorService;
import java.util.logging.Logger;

import static dslab.ComponentFactory.shutdownThreadpool;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.logging.Level.SEVERE;

/**
 * Delivery of a message by a transfer server entails contacting the recipient's mailbox server and
 * transmitting the message via DMTP. See assignment 1, sec. 2.4
 */
public class TransferServerDeliveryService implements DeliveryService {

    private static final Logger LOG = Logger.getLogger(TransferServerDeliveryService.class.getSimpleName());

    /* Using separate threadpools in different parts of the application makes the system more resilient.
     * Imagine having to send a mail with a million receivers. If using one common threadpool, that would
     * impede on the dispatcher's ability to create new conversations to receive mails. */
    private final ExecutorService threadPool = newCachedThreadPool();
    private final MonitoringClient monitoring;
    private final DomainNameResolver domainNameResolver;
    private final ComponentFactory componentFactory;

    /**
     * @param monitoring         to send telemetry to
     * @param domainNameResolver this instance will deliver mails to
     */
    public TransferServerDeliveryService(MonitoringClient monitoring, DomainNameResolver domainNameResolver, ComponentFactory factory) {
        this.monitoring = monitoring;
        this.domainNameResolver = domainNameResolver;
        this.componentFactory = factory;
        factory.registerShutdownHook(() -> shutdownThreadpool(threadPool, TransferServerDeliveryService.class));
    }

    @Override
    public void deliver(Message message) {
        LOG.info("Sending " + message);

        message.getRecipients().stream().map(Address::getDomain).distinct().forEach(domain ->
                threadPool.execute(() -> deliver(message, domain)));
    }

    void deliver(Message message, Domain domain) {

        try {
            domainNameResolver.resolve(domain);
            LOG.info("Sending " + message);

            try (var dmtpClient = componentFactory.newDmtpInstance(new SocketChannel(domain))) {

                dmtpClient.begin();
                dmtpClient.from(message.getSender());
                var validationResult = dmtpClient.validate(message.getRecipients(), domain);
                dmtpClient.to(validationResult.getValidAndIgnored());
                dmtpClient.subject(message.getSubject());
                dmtpClient.data(message.getData());
                dmtpClient.hash(message.getHash());
                dmtpClient.send();
                dmtpClient.quit();

                monitoring.report(message);

                if (validationResult.hasInvalid()) throw validationResult.asException();
                else LOG.info(message + " was sent successfully");
            }

        } catch (Exception e) {
            if (message instanceof DeliveryFailureReport)
                LOG.log(SEVERE, "Delivery of error report failed. It will be discarded.", e);
            else {
                var report = new DeliveryFailureReport(e, message);
                LOG.warning(report.getData());
                deliver(report);
            }
        }
    }
}