summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock60
-rw-r--r--Justfile4
-rw-r--r--broadcast/Cargo.toml3
-rw-r--r--broadcast/src/handler.rs167
-rw-r--r--broadcast/src/main.rs136
-rw-r--r--broadcast/src/msg.rs36
-rw-r--r--common/src/lib.rs6
7 files changed, 276 insertions, 136 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 7c56de2..191c81a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -165,6 +165,7 @@ name = "broadcast"
version = "0.1.0"
dependencies = [
"common",
+ "futures",
"serde",
"serde_json",
"smol",
@@ -256,6 +257,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
[[package]]
+name = "futures"
+version = "0.3.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-channel"
+version = "0.3.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
+dependencies = [
+ "futures-core",
+ "futures-sink",
+]
+
+[[package]]
name = "futures-core"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -283,6 +308,35 @@ dependencies = [
]
[[package]]
+name = "futures-sink"
+version = "0.3.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
+
+[[package]]
+name = "futures-task"
+version = "0.3.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
+
+[[package]]
+name = "futures-util"
+version = "0.3.28"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "memchr",
+ "pin-project-lite",
+ "pin-utils",
+ "slab",
+]
+
+[[package]]
name = "getrandom"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -368,6 +422,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
[[package]]
+name = "pin-utils"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
+
+[[package]]
name = "piper"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Justfile b/Justfile
index b324d00..3f7fd2d 100644
--- a/Justfile
+++ b/Justfile
@@ -14,4 +14,8 @@ test-broadcast-multi:
cargo build --bin broadcast
maelstrom test -w broadcast --bin target/debug/broadcast --node-count 5 --time-limit 20 --rate 10
+test-broadcast-fault:
+ cargo build --bin broadcast
+ maelstrom test -w broadcast --bin target/debug/broadcast --node-count 5 --time-limit 20 --rate 10 --nemesis partition
+
test: test-echo test-unique-ids test-broadcast-single test-broadcast-multi \ No newline at end of file
diff --git a/broadcast/Cargo.toml b/broadcast/Cargo.toml
index f3e4cc7..06e4426 100644
--- a/broadcast/Cargo.toml
+++ b/broadcast/Cargo.toml
@@ -9,4 +9,5 @@ edition = "2021"
smol = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
-common = { path = "../common/" } \ No newline at end of file
+common = { path = "../common/" }
+futures = { version = "0.3.28", default_features = false, features = ["std"] } \ No newline at end of file
diff --git a/broadcast/src/handler.rs b/broadcast/src/handler.rs
new file mode 100644
index 0000000..09c66b9
--- /dev/null
+++ b/broadcast/src/handler.rs
@@ -0,0 +1,167 @@
+use smol::{
+ lock::{Mutex, RwLock},
+ prelude::*,
+ Task, Timer,
+};
+use std::{
+ collections::{HashMap, HashSet},
+ sync::Arc,
+ time::Duration,
+};
+
+use crate::msg::*;
+
+use common::{
+ msg::{MessageHeader, Output},
+ msg_id::{gen_msg_id, MessageID},
+ Handler,
+};
+
+const RETRY_TIMEOUT: u64 = 2;
+
+pub struct BroadcastHandler {
+ node_id: String,
+ seen: RwLock<HashSet<BroadcastTarget>>,
+ broadcast_targets: RwLock<Vec<String>>,
+ output: Output<BroadcastBody>,
+ attempted_broadcasts: Mutex<HashMap<MessageID, Task<()>>>,
+}
+
+impl Handler for BroadcastHandler {
+ type Body = BroadcastBody;
+
+ fn init(node_id: String, mut node_ids: Vec<String>, output: Output<Self::Body>) -> Self {
+ node_ids.retain(|x| *x != node_id);
+
+ BroadcastHandler {
+ node_id,
+ broadcast_targets: RwLock::new(node_ids),
+ seen: RwLock::new(HashSet::new()),
+ output,
+ attempted_broadcasts: Mutex::default(),
+ }
+ }
+
+ fn handle<'a>(
+ self: &'a Arc<Self>,
+ header: MessageHeader,
+ body: BroadcastBody,
+ ) -> impl Future<Output = ()> + Send + 'a {
+ async move {
+ match body {
+ BroadcastBody::Broadcast {
+ msg_id: Some(msg_id),
+ message,
+ } => {
+ self.receive_broadcast(&header.src, message).await;
+ self.send_broadcast_ok(&header.src, msg_id).await;
+ }
+ BroadcastBody::Broadcast {
+ msg_id: None,
+ message,
+ } => {
+ self.receive_broadcast(&header.src, message).await;
+ }
+
+ BroadcastBody::Topology {
+ msg_id,
+ mut topology,
+ } => {
+ // Start using the new topology
+ if let Some(broadcast_targets) = topology.remove(&self.node_id) {
+ *self.broadcast_targets.write().await = broadcast_targets;
+ }
+
+ // Send reply if needed
+ if let Some(msg_id) = msg_id {
+ self.output
+ .send(
+ &header.src,
+ &BroadcastBody::TopologyOk {
+ in_reply_to: msg_id,
+ },
+ )
+ .await;
+ }
+ }
+
+ BroadcastBody::Read { msg_id } => {
+ // Send all received messages back
+ self.output
+ .send(
+ &header.src,
+ &BroadcastBody::ReadOk {
+ in_reply_to: msg_id,
+ messages: self.seen.read().await.clone(),
+ },
+ )
+ .await
+ }
+
+ // Ignore OK messages - we never actually request them
+ BroadcastBody::BroadcastOk { in_reply_to } => {
+ if let Some(task) = self.attempted_broadcasts.lock().await.remove(&in_reply_to)
+ {
+ task.cancel().await;
+ }
+ }
+ BroadcastBody::TopologyOk { .. } => {}
+ BroadcastBody::ReadOk { .. } => {}
+ }
+ }
+ }
+}
+
+impl BroadcastHandler {
+ /// Reply with a broadcast OK message
+ async fn send_broadcast_ok(&self, src: &str, msg_id: MessageID) {
+ self.output
+ .send(
+ &src,
+ &BroadcastBody::BroadcastOk {
+ in_reply_to: msg_id,
+ },
+ )
+ .await;
+ }
+
+ /// Receive a given message, and broadcast it onwards if it is new
+ async fn receive_broadcast(self: &Arc<Self>, src: &str, message: BroadcastTarget) {
+ let new = self.seen.write().await.insert(message);
+ if !new {
+ return;
+ }
+
+ // Ensure we don't keep holding the read lock
+ let targets = self.broadcast_targets.read().await.clone();
+
+ // Race all send futures
+ let mut tasks = self.attempted_broadcasts.lock().await;
+ for target in targets.into_iter() {
+ if &target == &src {
+ return;
+ }
+
+ let msg_id = gen_msg_id();
+ let this = self.clone();
+ tasks.insert(
+ msg_id,
+ smol::spawn(async move {
+ loop {
+ this.output
+ .send(
+ &target,
+ &BroadcastBody::Broadcast {
+ msg_id: None,
+ message,
+ },
+ )
+ .await;
+
+ Timer::after(Duration::from_secs(RETRY_TIMEOUT)).await;
+ }
+ }),
+ );
+ }
+ }
+}
diff --git a/broadcast/src/main.rs b/broadcast/src/main.rs
index 09c0268..bdc8413 100644
--- a/broadcast/src/main.rs
+++ b/broadcast/src/main.rs
@@ -1,140 +1,12 @@
#![feature(return_position_impl_trait_in_trait)]
-use smol::{lock::RwLock, prelude::*};
-use std::collections::{HashMap, HashSet};
+use common::run_server;
-use common::{
- msg::{MessageHeader, Output},
- run_server, Handler,
-};
-use serde::{Deserialize, Serialize};
+mod handler;
+mod msg;
-type BroadcastTarget = usize;
+use handler::BroadcastHandler;
fn main() {
run_server::<BroadcastHandler>();
}
-
-#[derive(Debug, Deserialize, Serialize, Clone)]
-#[serde(tag = "type")]
-pub enum BroadcastBody {
- #[serde(rename = "broadcast")]
- Broadcast {
- msg_id: Option<usize>,
- message: BroadcastTarget,
- },
-
- #[serde(rename = "broadcast_ok")]
- BroadcastOk { in_reply_to: usize },
-
- #[serde(rename = "topology")]
- Topology {
- msg_id: Option<usize>,
- topology: HashMap<String, Vec<String>>,
- },
-
- #[serde(rename = "topology_ok")]
- TopologyOk { in_reply_to: usize },
-
- #[serde(rename = "read")]
- Read { msg_id: usize },
-
- #[serde(rename = "read_ok")]
- ReadOk {
- in_reply_to: usize,
- messages: HashSet<BroadcastTarget>,
- },
-}
-
-pub struct BroadcastHandler {
- node_id: String,
- seen: RwLock<HashSet<BroadcastTarget>>,
- broadcast_targets: RwLock<Vec<String>>,
- output: Output<BroadcastBody>,
-}
-
-impl Handler for BroadcastHandler {
- type Body = BroadcastBody;
-
- fn init(node_id: String, mut node_ids: Vec<String>, output: Output<Self::Body>) -> Self {
- node_ids.retain(|x| *x != node_id);
-
- BroadcastHandler {
- node_id,
- broadcast_targets: RwLock::new(node_ids),
- seen: RwLock::new(HashSet::new()),
- output,
- }
- }
-
- fn handle(
- &self,
- header: MessageHeader,
- body: BroadcastBody,
- ) -> impl Future<Output = ()> + Send {
- async move {
- match body {
- BroadcastBody::Broadcast { msg_id, message } => {
- self.seen.write().await.insert(message);
- if let Some(msg_id) = msg_id {
- self.output
- .send(
- &header.src,
- &BroadcastBody::BroadcastOk {
- in_reply_to: msg_id,
- },
- )
- .await;
- }
-
- for target in self.broadcast_targets.read().await.iter() {
- self.output
- .send(
- target,
- &BroadcastBody::Broadcast {
- msg_id: None,
- message,
- },
- )
- .await;
- }
- }
- BroadcastBody::Topology {
- msg_id,
- mut topology,
- } => {
- if let Some(broadcast_targets) = topology.remove(&self.node_id) {
- *self.broadcast_targets.write().await = broadcast_targets;
- }
-
- if let Some(msg_id) = msg_id {
- self.output
- .send(
- &header.src,
- &BroadcastBody::TopologyOk {
- in_reply_to: msg_id,
- },
- )
- .await;
- }
- }
-
- BroadcastBody::Read { msg_id } => {
- self.output
- .send(
- &header.src,
- &BroadcastBody::ReadOk {
- in_reply_to: msg_id,
- messages: self.seen.read().await.clone(),
- },
- )
- .await
- }
-
- BroadcastBody::BroadcastOk { .. } => {}
- BroadcastBody::TopologyOk { .. } => {}
- BroadcastBody::ReadOk { .. } => {}
- }
- }
- }
-}
diff --git a/broadcast/src/msg.rs b/broadcast/src/msg.rs
new file mode 100644
index 0000000..6433982
--- /dev/null
+++ b/broadcast/src/msg.rs
@@ -0,0 +1,36 @@
+use common::msg_id::MessageID;
+use serde::{Deserialize, Serialize};
+use std::collections::{HashMap, HashSet};
+
+pub type BroadcastTarget = usize;
+
+#[derive(Debug, Deserialize, Serialize, Clone)]
+#[serde(tag = "type")]
+pub enum BroadcastBody {
+ #[serde(rename = "broadcast")]
+ Broadcast {
+ msg_id: Option<MessageID>,
+ message: BroadcastTarget,
+ },
+
+ #[serde(rename = "broadcast_ok")]
+ BroadcastOk { in_reply_to: MessageID },
+
+ #[serde(rename = "topology")]
+ Topology {
+ msg_id: Option<MessageID>,
+ topology: HashMap<String, Vec<String>>,
+ },
+
+ #[serde(rename = "topology_ok")]
+ TopologyOk { in_reply_to: MessageID },
+
+ #[serde(rename = "read")]
+ Read { msg_id: MessageID },
+
+ #[serde(rename = "read_ok")]
+ ReadOk {
+ in_reply_to: MessageID,
+ messages: HashSet<BroadcastTarget>,
+ },
+}
diff --git a/common/src/lib.rs b/common/src/lib.rs
index d58d10b..69a872b 100644
--- a/common/src/lib.rs
+++ b/common/src/lib.rs
@@ -22,11 +22,11 @@ pub trait Handler: Send + Sync + 'static {
type Body: Serialize + for<'a> Deserialize<'a> + Send + Clone;
fn init(node_id: String, node_ids: Vec<String>, output: Output<Self::Body>) -> Self;
- fn handle(
- &self,
+ fn handle<'a>(
+ self: &'a Arc<Self>,
header: MessageHeader,
body: Self::Body,
- ) -> impl Future<Output = ()> + Send + '_;
+ ) -> impl Future<Output = ()> + Send + 'a;
}
pub fn run_server<H: Handler>() {