DS-Lab / src / main / java / dslab / rmi / serialize / dmap / DmapClientSerializer.java
DmapClientSerializer.java
Raw
package dslab.rmi.serialize.dmap;

import dslab.protocol.Message;
import dslab.protocol.ProtocolException;
import dslab.protocol.dmap.MessageMetadata;
import dslab.rmi.serialize.ClientSerializer;
import dslab.rmi.serialize.Executors.ThrowingSupplier;
import dslab.rmi.serialize.ParseException;
import dslab.rmi.serialize.dmap.DmapServerSerializer.DmapCommand;
import dslab.routing.Address;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;

import static dslab.protocol.MessageBuilder.aMessage;
import static java.lang.Integer.parseInt;
import static java.util.Arrays.stream;
import static java.util.stream.Collectors.*;

public class DmapClientSerializer extends ClientSerializer {

    //The command last serialized by this serializer. We have to remember it to figure out how to parse the response.
    protected final AtomicReference<DmapCommand> currentCommand = new AtomicReference<>();
    private static final Logger LOG = Logger.getLogger(DmapClientSerializer.class.getSimpleName());

    @Override
    public ThrowingSupplier<?, ? extends ProtocolException> deserialize(String response) throws ParseException {

        try {

            super.tokenizer = new StringTokenizer(response);

            if (response.startsWith("error")) {
                var split = response.split(" ", 3);
                return error(split[1], split[2]);
            }

            switch (currentCommand.getAndSet(null)) {

                case login:
                case delete:
                case logout:
                case quit: return ok(response);
                case show: return show(response);
                case list: return list(response);
                default: throw new ParseException("Could not parse reponse'" + response + "'");

            }

        } catch (Exception e) {
            throw new ParseException("Failed to parse response '" + response + "'", e);
        }
    }

    /*
     * parse response of format
     *
     * <message-id> <sender> <subject>
     * <message-id> <sender> <subject>
     * ok
     */
    private ThrowingSupplier<List<MessageMetadata>, ? extends ProtocolException> list(String response) {

        var parsed = response
                .lines()
                .takeWhile(line -> !line.equals("ok"))
                .map(line -> {
                    var tokens = line.split(" ", 3);

                    return new MessageMetadata(
                            parseInt(tokens[0]),
                            new Address(tokens[1]),
                            tokens[2]);
                })
                .collect(toList());

        return () -> parsed;
    }

    @Override
    public String serialize(Enum<?> dmapCommand, Object arguments) {

        currentCommand.set((DmapCommand) dmapCommand);

        String result = dmapCommand.toString();

        if (arguments == null)
            return result;

        if (arguments instanceof List)
            return result + " " + ((List<?>) arguments).stream()
                    .map(Object::toString)
                    .collect(joining(" "));

        return result + " " + arguments;
    }

    private ThrowingSupplier<Message, ? extends ProtocolException> show(String response) {
        var map = response.lines()
                .takeWhile(line -> !line.equals("ok"))
                .map(line -> {
                    var pair = line.split(" ", 2);
                    return Map.entry(pair[0], pair.length == 2 ? pair[1] : "");
                })
                .collect(toMap(Entry::getKey, Entry::getValue));

        var parsedMessage = aMessage()
                .sender(new Address(map.get("from")))
                .recipients(stream(map.get("to").split(",")).map(Address::new).collect(toList()))
                .subject(map.get("subject"))
                .hash(map.get("hash"))
                .data(map.get("data"))
                .build();

        return () -> parsedMessage;
    }

    private ThrowingSupplier<?, ParseException> ok(String response) throws ParseException {

        if (!response.startsWith("ok"))
            throw new ParseException("Expected response to start with 'ok', got '" + response + "'");

        response = response.substring(2); //strip "ok "

        var result = response.isBlank() ? null :
                     response.matches("\\d+") ? parseInt(response) :
                     response;

        return () -> result;
    }

    private ThrowingSupplier<?, ProtocolException> error(String code, String message) {
        message = message.trim();
        var e = ProtocolException.from(code, message);
        return () -> {throw e;};
    }

}