summaryrefslogtreecommitdiff
path: root/broadcast/src/batch.rs
diff options
context:
space:
mode:
authorAria <me@aria.rip>2023-10-19 23:32:39 +0100
committerAria <me@aria.rip>2023-10-19 23:32:39 +0100
commit1ec89fafc0afc27758924ae9f0102c9871918514 (patch)
tree2ffbf8046eab93495935bb4e23129215fe1267c3 /broadcast/src/batch.rs
parent01b13b3ab8850f926f81347a7149e92649083dae (diff)
delay batch based on first message timing, not last
Diffstat (limited to 'broadcast/src/batch.rs')
-rw-r--r--broadcast/src/batch.rs13
1 files changed, 7 insertions, 6 deletions
diff --git a/broadcast/src/batch.rs b/broadcast/src/batch.rs
index 8c1c17d..46b9b57 100644
--- a/broadcast/src/batch.rs
+++ b/broadcast/src/batch.rs
@@ -19,7 +19,7 @@ const RETRY_TIMEOUT_SECS: u64 = 1;
pub struct MessageBatch {
max_message_delay: Duration,
max_message_count: usize,
- last_update: Instant,
+ first_added: Instant,
messages: HashSet<BroadcastTarget>,
}
@@ -28,29 +28,30 @@ impl MessageBatch {
Self {
max_message_delay,
max_message_count,
- last_update: Instant::now(),
+ first_added: Instant::now(),
messages: HashSet::new(),
}
}
pub fn add(&mut self, msg: BroadcastTarget) {
+ if self.messages.is_empty() {
+ self.first_added = Instant::now();
+ }
self.messages.insert(msg);
- self.last_update = Instant::now();
}
pub fn clear(&mut self) {
self.messages.clear();
- self.last_update = Instant::now();
}
pub fn should_broadcast(&self) -> bool {
!self.messages.is_empty()
- && (self.last_update.elapsed() >= self.max_message_delay
+ && (self.first_added.elapsed() >= self.max_message_delay
|| self.messages.len() >= self.max_message_count)
}
pub fn sleep_time(&self) -> Duration {
- self.last_update
+ self.first_added
.elapsed()
.saturating_sub(self.max_message_delay)
}