summaryrefslogtreecommitdiff
path: root/broadcast/src/handler.rs
diff options
context:
space:
mode:
Diffstat (limited to 'broadcast/src/handler.rs')
-rw-r--r--broadcast/src/handler.rs41
1 files changed, 21 insertions, 20 deletions
diff --git a/broadcast/src/handler.rs b/broadcast/src/handler.rs
index 4ee2664..2b470fb 100644
--- a/broadcast/src/handler.rs
+++ b/broadcast/src/handler.rs
@@ -17,7 +17,8 @@ use common::{
msg_id::{gen_msg_id, MessageID},
Handler,
};
-const MAX_STABLE_DELAY_MS: Duration = Duration::from_millis(700);
+
+const MAX_STABLE_DELAY_MS: Duration = Duration::from_millis(200);
pub struct BroadcastHandler {
node_id: String,
@@ -32,7 +33,7 @@ impl Handler for BroadcastHandler {
type Body = BroadcastBody;
fn init(node_id: String, node_ids: Vec<String>, output: Output<Self::Body>) -> Arc<Self> {
- let max_message_delay = MAX_STABLE_DELAY_MS / (node_ids.len() / node_ids.len()) as u32;
+ let max_message_delay = MAX_STABLE_DELAY_MS / (node_ids.len() as f32).sqrt() as u32;
let this = Arc::new(Self {
node_id,
@@ -40,7 +41,7 @@ impl Handler for BroadcastHandler {
seen: RwLock::new(HashSet::new()),
output,
attempted_broadcasts: Mutex::default(),
- batch: Mutex::new(MessageBatch::new(max_message_delay, 1000)),
+ batch: Mutex::new(MessageBatch::new(max_message_delay)),
});
smol::spawn(this.clone().poll_batch()).detach();
@@ -164,31 +165,18 @@ impl BroadcastHandler {
async fn receive_broadcast_batch(self: &Arc<Self>, message: Vec<BroadcastTarget>) {
let mut batch = self.batch.lock().await;
let mut seen = self.seen.write().await;
- let mut new = false;
for message in message.into_iter() {
- new |= seen.insert(message);
- batch.add(message);
- }
-
- if !new {
- return;
+ if seen.insert(message) {
+ batch.add(message);
+ }
}
}
async fn poll_batch(self: Arc<Self>) {
loop {
let mut batch = self.batch.lock().await;
- if batch.should_broadcast() {
- let mut tasks = self.attempted_broadcasts.lock().await;
- for target in self.topology.all_targets(&self.node_id).await {
- let msg_id = gen_msg_id();
- let this = self.clone();
- tasks.insert(msg_id, smol::spawn(batch.broadcast(this, target, msg_id)));
- }
-
- batch.clear();
- }
+ self.do_batch_check(&mut batch).await;
let time = batch.sleep_time();
drop(batch);
@@ -196,4 +184,17 @@ impl BroadcastHandler {
Timer::after(time).await;
}
}
+
+ async fn do_batch_check(self: &Arc<Self>, batch: &mut MessageBatch) {
+ if batch.should_broadcast() {
+ let mut tasks = self.attempted_broadcasts.lock().await;
+ for target in self.topology.all_targets(&self.node_id).await {
+ let msg_id = gen_msg_id();
+ let this = self.clone();
+ tasks.insert(msg_id, smol::spawn(batch.broadcast(this, target, msg_id)));
+ }
+
+ batch.clear();
+ }
+ }
}