summaryrefslogtreecommitdiff
path: root/broadcast/src/topology.rs
blob: bb104667b3da3dbba8875135cb7d74a25f76bb25 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
use std::collections::{HashMap, HashSet};

use smol::lock::RwLock;

pub type NodeId = String;
pub type TopologyDesc = HashMap<NodeId, HashSet<NodeId>>;

pub struct Topology(RwLock<TopologyDesc>);

impl Topology {
    /// Create a new topology in which all nodes are connected to each other.
    pub fn dense(node_ids: Vec<String>) -> Self {
        let mut top = TopologyDesc::new();
        for node_id in node_ids.iter() {
            top.insert(node_id.clone(), node_ids.iter().cloned().collect());
        }
        Self::filter_desc(&mut top);

        Topology(RwLock::new(top))
    }

    /// Replace the current topology with a new one.
    pub async fn replace(&self, mut new: TopologyDesc) {
        Self::filter_desc(&mut new);
        *self.0.write().await = new;
    }

    fn filter_desc(desc: &mut TopologyDesc) {
        for (node_id, neighbours) in desc.iter_mut() {
            neighbours.remove(node_id);
        }
    }

    /// Get the next targets from the given topology, for a message
    /// which has travelled across the given path and is now at node_id.
    pub async fn targets(&self, node_id: &String, last_node_id: &String) -> HashSet<String> {
        // Ensure we don't keep holding the read lock
        let topology = self.0.read().await;

        // Get all nodes the last node sent it to
        let visited = topology.get(last_node_id);

        let neighbours = topology.get(node_id).unwrap();
        match visited {
            Some(visited) => neighbours
                .difference(&visited)
                .cloned()
                .filter(|n| n != node_id)
                .collect(),
            None => neighbours.clone(),
        }
    }
}