#![feature(return_position_impl_trait_in_trait)] use std::{ future::Future, io::{self, Read, Write}, sync::Arc, thread, }; use msg::{MaelstromBody, Message, MessageHeader, Output}; use msg_id::gen_msg_id; use serde::{Deserialize, Serialize}; use serde_json::Deserializer; use smol::{ channel::{self, Receiver, Sender}, stream::StreamExt, }; pub mod msg; pub mod msg_id; pub trait Handler: Send + Sync + 'static { type Body: Serialize + for<'a> Deserialize<'a> + Send + Clone; fn init(node_id: String, node_ids: Vec, output: Output) -> Arc; fn handle<'a>( self: &'a Arc, header: MessageHeader, body: Self::Body, ) -> impl Future + Send + 'a; } pub fn run_server() { // Perform sync initialisation of the handler // This is a special case so that we can use a different message body type just for init messages let (handler, out_recv) = sync_init_handler::(io::stdin(), io::stdout()); let (inp_send, mut inp_recv) = channel::unbounded::>(); thread::scope(|s| { // Worker threads for receiving and sending // This is easier than making it async, and good enough for our usecase. s.spawn(|| recv_loop(io::stdin(), inp_send)); s.spawn(|| send_loop(io::stdout(), out_recv)); // As we receive messages, spawn a future for each let handler = Arc::new(handler); smol::block_on(async move { while let Some(msg) = inp_recv.next().await { let handler = handler.clone(); smol::spawn(async move { handler.handle(msg.header, msg.body).await; }) .detach(); } }); }); } /// Initialises the handler synchronously. /// /// This is done as a seperate step because we initially deserialize into a different type /// than our handler will accept, so there's no point spawning and immediately finishing threads. fn sync_init_handler( reader: R, mut writer: W, ) -> (Arc, Receiver>) { // Receive the init message let deser = Deserializer::from_reader(reader); let mut deser = deser.into_iter::>(); let (init_header, node_id, node_ids, init_msg_id) = match deser.next() { Some(Ok(Message { header, body: MaelstromBody::Init { node_id, node_ids, msg_id, }, })) => (header, node_id, node_ids, msg_id), Some(Err(e)) => panic!("invalid init message: {}", e), _ => { panic!("expected init message to be first message"); } }; // Write the init_ok message write_newline( &mut writer, &Message { header: init_header.flip(), body: MaelstromBody::InitOk { in_reply_to: init_msg_id, msg_id: gen_msg_id(), }, }, ); // Create handler, and channel to go with it let (send, recv) = channel::unbounded(); ( H::init(node_id.clone(), node_ids, Output::new(node_id, send)), recv, ) } /// Receives JSON from a reader, and outputs the deserialised result to a channel fn recv_loop Deserialize<'a>>(reader: impl Read, channel: Sender) { let deser = Deserializer::from_reader(reader); for msg in deser.into_iter() { let msg = msg.unwrap(); channel.send_blocking(msg).unwrap(); } } /// Receives things to send, and outputs them as JSON to writer fn send_loop(mut writer: impl Write, channel: Receiver) { while let Ok(msg) = channel.recv_blocking() { write_newline(&mut writer, msg); } } /// Write a message to writer, followed by a newline fn write_newline(mut writer: impl Write, msg: impl Serialize) { serde_json::to_writer(&mut writer, &msg).unwrap(); writer.write(&[b'\n']).unwrap(); }