diff options
author | Aria <me@aria.rip> | 2023-10-14 22:40:55 +0100 |
---|---|---|
committer | Aria <me@aria.rip> | 2023-10-14 22:40:55 +0100 |
commit | 2c6c06a9c31049567f1c1e0eff522c5a71d6c680 (patch) | |
tree | 9fa62b94122a539b2bdda97993d0bb906c9171b8 /broadcast/src | |
parent | b2d679f05d04052bfc25167eaaf09c60c03251cb (diff) |
fault tolerant broadcast
Diffstat (limited to 'broadcast/src')
-rw-r--r-- | broadcast/src/handler.rs | 51 | ||||
-rw-r--r-- | broadcast/src/msg.rs | 2 |
2 files changed, 32 insertions, 21 deletions
diff --git a/broadcast/src/handler.rs b/broadcast/src/handler.rs index 09c66b9..5ebed27 100644 --- a/broadcast/src/handler.rs +++ b/broadcast/src/handler.rs @@ -1,4 +1,5 @@ use smol::{ + future, lock::{Mutex, RwLock}, prelude::*, Task, Timer, @@ -17,12 +18,12 @@ use common::{ Handler, }; -const RETRY_TIMEOUT: u64 = 2; +const RETRY_TIMEOUT_SECS: u64 = 1; pub struct BroadcastHandler { node_id: String, seen: RwLock<HashSet<BroadcastTarget>>, - broadcast_targets: RwLock<Vec<String>>, + topology: RwLock<HashMap<String, HashSet<String>>>, output: Output<BroadcastBody>, attempted_broadcasts: Mutex<HashMap<MessageID, Task<()>>>, } @@ -30,12 +31,16 @@ pub struct BroadcastHandler { impl Handler for BroadcastHandler { type Body = BroadcastBody; - fn init(node_id: String, mut node_ids: Vec<String>, output: Output<Self::Body>) -> Self { - node_ids.retain(|x| *x != node_id); + fn init(node_id: String, node_ids: Vec<String>, output: Output<Self::Body>) -> Self { + // Initial topology assumes all nodes are neighbours + let mut topology = HashMap::new(); + for id in node_ids.iter() { + topology.insert(id.clone(), node_ids.iter().cloned().collect()); + } BroadcastHandler { node_id, - broadcast_targets: RwLock::new(node_ids), + topology: RwLock::new(topology), seen: RwLock::new(HashSet::new()), output, attempted_broadcasts: Mutex::default(), @@ -53,8 +58,11 @@ impl Handler for BroadcastHandler { msg_id: Some(msg_id), message, } => { - self.receive_broadcast(&header.src, message).await; - self.send_broadcast_ok(&header.src, msg_id).await; + future::zip( + self.receive_broadcast(&header.src, message), + self.send_broadcast_ok(&header.src, msg_id), + ) + .await; } BroadcastBody::Broadcast { msg_id: None, @@ -63,14 +71,9 @@ impl Handler for BroadcastHandler { self.receive_broadcast(&header.src, message).await; } - BroadcastBody::Topology { - msg_id, - mut topology, - } => { + BroadcastBody::Topology { msg_id, topology } => { // Start using the new topology - if let Some(broadcast_targets) = topology.remove(&self.node_id) { - *self.broadcast_targets.write().await = broadcast_targets; - } + *self.topology.write().await = topology; // Send reply if needed if let Some(msg_id) = msg_id { @@ -133,13 +136,21 @@ impl BroadcastHandler { } // Ensure we don't keep holding the read lock - let targets = self.broadcast_targets.read().await.clone(); + let mut targets = self.topology.read().await.clone(); + + // Only send to neighbours that the source has not sent to. + // This isn't technically optimal, but its as close as we can get without + // tracking the path of each broadcast message. + let our_targets = targets.remove(&self.node_id).unwrap(); + let their_targets = targets + .remove(&src.to_string()) + .unwrap_or_else(|| HashSet::new()); // Race all send futures let mut tasks = self.attempted_broadcasts.lock().await; - for target in targets.into_iter() { - if &target == &src { - return; + for target in our_targets.into_iter() { + if &target == &src || &target == &self.node_id || their_targets.contains(&target) { + continue; } let msg_id = gen_msg_id(); @@ -152,13 +163,13 @@ impl BroadcastHandler { .send( &target, &BroadcastBody::Broadcast { - msg_id: None, + msg_id: Some(msg_id), message, }, ) .await; - Timer::after(Duration::from_secs(RETRY_TIMEOUT)).await; + Timer::after(Duration::from_secs(RETRY_TIMEOUT_SECS)).await; } }), ); diff --git a/broadcast/src/msg.rs b/broadcast/src/msg.rs index 6433982..81abb2d 100644 --- a/broadcast/src/msg.rs +++ b/broadcast/src/msg.rs @@ -19,7 +19,7 @@ pub enum BroadcastBody { #[serde(rename = "topology")] Topology { msg_id: Option<MessageID>, - topology: HashMap<String, Vec<String>>, + topology: HashMap<String, HashSet<String>>, }, #[serde(rename = "topology_ok")] |