import java.io.*; import java.net.*; import java.util.*; public class Dstore { int port, cport, timeout; Socket controller; String file_folder; // ArrayList<String> fileList = new ArrayList<String>(); ArrayList<Thread> cliThreads = new ArrayList<>(); public static void main(String[] args) throws IOException, InterruptedException { int p = Integer.parseInt(args[0]); int cp = Integer.parseInt(args[1]); int t = Integer.parseInt(args[2]); String ff = args[3]; Dstore dstore = new Dstore(p, cp, t, ff); DstoreLogger.init(Logger.LoggingType.ON_FILE_AND_TERMINAL, p); dstore.start(); } Dstore(int port, int cport, int timeout, String file_folder) { this.port = port; this.cport = cport; this.timeout = timeout; this.file_folder = file_folder; File folder = new File(file_folder); if (!folder.exists()) { folder.mkdir(); } else { String[] entries = folder.list(); for (String s : entries) { File newFile = new File(folder.getPath(), s); newFile.delete(); } } } private void sendMessage(Socket socket, PrintWriter writer, String message) { DstoreLogger.getInstance().messageSent(socket, message); writer.println(message); } public void start() throws IOException, InterruptedException { ServerSocket ss = new ServerSocket(this.port); controller = new Socket("localhost", this.cport); OutputStream out = controller.getOutputStream(); PrintWriter writer = new PrintWriter(out, true); sendMessage(controller, writer, Protocol.JOIN_TOKEN + " " + this.port); ListenToController lst = new ListenToController(controller); new Thread(lst).start(); while(true) { Socket client = ss.accept(); HandleClients hnd = new HandleClients(client); Thread t = new Thread(hnd); t.start(); } } class ListenToController implements Runnable { Socket controller; ListenToController(Socket controller) { this.controller = controller; } public void run() { try { InputStream in = controller.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); OutputStream out = controller.getOutputStream(); PrintWriter writer = new PrintWriter(out, true); while(!Thread.interrupted()) { String line = reader.readLine(); if(line == null) { System.err.println("Controller has disconnected"); controller.close(); System.exit(0); break; } DstoreLogger.getInstance().messageReceived(controller, line); String[] args = line.split(" "); switch(args[0]) { case Protocol.REMOVE_TOKEN: File f = new File(file_folder + "/" + args[1]); if(f.exists()) { f.delete(); sendMessage(controller, writer, Protocol.REMOVE_ACK_TOKEN + " " + args[1]); } else { sendMessage(controller, writer, Protocol.ERROR_FILE_DOES_NOT_EXIST_TOKEN + " " + args[1]); } break; case Protocol.LIST_TOKEN: String[] fileList = new File(file_folder).list(); if(fileList != null) sendMessage(controller, writer, Protocol.LIST_TOKEN + " " + String.join(" ", fileList)); else sendMessage(controller, writer, Protocol.LIST_TOKEN + " "); break; case Protocol.REBALANCE_TOKEN: boolean success = handleRebalance(args); if(success) { sendMessage(controller, writer, Protocol.REBALANCE_COMPLETE_TOKEN); } break; } } } catch (Exception e) { e.printStackTrace(); } } } private boolean handleRebalance(String[] args) throws IOException { int currentpos = 1; HashMap<String, ArrayList<Integer>> filesToSend = new HashMap<>(); ArrayList<String> filesToRemove = new ArrayList<>(); try { int numberToSend = Integer.parseInt(args[currentpos]); currentpos += 1; for(int i = 0; i < numberToSend; i++) { String filename = args[currentpos]; filesToSend.put(filename, new ArrayList<>()); currentpos += 1; int placesToSend = Integer.parseInt(args[currentpos]); currentpos += 1; for(int j = 0; j < placesToSend; j++) { Integer port = Integer.parseInt(args[currentpos]); currentpos += 1; filesToSend.get(filename).add(port); } } int numberToRemove = Integer.parseInt(args[currentpos]); currentpos += 1; for(int i = 0; i < numberToRemove; i++) { filesToRemove.add(args[currentpos]); currentpos += 1; } } catch (NumberFormatException e) { System.err.println("Malformed message"); System.err.println(Arrays.toString(args)); return false; } for(String filename : filesToSend.keySet()) { for(Integer targetPort : filesToSend.get(filename)) { File fileOut = new File(file_folder + "/" + filename); if(!fileOut.exists()) { System.err.println("File " + filename + " does not exist"); } else { try { Socket sendTo = new Socket("localhost", targetPort); sendTo.setSoTimeout(timeout); InputStream in = sendTo.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); OutputStream out = sendTo.getOutputStream(); PrintWriter writer = new PrintWriter(out, true); sendMessage(sendTo, writer, Protocol.REBALANCE_STORE_TOKEN + " " + filename + " " + fileOut.length()); String ack = reader.readLine(); if (!ack.equals(Protocol.ACK_TOKEN)) { System.err.println("Ack not received"); return false; } InputStream toSendStream = new FileInputStream(fileOut); int bufLen; byte[] buf = new byte[4096]; while ((bufLen = toSendStream.read(buf)) != -1) { out.write(buf, 0, bufLen); } sendTo.close(); } catch (SocketTimeoutException e) { System.err.println("Dstore timed out waiting for response"); return false; } } } } for(String filename : filesToRemove) { File newFile = new File(file_folder + "/" + filename); newFile.delete(); } return true; } class HandleClients implements Runnable { Socket client; HandleClients(Socket client) { this.client = client; } public void run() { try { InputStream in = client.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); String arg = reader.readLine(); if(arg == null) { System.err.println("Client disconnected"); client.close(); return; } DstoreLogger.getInstance().messageReceived(client, arg); String[] args = arg.split(" "); OutputStream out = client.getOutputStream(); PrintWriter writer = new PrintWriter(out, true); String filename; switch(args[0]) { case Protocol.LOAD_DATA_TOKEN: filename = args[1]; File toSend = new File(file_folder + "/" + filename); if(!toSend.exists()) { client.close(); return; } InputStream toSendStream = new FileInputStream(toSend); int bufLen; byte[] buf = new byte[4096]; while((bufLen = toSendStream.read(buf)) != -1) { out.write(buf, 0, bufLen); } break; case Protocol.STORE_TOKEN: filename = args[1]; int filesize = Integer.parseInt(args[2]); sendMessage(client, writer, Protocol.ACK_TOKEN); byte[] file = in.readNBytes(filesize); try (FileOutputStream stream = new FileOutputStream(file_folder + File.separator + filename)) { stream.write(file); } catch (Exception e) { System.err.println("Error writing file"); } OutputStream outc = controller.getOutputStream(); PrintWriter writerc = new PrintWriter(outc, true); sendMessage(controller, writerc, Protocol.STORE_ACK_TOKEN + " " + args[1]); break; case Protocol.REBALANCE_STORE_TOKEN: filename = args[1]; int size = Integer.parseInt(args[2]); sendMessage(client, writer, Protocol.ACK_TOKEN); byte[] f = in.readNBytes(size); try (FileOutputStream stream = new FileOutputStream(file_folder + "/" + filename)) { stream.write(f); } catch (Exception e) { System.err.println("Error writing file"); } break; } client.close(); } catch (Exception e) { e.printStackTrace(); } } } }