summaryrefslogtreecommitdiff
path: root/broadcast
diff options
context:
space:
mode:
authorAria <me@aria.rip>2023-10-15 01:07:02 +0100
committerAria <me@aria.rip>2023-10-15 01:07:12 +0100
commit07e2085190e30010ad595369a07842413bacd3d1 (patch)
tree8a7e1a42f894a75f35e315729d93c439f015958d /broadcast
parent42123efe8fd92d6d81b6d5d10ae86866ea9b6a3c (diff)
track path through network for broadcast
Diffstat (limited to 'broadcast')
-rw-r--r--broadcast/src/handler.rs25
-rw-r--r--broadcast/src/msg.rs1
2 files changed, 20 insertions, 6 deletions
diff --git a/broadcast/src/handler.rs b/broadcast/src/handler.rs
index b55cd9b..b38e2b7 100644
--- a/broadcast/src/handler.rs
+++ b/broadcast/src/handler.rs
@@ -6,7 +6,6 @@ use smol::{
};
use std::{
collections::{HashMap, HashSet},
- iter,
sync::Arc,
time::Duration,
};
@@ -52,9 +51,10 @@ impl Handler for BroadcastHandler {
BroadcastBody::Broadcast {
msg_id: Some(msg_id),
message,
+ path,
} => {
future::zip(
- self.receive_broadcast(&header.src, message),
+ self.receive_broadcast(&header.src, path, message),
self.send_broadcast_ok(&header.src, msg_id),
)
.await;
@@ -62,8 +62,9 @@ impl Handler for BroadcastHandler {
BroadcastBody::Broadcast {
msg_id: None,
message,
+ path,
} => {
- self.receive_broadcast(&header.src, message).await;
+ self.receive_broadcast(&header.src, path, message).await;
}
BroadcastBody::Topology { msg_id, topology } => {
@@ -126,18 +127,29 @@ impl BroadcastHandler {
}
/// Receive a given message, and broadcast it onwards if it is new
- async fn receive_broadcast(self: &Arc<Self>, src: &str, message: BroadcastTarget) {
+ async fn receive_broadcast(
+ self: &Arc<Self>,
+ src: &str,
+ previous_path: Option<Vec<String>>,
+ message: BroadcastTarget,
+ ) {
let new = self.seen.write().await.insert(message);
if !new {
return;
}
// Race all send futures
- let path = iter::once(src);
+ let mut previous_path = previous_path.unwrap_or_else(|| vec![]);
+ previous_path.push(src.to_string());
let mut tasks = self.attempted_broadcasts.lock().await;
- for target in self.topology.targets(&self.node_id, path).await {
+ for target in self
+ .topology
+ .targets(&self.node_id, previous_path.iter().map(String::as_str))
+ .await
+ {
let msg_id = gen_msg_id();
let this = self.clone();
+ let path = previous_path.clone();
tasks.insert(
msg_id,
smol::spawn(async move {
@@ -148,6 +160,7 @@ impl BroadcastHandler {
&BroadcastBody::Broadcast {
msg_id: Some(msg_id),
message,
+ path: Some(path.clone()),
},
)
.await;
diff --git a/broadcast/src/msg.rs b/broadcast/src/msg.rs
index 81abb2d..c252394 100644
--- a/broadcast/src/msg.rs
+++ b/broadcast/src/msg.rs
@@ -11,6 +11,7 @@ pub enum BroadcastBody {
Broadcast {
msg_id: Option<MessageID>,
message: BroadcastTarget,
+ path: Option<Vec<String>>,
},
#[serde(rename = "broadcast_ok")]