summaryrefslogtreecommitdiff
path: root/broadcast/src/batch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'broadcast/src/batch.rs')
-rw-r--r--broadcast/src/batch.rs81
1 files changed, 81 insertions, 0 deletions
diff --git a/broadcast/src/batch.rs b/broadcast/src/batch.rs
new file mode 100644
index 0000000..8c1c17d
--- /dev/null
+++ b/broadcast/src/batch.rs
@@ -0,0 +1,81 @@
+use std::{
+ collections::HashSet,
+ sync::Arc,
+ time::{Duration, Instant},
+};
+
+use common::msg_id::MessageID;
+use futures::Future;
+use smol::Timer;
+
+use crate::{
+ handler::BroadcastHandler,
+ msg::{BroadcastBody, BroadcastTarget},
+};
+
+const RETRY_TIMEOUT_SECS: u64 = 1;
+
+#[derive(Debug, Clone)]
+pub struct MessageBatch {
+ max_message_delay: Duration,
+ max_message_count: usize,
+ last_update: Instant,
+ messages: HashSet<BroadcastTarget>,
+}
+
+impl MessageBatch {
+ pub fn new(max_message_delay: Duration, max_message_count: usize) -> Self {
+ Self {
+ max_message_delay,
+ max_message_count,
+ last_update: Instant::now(),
+ messages: HashSet::new(),
+ }
+ }
+
+ pub fn add(&mut self, msg: BroadcastTarget) {
+ self.messages.insert(msg);
+ self.last_update = Instant::now();
+ }
+
+ pub fn clear(&mut self) {
+ self.messages.clear();
+ self.last_update = Instant::now();
+ }
+
+ pub fn should_broadcast(&self) -> bool {
+ !self.messages.is_empty()
+ && (self.last_update.elapsed() >= self.max_message_delay
+ || self.messages.len() >= self.max_message_count)
+ }
+
+ pub fn sleep_time(&self) -> Duration {
+ self.last_update
+ .elapsed()
+ .saturating_sub(self.max_message_delay)
+ }
+
+ pub fn broadcast(
+ &self,
+ this: Arc<BroadcastHandler>,
+ dst: String,
+ msg_id: MessageID,
+ ) -> impl Future<Output = ()> + 'static {
+ let messages = self.messages.clone();
+ async move {
+ loop {
+ this.output
+ .send(
+ &dst,
+ &BroadcastBody::BroadcastBatch {
+ msg_id: Some(msg_id),
+ messages: messages.clone().into_iter().collect(),
+ },
+ )
+ .await;
+
+ Timer::after(Duration::from_secs(RETRY_TIMEOUT_SECS)).await;
+ }
+ }
+ }
+}