package dslab.rmi.serialize.dmap; import dslab.protocol.Message; import dslab.protocol.ProtocolException; import dslab.protocol.dmap.MessageMetadata; import dslab.rmi.serialize.ClientSerializer; import dslab.rmi.serialize.Executors.ThrowingSupplier; import dslab.rmi.serialize.ParseException; import dslab.rmi.serialize.dmap.DmapServerSerializer.DmapCommand; import dslab.routing.Address; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.StringTokenizer; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; import static dslab.protocol.MessageBuilder.aMessage; import static java.lang.Integer.parseInt; import static java.util.Arrays.stream; import static java.util.stream.Collectors.*; public class DmapClientSerializer extends ClientSerializer { //The command last serialized by this serializer. We have to remember it to figure out how to parse the response. protected final AtomicReference currentCommand = new AtomicReference<>(); private static final Logger LOG = Logger.getLogger(DmapClientSerializer.class.getSimpleName()); @Override public ThrowingSupplier deserialize(String response) throws ParseException { try { super.tokenizer = new StringTokenizer(response); if (response.startsWith("error")) { var split = response.split(" ", 3); return error(split[1], split[2]); } switch (currentCommand.getAndSet(null)) { case login: case delete: case logout: case quit: return ok(response); case show: return show(response); case list: return list(response); default: throw new ParseException("Could not parse reponse'" + response + "'"); } } catch (Exception e) { throw new ParseException("Failed to parse response '" + response + "'", e); } } /* * parse response of format * * * * ok */ private ThrowingSupplier, ? extends ProtocolException> list(String response) { var parsed = response .lines() .takeWhile(line -> !line.equals("ok")) .map(line -> { var tokens = line.split(" ", 3); return new MessageMetadata( parseInt(tokens[0]), new Address(tokens[1]), tokens[2]); }) .collect(toList()); return () -> parsed; } @Override public String serialize(Enum dmapCommand, Object arguments) { currentCommand.set((DmapCommand) dmapCommand); String result = dmapCommand.toString(); if (arguments == null) return result; if (arguments instanceof List) return result + " " + ((List) arguments).stream() .map(Object::toString) .collect(joining(" ")); return result + " " + arguments; } private ThrowingSupplier show(String response) { var map = response.lines() .takeWhile(line -> !line.equals("ok")) .map(line -> { var pair = line.split(" ", 2); return Map.entry(pair[0], pair.length == 2 ? pair[1] : ""); }) .collect(toMap(Entry::getKey, Entry::getValue)); var parsedMessage = aMessage() .sender(new Address(map.get("from"))) .recipients(stream(map.get("to").split(",")).map(Address::new).collect(toList())) .subject(map.get("subject")) .hash(map.get("hash")) .data(map.get("data")) .build(); return () -> parsedMessage; } private ThrowingSupplier ok(String response) throws ParseException { if (!response.startsWith("ok")) throw new ParseException("Expected response to start with 'ok', got '" + response + "'"); response = response.substring(2); //strip "ok " var result = response.isBlank() ? null : response.matches("\\d+") ? parseInt(response) : response; return () -> result; } private ThrowingSupplier error(String code, String message) { message = message.trim(); var e = ProtocolException.from(code, message); return () -> {throw e;}; } }