diff options
Diffstat (limited to 'broadcast/src/batch.rs')
-rw-r--r-- | broadcast/src/batch.rs | 81 |
1 files changed, 81 insertions, 0 deletions
diff --git a/broadcast/src/batch.rs b/broadcast/src/batch.rs new file mode 100644 index 0000000..8c1c17d --- /dev/null +++ b/broadcast/src/batch.rs @@ -0,0 +1,81 @@ +use std::{ + collections::HashSet, + sync::Arc, + time::{Duration, Instant}, +}; + +use common::msg_id::MessageID; +use futures::Future; +use smol::Timer; + +use crate::{ + handler::BroadcastHandler, + msg::{BroadcastBody, BroadcastTarget}, +}; + +const RETRY_TIMEOUT_SECS: u64 = 1; + +#[derive(Debug, Clone)] +pub struct MessageBatch { + max_message_delay: Duration, + max_message_count: usize, + last_update: Instant, + messages: HashSet<BroadcastTarget>, +} + +impl MessageBatch { + pub fn new(max_message_delay: Duration, max_message_count: usize) -> Self { + Self { + max_message_delay, + max_message_count, + last_update: Instant::now(), + messages: HashSet::new(), + } + } + + pub fn add(&mut self, msg: BroadcastTarget) { + self.messages.insert(msg); + self.last_update = Instant::now(); + } + + pub fn clear(&mut self) { + self.messages.clear(); + self.last_update = Instant::now(); + } + + pub fn should_broadcast(&self) -> bool { + !self.messages.is_empty() + && (self.last_update.elapsed() >= self.max_message_delay + || self.messages.len() >= self.max_message_count) + } + + pub fn sleep_time(&self) -> Duration { + self.last_update + .elapsed() + .saturating_sub(self.max_message_delay) + } + + pub fn broadcast( + &self, + this: Arc<BroadcastHandler>, + dst: String, + msg_id: MessageID, + ) -> impl Future<Output = ()> + 'static { + let messages = self.messages.clone(); + async move { + loop { + this.output + .send( + &dst, + &BroadcastBody::BroadcastBatch { + msg_id: Some(msg_id), + messages: messages.clone().into_iter().collect(), + }, + ) + .await; + + Timer::after(Duration::from_secs(RETRY_TIMEOUT_SECS)).await; + } + } + } +} |