summaryrefslogtreecommitdiff
path: root/broadcast/src/handler.rs
diff options
context:
space:
mode:
Diffstat (limited to 'broadcast/src/handler.rs')
-rw-r--r--broadcast/src/handler.rs33
1 files changed, 7 insertions, 26 deletions
diff --git a/broadcast/src/handler.rs b/broadcast/src/handler.rs
index d4bb94a..b55cd9b 100644
--- a/broadcast/src/handler.rs
+++ b/broadcast/src/handler.rs
@@ -6,11 +6,12 @@ use smol::{
};
use std::{
collections::{HashMap, HashSet},
+ iter,
sync::Arc,
time::Duration,
};
-use crate::msg::*;
+use crate::{msg::*, topology::Topology};
use common::{
msg::{MessageHeader, Output},
@@ -23,7 +24,7 @@ const RETRY_TIMEOUT_SECS: u64 = 1;
pub struct BroadcastHandler {
node_id: String,
seen: RwLock<HashSet<BroadcastTarget>>,
- topology: RwLock<HashMap<String, HashSet<String>>>,
+ topology: Topology,
output: Output<BroadcastBody>,
attempted_broadcasts: Mutex<HashMap<MessageID, Task<()>>>,
}
@@ -32,15 +33,9 @@ impl Handler for BroadcastHandler {
type Body = BroadcastBody;
fn init(node_id: String, node_ids: Vec<String>, output: Output<Self::Body>) -> Self {
- // Initial topology assumes all nodes are neighbours
- let mut topology = HashMap::new();
- for id in node_ids.iter() {
- topology.insert(id.clone(), node_ids.iter().cloned().collect());
- }
-
BroadcastHandler {
node_id,
- topology: RwLock::new(topology),
+ topology: Topology::dense(node_ids.clone()),
seen: RwLock::new(HashSet::new()),
output,
attempted_broadcasts: Mutex::default(),
@@ -73,7 +68,7 @@ impl Handler for BroadcastHandler {
BroadcastBody::Topology { msg_id, topology } => {
// Start using the new topology
- *self.topology.write().await = topology;
+ self.topology.replace(topology).await;
// Send reply if needed
if let Some(msg_id) = msg_id {
@@ -137,24 +132,10 @@ impl BroadcastHandler {
return;
}
- // Ensure we don't keep holding the read lock
- let mut targets = self.topology.read().await.clone();
-
- // Only send to neighbours that the source has not sent to.
- // This isn't technically optimal, but its as close as we can get without
- // tracking the path of each broadcast message.
- let our_targets = targets.remove(&self.node_id).unwrap();
- let their_targets = targets
- .remove(&src.to_string())
- .unwrap_or_else(|| HashSet::new());
-
// Race all send futures
+ let path = iter::once(src);
let mut tasks = self.attempted_broadcasts.lock().await;
- for target in our_targets.into_iter() {
- if &target == &src || &target == &self.node_id || their_targets.contains(&target) {
- continue;
- }
-
+ for target in self.topology.targets(&self.node_id, path).await {
let msg_id = gen_msg_id();
let this = self.clone();
tasks.insert(