rust-networking-proxy / src / main.rs
main.rs
Raw
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::sync::Arc;

use serde::Deserialize;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;

#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
struct App {
    name: String,
    ports: Vec<u16>,
    targets: Vec<String>, // TODO: put IpAddr here directly
}

impl App {
    fn parse(json: &serde_json::Value) -> App {
        serde_json::from_value(json.clone()).expect("Unable to deserialize into App")
    }
}

fn parse_config_file(path: &Path) -> Vec<App> {
    // We need to read the configuration file to see what needs to be done
    let contents = fs::read_to_string(path).expect("Unable to read config.json file.");

    // TODO: Make it more robust -> Parse into a specific, strongly typed, data structure
    let json: serde_json::Value =
        serde_json::from_str(&contents).expect("Unable to read config.json");

    if let Some(apps) = json["Apps"].as_array() {
        return apps.iter().map(App::parse).collect();
    } else {
        // Nothing to do if there are no apps to handle, so panicking is OK here.
        panic!(
            "Could not parse apps from config file json. File path of config file attempted: {}",
            path.display()
        );
    }
}

// This is used in order to route different requests to different target servers
type AppToLastTarget = Arc<Mutex<HashMap<String, Option<usize>>>>;

#[tokio::main]
async fn main() {
    // To avoid sending all the app requests to the same target, we keep track of the last target used for each app
    // Apps are indexed by their `name`.
    let last_target_used = Arc::new(Mutex::new(HashMap::new()));

    // We are going to handle each "App" in the config file separately
    // So first we parse the config file into the "App" struct
    let config_file_path = Path::new("config.json");
    let apps = parse_config_file(config_file_path);

    // TODO: There's probably a cleaner way for this snippet
    let mut handles = Vec::new();
    apps.iter().for_each(|app| {
        handles.push(tokio::spawn(handle_app(
            app.clone(),
            last_target_used.clone(),
        )));
    });

    // Wait for all the futures to complete
    // (Will actually run forever because we are always waiting for connections)
    futures::future::join_all(handles).await;
}

async fn handle_app(app: App, last_target: AppToLastTarget) {
    let mut handles = Vec::new();
    // Listen on each port assigned to this app
    for port in app.ports.clone() {
        let last_target = last_target.clone();
        let app = app.clone();
        handles.push(tokio::spawn(async move {
            let listener = TcpListener::bind(format!("localhost:{}", port))
                .await
                .unwrap_or_else(|_| panic!("Failed trying to bind to {}", port));

            // println!("Listening on port {} for app {}", port, app.name);
            while let Ok((socket, _address)) = listener.accept().await {
                tokio::spawn(handle_connection(
                    socket,
                    app.clone(),
                    port,
                    last_target.clone(),
                ));
            }
        }));
    }
    futures::future::join_all(handles).await;
}

// Largely based on Lily Mara's https://www.youtube.com/watch?v=Iapc-qGTEBQ
async fn handle_connection(
    mut socket: TcpStream,
    app: App,
    port_used: u16, // For debug purposes
    last_target: AppToLastTarget,
) {
    // println!(
    //     "Connection on app: {} through port: {}",
    //     app.name, port_used
    // );

    let (reader, mut writer) = socket.split();
    let mut reader = BufReader::new(reader);
    let mut line = String::new();
    let bytes_read = reader.read_line(&mut line).await.unwrap();
    if bytes_read == 0 {
        // println!("[{}] @ {} [CLOSED]", app.name, port_used);
        return;
    }
    // println!("[{}] @ {} -> {}", app.name, port_used, line);

    // This mutex is held while gathering the target list to try, and released on line
    // 154, until it is locked again to update the hash map when routing is completed
    let mut access_to_last_target = last_target.lock().await;

    // println!(
    //     "Last we routed to {:?}",
    //     last_target.entry(app.name.clone()).or_default()
    // );

    let last_target_for_this_app = access_to_last_target.entry(app.name.clone()).or_default();
    // In order to balance load between the App's targets, we keep track of the previous target used
    // This way we can "cycle" through the target list. If there was no previous target used, we will
    // attempt them in the order specified by the config file.
    type Target<'a> = (usize, &'a String);
    let target_order_to_try: Vec<Target> = match last_target_for_this_app {
        Some(previous_index_used) => {
            let targets_after = app
                .targets
                .iter()
                .enumerate()
                .filter(|(index, _target)| index > previous_index_used);
            let targets_before = app
                .targets
                .iter()
                .enumerate()
                .filter(|(index, _target)| index <= previous_index_used);
            // Note the <= here, as we also want to include the last used target. It may be that it's the
            // only one available.
            // Note that order here is important:
            // We first need to try all the targets *after* our previous one
            // Otherwise we would be stuck using target 0, then target 1, then target 0, ...
            targets_after.chain(targets_before).collect()
        }
        None => app.targets.iter().enumerate().collect(),
    };
    drop(access_to_last_target);

    for (index, target) in target_order_to_try {
        if let Ok(answer) = route_to(target.clone(), line.clone()).await {
            // We were successful in routing to this target

            // Write back its response to the user
            if let Some(bytes) = answer {
                writer.write_all(bytes.as_bytes()).await.unwrap();
            }
            let mut access_to_last_target = last_target.lock().await;
            access_to_last_target.insert(app.name.clone(), Some(index));
            return; // We have accomplished our task.
        } else {
            // The connection was not successful with this target, but there are potentially others still
            // to be attempted, so no need to worry yet.
        }
    }
    // We could not establish a connection to any target (otherwise we would have returned from the function already)
    // In this case, we could continue trying to route (by running the for loop above again), or we could
    // terminate the connection and inform the user. Here we will terminate and inform the user, in order
    // to avoid Denial of Service attacks (if a user knows all the targets for an app are unavailable and keeps asking
    // for a connection to the app, we would be busy trying to connect to unavailable servers forever).
    // TODO: What to do in this case? Should we return something specific to the user maybe?
    //       A error string may not be the best.
    writer
        .write_all(
            "There were no available targets to handle your request. Please try again later."
                .as_bytes(),
        )
        .await
        .unwrap();
}

async fn route_to(ip: String, request: String) -> anyhow::Result<Option<String>> {
    let mut stream = TcpStream::connect(ip).await?;
    // println!("Established connection to {:?}", stream.peer_addr());
    // Write the user request to the target
    stream.write_all(request.as_bytes()).await.unwrap();

    // The target will send us a request back, which we should then route to the user
    let mut reader = BufReader::new(stream);
    let mut line = String::new();
    let bytes_read = reader.read_line(&mut line).await.unwrap();
    // Connection was OK, but there is no answer
    if bytes_read == 0 {
        // Not sure what to do here
        return Ok(None);
    }

    Ok(Some(line))
}