diff options
Diffstat (limited to 'broadcast/src')
-rw-r--r-- | broadcast/src/batch.rs | 8 | ||||
-rw-r--r-- | broadcast/src/handler.rs | 41 |
2 files changed, 23 insertions, 26 deletions
diff --git a/broadcast/src/batch.rs b/broadcast/src/batch.rs index 46b9b57..d69771d 100644 --- a/broadcast/src/batch.rs +++ b/broadcast/src/batch.rs @@ -18,16 +18,14 @@ const RETRY_TIMEOUT_SECS: u64 = 1; #[derive(Debug, Clone)] pub struct MessageBatch { max_message_delay: Duration, - max_message_count: usize, first_added: Instant, messages: HashSet<BroadcastTarget>, } impl MessageBatch { - pub fn new(max_message_delay: Duration, max_message_count: usize) -> Self { + pub fn new(max_message_delay: Duration) -> Self { Self { max_message_delay, - max_message_count, first_added: Instant::now(), messages: HashSet::new(), } @@ -45,9 +43,7 @@ impl MessageBatch { } pub fn should_broadcast(&self) -> bool { - !self.messages.is_empty() - && (self.first_added.elapsed() >= self.max_message_delay - || self.messages.len() >= self.max_message_count) + !self.messages.is_empty() && self.first_added.elapsed() >= self.max_message_delay } pub fn sleep_time(&self) -> Duration { diff --git a/broadcast/src/handler.rs b/broadcast/src/handler.rs index 4ee2664..2b470fb 100644 --- a/broadcast/src/handler.rs +++ b/broadcast/src/handler.rs @@ -17,7 +17,8 @@ use common::{ msg_id::{gen_msg_id, MessageID}, Handler, }; -const MAX_STABLE_DELAY_MS: Duration = Duration::from_millis(700); + +const MAX_STABLE_DELAY_MS: Duration = Duration::from_millis(200); pub struct BroadcastHandler { node_id: String, @@ -32,7 +33,7 @@ impl Handler for BroadcastHandler { type Body = BroadcastBody; fn init(node_id: String, node_ids: Vec<String>, output: Output<Self::Body>) -> Arc<Self> { - let max_message_delay = MAX_STABLE_DELAY_MS / (node_ids.len() / node_ids.len()) as u32; + let max_message_delay = MAX_STABLE_DELAY_MS / (node_ids.len() as f32).sqrt() as u32; let this = Arc::new(Self { node_id, @@ -40,7 +41,7 @@ impl Handler for BroadcastHandler { seen: RwLock::new(HashSet::new()), output, attempted_broadcasts: Mutex::default(), - batch: Mutex::new(MessageBatch::new(max_message_delay, 1000)), + batch: Mutex::new(MessageBatch::new(max_message_delay)), }); smol::spawn(this.clone().poll_batch()).detach(); @@ -164,31 +165,18 @@ impl BroadcastHandler { async fn receive_broadcast_batch(self: &Arc<Self>, message: Vec<BroadcastTarget>) { let mut batch = self.batch.lock().await; let mut seen = self.seen.write().await; - let mut new = false; for message in message.into_iter() { - new |= seen.insert(message); - batch.add(message); - } - - if !new { - return; + if seen.insert(message) { + batch.add(message); + } } } async fn poll_batch(self: Arc<Self>) { loop { let mut batch = self.batch.lock().await; - if batch.should_broadcast() { - let mut tasks = self.attempted_broadcasts.lock().await; - for target in self.topology.all_targets(&self.node_id).await { - let msg_id = gen_msg_id(); - let this = self.clone(); - tasks.insert(msg_id, smol::spawn(batch.broadcast(this, target, msg_id))); - } - - batch.clear(); - } + self.do_batch_check(&mut batch).await; let time = batch.sleep_time(); drop(batch); @@ -196,4 +184,17 @@ impl BroadcastHandler { Timer::after(time).await; } } + + async fn do_batch_check(self: &Arc<Self>, batch: &mut MessageBatch) { + if batch.should_broadcast() { + let mut tasks = self.attempted_broadcasts.lock().await; + for target in self.topology.all_targets(&self.node_id).await { + let msg_id = gen_msg_id(); + let this = self.clone(); + tasks.insert(msg_id, smol::spawn(batch.broadcast(this, target, msg_id))); + } + + batch.clear(); + } + } } |