diff options
author | Aria <me@aria.rip> | 2023-10-15 00:57:41 +0100 |
---|---|---|
committer | Aria <me@aria.rip> | 2023-10-15 00:57:41 +0100 |
commit | 42123efe8fd92d6d81b6d5d10ae86866ea9b6a3c (patch) | |
tree | 2dfcb1a27c337d1c9b89f54aff35e94795970b74 /broadcast/src/handler.rs | |
parent | 7447f3fb801ba954c7b8cbf3f47700ffcc562d20 (diff) |
some refactors
Diffstat (limited to 'broadcast/src/handler.rs')
-rw-r--r-- | broadcast/src/handler.rs | 33 |
1 files changed, 7 insertions, 26 deletions
diff --git a/broadcast/src/handler.rs b/broadcast/src/handler.rs index d4bb94a..b55cd9b 100644 --- a/broadcast/src/handler.rs +++ b/broadcast/src/handler.rs @@ -6,11 +6,12 @@ use smol::{ }; use std::{ collections::{HashMap, HashSet}, + iter, sync::Arc, time::Duration, }; -use crate::msg::*; +use crate::{msg::*, topology::Topology}; use common::{ msg::{MessageHeader, Output}, @@ -23,7 +24,7 @@ const RETRY_TIMEOUT_SECS: u64 = 1; pub struct BroadcastHandler { node_id: String, seen: RwLock<HashSet<BroadcastTarget>>, - topology: RwLock<HashMap<String, HashSet<String>>>, + topology: Topology, output: Output<BroadcastBody>, attempted_broadcasts: Mutex<HashMap<MessageID, Task<()>>>, } @@ -32,15 +33,9 @@ impl Handler for BroadcastHandler { type Body = BroadcastBody; fn init(node_id: String, node_ids: Vec<String>, output: Output<Self::Body>) -> Self { - // Initial topology assumes all nodes are neighbours - let mut topology = HashMap::new(); - for id in node_ids.iter() { - topology.insert(id.clone(), node_ids.iter().cloned().collect()); - } - BroadcastHandler { node_id, - topology: RwLock::new(topology), + topology: Topology::dense(node_ids.clone()), seen: RwLock::new(HashSet::new()), output, attempted_broadcasts: Mutex::default(), @@ -73,7 +68,7 @@ impl Handler for BroadcastHandler { BroadcastBody::Topology { msg_id, topology } => { // Start using the new topology - *self.topology.write().await = topology; + self.topology.replace(topology).await; // Send reply if needed if let Some(msg_id) = msg_id { @@ -137,24 +132,10 @@ impl BroadcastHandler { return; } - // Ensure we don't keep holding the read lock - let mut targets = self.topology.read().await.clone(); - - // Only send to neighbours that the source has not sent to. - // This isn't technically optimal, but its as close as we can get without - // tracking the path of each broadcast message. - let our_targets = targets.remove(&self.node_id).unwrap(); - let their_targets = targets - .remove(&src.to_string()) - .unwrap_or_else(|| HashSet::new()); - // Race all send futures + let path = iter::once(src); let mut tasks = self.attempted_broadcasts.lock().await; - for target in our_targets.into_iter() { - if &target == &src || &target == &self.node_id || their_targets.contains(&target) { - continue; - } - + for target in self.topology.targets(&self.node_id, path).await { let msg_id = gen_msg_id(); let this = self.clone(); tasks.insert( |