1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
use std::{
collections::HashSet,
sync::Arc,
time::{Duration, Instant},
};
use common::msg_id::MessageID;
use futures::Future;
use smol::Timer;
use crate::{
handler::BroadcastHandler,
msg::{BroadcastBody, BroadcastTarget},
};
const RETRY_TIMEOUT_SECS: u64 = 1;
#[derive(Debug, Clone)]
pub struct MessageBatch {
max_message_delay: Duration,
max_message_count: usize,
last_update: Instant,
messages: HashSet<BroadcastTarget>,
}
impl MessageBatch {
pub fn new(max_message_delay: Duration, max_message_count: usize) -> Self {
Self {
max_message_delay,
max_message_count,
last_update: Instant::now(),
messages: HashSet::new(),
}
}
pub fn add(&mut self, msg: BroadcastTarget) {
self.messages.insert(msg);
self.last_update = Instant::now();
}
pub fn clear(&mut self) {
self.messages.clear();
self.last_update = Instant::now();
}
pub fn should_broadcast(&self) -> bool {
!self.messages.is_empty()
&& (self.last_update.elapsed() >= self.max_message_delay
|| self.messages.len() >= self.max_message_count)
}
pub fn sleep_time(&self) -> Duration {
self.last_update
.elapsed()
.saturating_sub(self.max_message_delay)
}
pub fn broadcast(
&self,
this: Arc<BroadcastHandler>,
dst: String,
msg_id: MessageID,
) -> impl Future<Output = ()> + 'static {
let messages = self.messages.clone();
async move {
loop {
this.output
.send(
&dst,
&BroadcastBody::BroadcastBatch {
msg_id: Some(msg_id),
messages: messages.clone().into_iter().collect(),
},
)
.await;
Timer::after(Duration::from_secs(RETRY_TIMEOUT_SECS)).await;
}
}
}
}
|