1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
#![feature(return_position_impl_trait_in_trait)]
use std::{
future::Future,
io::{self, Read, Write},
sync::Arc,
thread,
};
use msg::{MaelstromBody, Message, MessageHeader, Output};
use msg_id::gen_msg_id;
use serde::{Deserialize, Serialize};
use serde_json::Deserializer;
use smol::{
channel::{self, Receiver, Sender},
stream::StreamExt,
};
pub mod msg;
pub mod msg_id;
pub trait Handler: Send + Sync + 'static {
type Body: Serialize + for<'a> Deserialize<'a> + Send + Clone;
fn init(node_id: String, node_ids: Vec<String>, output: Output<Self::Body>) -> Self;
fn handle(
&self,
header: MessageHeader,
body: Self::Body,
) -> impl Future<Output = ()> + Send + '_;
}
pub fn run_server<H: Handler>() {
// Perform sync initialisation of the handler
// This is a special case so that we can use a different message body type just for init messages
let (handler, out_recv) = sync_init_handler::<H, _, _>(io::stdin(), io::stdout());
let (inp_send, mut inp_recv) = channel::unbounded::<Message<H::Body>>();
thread::scope(|s| {
// Worker threads for receiving and sending
// This is easier than making it async, and good enough for our usecase.
s.spawn(|| recv_loop(io::stdin(), inp_send));
s.spawn(|| send_loop(io::stdout(), out_recv));
// As we receive messages, spawn a future for each
let handler = Arc::new(handler);
smol::block_on(async move {
while let Some(msg) = inp_recv.next().await {
let handler = handler.clone();
smol::spawn(async move {
handler.handle(msg.header, msg.body).await;
})
.detach();
}
});
});
}
/// Initialises the handler synchronously.
///
/// This is done as a seperate step because we initially deserialize into a different type
/// than our handler will accept, so there's no point spawning and immediately finishing threads.
fn sync_init_handler<H: Handler, R: Read, W: Write>(
reader: R,
mut writer: W,
) -> (H, Receiver<Message<H::Body>>) {
// Receive the init message
let deser = Deserializer::from_reader(reader);
let mut deser = deser.into_iter::<Message<MaelstromBody>>();
let (init_header, node_id, node_ids, init_msg_id) = match deser.next() {
Some(Ok(Message {
header,
body:
MaelstromBody::Init {
node_id,
node_ids,
msg_id,
},
})) => (header, node_id, node_ids, msg_id),
Some(Err(e)) => panic!("invalid init message: {}", e),
_ => {
panic!("expected init message to be first message");
}
};
// Write the init_ok message
write_newline(
&mut writer,
&Message {
header: init_header.flip(),
body: MaelstromBody::InitOk {
in_reply_to: init_msg_id,
msg_id: gen_msg_id(),
},
},
);
// Create handler, and channel to go with it
let (send, recv) = channel::unbounded();
(
H::init(node_id.clone(), node_ids, Output::new(node_id, send)),
recv,
)
}
/// Receives JSON from a reader, and outputs the deserialised result to a channel
fn recv_loop<M: for<'a> Deserialize<'a>>(reader: impl Read, channel: Sender<M>) {
let deser = Deserializer::from_reader(reader);
for msg in deser.into_iter() {
let msg = msg.unwrap();
channel.send_blocking(msg).unwrap();
}
}
/// Receives things to send, and outputs them as JSON to writer
fn send_loop<M: Serialize>(mut writer: impl Write, channel: Receiver<M>) {
while let Ok(msg) = channel.recv_blocking() {
write_newline(&mut writer, msg);
}
}
/// Write a message to writer, followed by a newline
fn write_newline(mut writer: impl Write, msg: impl Serialize) {
serde_json::to_writer(&mut writer, &msg).unwrap();
writer.write(&[b'\n']).unwrap();
}
|