From 42123efe8fd92d6d81b6d5d10ae86866ea9b6a3c Mon Sep 17 00:00:00 2001 From: Aria Date: Sun, 15 Oct 2023 00:57:41 +0100 Subject: some refactors --- broadcast/src/handler.rs | 33 +++++++-------------------------- 1 file changed, 7 insertions(+), 26 deletions(-) (limited to 'broadcast/src/handler.rs') diff --git a/broadcast/src/handler.rs b/broadcast/src/handler.rs index d4bb94a..b55cd9b 100644 --- a/broadcast/src/handler.rs +++ b/broadcast/src/handler.rs @@ -6,11 +6,12 @@ use smol::{ }; use std::{ collections::{HashMap, HashSet}, + iter, sync::Arc, time::Duration, }; -use crate::msg::*; +use crate::{msg::*, topology::Topology}; use common::{ msg::{MessageHeader, Output}, @@ -23,7 +24,7 @@ const RETRY_TIMEOUT_SECS: u64 = 1; pub struct BroadcastHandler { node_id: String, seen: RwLock>, - topology: RwLock>>, + topology: Topology, output: Output, attempted_broadcasts: Mutex>>, } @@ -32,15 +33,9 @@ impl Handler for BroadcastHandler { type Body = BroadcastBody; fn init(node_id: String, node_ids: Vec, output: Output) -> Self { - // Initial topology assumes all nodes are neighbours - let mut topology = HashMap::new(); - for id in node_ids.iter() { - topology.insert(id.clone(), node_ids.iter().cloned().collect()); - } - BroadcastHandler { node_id, - topology: RwLock::new(topology), + topology: Topology::dense(node_ids.clone()), seen: RwLock::new(HashSet::new()), output, attempted_broadcasts: Mutex::default(), @@ -73,7 +68,7 @@ impl Handler for BroadcastHandler { BroadcastBody::Topology { msg_id, topology } => { // Start using the new topology - *self.topology.write().await = topology; + self.topology.replace(topology).await; // Send reply if needed if let Some(msg_id) = msg_id { @@ -137,24 +132,10 @@ impl BroadcastHandler { return; } - // Ensure we don't keep holding the read lock - let mut targets = self.topology.read().await.clone(); - - // Only send to neighbours that the source has not sent to. - // This isn't technically optimal, but its as close as we can get without - // tracking the path of each broadcast message. - let our_targets = targets.remove(&self.node_id).unwrap(); - let their_targets = targets - .remove(&src.to_string()) - .unwrap_or_else(|| HashSet::new()); - // Race all send futures + let path = iter::once(src); let mut tasks = self.attempted_broadcasts.lock().await; - for target in our_targets.into_iter() { - if &target == &src || &target == &self.node_id || their_targets.contains(&target) { - continue; - } - + for target in self.topology.targets(&self.node_id, path).await { let msg_id = gen_msg_id(); let this = self.clone(); tasks.insert( -- cgit v1.2.3