summaryrefslogtreecommitdiff
path: root/broadcast
diff options
context:
space:
mode:
Diffstat (limited to 'broadcast')
-rw-r--r--broadcast/src/batch.rs8
-rw-r--r--broadcast/src/handler.rs41
2 files changed, 23 insertions, 26 deletions
diff --git a/broadcast/src/batch.rs b/broadcast/src/batch.rs
index 46b9b57..d69771d 100644
--- a/broadcast/src/batch.rs
+++ b/broadcast/src/batch.rs
@@ -18,16 +18,14 @@ const RETRY_TIMEOUT_SECS: u64 = 1;
#[derive(Debug, Clone)]
pub struct MessageBatch {
max_message_delay: Duration,
- max_message_count: usize,
first_added: Instant,
messages: HashSet<BroadcastTarget>,
}
impl MessageBatch {
- pub fn new(max_message_delay: Duration, max_message_count: usize) -> Self {
+ pub fn new(max_message_delay: Duration) -> Self {
Self {
max_message_delay,
- max_message_count,
first_added: Instant::now(),
messages: HashSet::new(),
}
@@ -45,9 +43,7 @@ impl MessageBatch {
}
pub fn should_broadcast(&self) -> bool {
- !self.messages.is_empty()
- && (self.first_added.elapsed() >= self.max_message_delay
- || self.messages.len() >= self.max_message_count)
+ !self.messages.is_empty() && self.first_added.elapsed() >= self.max_message_delay
}
pub fn sleep_time(&self) -> Duration {
diff --git a/broadcast/src/handler.rs b/broadcast/src/handler.rs
index 4ee2664..2b470fb 100644
--- a/broadcast/src/handler.rs
+++ b/broadcast/src/handler.rs
@@ -17,7 +17,8 @@ use common::{
msg_id::{gen_msg_id, MessageID},
Handler,
};
-const MAX_STABLE_DELAY_MS: Duration = Duration::from_millis(700);
+
+const MAX_STABLE_DELAY_MS: Duration = Duration::from_millis(200);
pub struct BroadcastHandler {
node_id: String,
@@ -32,7 +33,7 @@ impl Handler for BroadcastHandler {
type Body = BroadcastBody;
fn init(node_id: String, node_ids: Vec<String>, output: Output<Self::Body>) -> Arc<Self> {
- let max_message_delay = MAX_STABLE_DELAY_MS / (node_ids.len() / node_ids.len()) as u32;
+ let max_message_delay = MAX_STABLE_DELAY_MS / (node_ids.len() as f32).sqrt() as u32;
let this = Arc::new(Self {
node_id,
@@ -40,7 +41,7 @@ impl Handler for BroadcastHandler {
seen: RwLock::new(HashSet::new()),
output,
attempted_broadcasts: Mutex::default(),
- batch: Mutex::new(MessageBatch::new(max_message_delay, 1000)),
+ batch: Mutex::new(MessageBatch::new(max_message_delay)),
});
smol::spawn(this.clone().poll_batch()).detach();
@@ -164,31 +165,18 @@ impl BroadcastHandler {
async fn receive_broadcast_batch(self: &Arc<Self>, message: Vec<BroadcastTarget>) {
let mut batch = self.batch.lock().await;
let mut seen = self.seen.write().await;
- let mut new = false;
for message in message.into_iter() {
- new |= seen.insert(message);
- batch.add(message);
- }
-
- if !new {
- return;
+ if seen.insert(message) {
+ batch.add(message);
+ }
}
}
async fn poll_batch(self: Arc<Self>) {
loop {
let mut batch = self.batch.lock().await;
- if batch.should_broadcast() {
- let mut tasks = self.attempted_broadcasts.lock().await;
- for target in self.topology.all_targets(&self.node_id).await {
- let msg_id = gen_msg_id();
- let this = self.clone();
- tasks.insert(msg_id, smol::spawn(batch.broadcast(this, target, msg_id)));
- }
-
- batch.clear();
- }
+ self.do_batch_check(&mut batch).await;
let time = batch.sleep_time();
drop(batch);
@@ -196,4 +184,17 @@ impl BroadcastHandler {
Timer::after(time).await;
}
}
+
+ async fn do_batch_check(self: &Arc<Self>, batch: &mut MessageBatch) {
+ if batch.should_broadcast() {
+ let mut tasks = self.attempted_broadcasts.lock().await;
+ for target in self.topology.all_targets(&self.node_id).await {
+ let msg_id = gen_msg_id();
+ let this = self.clone();
+ tasks.insert(msg_id, smol::spawn(batch.broadcast(this, target, msg_id)));
+ }
+
+ batch.clear();
+ }
+ }
}