diff options
Diffstat (limited to 'broadcast')
-rw-r--r-- | broadcast/Cargo.toml | 12 | ||||
-rw-r--r-- | broadcast/src/main.rs | 140 |
2 files changed, 152 insertions, 0 deletions
diff --git a/broadcast/Cargo.toml b/broadcast/Cargo.toml new file mode 100644 index 0000000..f3e4cc7 --- /dev/null +++ b/broadcast/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "broadcast" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +smol = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +common = { path = "../common/" }
\ No newline at end of file diff --git a/broadcast/src/main.rs b/broadcast/src/main.rs new file mode 100644 index 0000000..09c0268 --- /dev/null +++ b/broadcast/src/main.rs @@ -0,0 +1,140 @@ +#![feature(return_position_impl_trait_in_trait)] + +use smol::{lock::RwLock, prelude::*}; +use std::collections::{HashMap, HashSet}; + +use common::{ + msg::{MessageHeader, Output}, + run_server, Handler, +}; +use serde::{Deserialize, Serialize}; + +type BroadcastTarget = usize; + +fn main() { + run_server::<BroadcastHandler>(); +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +#[serde(tag = "type")] +pub enum BroadcastBody { + #[serde(rename = "broadcast")] + Broadcast { + msg_id: Option<usize>, + message: BroadcastTarget, + }, + + #[serde(rename = "broadcast_ok")] + BroadcastOk { in_reply_to: usize }, + + #[serde(rename = "topology")] + Topology { + msg_id: Option<usize>, + topology: HashMap<String, Vec<String>>, + }, + + #[serde(rename = "topology_ok")] + TopologyOk { in_reply_to: usize }, + + #[serde(rename = "read")] + Read { msg_id: usize }, + + #[serde(rename = "read_ok")] + ReadOk { + in_reply_to: usize, + messages: HashSet<BroadcastTarget>, + }, +} + +pub struct BroadcastHandler { + node_id: String, + seen: RwLock<HashSet<BroadcastTarget>>, + broadcast_targets: RwLock<Vec<String>>, + output: Output<BroadcastBody>, +} + +impl Handler for BroadcastHandler { + type Body = BroadcastBody; + + fn init(node_id: String, mut node_ids: Vec<String>, output: Output<Self::Body>) -> Self { + node_ids.retain(|x| *x != node_id); + + BroadcastHandler { + node_id, + broadcast_targets: RwLock::new(node_ids), + seen: RwLock::new(HashSet::new()), + output, + } + } + + fn handle( + &self, + header: MessageHeader, + body: BroadcastBody, + ) -> impl Future<Output = ()> + Send { + async move { + match body { + BroadcastBody::Broadcast { msg_id, message } => { + self.seen.write().await.insert(message); + if let Some(msg_id) = msg_id { + self.output + .send( + &header.src, + &BroadcastBody::BroadcastOk { + in_reply_to: msg_id, + }, + ) + .await; + } + + for target in self.broadcast_targets.read().await.iter() { + self.output + .send( + target, + &BroadcastBody::Broadcast { + msg_id: None, + message, + }, + ) + .await; + } + } + BroadcastBody::Topology { + msg_id, + mut topology, + } => { + if let Some(broadcast_targets) = topology.remove(&self.node_id) { + *self.broadcast_targets.write().await = broadcast_targets; + } + + if let Some(msg_id) = msg_id { + self.output + .send( + &header.src, + &BroadcastBody::TopologyOk { + in_reply_to: msg_id, + }, + ) + .await; + } + } + + BroadcastBody::Read { msg_id } => { + self.output + .send( + &header.src, + &BroadcastBody::ReadOk { + in_reply_to: msg_id, + messages: self.seen.read().await.clone(), + }, + ) + .await + } + + BroadcastBody::BroadcastOk { .. } => {} + BroadcastBody::TopologyOk { .. } => {} + BroadcastBody::ReadOk { .. } => {} + } + } + } +} |