summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock533
-rw-r--r--Cargo.toml9
-rw-r--r--common/Cargo.toml6
-rw-r--r--common/src/lib.rs154
-rw-r--r--common/src/msg.rs50
-rw-r--r--common/src/msg_id.rs16
-rw-r--r--echo/src/main.rs66
-rw-r--r--rust-toolchain.toml2
-rw-r--r--unique_ids/src/main.rs94
9 files changed, 764 insertions, 166 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 2f2ae17..7c56de2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3,6 +3,174 @@
version = 3
[[package]]
+name = "async-channel"
+version = "1.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
+dependencies = [
+ "concurrent-queue",
+ "event-listener 2.5.3",
+ "futures-core",
+]
+
+[[package]]
+name = "async-executor"
+version = "1.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2c1da3ae8dabd9c00f453a329dfe1fb28da3c0a72e2478cdcd93171740c20499"
+dependencies = [
+ "async-lock",
+ "async-task",
+ "concurrent-queue",
+ "fastrand 2.0.1",
+ "futures-lite",
+ "slab",
+]
+
+[[package]]
+name = "async-fs"
+version = "1.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "279cf904654eeebfa37ac9bb1598880884924aab82e290aa65c9e77a0e142e06"
+dependencies = [
+ "async-lock",
+ "autocfg",
+ "blocking",
+ "futures-lite",
+]
+
+[[package]]
+name = "async-io"
+version = "1.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af"
+dependencies = [
+ "async-lock",
+ "autocfg",
+ "cfg-if",
+ "concurrent-queue",
+ "futures-lite",
+ "log",
+ "parking",
+ "polling",
+ "rustix 0.37.25",
+ "slab",
+ "socket2",
+ "waker-fn",
+]
+
+[[package]]
+name = "async-lock"
+version = "2.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b"
+dependencies = [
+ "event-listener 2.5.3",
+]
+
+[[package]]
+name = "async-net"
+version = "1.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0434b1ed18ce1cf5769b8ac540e33f01fa9471058b5e89da9e06f3c882a8c12f"
+dependencies = [
+ "async-io",
+ "blocking",
+ "futures-lite",
+]
+
+[[package]]
+name = "async-process"
+version = "1.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ea6438ba0a08d81529c69b36700fa2f95837bfe3e776ab39cde9c14d9149da88"
+dependencies = [
+ "async-io",
+ "async-lock",
+ "async-signal",
+ "blocking",
+ "cfg-if",
+ "event-listener 3.0.0",
+ "futures-lite",
+ "rustix 0.38.19",
+ "windows-sys",
+]
+
+[[package]]
+name = "async-signal"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d2a5415b7abcdc9cd7d63d6badba5288b2ca017e3fbd4173b8f405449f1a2399"
+dependencies = [
+ "async-io",
+ "async-lock",
+ "atomic-waker",
+ "cfg-if",
+ "futures-core",
+ "futures-io",
+ "rustix 0.38.19",
+ "signal-hook-registry",
+ "slab",
+ "windows-sys",
+]
+
+[[package]]
+name = "async-task"
+version = "4.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b9441c6b2fe128a7c2bf680a44c34d0df31ce09e5b7e401fcca3faa483dbc921"
+
+[[package]]
+name = "atomic-waker"
+version = "1.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
+
+[[package]]
+name = "autocfg"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
+
+[[package]]
+name = "bitflags"
+version = "1.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
+
+[[package]]
+name = "bitflags"
+version = "2.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635"
+
+[[package]]
+name = "blocking"
+version = "1.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8c36a4d0d48574b3dd360b4b7d95cc651d2b6557b6402848a27d4b228a473e2a"
+dependencies = [
+ "async-channel",
+ "async-lock",
+ "async-task",
+ "fastrand 2.0.1",
+ "futures-io",
+ "futures-lite",
+ "piper",
+ "tracing",
+]
+
+[[package]]
+name = "broadcast"
+version = "0.1.0"
+dependencies = [
+ "common",
+ "serde",
+ "serde_json",
+ "smol",
+]
+
+[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -12,8 +180,28 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
name = "common"
version = "0.1.0"
dependencies = [
+ "rand",
"serde",
"serde_json",
+ "smol",
+]
+
+[[package]]
+name = "concurrent-queue"
+version = "2.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400"
+dependencies = [
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-utils"
+version = "0.8.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294"
+dependencies = [
+ "cfg-if",
]
[[package]]
@@ -26,6 +214,75 @@ dependencies = [
]
[[package]]
+name = "errno"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860"
+dependencies = [
+ "libc",
+ "windows-sys",
+]
+
+[[package]]
+name = "event-listener"
+version = "2.5.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
+
+[[package]]
+name = "event-listener"
+version = "3.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "29e56284f00d94c1bc7fd3c77027b4623c88c1f53d8d2394c6199f2921dea325"
+dependencies = [
+ "concurrent-queue",
+ "parking",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "fastrand"
+version = "1.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be"
+dependencies = [
+ "instant",
+]
+
+[[package]]
+name = "fastrand"
+version = "2.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
+
+[[package]]
+name = "futures-core"
+version = "0.3.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
+
+[[package]]
+name = "futures-io"
+version = "0.3.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
+
+[[package]]
+name = "futures-lite"
+version = "1.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce"
+dependencies = [
+ "fastrand 1.9.0",
+ "futures-core",
+ "futures-io",
+ "memchr",
+ "parking",
+ "pin-project-lite",
+ "waker-fn",
+]
+
+[[package]]
name = "getrandom"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -37,6 +294,32 @@ dependencies = [
]
[[package]]
+name = "hermit-abi"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7"
+
+[[package]]
+name = "instant"
+version = "0.1.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
+dependencies = [
+ "cfg-if",
+]
+
+[[package]]
+name = "io-lifetimes"
+version = "1.0.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
+dependencies = [
+ "hermit-abi",
+ "libc",
+ "windows-sys",
+]
+
+[[package]]
name = "itoa"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -44,9 +327,72 @@ checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38"
[[package]]
name = "libc"
-version = "0.2.147"
+version = "0.2.149"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b"
+
+[[package]]
+name = "linux-raw-sys"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
+
+[[package]]
+name = "linux-raw-sys"
+version = "0.4.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f"
+
+[[package]]
+name = "log"
+version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
+checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
+
+[[package]]
+name = "memchr"
+version = "2.6.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167"
+
+[[package]]
+name = "parking"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e52c774a4c39359c1d1c52e43f73dd91a75a614652c825408eec30c95a9b2067"
+
+[[package]]
+name = "pin-project-lite"
+version = "0.2.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
+
+[[package]]
+name = "piper"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4"
+dependencies = [
+ "atomic-waker",
+ "fastrand 2.0.1",
+ "futures-io",
+]
+
+[[package]]
+name = "polling"
+version = "2.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce"
+dependencies = [
+ "autocfg",
+ "bitflags 1.3.2",
+ "cfg-if",
+ "concurrent-queue",
+ "libc",
+ "log",
+ "pin-project-lite",
+ "windows-sys",
+]
[[package]]
name = "ppv-lite86"
@@ -103,6 +449,33 @@ dependencies = [
]
[[package]]
+name = "rustix"
+version = "0.37.25"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d4eb579851244c2c03e7c24f501c3432bed80b8f720af1d6e5b0e0f01555a035"
+dependencies = [
+ "bitflags 1.3.2",
+ "errno",
+ "io-lifetimes",
+ "libc",
+ "linux-raw-sys 0.3.8",
+ "windows-sys",
+]
+
+[[package]]
+name = "rustix"
+version = "0.38.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "745ecfa778e66b2b63c88a61cb36e0eea109e803b0b86bf9879fbc77c70e86ed"
+dependencies = [
+ "bitflags 2.4.0",
+ "errno",
+ "libc",
+ "linux-raw-sys 0.4.10",
+ "windows-sys",
+]
+
+[[package]]
name = "ryu"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -140,6 +513,51 @@ dependencies = [
]
[[package]]
+name = "signal-hook-registry"
+version = "1.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
+dependencies = [
+ "libc",
+]
+
+[[package]]
+name = "slab"
+version = "0.4.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
+dependencies = [
+ "autocfg",
+]
+
+[[package]]
+name = "smol"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "13f2b548cd8447f8de0fdf1c592929f70f4fc7039a05e47404b0d096ec6987a1"
+dependencies = [
+ "async-channel",
+ "async-executor",
+ "async-fs",
+ "async-io",
+ "async-lock",
+ "async-net",
+ "async-process",
+ "blocking",
+ "futures-lite",
+]
+
+[[package]]
+name = "socket2"
+version = "0.4.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662"
+dependencies = [
+ "libc",
+ "winapi",
+]
+
+[[package]]
name = "syn"
version = "2.0.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -151,6 +569,23 @@ dependencies = [
]
[[package]]
+name = "tracing"
+version = "0.1.37"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+dependencies = [
+ "cfg-if",
+ "pin-project-lite",
+ "tracing-core",
+]
+
+[[package]]
+name = "tracing-core"
+version = "0.1.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+
+[[package]]
name = "unicode-ident"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -167,7 +602,101 @@ dependencies = [
]
[[package]]
+name = "waker-fn"
+version = "1.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690"
+
+[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
+
+[[package]]
+name = "winapi"
+version = "0.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
+dependencies = [
+ "winapi-i686-pc-windows-gnu",
+ "winapi-x86_64-pc-windows-gnu",
+]
+
+[[package]]
+name = "winapi-i686-pc-windows-gnu"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
+
+[[package]]
+name = "winapi-x86_64-pc-windows-gnu"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
+
+[[package]]
+name = "windows-sys"
+version = "0.48.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
+dependencies = [
+ "windows-targets",
+]
+
+[[package]]
+name = "windows-targets"
+version = "0.48.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c"
+dependencies = [
+ "windows_aarch64_gnullvm",
+ "windows_aarch64_msvc",
+ "windows_i686_gnu",
+ "windows_i686_msvc",
+ "windows_x86_64_gnu",
+ "windows_x86_64_gnullvm",
+ "windows_x86_64_msvc",
+]
+
+[[package]]
+name = "windows_aarch64_gnullvm"
+version = "0.48.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
+
+[[package]]
+name = "windows_aarch64_msvc"
+version = "0.48.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
+
+[[package]]
+name = "windows_i686_gnu"
+version = "0.48.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
+
+[[package]]
+name = "windows_i686_msvc"
+version = "0.48.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
+
+[[package]]
+name = "windows_x86_64_gnu"
+version = "0.48.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
+
+[[package]]
+name = "windows_x86_64_gnullvm"
+version = "0.48.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
+
+[[package]]
+name = "windows_x86_64_msvc"
+version = "0.48.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
diff --git a/Cargo.toml b/Cargo.toml
index 5ec9429..97657b0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,13 @@
[workspace]
+resolver = "2"
members = [
"common",
"echo",
- "unique_ids"
+ "unique_ids",
+ "broadcast",
]
+
+[workspace.dependencies]
+smol = "1.3.0"
+serde = { version = "1.0.185", features = ["derive"] }
+serde_json = "1.0.105" \ No newline at end of file
diff --git a/common/Cargo.toml b/common/Cargo.toml
index 3abc652..b184b3c 100644
--- a/common/Cargo.toml
+++ b/common/Cargo.toml
@@ -6,5 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-serde = { version = "1.0.185", features = ["derive"] }
-serde_json = "1.0.105" \ No newline at end of file
+smol = { workspace = true }
+serde = { workspace = true }
+serde_json = { workspace = true }
+rand = "0.8.5" \ No newline at end of file
diff --git a/common/src/lib.rs b/common/src/lib.rs
index 616dfcc..5317b3e 100644
--- a/common/src/lib.rs
+++ b/common/src/lib.rs
@@ -1,94 +1,122 @@
-use std::io::{Read, Write};
+#![feature(return_position_impl_trait_in_trait)]
+use std::{
+ future::Future,
+ io::{self, Read, Write},
+ thread,
+};
-use msg::{MaelstromBody, MaelstromBodyOr, Message, MessageHeader};
+use msg::{MaelstromBody, Message, MessageHeader, Output};
+use msg_id::gen_msg_id;
use serde::{Deserialize, Serialize};
use serde_json::Deserializer;
+use smol::{
+ channel::{self, Receiver, Sender},
+ future,
+ stream::StreamExt,
+ Executor,
+};
pub mod msg;
+pub mod msg_id;
pub trait Handler {
- type Body: Serialize + for<'a> Deserialize<'a>;
+ type Body: Serialize + for<'a> Deserialize<'a> + Send + Clone;
- fn init(node_id: String, node_ids: Vec<String>, msg_id: usize) -> Self;
- fn handle(
- &mut self,
- header: MessageHeader,
- body: Self::Body,
- writer: &mut MsgWriter<impl Write>,
- ) -> ();
+ fn init(node_id: String, node_ids: Vec<String>, output: Output<Self::Body>) -> Self;
+ fn handle(&self, header: MessageHeader, body: Self::Body) -> impl Future<Output = ()> + Send;
}
-pub struct MsgWriter<W> {
- node_id: String,
- writer: W,
-}
+pub fn run_server<H: Handler>() {
+ // Perform sync initialisation of the handler
+ // This is a special case so that we can use a different message body type just for init messages
+ let (handler, out_recv) = sync_init_handler::<H, _, _>(io::stdin(), io::stdout());
-impl<W: Write> MsgWriter<W> {
- pub fn new(node_id: String, writer: W) -> Self {
- Self { node_id, writer }
- }
+ let (inp_send, mut inp_recv) = channel::unbounded::<Message<H::Body>>();
- pub fn write<T: Serialize>(&mut self, dst: String, msg: &T) {
- let msg = Message {
- header: MessageHeader {
- src: self.node_id.clone(),
- dst,
- },
- body: MaelstromBodyOr::Other { inner: msg },
- };
- serde_json::to_writer(&mut self.writer, &msg).unwrap();
- self.writer.write(&[b'\n']).unwrap();
- }
-}
-
-pub fn run_with<T: Handler>(mut reader: impl Read, mut writer: impl Write) {
- let (mut handler, mut msg_writer) = init_handler::<T, _, _>(&mut reader, &mut writer);
+ thread::scope(|s| {
+ // Worker threads for receiving and sending
+ // This is easier than making it async, and good enough for our usecase.
+ s.spawn(|| recv_loop(io::stdin(), inp_send));
+ s.spawn(|| send_loop(io::stdout(), out_recv));
- let deser = Deserializer::from_reader(reader);
- for msg in deser.into_iter::<Message<T::Body>>() {
- let msg = msg.unwrap();
- match msg.body {
- MaelstromBodyOr::Other { inner } => {
- handler.handle(msg.header, inner, &mut msg_writer);
+ // As we receive messages, spawn a future for each
+ let executor = Executor::new();
+ future::block_on(executor.run(async {
+ while let Some(msg) = inp_recv.next().await {
+ executor
+ .spawn(handler.handle(msg.header, msg.body))
+ .detach();
}
- _ => todo!(),
- };
- }
+ }));
+ });
}
-pub fn init_handler<T: Handler, R: Read, W: Write>(reader: R, writer: W) -> (T, MsgWriter<W>) {
+/// Initialises the handler synchronously.
+///
+/// This is done as a seperate step because we initially deserialize into a different type
+/// than our handler will accept, so there's no point spawning and immediately finishing threads.
+fn sync_init_handler<H: Handler, R: Read, W: Write>(
+ reader: R,
+ mut writer: W,
+) -> (H, Receiver<Message<H::Body>>) {
+ // Receive the init message
let deser = Deserializer::from_reader(reader);
- let mut deser = deser.into_iter::<Message<()>>();
- let Some(msg) = deser.next() else {
- panic!("stream ended before init message");
- };
- let Ok(msg) = msg else {
- panic!("{}", msg.unwrap_err());
- };
-
- let (node_id, node_ids, msg_id) = match msg.body {
- MaelstromBodyOr::MaelstromBody {
- inner:
+ let mut deser = deser.into_iter::<Message<MaelstromBody>>();
+ let (init_header, node_id, node_ids, init_msg_id) = match deser.next() {
+ Some(Ok(Message {
+ header,
+ body:
MaelstromBody::Init {
node_id,
node_ids,
msg_id,
},
- } => (node_id, node_ids, msg_id),
+ })) => (header, node_id, node_ids, msg_id),
+ Some(Err(e)) => panic!("invalid init message: {}", e),
_ => {
panic!("expected init message to be first message");
}
};
- let mut writer = MsgWriter::new(node_id.clone(), writer);
-
- writer.write(
- msg.header.src,
- &MaelstromBody::InitOk {
- msg_id: 0,
- in_reply_to: msg_id,
+ // Write the init_ok message
+ write_newline(
+ &mut writer,
+ &Message {
+ header: init_header.flip(),
+ body: MaelstromBody::InitOk {
+ in_reply_to: init_msg_id,
+ msg_id: gen_msg_id(),
+ },
},
);
- (T::init(node_id, node_ids, msg_id), writer)
+ // Create handler, and channel to go with it
+ let (send, recv) = channel::unbounded();
+
+ (
+ H::init(node_id.clone(), node_ids, Output::new(node_id, send)),
+ recv,
+ )
+}
+
+/// Receives JSON from a reader, and outputs the deserialised result to a channel
+fn recv_loop<M: for<'a> Deserialize<'a>>(reader: impl Read, channel: Sender<M>) {
+ let deser = Deserializer::from_reader(reader);
+ for msg in deser.into_iter() {
+ let msg = msg.unwrap();
+ channel.send_blocking(msg).unwrap();
+ }
+}
+
+/// Receives things to send, and outputs them as JSON to writer
+fn send_loop<M: Serialize>(mut writer: impl Write, channel: Receiver<M>) {
+ while let Ok(msg) = channel.recv_blocking() {
+ write_newline(&mut writer, msg);
+ }
+}
+
+/// Write a message to writer, followed by a newline
+fn write_newline(mut writer: impl Write, msg: impl Serialize) {
+ serde_json::to_writer(&mut writer, &msg).unwrap();
+ writer.write(&[b'\n']).unwrap();
}
diff --git a/common/src/msg.rs b/common/src/msg.rs
index 23db171..7e9863f 100644
--- a/common/src/msg.rs
+++ b/common/src/msg.rs
@@ -1,10 +1,13 @@
use serde::{Deserialize, Serialize};
+use smol::channel::Sender;
+
+use crate::msg_id::MessageID;
#[derive(Debug, Serialize, Deserialize)]
pub struct Message<B> {
#[serde(flatten)]
pub header: MessageHeader,
- pub body: MaelstromBodyOr<B>,
+ pub body: B,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -24,27 +27,42 @@ impl MessageHeader {
}
#[derive(Debug, Serialize, Deserialize)]
-#[serde(untagged)]
-pub enum MaelstromBodyOr<B> {
- MaelstromBody {
- #[serde(flatten)]
- inner: MaelstromBody,
- },
- Other {
- #[serde(flatten)]
- inner: B,
- },
-}
-
-#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum MaelstromBody {
#[serde(rename = "init")]
Init {
node_id: String,
node_ids: Vec<String>,
- msg_id: usize,
+ msg_id: MessageID,
},
#[serde(rename = "init_ok")]
- InitOk { msg_id: usize, in_reply_to: usize },
+ InitOk {
+ msg_id: MessageID,
+ in_reply_to: MessageID,
+ },
+}
+
+pub struct Output<B> {
+ node_id: String,
+ channel: Sender<Message<B>>,
+}
+
+impl<B: Serialize + Clone> Output<B> {
+ pub fn new(node_id: String, channel: Sender<Message<B>>) -> Self {
+ Self { node_id, channel }
+ }
+
+ pub async fn send(&self, dst: &str, body: &B) {
+ self.send_raw(Message {
+ header: MessageHeader {
+ src: self.node_id.clone(),
+ dst: dst.to_string(),
+ },
+ body: body.clone(),
+ })
+ .await;
+ }
+ pub async fn send_raw(&self, msg: Message<B>) {
+ self.channel.send(msg).await.unwrap();
+ }
}
diff --git a/common/src/msg_id.rs b/common/src/msg_id.rs
new file mode 100644
index 0000000..e953f08
--- /dev/null
+++ b/common/src/msg_id.rs
@@ -0,0 +1,16 @@
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use rand::{thread_rng, Rng};
+
+pub type MessageID = u64;
+
+pub fn gen_msg_id() -> MessageID {
+ // Time since UNIX epoch in milliseconds, (48 bits)
+ let now = SystemTime::now();
+ let time_millis: u128 = now.duration_since(UNIX_EPOCH).unwrap().as_millis();
+
+ // 16 bits of randomness
+ let rand: u16 = thread_rng().gen();
+
+ ((time_millis as u64) << 16) | (rand as u64)
+}
diff --git a/echo/src/main.rs b/echo/src/main.rs
index 5ae72f3..fed0059 100644
--- a/echo/src/main.rs
+++ b/echo/src/main.rs
@@ -1,57 +1,59 @@
-use std::io::{self, Write};
-
-use common::{msg::*, run_with, Handler, MsgWriter};
+#![feature(return_position_impl_trait_in_trait)]
+use std::future::Future;
+
+use common::{
+ msg::*,
+ msg_id::{gen_msg_id, MessageID},
+ run_server, Handler,
+};
use serde::{Deserialize, Serialize};
fn main() {
- run_with::<EchoHandler>(io::stdin(), io::stdout());
+ run_server::<EchoHandler>();
}
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum EchoBody {
#[serde(rename = "echo")]
- Echo { msg_id: usize, echo: String },
+ Echo { msg_id: MessageID, echo: String },
#[serde(rename = "echo_ok")]
EchoOk {
- msg_id: usize,
- in_reply_to: usize,
+ msg_id: MessageID,
+ in_reply_to: MessageID,
echo: String,
},
}
pub struct EchoHandler {
- next_msg_id: usize,
+ output: Output<EchoBody>,
}
impl Handler for EchoHandler {
type Body = EchoBody;
- fn init(_node_id: String, _node_ids: Vec<String>, _msg_id: usize) -> Self {
- EchoHandler { next_msg_id: 1 }
+ fn init(_node_id: String, _node_ids: Vec<String>, output: Output<Self::Body>) -> Self {
+ EchoHandler { output }
}
- fn handle(
- &mut self,
- header: MessageHeader,
- body: Self::Body,
- writer: &mut MsgWriter<impl Write>,
- ) {
- match body {
- EchoBody::Echo { msg_id, echo } => {
- writer.write(
- header.src,
- &EchoBody::EchoOk {
- msg_id: self.next_msg_id,
- in_reply_to: msg_id,
- echo,
- },
- );
-
- self.next_msg_id += 1;
- }
- EchoBody::EchoOk { .. } => (),
- };
+ fn handle(&self, header: MessageHeader, body: Self::Body) -> impl Future<Output = ()> + Send {
+ async move {
+ match body {
+ EchoBody::Echo { msg_id, echo } => {
+ self.output
+ .send(
+ &header.src,
+ &EchoBody::EchoOk {
+ msg_id: gen_msg_id(),
+ in_reply_to: msg_id,
+ echo,
+ },
+ )
+ .await;
+ }
+ EchoBody::EchoOk { .. } => (),
+ };
+ }
}
}
diff --git a/rust-toolchain.toml b/rust-toolchain.toml
new file mode 100644
index 0000000..271800c
--- /dev/null
+++ b/rust-toolchain.toml
@@ -0,0 +1,2 @@
+[toolchain]
+channel = "nightly" \ No newline at end of file
diff --git a/unique_ids/src/main.rs b/unique_ids/src/main.rs
index 3787033..f899e2b 100644
--- a/unique_ids/src/main.rs
+++ b/unique_ids/src/main.rs
@@ -1,83 +1,77 @@
+#![feature(return_position_impl_trait_in_trait)]
use std::{
- io,
+ future::Future,
time::{SystemTime, UNIX_EPOCH},
};
-use common::{run_with, Handler};
-use rand::{rngs::StdRng, Rng, SeedableRng};
+use common::{
+ msg::{MessageHeader, Output},
+ msg_id::{gen_msg_id, MessageID},
+ run_server, Handler,
+};
+use rand::{thread_rng, Rng};
use serde::{Deserialize, Serialize};
fn main() {
- run_with::<UniqueIdsHandler>(io::stdin(), io::stdout())
+ run_server::<UniqueIdsHandler>()
}
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
enum UniqueIdsBody {
#[serde(rename = "generate")]
- Generate { msg_id: usize },
+ Generate { msg_id: MessageID },
#[serde(rename = "generate_ok")]
GenerateOk {
- msg_id: usize,
- in_reply_to: usize,
+ msg_id: MessageID,
+ in_reply_to: MessageID,
id: u128,
},
}
-type RngMethod = StdRng;
-
struct UniqueIdsHandler {
- rng: RngMethod,
- next_msg_id: usize,
+ output: Output<UniqueIdsBody>,
}
impl Handler for UniqueIdsHandler {
type Body = UniqueIdsBody;
- fn init(_node_id: String, _node_ids: Vec<String>, _msg_id: usize) -> Self {
- Self {
- rng: RngMethod::from_entropy(), // TODO: This could be seeded from the node ID
- next_msg_id: 1,
- }
+ fn init(_node_id: String, _node_ids: Vec<String>, output: Output<Self::Body>) -> Self {
+ Self { output }
}
- fn handle(
- &mut self,
- header: common::msg::MessageHeader,
- body: Self::Body,
- writer: &mut common::MsgWriter<impl std::io::Write>,
- ) -> () {
- match body {
- UniqueIdsBody::Generate { msg_id } => {
- let id = self.gen_id();
- writer.write(
- header.src,
- &UniqueIdsBody::GenerateOk {
- msg_id: self.next_msg_id,
- in_reply_to: msg_id,
- id,
- },
- );
-
- self.next_msg_id += 1;
- }
- UniqueIdsBody::GenerateOk { .. } => (),
- };
+ fn handle(&self, header: MessageHeader, body: Self::Body) -> impl Future<Output = ()> + Send {
+ async move {
+ match body {
+ UniqueIdsBody::Generate { msg_id } => {
+ let id = gen_id();
+ self.output
+ .send(
+ &header.src,
+ &UniqueIdsBody::GenerateOk {
+ msg_id: gen_msg_id(),
+ in_reply_to: msg_id,
+ id,
+ },
+ )
+ .await;
+ }
+ UniqueIdsBody::GenerateOk { .. } => (),
+ };
+ }
}
}
-impl UniqueIdsHandler {
- fn gen_id(&mut self) -> u128 {
- // Time since UNIX epoch in milliseconds
- let now = SystemTime::now();
- let time_millis: u128 = now.duration_since(UNIX_EPOCH).unwrap().as_millis();
+fn gen_id() -> u128 {
+ // Time since UNIX epoch in milliseconds
+ let now = SystemTime::now();
+ let time_millis: u128 = now.duration_since(UNIX_EPOCH).unwrap().as_millis();
- // 80 bits of randomness
- let rand1: u16 = self.rng.gen();
- let rand2: u64 = self.rng.gen();
- let rand: u128 = rand1 as u128 | ((rand2 as u128) << 64);
+ // 80 bits of randomness
+ let rand1: u16 = thread_rng().gen();
+ let rand2: u64 = thread_rng().gen();
+ let rand: u128 = rand1 as u128 | ((rand2 as u128) << 64);
- (time_millis << 80) | rand
- }
+ (time_millis << 80) | rand
}