diff options
Diffstat (limited to 'broadcast/src/handler.rs')
-rw-r--r-- | broadcast/src/handler.rs | 41 |
1 files changed, 21 insertions, 20 deletions
diff --git a/broadcast/src/handler.rs b/broadcast/src/handler.rs index 4ee2664..2b470fb 100644 --- a/broadcast/src/handler.rs +++ b/broadcast/src/handler.rs @@ -17,7 +17,8 @@ use common::{ msg_id::{gen_msg_id, MessageID}, Handler, }; -const MAX_STABLE_DELAY_MS: Duration = Duration::from_millis(700); + +const MAX_STABLE_DELAY_MS: Duration = Duration::from_millis(200); pub struct BroadcastHandler { node_id: String, @@ -32,7 +33,7 @@ impl Handler for BroadcastHandler { type Body = BroadcastBody; fn init(node_id: String, node_ids: Vec<String>, output: Output<Self::Body>) -> Arc<Self> { - let max_message_delay = MAX_STABLE_DELAY_MS / (node_ids.len() / node_ids.len()) as u32; + let max_message_delay = MAX_STABLE_DELAY_MS / (node_ids.len() as f32).sqrt() as u32; let this = Arc::new(Self { node_id, @@ -40,7 +41,7 @@ impl Handler for BroadcastHandler { seen: RwLock::new(HashSet::new()), output, attempted_broadcasts: Mutex::default(), - batch: Mutex::new(MessageBatch::new(max_message_delay, 1000)), + batch: Mutex::new(MessageBatch::new(max_message_delay)), }); smol::spawn(this.clone().poll_batch()).detach(); @@ -164,31 +165,18 @@ impl BroadcastHandler { async fn receive_broadcast_batch(self: &Arc<Self>, message: Vec<BroadcastTarget>) { let mut batch = self.batch.lock().await; let mut seen = self.seen.write().await; - let mut new = false; for message in message.into_iter() { - new |= seen.insert(message); - batch.add(message); - } - - if !new { - return; + if seen.insert(message) { + batch.add(message); + } } } async fn poll_batch(self: Arc<Self>) { loop { let mut batch = self.batch.lock().await; - if batch.should_broadcast() { - let mut tasks = self.attempted_broadcasts.lock().await; - for target in self.topology.all_targets(&self.node_id).await { - let msg_id = gen_msg_id(); - let this = self.clone(); - tasks.insert(msg_id, smol::spawn(batch.broadcast(this, target, msg_id))); - } - - batch.clear(); - } + self.do_batch_check(&mut batch).await; let time = batch.sleep_time(); drop(batch); @@ -196,4 +184,17 @@ impl BroadcastHandler { Timer::after(time).await; } } + + async fn do_batch_check(self: &Arc<Self>, batch: &mut MessageBatch) { + if batch.should_broadcast() { + let mut tasks = self.attempted_broadcasts.lock().await; + for target in self.topology.all_targets(&self.node_id).await { + let msg_id = gen_msg_id(); + let this = self.clone(); + tasks.insert(msg_id, smol::spawn(batch.broadcast(this, target, msg_id))); + } + + batch.clear(); + } + } } |