diff options
author | Aria <me@aria.rip> | 2023-10-15 01:07:02 +0100 |
---|---|---|
committer | Aria <me@aria.rip> | 2023-10-15 01:07:12 +0100 |
commit | 07e2085190e30010ad595369a07842413bacd3d1 (patch) | |
tree | 8a7e1a42f894a75f35e315729d93c439f015958d /broadcast | |
parent | 42123efe8fd92d6d81b6d5d10ae86866ea9b6a3c (diff) |
track path through network for broadcast
Diffstat (limited to 'broadcast')
-rw-r--r-- | broadcast/src/handler.rs | 25 | ||||
-rw-r--r-- | broadcast/src/msg.rs | 1 |
2 files changed, 20 insertions, 6 deletions
diff --git a/broadcast/src/handler.rs b/broadcast/src/handler.rs index b55cd9b..b38e2b7 100644 --- a/broadcast/src/handler.rs +++ b/broadcast/src/handler.rs @@ -6,7 +6,6 @@ use smol::{ }; use std::{ collections::{HashMap, HashSet}, - iter, sync::Arc, time::Duration, }; @@ -52,9 +51,10 @@ impl Handler for BroadcastHandler { BroadcastBody::Broadcast { msg_id: Some(msg_id), message, + path, } => { future::zip( - self.receive_broadcast(&header.src, message), + self.receive_broadcast(&header.src, path, message), self.send_broadcast_ok(&header.src, msg_id), ) .await; @@ -62,8 +62,9 @@ impl Handler for BroadcastHandler { BroadcastBody::Broadcast { msg_id: None, message, + path, } => { - self.receive_broadcast(&header.src, message).await; + self.receive_broadcast(&header.src, path, message).await; } BroadcastBody::Topology { msg_id, topology } => { @@ -126,18 +127,29 @@ impl BroadcastHandler { } /// Receive a given message, and broadcast it onwards if it is new - async fn receive_broadcast(self: &Arc<Self>, src: &str, message: BroadcastTarget) { + async fn receive_broadcast( + self: &Arc<Self>, + src: &str, + previous_path: Option<Vec<String>>, + message: BroadcastTarget, + ) { let new = self.seen.write().await.insert(message); if !new { return; } // Race all send futures - let path = iter::once(src); + let mut previous_path = previous_path.unwrap_or_else(|| vec![]); + previous_path.push(src.to_string()); let mut tasks = self.attempted_broadcasts.lock().await; - for target in self.topology.targets(&self.node_id, path).await { + for target in self + .topology + .targets(&self.node_id, previous_path.iter().map(String::as_str)) + .await + { let msg_id = gen_msg_id(); let this = self.clone(); + let path = previous_path.clone(); tasks.insert( msg_id, smol::spawn(async move { @@ -148,6 +160,7 @@ impl BroadcastHandler { &BroadcastBody::Broadcast { msg_id: Some(msg_id), message, + path: Some(path.clone()), }, ) .await; diff --git a/broadcast/src/msg.rs b/broadcast/src/msg.rs index 81abb2d..c252394 100644 --- a/broadcast/src/msg.rs +++ b/broadcast/src/msg.rs @@ -11,6 +11,7 @@ pub enum BroadcastBody { Broadcast { msg_id: Option<MessageID>, message: BroadcastTarget, + path: Option<Vec<String>>, }, #[serde(rename = "broadcast_ok")] |