use std::collections::HashMap; use std::fs; use std::path::Path; use std::sync::Arc; use serde::Deserialize; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::Mutex; #[derive(Clone, Debug, Deserialize)] #[serde(rename_all = "PascalCase")] struct App { name: String, ports: Vec<u16>, targets: Vec<String>, // TODO: put IpAddr here directly } impl App { fn parse(json: &serde_json::Value) -> App { serde_json::from_value(json.clone()).expect("Unable to deserialize into App") } } fn parse_config_file(path: &Path) -> Vec<App> { // We need to read the configuration file to see what needs to be done let contents = fs::read_to_string(path).expect("Unable to read config.json file."); // TODO: Make it more robust -> Parse into a specific, strongly typed, data structure let json: serde_json::Value = serde_json::from_str(&contents).expect("Unable to read config.json"); if let Some(apps) = json["Apps"].as_array() { return apps.iter().map(App::parse).collect(); } else { // Nothing to do if there are no apps to handle, so panicking is OK here. panic!( "Could not parse apps from config file json. File path of config file attempted: {}", path.display() ); } } // This is used in order to route different requests to different target servers type AppToLastTarget = Arc<Mutex<HashMap<String, Option<usize>>>>; #[tokio::main] async fn main() { // To avoid sending all the app requests to the same target, we keep track of the last target used for each app // Apps are indexed by their `name`. let last_target_used = Arc::new(Mutex::new(HashMap::new())); // We are going to handle each "App" in the config file separately // So first we parse the config file into the "App" struct let config_file_path = Path::new("config.json"); let apps = parse_config_file(config_file_path); // TODO: There's probably a cleaner way for this snippet let mut handles = Vec::new(); apps.iter().for_each(|app| { handles.push(tokio::spawn(handle_app( app.clone(), last_target_used.clone(), ))); }); // Wait for all the futures to complete // (Will actually run forever because we are always waiting for connections) futures::future::join_all(handles).await; } async fn handle_app(app: App, last_target: AppToLastTarget) { let mut handles = Vec::new(); // Listen on each port assigned to this app for port in app.ports.clone() { let last_target = last_target.clone(); let app = app.clone(); handles.push(tokio::spawn(async move { let listener = TcpListener::bind(format!("localhost:{}", port)) .await .unwrap_or_else(|_| panic!("Failed trying to bind to {}", port)); // println!("Listening on port {} for app {}", port, app.name); while let Ok((socket, _address)) = listener.accept().await { tokio::spawn(handle_connection( socket, app.clone(), port, last_target.clone(), )); } })); } futures::future::join_all(handles).await; } // Largely based on Lily Mara's https://www.youtube.com/watch?v=Iapc-qGTEBQ async fn handle_connection( mut socket: TcpStream, app: App, port_used: u16, // For debug purposes last_target: AppToLastTarget, ) { // println!( // "Connection on app: {} through port: {}", // app.name, port_used // ); let (reader, mut writer) = socket.split(); let mut reader = BufReader::new(reader); let mut line = String::new(); let bytes_read = reader.read_line(&mut line).await.unwrap(); if bytes_read == 0 { // println!("[{}] @ {} [CLOSED]", app.name, port_used); return; } // println!("[{}] @ {} -> {}", app.name, port_used, line); // This mutex is held while gathering the target list to try, and released on line // 154, until it is locked again to update the hash map when routing is completed let mut access_to_last_target = last_target.lock().await; // println!( // "Last we routed to {:?}", // last_target.entry(app.name.clone()).or_default() // ); let last_target_for_this_app = access_to_last_target.entry(app.name.clone()).or_default(); // In order to balance load between the App's targets, we keep track of the previous target used // This way we can "cycle" through the target list. If there was no previous target used, we will // attempt them in the order specified by the config file. type Target<'a> = (usize, &'a String); let target_order_to_try: Vec<Target> = match last_target_for_this_app { Some(previous_index_used) => { let targets_after = app .targets .iter() .enumerate() .filter(|(index, _target)| index > previous_index_used); let targets_before = app .targets .iter() .enumerate() .filter(|(index, _target)| index <= previous_index_used); // Note the <= here, as we also want to include the last used target. It may be that it's the // only one available. // Note that order here is important: // We first need to try all the targets *after* our previous one // Otherwise we would be stuck using target 0, then target 1, then target 0, ... targets_after.chain(targets_before).collect() } None => app.targets.iter().enumerate().collect(), }; drop(access_to_last_target); for (index, target) in target_order_to_try { if let Ok(answer) = route_to(target.clone(), line.clone()).await { // We were successful in routing to this target // Write back its response to the user if let Some(bytes) = answer { writer.write_all(bytes.as_bytes()).await.unwrap(); } let mut access_to_last_target = last_target.lock().await; access_to_last_target.insert(app.name.clone(), Some(index)); return; // We have accomplished our task. } else { // The connection was not successful with this target, but there are potentially others still // to be attempted, so no need to worry yet. } } // We could not establish a connection to any target (otherwise we would have returned from the function already) // In this case, we could continue trying to route (by running the for loop above again), or we could // terminate the connection and inform the user. Here we will terminate and inform the user, in order // to avoid Denial of Service attacks (if a user knows all the targets for an app are unavailable and keeps asking // for a connection to the app, we would be busy trying to connect to unavailable servers forever). // TODO: What to do in this case? Should we return something specific to the user maybe? // A error string may not be the best. writer .write_all( "There were no available targets to handle your request. Please try again later." .as_bytes(), ) .await .unwrap(); } async fn route_to(ip: String, request: String) -> anyhow::Result<Option<String>> { let mut stream = TcpStream::connect(ip).await?; // println!("Established connection to {:?}", stream.peer_addr()); // Write the user request to the target stream.write_all(request.as_bytes()).await.unwrap(); // The target will send us a request back, which we should then route to the user let mut reader = BufReader::new(stream); let mut line = String::new(); let bytes_read = reader.read_line(&mut line).await.unwrap(); // Connection was OK, but there is no answer if bytes_read == 0 { // Not sure what to do here return Ok(None); } Ok(Some(line)) }