summaryrefslogtreecommitdiff
path: root/broadcast/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'broadcast/src/main.rs')
-rw-r--r--broadcast/src/main.rs136
1 files changed, 4 insertions, 132 deletions
diff --git a/broadcast/src/main.rs b/broadcast/src/main.rs
index 09c0268..bdc8413 100644
--- a/broadcast/src/main.rs
+++ b/broadcast/src/main.rs
@@ -1,140 +1,12 @@
#![feature(return_position_impl_trait_in_trait)]
-use smol::{lock::RwLock, prelude::*};
-use std::collections::{HashMap, HashSet};
+use common::run_server;
-use common::{
- msg::{MessageHeader, Output},
- run_server, Handler,
-};
-use serde::{Deserialize, Serialize};
+mod handler;
+mod msg;
-type BroadcastTarget = usize;
+use handler::BroadcastHandler;
fn main() {
run_server::<BroadcastHandler>();
}
-
-#[derive(Debug, Deserialize, Serialize, Clone)]
-#[serde(tag = "type")]
-pub enum BroadcastBody {
- #[serde(rename = "broadcast")]
- Broadcast {
- msg_id: Option<usize>,
- message: BroadcastTarget,
- },
-
- #[serde(rename = "broadcast_ok")]
- BroadcastOk { in_reply_to: usize },
-
- #[serde(rename = "topology")]
- Topology {
- msg_id: Option<usize>,
- topology: HashMap<String, Vec<String>>,
- },
-
- #[serde(rename = "topology_ok")]
- TopologyOk { in_reply_to: usize },
-
- #[serde(rename = "read")]
- Read { msg_id: usize },
-
- #[serde(rename = "read_ok")]
- ReadOk {
- in_reply_to: usize,
- messages: HashSet<BroadcastTarget>,
- },
-}
-
-pub struct BroadcastHandler {
- node_id: String,
- seen: RwLock<HashSet<BroadcastTarget>>,
- broadcast_targets: RwLock<Vec<String>>,
- output: Output<BroadcastBody>,
-}
-
-impl Handler for BroadcastHandler {
- type Body = BroadcastBody;
-
- fn init(node_id: String, mut node_ids: Vec<String>, output: Output<Self::Body>) -> Self {
- node_ids.retain(|x| *x != node_id);
-
- BroadcastHandler {
- node_id,
- broadcast_targets: RwLock::new(node_ids),
- seen: RwLock::new(HashSet::new()),
- output,
- }
- }
-
- fn handle(
- &self,
- header: MessageHeader,
- body: BroadcastBody,
- ) -> impl Future<Output = ()> + Send {
- async move {
- match body {
- BroadcastBody::Broadcast { msg_id, message } => {
- self.seen.write().await.insert(message);
- if let Some(msg_id) = msg_id {
- self.output
- .send(
- &header.src,
- &BroadcastBody::BroadcastOk {
- in_reply_to: msg_id,
- },
- )
- .await;
- }
-
- for target in self.broadcast_targets.read().await.iter() {
- self.output
- .send(
- target,
- &BroadcastBody::Broadcast {
- msg_id: None,
- message,
- },
- )
- .await;
- }
- }
- BroadcastBody::Topology {
- msg_id,
- mut topology,
- } => {
- if let Some(broadcast_targets) = topology.remove(&self.node_id) {
- *self.broadcast_targets.write().await = broadcast_targets;
- }
-
- if let Some(msg_id) = msg_id {
- self.output
- .send(
- &header.src,
- &BroadcastBody::TopologyOk {
- in_reply_to: msg_id,
- },
- )
- .await;
- }
- }
-
- BroadcastBody::Read { msg_id } => {
- self.output
- .send(
- &header.src,
- &BroadcastBody::ReadOk {
- in_reply_to: msg_id,
- messages: self.seen.read().await.clone(),
- },
- )
- .await
- }
-
- BroadcastBody::BroadcastOk { .. } => {}
- BroadcastBody::TopologyOk { .. } => {}
- BroadcastBody::ReadOk { .. } => {}
- }
- }
- }
-}