summaryrefslogtreecommitdiff
path: root/common/src/lib.rs
blob: 69a872b5b1ca7fbb665d38bd469408269b54f9a2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#![feature(return_position_impl_trait_in_trait)]
use std::{
    future::Future,
    io::{self, Read, Write},
    sync::Arc,
    thread,
};

use msg::{MaelstromBody, Message, MessageHeader, Output};
use msg_id::gen_msg_id;
use serde::{Deserialize, Serialize};
use serde_json::Deserializer;
use smol::{
    channel::{self, Receiver, Sender},
    stream::StreamExt,
};

pub mod msg;
pub mod msg_id;

pub trait Handler: Send + Sync + 'static {
    type Body: Serialize + for<'a> Deserialize<'a> + Send + Clone;

    fn init(node_id: String, node_ids: Vec<String>, output: Output<Self::Body>) -> Self;
    fn handle<'a>(
        self: &'a Arc<Self>,
        header: MessageHeader,
        body: Self::Body,
    ) -> impl Future<Output = ()> + Send + 'a;
}

pub fn run_server<H: Handler>() {
    // Perform sync initialisation of the handler
    // This is a special case so that we can use a different message body type just for init messages
    let (handler, out_recv) = sync_init_handler::<H, _, _>(io::stdin(), io::stdout());

    let (inp_send, mut inp_recv) = channel::unbounded::<Message<H::Body>>();

    thread::scope(|s| {
        // Worker threads for receiving and sending
        // This is easier than making it async, and good enough for our usecase.
        s.spawn(|| recv_loop(io::stdin(), inp_send));
        s.spawn(|| send_loop(io::stdout(), out_recv));

        // As we receive messages, spawn a future for each
        let handler = Arc::new(handler);
        smol::block_on(async move {
            while let Some(msg) = inp_recv.next().await {
                let handler = handler.clone();
                smol::spawn(async move {
                    handler.handle(msg.header, msg.body).await;
                })
                .detach();
            }
        });
    });
}

/// Initialises the handler synchronously.
///
/// This is done as a seperate step because we initially deserialize into a different type
/// than our handler will accept, so there's no point spawning and immediately finishing threads.
fn sync_init_handler<H: Handler, R: Read, W: Write>(
    reader: R,
    mut writer: W,
) -> (H, Receiver<Message<H::Body>>) {
    // Receive the init message
    let deser = Deserializer::from_reader(reader);
    let mut deser = deser.into_iter::<Message<MaelstromBody>>();
    let (init_header, node_id, node_ids, init_msg_id) = match deser.next() {
        Some(Ok(Message {
            header,
            body:
                MaelstromBody::Init {
                    node_id,
                    node_ids,
                    msg_id,
                },
        })) => (header, node_id, node_ids, msg_id),
        Some(Err(e)) => panic!("invalid init message: {}", e),
        _ => {
            panic!("expected init message to be first message");
        }
    };

    // Write the init_ok message
    write_newline(
        &mut writer,
        &Message {
            header: init_header.flip(),
            body: MaelstromBody::InitOk {
                in_reply_to: init_msg_id,
                msg_id: gen_msg_id(),
            },
        },
    );

    // Create handler, and channel to go with it
    let (send, recv) = channel::unbounded();

    (
        H::init(node_id.clone(), node_ids, Output::new(node_id, send)),
        recv,
    )
}

/// Receives JSON from a reader, and outputs the deserialised result to a channel
fn recv_loop<M: for<'a> Deserialize<'a>>(reader: impl Read, channel: Sender<M>) {
    let deser = Deserializer::from_reader(reader);
    for msg in deser.into_iter() {
        let msg = msg.unwrap();
        channel.send_blocking(msg).unwrap();
    }
}

/// Receives things to send, and outputs them as JSON to writer
fn send_loop<M: Serialize>(mut writer: impl Write, channel: Receiver<M>) {
    while let Ok(msg) = channel.recv_blocking() {
        write_newline(&mut writer, msg);
    }
}

/// Write a message to writer, followed by a newline
fn write_newline(mut writer: impl Write, msg: impl Serialize) {
    serde_json::to_writer(&mut writer, &msg).unwrap();
    writer.write(&[b'\n']).unwrap();
}