From 1b6c1b425f78f4ec3eb275f21a792776e50cbf93 Mon Sep 17 00:00:00 2001 From: Aria Date: Fri, 13 Oct 2023 01:33:38 +0100 Subject: start using async --- Cargo.lock | 533 ++++++++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 9 +- common/Cargo.toml | 6 +- common/src/lib.rs | 154 ++++++++------ common/src/msg.rs | 50 +++-- common/src/msg_id.rs | 16 ++ echo/src/main.rs | 66 +++--- rust-toolchain.toml | 2 + unique_ids/src/main.rs | 94 ++++----- 9 files changed, 764 insertions(+), 166 deletions(-) create mode 100644 common/src/msg_id.rs create mode 100644 rust-toolchain.toml diff --git a/Cargo.lock b/Cargo.lock index 2f2ae17..7c56de2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,174 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-executor" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c1da3ae8dabd9c00f453a329dfe1fb28da3c0a72e2478cdcd93171740c20499" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand 2.0.1", + "futures-lite", + "slab", +] + +[[package]] +name = "async-fs" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "279cf904654eeebfa37ac9bb1598880884924aab82e290aa65c9e77a0e142e06" +dependencies = [ + "async-lock", + "autocfg", + "blocking", + "futures-lite", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix 0.37.25", + "slab", + "socket2", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener 2.5.3", +] + +[[package]] +name = "async-net" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0434b1ed18ce1cf5769b8ac540e33f01fa9471058b5e89da9e06f3c882a8c12f" +dependencies = [ + "async-io", + "blocking", + "futures-lite", +] + +[[package]] +name = "async-process" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea6438ba0a08d81529c69b36700fa2f95837bfe3e776ab39cde9c14d9149da88" +dependencies = [ + "async-io", + "async-lock", + "async-signal", + "blocking", + "cfg-if", + "event-listener 3.0.0", + "futures-lite", + "rustix 0.38.19", + "windows-sys", +] + +[[package]] +name = "async-signal" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2a5415b7abcdc9cd7d63d6badba5288b2ca017e3fbd4173b8f405449f1a2399" +dependencies = [ + "async-io", + "async-lock", + "atomic-waker", + "cfg-if", + "futures-core", + "futures-io", + "rustix 0.38.19", + "signal-hook-registry", + "slab", + "windows-sys", +] + +[[package]] +name = "async-task" +version = "4.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9441c6b2fe128a7c2bf680a44c34d0df31ce09e5b7e401fcca3faa483dbc921" + +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bitflags" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" + +[[package]] +name = "blocking" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c36a4d0d48574b3dd360b4b7d95cc651d2b6557b6402848a27d4b228a473e2a" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "fastrand 2.0.1", + "futures-io", + "futures-lite", + "piper", + "tracing", +] + +[[package]] +name = "broadcast" +version = "0.1.0" +dependencies = [ + "common", + "serde", + "serde_json", + "smol", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -12,8 +180,28 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" name = "common" version = "0.1.0" dependencies = [ + "rand", "serde", "serde_json", + "smol", +] + +[[package]] +name = "concurrent-queue" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +dependencies = [ + "cfg-if", ] [[package]] @@ -25,6 +213,75 @@ dependencies = [ "serde_json", ] +[[package]] +name = "errno" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "event-listener" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29e56284f00d94c1bc7fd3c77027b4623c88c1f53d8d2394c6199f2921dea325" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + +[[package]] +name = "fastrand" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" + +[[package]] +name = "futures-core" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" + +[[package]] +name = "futures-io" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" + +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "getrandom" version = "0.2.10" @@ -36,6 +293,32 @@ dependencies = [ "wasi", ] +[[package]] +name = "hermit-abi" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" + +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys", +] + [[package]] name = "itoa" version = "1.0.9" @@ -44,9 +327,72 @@ checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" [[package]] name = "libc" -version = "0.2.147" +version = "0.2.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" + +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + +[[package]] +name = "linux-raw-sys" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" + +[[package]] +name = "log" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" + +[[package]] +name = "memchr" +version = "2.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" + +[[package]] +name = "parking" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e52c774a4c39359c1d1c52e43f73dd91a75a614652c825408eec30c95a9b2067" + +[[package]] +name = "pin-project-lite" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" + +[[package]] +name = "piper" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" +dependencies = [ + "atomic-waker", + "fastrand 2.0.1", + "futures-io", +] + +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys", +] [[package]] name = "ppv-lite86" @@ -102,6 +448,33 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rustix" +version = "0.37.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4eb579851244c2c03e7c24f501c3432bed80b8f720af1d6e5b0e0f01555a035" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys", +] + +[[package]] +name = "rustix" +version = "0.38.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "745ecfa778e66b2b63c88a61cb36e0eea109e803b0b86bf9879fbc77c70e86ed" +dependencies = [ + "bitflags 2.4.0", + "errno", + "libc", + "linux-raw-sys 0.4.10", + "windows-sys", +] + [[package]] name = "ryu" version = "1.0.15" @@ -139,6 +512,51 @@ dependencies = [ "serde", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "smol" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13f2b548cd8447f8de0fdf1c592929f70f4fc7039a05e47404b0d096ec6987a1" +dependencies = [ + "async-channel", + "async-executor", + "async-fs", + "async-io", + "async-lock", + "async-net", + "async-process", + "blocking", + "futures-lite", +] + +[[package]] +name = "socket2" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "syn" version = "2.0.29" @@ -150,6 +568,23 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tracing" +version = "0.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" + [[package]] name = "unicode-ident" version = "1.0.11" @@ -166,8 +601,102 @@ dependencies = [ "serde_json", ] +[[package]] +name = "waker-fn" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" diff --git a/Cargo.toml b/Cargo.toml index 5ec9429..97657b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,13 @@ [workspace] +resolver = "2" members = [ "common", "echo", - "unique_ids" + "unique_ids", + "broadcast", ] + +[workspace.dependencies] +smol = "1.3.0" +serde = { version = "1.0.185", features = ["derive"] } +serde_json = "1.0.105" \ No newline at end of file diff --git a/common/Cargo.toml b/common/Cargo.toml index 3abc652..b184b3c 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -6,5 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -serde = { version = "1.0.185", features = ["derive"] } -serde_json = "1.0.105" \ No newline at end of file +smol = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +rand = "0.8.5" \ No newline at end of file diff --git a/common/src/lib.rs b/common/src/lib.rs index 616dfcc..5317b3e 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -1,94 +1,122 @@ -use std::io::{Read, Write}; +#![feature(return_position_impl_trait_in_trait)] +use std::{ + future::Future, + io::{self, Read, Write}, + thread, +}; -use msg::{MaelstromBody, MaelstromBodyOr, Message, MessageHeader}; +use msg::{MaelstromBody, Message, MessageHeader, Output}; +use msg_id::gen_msg_id; use serde::{Deserialize, Serialize}; use serde_json::Deserializer; +use smol::{ + channel::{self, Receiver, Sender}, + future, + stream::StreamExt, + Executor, +}; pub mod msg; +pub mod msg_id; pub trait Handler { - type Body: Serialize + for<'a> Deserialize<'a>; + type Body: Serialize + for<'a> Deserialize<'a> + Send + Clone; - fn init(node_id: String, node_ids: Vec, msg_id: usize) -> Self; - fn handle( - &mut self, - header: MessageHeader, - body: Self::Body, - writer: &mut MsgWriter, - ) -> (); + fn init(node_id: String, node_ids: Vec, output: Output) -> Self; + fn handle(&self, header: MessageHeader, body: Self::Body) -> impl Future + Send; } -pub struct MsgWriter { - node_id: String, - writer: W, -} +pub fn run_server() { + // Perform sync initialisation of the handler + // This is a special case so that we can use a different message body type just for init messages + let (handler, out_recv) = sync_init_handler::(io::stdin(), io::stdout()); -impl MsgWriter { - pub fn new(node_id: String, writer: W) -> Self { - Self { node_id, writer } - } + let (inp_send, mut inp_recv) = channel::unbounded::>(); - pub fn write(&mut self, dst: String, msg: &T) { - let msg = Message { - header: MessageHeader { - src: self.node_id.clone(), - dst, - }, - body: MaelstromBodyOr::Other { inner: msg }, - }; - serde_json::to_writer(&mut self.writer, &msg).unwrap(); - self.writer.write(&[b'\n']).unwrap(); - } -} - -pub fn run_with(mut reader: impl Read, mut writer: impl Write) { - let (mut handler, mut msg_writer) = init_handler::(&mut reader, &mut writer); + thread::scope(|s| { + // Worker threads for receiving and sending + // This is easier than making it async, and good enough for our usecase. + s.spawn(|| recv_loop(io::stdin(), inp_send)); + s.spawn(|| send_loop(io::stdout(), out_recv)); - let deser = Deserializer::from_reader(reader); - for msg in deser.into_iter::>() { - let msg = msg.unwrap(); - match msg.body { - MaelstromBodyOr::Other { inner } => { - handler.handle(msg.header, inner, &mut msg_writer); + // As we receive messages, spawn a future for each + let executor = Executor::new(); + future::block_on(executor.run(async { + while let Some(msg) = inp_recv.next().await { + executor + .spawn(handler.handle(msg.header, msg.body)) + .detach(); } - _ => todo!(), - }; - } + })); + }); } -pub fn init_handler(reader: R, writer: W) -> (T, MsgWriter) { +/// Initialises the handler synchronously. +/// +/// This is done as a seperate step because we initially deserialize into a different type +/// than our handler will accept, so there's no point spawning and immediately finishing threads. +fn sync_init_handler( + reader: R, + mut writer: W, +) -> (H, Receiver>) { + // Receive the init message let deser = Deserializer::from_reader(reader); - let mut deser = deser.into_iter::>(); - let Some(msg) = deser.next() else { - panic!("stream ended before init message"); - }; - let Ok(msg) = msg else { - panic!("{}", msg.unwrap_err()); - }; - - let (node_id, node_ids, msg_id) = match msg.body { - MaelstromBodyOr::MaelstromBody { - inner: + let mut deser = deser.into_iter::>(); + let (init_header, node_id, node_ids, init_msg_id) = match deser.next() { + Some(Ok(Message { + header, + body: MaelstromBody::Init { node_id, node_ids, msg_id, }, - } => (node_id, node_ids, msg_id), + })) => (header, node_id, node_ids, msg_id), + Some(Err(e)) => panic!("invalid init message: {}", e), _ => { panic!("expected init message to be first message"); } }; - let mut writer = MsgWriter::new(node_id.clone(), writer); - - writer.write( - msg.header.src, - &MaelstromBody::InitOk { - msg_id: 0, - in_reply_to: msg_id, + // Write the init_ok message + write_newline( + &mut writer, + &Message { + header: init_header.flip(), + body: MaelstromBody::InitOk { + in_reply_to: init_msg_id, + msg_id: gen_msg_id(), + }, }, ); - (T::init(node_id, node_ids, msg_id), writer) + // Create handler, and channel to go with it + let (send, recv) = channel::unbounded(); + + ( + H::init(node_id.clone(), node_ids, Output::new(node_id, send)), + recv, + ) +} + +/// Receives JSON from a reader, and outputs the deserialised result to a channel +fn recv_loop Deserialize<'a>>(reader: impl Read, channel: Sender) { + let deser = Deserializer::from_reader(reader); + for msg in deser.into_iter() { + let msg = msg.unwrap(); + channel.send_blocking(msg).unwrap(); + } +} + +/// Receives things to send, and outputs them as JSON to writer +fn send_loop(mut writer: impl Write, channel: Receiver) { + while let Ok(msg) = channel.recv_blocking() { + write_newline(&mut writer, msg); + } +} + +/// Write a message to writer, followed by a newline +fn write_newline(mut writer: impl Write, msg: impl Serialize) { + serde_json::to_writer(&mut writer, &msg).unwrap(); + writer.write(&[b'\n']).unwrap(); } diff --git a/common/src/msg.rs b/common/src/msg.rs index 23db171..7e9863f 100644 --- a/common/src/msg.rs +++ b/common/src/msg.rs @@ -1,10 +1,13 @@ use serde::{Deserialize, Serialize}; +use smol::channel::Sender; + +use crate::msg_id::MessageID; #[derive(Debug, Serialize, Deserialize)] pub struct Message { #[serde(flatten)] pub header: MessageHeader, - pub body: MaelstromBodyOr, + pub body: B, } #[derive(Debug, Serialize, Deserialize)] @@ -23,19 +26,6 @@ impl MessageHeader { } } -#[derive(Debug, Serialize, Deserialize)] -#[serde(untagged)] -pub enum MaelstromBodyOr { - MaelstromBody { - #[serde(flatten)] - inner: MaelstromBody, - }, - Other { - #[serde(flatten)] - inner: B, - }, -} - #[derive(Debug, Serialize, Deserialize)] #[serde(tag = "type")] pub enum MaelstromBody { @@ -43,8 +33,36 @@ pub enum MaelstromBody { Init { node_id: String, node_ids: Vec, - msg_id: usize, + msg_id: MessageID, }, #[serde(rename = "init_ok")] - InitOk { msg_id: usize, in_reply_to: usize }, + InitOk { + msg_id: MessageID, + in_reply_to: MessageID, + }, +} + +pub struct Output { + node_id: String, + channel: Sender>, +} + +impl Output { + pub fn new(node_id: String, channel: Sender>) -> Self { + Self { node_id, channel } + } + + pub async fn send(&self, dst: &str, body: &B) { + self.send_raw(Message { + header: MessageHeader { + src: self.node_id.clone(), + dst: dst.to_string(), + }, + body: body.clone(), + }) + .await; + } + pub async fn send_raw(&self, msg: Message) { + self.channel.send(msg).await.unwrap(); + } } diff --git a/common/src/msg_id.rs b/common/src/msg_id.rs new file mode 100644 index 0000000..e953f08 --- /dev/null +++ b/common/src/msg_id.rs @@ -0,0 +1,16 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use rand::{thread_rng, Rng}; + +pub type MessageID = u64; + +pub fn gen_msg_id() -> MessageID { + // Time since UNIX epoch in milliseconds, (48 bits) + let now = SystemTime::now(); + let time_millis: u128 = now.duration_since(UNIX_EPOCH).unwrap().as_millis(); + + // 16 bits of randomness + let rand: u16 = thread_rng().gen(); + + ((time_millis as u64) << 16) | (rand as u64) +} diff --git a/echo/src/main.rs b/echo/src/main.rs index 5ae72f3..fed0059 100644 --- a/echo/src/main.rs +++ b/echo/src/main.rs @@ -1,57 +1,59 @@ -use std::io::{self, Write}; - -use common::{msg::*, run_with, Handler, MsgWriter}; +#![feature(return_position_impl_trait_in_trait)] +use std::future::Future; + +use common::{ + msg::*, + msg_id::{gen_msg_id, MessageID}, + run_server, Handler, +}; use serde::{Deserialize, Serialize}; fn main() { - run_with::(io::stdin(), io::stdout()); + run_server::(); } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum EchoBody { #[serde(rename = "echo")] - Echo { msg_id: usize, echo: String }, + Echo { msg_id: MessageID, echo: String }, #[serde(rename = "echo_ok")] EchoOk { - msg_id: usize, - in_reply_to: usize, + msg_id: MessageID, + in_reply_to: MessageID, echo: String, }, } pub struct EchoHandler { - next_msg_id: usize, + output: Output, } impl Handler for EchoHandler { type Body = EchoBody; - fn init(_node_id: String, _node_ids: Vec, _msg_id: usize) -> Self { - EchoHandler { next_msg_id: 1 } + fn init(_node_id: String, _node_ids: Vec, output: Output) -> Self { + EchoHandler { output } } - fn handle( - &mut self, - header: MessageHeader, - body: Self::Body, - writer: &mut MsgWriter, - ) { - match body { - EchoBody::Echo { msg_id, echo } => { - writer.write( - header.src, - &EchoBody::EchoOk { - msg_id: self.next_msg_id, - in_reply_to: msg_id, - echo, - }, - ); - - self.next_msg_id += 1; - } - EchoBody::EchoOk { .. } => (), - }; + fn handle(&self, header: MessageHeader, body: Self::Body) -> impl Future + Send { + async move { + match body { + EchoBody::Echo { msg_id, echo } => { + self.output + .send( + &header.src, + &EchoBody::EchoOk { + msg_id: gen_msg_id(), + in_reply_to: msg_id, + echo, + }, + ) + .await; + } + EchoBody::EchoOk { .. } => (), + }; + } } } diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..271800c --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "nightly" \ No newline at end of file diff --git a/unique_ids/src/main.rs b/unique_ids/src/main.rs index 3787033..f899e2b 100644 --- a/unique_ids/src/main.rs +++ b/unique_ids/src/main.rs @@ -1,83 +1,77 @@ +#![feature(return_position_impl_trait_in_trait)] use std::{ - io, + future::Future, time::{SystemTime, UNIX_EPOCH}, }; -use common::{run_with, Handler}; -use rand::{rngs::StdRng, Rng, SeedableRng}; +use common::{ + msg::{MessageHeader, Output}, + msg_id::{gen_msg_id, MessageID}, + run_server, Handler, +}; +use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; fn main() { - run_with::(io::stdin(), io::stdout()) + run_server::() } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] enum UniqueIdsBody { #[serde(rename = "generate")] - Generate { msg_id: usize }, + Generate { msg_id: MessageID }, #[serde(rename = "generate_ok")] GenerateOk { - msg_id: usize, - in_reply_to: usize, + msg_id: MessageID, + in_reply_to: MessageID, id: u128, }, } -type RngMethod = StdRng; - struct UniqueIdsHandler { - rng: RngMethod, - next_msg_id: usize, + output: Output, } impl Handler for UniqueIdsHandler { type Body = UniqueIdsBody; - fn init(_node_id: String, _node_ids: Vec, _msg_id: usize) -> Self { - Self { - rng: RngMethod::from_entropy(), // TODO: This could be seeded from the node ID - next_msg_id: 1, - } + fn init(_node_id: String, _node_ids: Vec, output: Output) -> Self { + Self { output } } - fn handle( - &mut self, - header: common::msg::MessageHeader, - body: Self::Body, - writer: &mut common::MsgWriter, - ) -> () { - match body { - UniqueIdsBody::Generate { msg_id } => { - let id = self.gen_id(); - writer.write( - header.src, - &UniqueIdsBody::GenerateOk { - msg_id: self.next_msg_id, - in_reply_to: msg_id, - id, - }, - ); - - self.next_msg_id += 1; - } - UniqueIdsBody::GenerateOk { .. } => (), - }; + fn handle(&self, header: MessageHeader, body: Self::Body) -> impl Future + Send { + async move { + match body { + UniqueIdsBody::Generate { msg_id } => { + let id = gen_id(); + self.output + .send( + &header.src, + &UniqueIdsBody::GenerateOk { + msg_id: gen_msg_id(), + in_reply_to: msg_id, + id, + }, + ) + .await; + } + UniqueIdsBody::GenerateOk { .. } => (), + }; + } } } -impl UniqueIdsHandler { - fn gen_id(&mut self) -> u128 { - // Time since UNIX epoch in milliseconds - let now = SystemTime::now(); - let time_millis: u128 = now.duration_since(UNIX_EPOCH).unwrap().as_millis(); +fn gen_id() -> u128 { + // Time since UNIX epoch in milliseconds + let now = SystemTime::now(); + let time_millis: u128 = now.duration_since(UNIX_EPOCH).unwrap().as_millis(); - // 80 bits of randomness - let rand1: u16 = self.rng.gen(); - let rand2: u64 = self.rng.gen(); - let rand: u128 = rand1 as u128 | ((rand2 as u128) << 64); + // 80 bits of randomness + let rand1: u16 = thread_rng().gen(); + let rand2: u64 = thread_rng().gen(); + let rand: u128 = rand1 as u128 | ((rand2 as u128) << 64); - (time_millis << 80) | rand - } + (time_millis << 80) | rand } -- cgit v1.2.3