distributed-storage-system / Dstore.java
Dstore.java
Raw
import java.io.*;
import java.net.*;
import java.util.*;

public class Dstore {
    
    int port, cport, timeout;
    Socket controller;
    String file_folder;
//    ArrayList<String> fileList = new ArrayList<String>();

    ArrayList<Thread> cliThreads = new ArrayList<>();

    public static void main(String[] args) throws IOException, InterruptedException {
        
        int p = Integer.parseInt(args[0]);
        int cp = Integer.parseInt(args[1]);
        int t = Integer.parseInt(args[2]);
        String ff = args[3];

        Dstore dstore = new Dstore(p, cp, t, ff);
        DstoreLogger.init(Logger.LoggingType.ON_FILE_AND_TERMINAL, p);
        dstore.start();
        

    }

    Dstore(int port, int cport, int timeout, String file_folder) {
        this.port = port;
        this.cport = cport;
        this.timeout = timeout;
        this.file_folder = file_folder;



        File folder = new File(file_folder);
        if (!folder.exists()) {
            folder.mkdir();
        } else {
            String[] entries = folder.list();
            for (String s : entries) {
                File newFile = new File(folder.getPath(), s);
                newFile.delete();
            }
        }
    }

    private void sendMessage(Socket socket, PrintWriter writer, String message) {
        DstoreLogger.getInstance().messageSent(socket, message);
        writer.println(message);
    }

    public void start() throws IOException, InterruptedException {
        ServerSocket ss = new ServerSocket(this.port);
        controller = new Socket("localhost", this.cport);
        OutputStream out = controller.getOutputStream();
        PrintWriter writer = new PrintWriter(out, true);
        sendMessage(controller, writer, Protocol.JOIN_TOKEN + " " + this.port);
        ListenToController lst = new ListenToController(controller);
        new Thread(lst).start();
        while(true) {
            Socket client = ss.accept();
            HandleClients hnd = new HandleClients(client);
            Thread t = new Thread(hnd);
            t.start();
        }

    }

    class ListenToController implements Runnable {

        Socket controller;

        ListenToController(Socket controller) {
            this.controller = controller;
        }

