From 07e2085190e30010ad595369a07842413bacd3d1 Mon Sep 17 00:00:00 2001 From: Aria Date: Sun, 15 Oct 2023 01:07:02 +0100 Subject: track path through network for broadcast --- broadcast/src/handler.rs | 25 +++++++++++++++++++------ broadcast/src/msg.rs | 1 + 2 files changed, 20 insertions(+), 6 deletions(-) (limited to 'broadcast') diff --git a/broadcast/src/handler.rs b/broadcast/src/handler.rs index b55cd9b..b38e2b7 100644 --- a/broadcast/src/handler.rs +++ b/broadcast/src/handler.rs @@ -6,7 +6,6 @@ use smol::{ }; use std::{ collections::{HashMap, HashSet}, - iter, sync::Arc, time::Duration, }; @@ -52,9 +51,10 @@ impl Handler for BroadcastHandler { BroadcastBody::Broadcast { msg_id: Some(msg_id), message, + path, } => { future::zip( - self.receive_broadcast(&header.src, message), + self.receive_broadcast(&header.src, path, message), self.send_broadcast_ok(&header.src, msg_id), ) .await; @@ -62,8 +62,9 @@ impl Handler for BroadcastHandler { BroadcastBody::Broadcast { msg_id: None, message, + path, } => { - self.receive_broadcast(&header.src, message).await; + self.receive_broadcast(&header.src, path, message).await; } BroadcastBody::Topology { msg_id, topology } => { @@ -126,18 +127,29 @@ impl BroadcastHandler { } /// Receive a given message, and broadcast it onwards if it is new - async fn receive_broadcast(self: &Arc, src: &str, message: BroadcastTarget) { + async fn receive_broadcast( + self: &Arc, + src: &str, + previous_path: Option>, + message: BroadcastTarget, + ) { let new = self.seen.write().await.insert(message); if !new { return; } // Race all send futures - let path = iter::once(src); + let mut previous_path = previous_path.unwrap_or_else(|| vec![]); + previous_path.push(src.to_string()); let mut tasks = self.attempted_broadcasts.lock().await; - for target in self.topology.targets(&self.node_id, path).await { + for target in self + .topology + .targets(&self.node_id, previous_path.iter().map(String::as_str)) + .await + { let msg_id = gen_msg_id(); let this = self.clone(); + let path = previous_path.clone(); tasks.insert( msg_id, smol::spawn(async move { @@ -148,6 +160,7 @@ impl BroadcastHandler { &BroadcastBody::Broadcast { msg_id: Some(msg_id), message, + path: Some(path.clone()), }, ) .await; diff --git a/broadcast/src/msg.rs b/broadcast/src/msg.rs index 81abb2d..c252394 100644 --- a/broadcast/src/msg.rs +++ b/broadcast/src/msg.rs @@ -11,6 +11,7 @@ pub enum BroadcastBody { Broadcast { msg_id: Option, message: BroadcastTarget, + path: Option>, }, #[serde(rename = "broadcast_ok")] -- cgit v1.2.3