use std::{ collections::HashSet, sync::Arc, time::{Duration, Instant}, }; use common::msg_id::MessageID; use futures::Future; use smol::Timer; use crate::{ handler::BroadcastHandler, msg::{BroadcastBody, BroadcastTarget}, }; const RETRY_TIMEOUT_SECS: u64 = 1; #[derive(Debug, Clone)] pub struct MessageBatch { max_message_delay: Duration, max_message_count: usize, first_added: Instant, messages: HashSet, } impl MessageBatch { pub fn new(max_message_delay: Duration, max_message_count: usize) -> Self { Self { max_message_delay, max_message_count, first_added: Instant::now(), messages: HashSet::new(), } } pub fn add(&mut self, msg: BroadcastTarget) { if self.messages.is_empty() { self.first_added = Instant::now(); } self.messages.insert(msg); } pub fn clear(&mut self) { self.messages.clear(); } pub fn should_broadcast(&self) -> bool { !self.messages.is_empty() && (self.first_added.elapsed() >= self.max_message_delay || self.messages.len() >= self.max_message_count) } pub fn sleep_time(&self) -> Duration { self.first_added .elapsed() .saturating_sub(self.max_message_delay) } pub fn broadcast( &self, this: Arc, dst: String, msg_id: MessageID, ) -> impl Future + 'static { let messages = self.messages.clone(); async move { loop { this.output .send( &dst, &BroadcastBody::BroadcastBatch { msg_id: Some(msg_id), messages: messages.clone().into_iter().collect(), }, ) .await; Timer::after(Duration::from_secs(RETRY_TIMEOUT_SECS)).await; } } } }