summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--broadcast/src/handler.rs51
-rw-r--r--broadcast/src/msg.rs2
2 files changed, 32 insertions, 21 deletions
diff --git a/broadcast/src/handler.rs b/broadcast/src/handler.rs
index 09c66b9..5ebed27 100644
--- a/broadcast/src/handler.rs
+++ b/broadcast/src/handler.rs
@@ -1,4 +1,5 @@
use smol::{
+ future,
lock::{Mutex, RwLock},
prelude::*,
Task, Timer,
@@ -17,12 +18,12 @@ use common::{
Handler,
};
-const RETRY_TIMEOUT: u64 = 2;
+const RETRY_TIMEOUT_SECS: u64 = 1;
pub struct BroadcastHandler {
node_id: String,
seen: RwLock<HashSet<BroadcastTarget>>,
- broadcast_targets: RwLock<Vec<String>>,
+ topology: RwLock<HashMap<String, HashSet<String>>>,
output: Output<BroadcastBody>,
attempted_broadcasts: Mutex<HashMap<MessageID, Task<()>>>,
}
@@ -30,12 +31,16 @@ pub struct BroadcastHandler {
impl Handler for BroadcastHandler {
type Body = BroadcastBody;
- fn init(node_id: String, mut node_ids: Vec<String>, output: Output<Self::Body>) -> Self {
- node_ids.retain(|x| *x != node_id);
+ fn init(node_id: String, node_ids: Vec<String>, output: Output<Self::Body>) -> Self {
+ // Initial topology assumes all nodes are neighbours
+ let mut topology = HashMap::new();
+ for id in node_ids.iter() {
+ topology.insert(id.clone(), node_ids.iter().cloned().collect());
+ }
BroadcastHandler {
node_id,
- broadcast_targets: RwLock::new(node_ids),
+ topology: RwLock::new(topology),
seen: RwLock::new(HashSet::new()),
output,
attempted_broadcasts: Mutex::default(),
@@ -53,8 +58,11 @@ impl Handler for BroadcastHandler {
msg_id: Some(msg_id),
message,
} => {
- self.receive_broadcast(&header.src, message).await;
- self.send_broadcast_ok(&header.src, msg_id).await;
+ future::zip(
+ self.receive_broadcast(&header.src, message),
+ self.send_broadcast_ok(&header.src, msg_id),
+ )
+ .await;
}
BroadcastBody::Broadcast {
msg_id: None,
@@ -63,14 +71,9 @@ impl Handler for BroadcastHandler {
self.receive_broadcast(&header.src, message).await;
}
- BroadcastBody::Topology {
- msg_id,
- mut topology,
- } => {
+ BroadcastBody::Topology { msg_id, topology } => {
// Start using the new topology
- if let Some(broadcast_targets) = topology.remove(&self.node_id) {
- *self.broadcast_targets.write().await = broadcast_targets;
- }
+ *self.topology.write().await = topology;
// Send reply if needed
if let Some(msg_id) = msg_id {
@@ -133,13 +136,21 @@ impl BroadcastHandler {
}
// Ensure we don't keep holding the read lock
- let targets = self.broadcast_targets.read().await.clone();
+ let mut targets = self.topology.read().await.clone();
+
+ // Only send to neighbours that the source has not sent to.
+ // This isn't technically optimal, but its as close as we can get without
+ // tracking the path of each broadcast message.
+ let our_targets = targets.remove(&self.node_id).unwrap();
+ let their_targets = targets
+ .remove(&src.to_string())
+ .unwrap_or_else(|| HashSet::new());
// Race all send futures
let mut tasks = self.attempted_broadcasts.lock().await;
- for target in targets.into_iter() {
- if &target == &src {
- return;
+ for target in our_targets.into_iter() {
+ if &target == &src || &target == &self.node_id || their_targets.contains(&target) {
+ continue;
}
let msg_id = gen_msg_id();
@@ -152,13 +163,13 @@ impl BroadcastHandler {
.send(
&target,
&BroadcastBody::Broadcast {
- msg_id: None,
+ msg_id: Some(msg_id),
message,
},
)
.await;
- Timer::after(Duration::from_secs(RETRY_TIMEOUT)).await;
+ Timer::after(Duration::from_secs(RETRY_TIMEOUT_SECS)).await;
}
}),
);
diff --git a/broadcast/src/msg.rs b/broadcast/src/msg.rs
index 6433982..81abb2d 100644
--- a/broadcast/src/msg.rs
+++ b/broadcast/src/msg.rs
@@ -19,7 +19,7 @@ pub enum BroadcastBody {
#[serde(rename = "topology")]
Topology {
msg_id: Option<MessageID>,
- topology: HashMap<String, Vec<String>>,
+ topology: HashMap<String, HashSet<String>>,
},
#[serde(rename = "topology_ok")]