        public void run() {
            try {
                InputStream in = controller.getInputStream();
                BufferedReader reader = new BufferedReader(new InputStreamReader(in));
                OutputStream out = controller.getOutputStream();
                PrintWriter writer = new PrintWriter(out, true);
                while(!Thread.interrupted()) {
                    String line = reader.readLine();
                    if(line == null) {
                        System.err.println("Controller has disconnected");
                        controller.close();
                        System.exit(0);
                        break;
                    }
                    DstoreLogger.getInstance().messageReceived(controller, line);
                    String[] args = line.split(" ");
                    switch(args[0]) {
                        case Protocol.REMOVE_TOKEN:
                            File f = new File(file_folder + "/" + args[1]);
                            if(f.exists()) {
                                f.delete();
                                sendMessage(controller, writer, Protocol.REMOVE_ACK_TOKEN + " " + args[1]);
                            } else {
                                sendMessage(controller, writer, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN + " " + args[1]);
                            }
                            break;
                        case Protocol.LIST_TOKEN:
                            String[] fileList = new File(file_folder).list();
                            if(fileList != null)
                                sendMessage(controller, writer, Protocol.LIST_TOKEN + " " +  String.join(" ", fileList));
                            else
                                sendMessage(controller, writer, Protocol.LIST_TOKEN + " ");
                            break;
                        case Protocol.REBALANCE_TOKEN:
                            boolean success = handleRebalance(args);
                            if(success) {
                                sendMessage(controller, writer, Protocol.REBALANCE_COMPLETE_TOKEN);
                            }
                            break;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    private boolean handleRebalance(String[] args) throws IOException {
        int currentpos = 1;
        HashMap<String, ArrayList<Integer>> filesToSend = new HashMap<>();
        ArrayList<String> filesToRemove = new ArrayList<>();
        try {
            int numberToSend = Integer.parseInt(args[currentpos]);
            currentpos += 1;
            for(int i = 0; i < numberToSend; i++) {
                String filename = args[currentpos];
                filesToSend.put(filename, new ArrayList<>());
                currentpos += 1;
                int placesToSend = Integer.parseInt(args[currentpos]);
                currentpos += 1;
                for(int j = 0; j < placesToSend; j++) {
                    Integer port = Integer.parseInt(args[currentpos]);
                    currentpos += 1;
                    filesToSend.get(filename).add(port);
                }
            }
            int numberToRemove = Integer.parseInt(args[currentpos]);
            currentpos += 1;
            for(int i = 0; i < numberToRemove; i++) {
                filesToRemove.add(args[currentpos]);
                currentpos += 1;
            }
        } catch (NumberFormatException e) {
            System.err.println("Malformed message");
            System.err.println(Arrays.toString(args));
            return false;
        }

        for(String filename : filesToSend.keySet()) {
            for(Integer targetPort : filesToSend.get(filename)) {
                File fileOut = new File(file_folder + "/" + filename);
                if(!fileOut.exists()) {
                    System.err.println("File " + filename + " does not exist");
                } else {
                    try {
                        Socket sendTo = new Socket("localhost", targetPort);
                        sendTo.setSoTimeout(timeout);
                        InputStream in = sendTo.getInputStream();
                        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
                        OutputStream out = sendTo.getOutputStream();
                        PrintWriter writer = new PrintWriter(out, true);
                        sendMessage(sendTo, writer, Protocol.REBALANCE_STORE_TOKEN + " " + filename + " " + fileOut.length());
                        String ack = reader.readLine();
                        if (!ack.equals(Protocol.ACK_TOKEN)) {
                            System.err.println("Ack not received");
                            return false;
                        }
                        InputStream toSendStream = new FileInputStream(fileOut);
                        int bufLen;
                        byte[] buf = new byte[4096];
                        while ((bufLen = toSendStream.read(buf)) != -1) {
                            out.write(buf, 0, bufLen);
                        }
                        sendTo.close();
                    } catch (SocketTimeoutException e) {
                        System.err.println("Dstore timed out waiting for response");
                        return false;
                    }
                }
            }
        }

        for(String filename : filesToRemove) {
            File newFile = new File(file_folder + "/" + filename);
            newFile.delete();
        }

        return true;
    }

    class HandleClients implements Runnable {

        Socket client;

        HandleClients(Socket client) {
            this.client = client;
        }

        public void run() {
            try {
                InputStream in = client.getInputStream();
                BufferedReader reader = new BufferedReader(new InputStreamReader(in));
                String arg = reader.readLine();
                if(arg == null) {
                    System.err.println("Client disconnected");
                    client.close();
                    return;
                }
                DstoreLogger.getInstance().messageReceived(client, arg);
                String[] args = arg.split(" ");
                OutputStream out = client.getOutputStream();
                PrintWriter writer = new PrintWriter(out, true);
                String filename;
                switch(args[0]) {
                    case Protocol.LOAD_DATA_TOKEN:
                        filename = args[1];
                        File toSend = new File(file_folder + "/" + filename);
                        if(!toSend.exists()) {
                            client.close();
                            return;
                        }
                        InputStream toSendStream = new FileInputStream(toSend);
                        int bufLen;
                        byte[] buf = new byte[4096];
                        while((bufLen = toSendStream.read(buf)) != -1) {
                            out.write(buf, 0, bufLen);
                        }
                        break;
                    case Protocol.STORE_TOKEN:
                        filename = args[1];
                        int filesize = Integer.parseInt(args[2]);
                        sendMessage(client, writer, Protocol.ACK_TOKEN);
                        byte[] file = in.readNBytes(filesize);
                        try (FileOutputStream stream = new FileOutputStream(file_folder + File.separator + filename)) {
                            stream.write(file);
                        } catch (Exception e) {
                            System.err.println("Error writing file");
                        }
                        OutputStream outc = controller.getOutputStream();
                        PrintWriter writerc = new PrintWriter(outc, true);
                        sendMessage(controller, writerc, Protocol.STORE_ACK_TOKEN + " " + args[1]);
                        break;
                    case Protocol.REBALANCE_STORE_TOKEN:
                        filename = args[1];
                        int size = Integer.parseInt(args[2]);
                        sendMessage(client, writer, Protocol.ACK_TOKEN);
                        byte[] f = in.readNBytes(size);
                        try (FileOutputStream stream = new FileOutputStream(file_folder + "/" + filename)) {
                            stream.write(f);
                        } catch (Exception e) {
                            System.err.println("Error writing file");
                        }
                        break;
                }
                client.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}