From bb54fd5b450ea1b126f7c31845f12893bf061058 Mon Sep 17 00:00:00 2001 From: Aria Date: Thu, 19 Oct 2023 23:40:51 +0100 Subject: don't check max batch size --- broadcast/src/batch.rs | 8 ++------ broadcast/src/handler.rs | 41 +++++++++++++++++++++-------------------- 2 files changed, 23 insertions(+), 26 deletions(-) (limited to 'broadcast/src') diff --git a/broadcast/src/batch.rs b/broadcast/src/batch.rs index 46b9b57..d69771d 100644 --- a/broadcast/src/batch.rs +++ b/broadcast/src/batch.rs @@ -18,16 +18,14 @@ const RETRY_TIMEOUT_SECS: u64 = 1; #[derive(Debug, Clone)] pub struct MessageBatch { max_message_delay: Duration, - max_message_count: usize, first_added: Instant, messages: HashSet, } impl MessageBatch { - pub fn new(max_message_delay: Duration, max_message_count: usize) -> Self { + pub fn new(max_message_delay: Duration) -> Self { Self { max_message_delay, - max_message_count, first_added: Instant::now(), messages: HashSet::new(), } @@ -45,9 +43,7 @@ impl MessageBatch { } pub fn should_broadcast(&self) -> bool { - !self.messages.is_empty() - && (self.first_added.elapsed() >= self.max_message_delay - || self.messages.len() >= self.max_message_count) + !self.messages.is_empty() && self.first_added.elapsed() >= self.max_message_delay } pub fn sleep_time(&self) -> Duration { diff --git a/broadcast/src/handler.rs b/broadcast/src/handler.rs index 4ee2664..2b470fb 100644 --- a/broadcast/src/handler.rs +++ b/broadcast/src/handler.rs @@ -17,7 +17,8 @@ use common::{ msg_id::{gen_msg_id, MessageID}, Handler, }; -const MAX_STABLE_DELAY_MS: Duration = Duration::from_millis(700); + +const MAX_STABLE_DELAY_MS: Duration = Duration::from_millis(200); pub struct BroadcastHandler { node_id: String, @@ -32,7 +33,7 @@ impl Handler for BroadcastHandler { type Body = BroadcastBody; fn init(node_id: String, node_ids: Vec, output: Output) -> Arc { - let max_message_delay = MAX_STABLE_DELAY_MS / (node_ids.len() / node_ids.len()) as u32; + let max_message_delay = MAX_STABLE_DELAY_MS / (node_ids.len() as f32).sqrt() as u32; let this = Arc::new(Self { node_id, @@ -40,7 +41,7 @@ impl Handler for BroadcastHandler { seen: RwLock::new(HashSet::new()), output, attempted_broadcasts: Mutex::default(), - batch: Mutex::new(MessageBatch::new(max_message_delay, 1000)), + batch: Mutex::new(MessageBatch::new(max_message_delay)), }); smol::spawn(this.clone().poll_batch()).detach(); @@ -164,31 +165,18 @@ impl BroadcastHandler { async fn receive_broadcast_batch(self: &Arc, message: Vec) { let mut batch = self.batch.lock().await; let mut seen = self.seen.write().await; - let mut new = false; for message in message.into_iter() { - new |= seen.insert(message); - batch.add(message); - } - - if !new { - return; + if seen.insert(message) { + batch.add(message); + } } } async fn poll_batch(self: Arc) { loop { let mut batch = self.batch.lock().await; - if batch.should_broadcast() { - let mut tasks = self.attempted_broadcasts.lock().await; - for target in self.topology.all_targets(&self.node_id).await { - let msg_id = gen_msg_id(); - let this = self.clone(); - tasks.insert(msg_id, smol::spawn(batch.broadcast(this, target, msg_id))); - } - - batch.clear(); - } + self.do_batch_check(&mut batch).await; let time = batch.sleep_time(); drop(batch); @@ -196,4 +184,17 @@ impl BroadcastHandler { Timer::after(time).await; } } + + async fn do_batch_check(self: &Arc, batch: &mut MessageBatch) { + if batch.should_broadcast() { + let mut tasks = self.attempted_broadcasts.lock().await; + for target in self.topology.all_targets(&self.node_id).await { + let msg_id = gen_msg_id(); + let this = self.clone(); + tasks.insert(msg_id, smol::spawn(batch.broadcast(this, target, msg_id))); + } + + batch.clear(); + } + } } -- cgit v1.2.3