diff options
-rw-r--r-- | Cargo.lock | 60 | ||||
-rw-r--r-- | Justfile | 4 | ||||
-rw-r--r-- | broadcast/Cargo.toml | 3 | ||||
-rw-r--r-- | broadcast/src/handler.rs | 167 | ||||
-rw-r--r-- | broadcast/src/main.rs | 136 | ||||
-rw-r--r-- | broadcast/src/msg.rs | 36 | ||||
-rw-r--r-- | common/src/lib.rs | 6 |
7 files changed, 276 insertions, 136 deletions
@@ -165,6 +165,7 @@ name = "broadcast" version = "0.1.0" dependencies = [ "common", + "futures", "serde", "serde_json", "smol", @@ -256,6 +257,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] name = "futures-core" version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -283,6 +308,35 @@ dependencies = [ ] [[package]] +name = "futures-sink" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" + +[[package]] +name = "futures-task" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" + +[[package]] +name = "futures-util" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] name = "getrandom" version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -368,6 +422,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" [[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] name = "piper" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -14,4 +14,8 @@ test-broadcast-multi: cargo build --bin broadcast maelstrom test -w broadcast --bin target/debug/broadcast --node-count 5 --time-limit 20 --rate 10 +test-broadcast-fault: + cargo build --bin broadcast + maelstrom test -w broadcast --bin target/debug/broadcast --node-count 5 --time-limit 20 --rate 10 --nemesis partition + test: test-echo test-unique-ids test-broadcast-single test-broadcast-multi
\ No newline at end of file diff --git a/broadcast/Cargo.toml b/broadcast/Cargo.toml index f3e4cc7..06e4426 100644 --- a/broadcast/Cargo.toml +++ b/broadcast/Cargo.toml @@ -9,4 +9,5 @@ edition = "2021" smol = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -common = { path = "../common/" }
\ No newline at end of file +common = { path = "../common/" } +futures = { version = "0.3.28", default_features = false, features = ["std"] }
\ No newline at end of file diff --git a/broadcast/src/handler.rs b/broadcast/src/handler.rs new file mode 100644 index 0000000..09c66b9 --- /dev/null +++ b/broadcast/src/handler.rs @@ -0,0 +1,167 @@ +use smol::{ + lock::{Mutex, RwLock}, + prelude::*, + Task, Timer, +}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; + +use crate::msg::*; + +use common::{ + msg::{MessageHeader, Output}, + msg_id::{gen_msg_id, MessageID}, + Handler, +}; + +const RETRY_TIMEOUT: u64 = 2; + +pub struct BroadcastHandler { + node_id: String, + seen: RwLock<HashSet<BroadcastTarget>>, + broadcast_targets: RwLock<Vec<String>>, + output: Output<BroadcastBody>, + attempted_broadcasts: Mutex<HashMap<MessageID, Task<()>>>, +} + +impl Handler for BroadcastHandler { + type Body = BroadcastBody; + + fn init(node_id: String, mut node_ids: Vec<String>, output: Output<Self::Body>) -> Self { + node_ids.retain(|x| *x != node_id); + + BroadcastHandler { + node_id, + broadcast_targets: RwLock::new(node_ids), + seen: RwLock::new(HashSet::new()), + output, + attempted_broadcasts: Mutex::default(), + } + } + + fn handle<'a>( + self: &'a Arc<Self>, + header: MessageHeader, + body: BroadcastBody, + ) -> impl Future<Output = ()> + Send + 'a { + async move { + match body { + BroadcastBody::Broadcast { + msg_id: Some(msg_id), + message, + } => { + self.receive_broadcast(&header.src, message).await; + self.send_broadcast_ok(&header.src, msg_id).await; + } + BroadcastBody::Broadcast { + msg_id: None, + message, + } => { + self.receive_broadcast(&header.src, message).await; + } + + BroadcastBody::Topology { + msg_id, + mut topology, + } => { + // Start using the new topology + if let Some(broadcast_targets) = topology.remove(&self.node_id) { + *self.broadcast_targets.write().await = broadcast_targets; + } + + // Send reply if needed + if let Some(msg_id) = msg_id { + self.output + .send( + &header.src, + &BroadcastBody::TopologyOk { + in_reply_to: msg_id, + }, + ) + .await; + } + } + + BroadcastBody::Read { msg_id } => { + // Send all received messages back + self.output + .send( + &header.src, + &BroadcastBody::ReadOk { + in_reply_to: msg_id, + messages: self.seen.read().await.clone(), + }, + ) + .await + } + + // Ignore OK messages - we never actually request them + BroadcastBody::BroadcastOk { in_reply_to } => { + if let Some(task) = self.attempted_broadcasts.lock().await.remove(&in_reply_to) + { + task.cancel().await; + } + } + BroadcastBody::TopologyOk { .. } => {} + BroadcastBody::ReadOk { .. } => {} + } + } + } +} + +impl BroadcastHandler { + /// Reply with a broadcast OK message + async fn send_broadcast_ok(&self, src: &str, msg_id: MessageID) { + self.output + .send( + &src, + &BroadcastBody::BroadcastOk { + in_reply_to: msg_id, + }, + ) + .await; + } + + /// Receive a given message, and broadcast it onwards if it is new + async fn receive_broadcast(self: &Arc<Self>, src: &str, message: BroadcastTarget) { + let new = self.seen.write().await.insert(message); + if !new { + return; + } + + // Ensure we don't keep holding the read lock + let targets = self.broadcast_targets.read().await.clone(); + + // Race all send futures + let mut tasks = self.attempted_broadcasts.lock().await; + for target in targets.into_iter() { + if &target == &src { + return; + } + + let msg_id = gen_msg_id(); + let this = self.clone(); + tasks.insert( + msg_id, + smol::spawn(async move { + loop { + this.output + .send( + &target, + &BroadcastBody::Broadcast { + msg_id: None, + message, + }, + ) + .await; + + Timer::after(Duration::from_secs(RETRY_TIMEOUT)).await; + } + }), + ); + } + } +} diff --git a/broadcast/src/main.rs b/broadcast/src/main.rs index 09c0268..bdc8413 100644 --- a/broadcast/src/main.rs +++ b/broadcast/src/main.rs @@ -1,140 +1,12 @@ #![feature(return_position_impl_trait_in_trait)] -use smol::{lock::RwLock, prelude::*}; -use std::collections::{HashMap, HashSet}; +use common::run_server; -use common::{ - msg::{MessageHeader, Output}, - run_server, Handler, -}; -use serde::{Deserialize, Serialize}; +mod handler; +mod msg; -type BroadcastTarget = usize; +use handler::BroadcastHandler; fn main() { run_server::<BroadcastHandler>(); } - -#[derive(Debug, Deserialize, Serialize, Clone)] -#[serde(tag = "type")] -pub enum BroadcastBody { - #[serde(rename = "broadcast")] - Broadcast { - msg_id: Option<usize>, - message: BroadcastTarget, - }, - - #[serde(rename = "broadcast_ok")] - BroadcastOk { in_reply_to: usize }, - - #[serde(rename = "topology")] - Topology { - msg_id: Option<usize>, - topology: HashMap<String, Vec<String>>, - }, - - #[serde(rename = "topology_ok")] - TopologyOk { in_reply_to: usize }, - - #[serde(rename = "read")] - Read { msg_id: usize }, - - #[serde(rename = "read_ok")] - ReadOk { - in_reply_to: usize, - messages: HashSet<BroadcastTarget>, - }, -} - -pub struct BroadcastHandler { - node_id: String, - seen: RwLock<HashSet<BroadcastTarget>>, - broadcast_targets: RwLock<Vec<String>>, - output: Output<BroadcastBody>, -} - -impl Handler for BroadcastHandler { - type Body = BroadcastBody; - - fn init(node_id: String, mut node_ids: Vec<String>, output: Output<Self::Body>) -> Self { - node_ids.retain(|x| *x != node_id); - - BroadcastHandler { - node_id, - broadcast_targets: RwLock::new(node_ids), - seen: RwLock::new(HashSet::new()), - output, - } - } - - fn handle( - &self, - header: MessageHeader, - body: BroadcastBody, - ) -> impl Future<Output = ()> + Send { - async move { - match body { - BroadcastBody::Broadcast { msg_id, message } => { - self.seen.write().await.insert(message); - if let Some(msg_id) = msg_id { - self.output - .send( - &header.src, - &BroadcastBody::BroadcastOk { - in_reply_to: msg_id, - }, - ) - .await; - } - - for target in self.broadcast_targets.read().await.iter() { - self.output - .send( - target, - &BroadcastBody::Broadcast { - msg_id: None, - message, - }, - ) - .await; - } - } - BroadcastBody::Topology { - msg_id, - mut topology, - } => { - if let Some(broadcast_targets) = topology.remove(&self.node_id) { - *self.broadcast_targets.write().await = broadcast_targets; - } - - if let Some(msg_id) = msg_id { - self.output - .send( - &header.src, - &BroadcastBody::TopologyOk { - in_reply_to: msg_id, - }, - ) - .await; - } - } - - BroadcastBody::Read { msg_id } => { - self.output - .send( - &header.src, - &BroadcastBody::ReadOk { - in_reply_to: msg_id, - messages: self.seen.read().await.clone(), - }, - ) - .await - } - - BroadcastBody::BroadcastOk { .. } => {} - BroadcastBody::TopologyOk { .. } => {} - BroadcastBody::ReadOk { .. } => {} - } - } - } -} diff --git a/broadcast/src/msg.rs b/broadcast/src/msg.rs new file mode 100644 index 0000000..6433982 --- /dev/null +++ b/broadcast/src/msg.rs @@ -0,0 +1,36 @@ +use common::msg_id::MessageID; +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; + +pub type BroadcastTarget = usize; + +#[derive(Debug, Deserialize, Serialize, Clone)] +#[serde(tag = "type")] +pub enum BroadcastBody { + #[serde(rename = "broadcast")] + Broadcast { + msg_id: Option<MessageID>, + message: BroadcastTarget, + }, + + #[serde(rename = "broadcast_ok")] + BroadcastOk { in_reply_to: MessageID }, + + #[serde(rename = "topology")] + Topology { + msg_id: Option<MessageID>, + topology: HashMap<String, Vec<String>>, + }, + + #[serde(rename = "topology_ok")] + TopologyOk { in_reply_to: MessageID }, + + #[serde(rename = "read")] + Read { msg_id: MessageID }, + + #[serde(rename = "read_ok")] + ReadOk { + in_reply_to: MessageID, + messages: HashSet<BroadcastTarget>, + }, +} diff --git a/common/src/lib.rs b/common/src/lib.rs index d58d10b..69a872b 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -22,11 +22,11 @@ pub trait Handler: Send + Sync + 'static { type Body: Serialize + for<'a> Deserialize<'a> + Send + Clone; fn init(node_id: String, node_ids: Vec<String>, output: Output<Self::Body>) -> Self; - fn handle( - &self, + fn handle<'a>( + self: &'a Arc<Self>, header: MessageHeader, body: Self::Body, - ) -> impl Future<Output = ()> + Send + '_; + ) -> impl Future<Output = ()> + Send + 'a; } pub fn run_server<H: Handler>() { |