diff options
author | Aria <me@aria.rip> | 2023-10-19 21:00:26 +0100 |
---|---|---|
committer | Aria <me@aria.rip> | 2023-10-19 21:00:26 +0100 |
commit | dba41282cac86a740c007498709e996b9fa3e59b (patch) | |
tree | 5d9309962f55ca00be6fee3b7ac24b7314e86f2a /broadcast/src/batch.rs | |
parent | 07e2085190e30010ad595369a07842413bacd3d1 (diff) |
wip: broadcast message batching
Diffstat (limited to 'broadcast/src/batch.rs')
-rw-r--r-- | broadcast/src/batch.rs | 81 |
1 files changed, 81 insertions, 0 deletions
diff --git a/broadcast/src/batch.rs b/broadcast/src/batch.rs new file mode 100644 index 0000000..8c1c17d --- /dev/null +++ b/broadcast/src/batch.rs @@ -0,0 +1,81 @@ +use std::{ + collections::HashSet, + sync::Arc, + time::{Duration, Instant}, +}; + +use common::msg_id::MessageID; +use futures::Future; +use smol::Timer; + +use crate::{ + handler::BroadcastHandler, + msg::{BroadcastBody, BroadcastTarget}, +}; + +const RETRY_TIMEOUT_SECS: u64 = 1; + +#[derive(Debug, Clone)] +pub struct MessageBatch { + max_message_delay: Duration, + max_message_count: usize, + last_update: Instant, + messages: HashSet<BroadcastTarget>, +} + +impl MessageBatch { + pub fn new(max_message_delay: Duration, max_message_count: usize) -> Self { + Self { + max_message_delay, + max_message_count, + last_update: Instant::now(), + messages: HashSet::new(), + } + } + + pub fn add(&mut self, msg: BroadcastTarget) { + self.messages.insert(msg); + self.last_update = Instant::now(); + } + + pub fn clear(&mut self) { + self.messages.clear(); + self.last_update = Instant::now(); + } + + pub fn should_broadcast(&self) -> bool { + !self.messages.is_empty() + && (self.last_update.elapsed() >= self.max_message_delay + || self.messages.len() >= self.max_message_count) + } + + pub fn sleep_time(&self) -> Duration { + self.last_update + .elapsed() + .saturating_sub(self.max_message_delay) + } + + pub fn broadcast( + &self, + this: Arc<BroadcastHandler>, + dst: String, + msg_id: MessageID, + ) -> impl Future<Output = ()> + 'static { + let messages = self.messages.clone(); + async move { + loop { + this.output + .send( + &dst, + &BroadcastBody::BroadcastBatch { + msg_id: Some(msg_id), + messages: messages.clone().into_iter().collect(), + }, + ) + .await; + + Timer::after(Duration::from_secs(RETRY_TIMEOUT_SECS)).await; + } + } + } +} |