package dslab.routing; import dslab.ComponentFactory; import dslab.monitoring.MonitoringClient; import dslab.protocol.Message; import dslab.protocol.dmtp.DeliveryFailureReport; import dslab.rmi.channel.SocketChannel; import java.util.concurrent.ExecutorService; import java.util.logging.Logger; import static dslab.ComponentFactory.shutdownThreadpool; import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.logging.Level.SEVERE; /** * Delivery of a message by a transfer server entails contacting the recipient's mailbox server and * transmitting the message via DMTP. See assignment 1, sec. 2.4 */ public class TransferServerDeliveryService implements DeliveryService { private static final Logger LOG = Logger.getLogger(TransferServerDeliveryService.class.getSimpleName()); /* Using separate threadpools in different parts of the application makes the system more resilient. * Imagine having to send a mail with a million receivers. If using one common threadpool, that would * impede on the dispatcher's ability to create new conversations to receive mails. */ private final ExecutorService threadPool = newCachedThreadPool(); private final MonitoringClient monitoring; private final DomainNameResolver domainNameResolver; private final ComponentFactory componentFactory; /** * @param monitoring to send telemetry to * @param domainNameResolver this instance will deliver mails to */ public TransferServerDeliveryService(MonitoringClient monitoring, DomainNameResolver domainNameResolver, ComponentFactory factory) { this.monitoring = monitoring; this.domainNameResolver = domainNameResolver; this.componentFactory = factory; factory.registerShutdownHook(() -> shutdownThreadpool(threadPool, TransferServerDeliveryService.class)); } @Override public void deliver(Message message) { LOG.info("Sending " + message); message.getRecipients().stream().map(Address::getDomain).distinct().forEach(domain -> threadPool.execute(() -> deliver(message, domain))); } void deliver(Message message, Domain domain) { try { domainNameResolver.resolve(domain); LOG.info("Sending " + message); try (var dmtpClient = componentFactory.newDmtpInstance(new SocketChannel(domain))) { dmtpClient.begin(); dmtpClient.from(message.getSender()); var validationResult = dmtpClient.validate(message.getRecipients(), domain); dmtpClient.to(validationResult.getValidAndIgnored()); dmtpClient.subject(message.getSubject()); dmtpClient.data(message.getData()); dmtpClient.hash(message.getHash()); dmtpClient.send(); dmtpClient.quit(); monitoring.report(message); if (validationResult.hasInvalid()) throw validationResult.asException(); else LOG.info(message + " was sent successfully"); } } catch (Exception e) { if (message instanceof DeliveryFailureReport) LOG.log(SEVERE, "Delivery of error report failed. It will be discarded.", e); else { var report = new DeliveryFailureReport(e, message); LOG.warning(report.getData()); deliver(report); } } } }