aboutsummaryrefslogtreecommitdiff
path: root/incria/src/lazy_mapping.rs
blob: bef3a3dfad0673a1511ce5d85311edabccb77e54 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::{collections::HashMap, future::Future, hash::Hash, sync::Arc};

use tokio::sync::{Mutex, Notify, RwLock, RwLockReadGuard};

use crate::{
    deps::{self, NodeId},
    Mapper,
};

pub trait LazyMappingBackend<K, V> {
    async fn is_dirty(&self, key: &K) -> bool;
    fn compute(&self, key: K) -> impl Future<Output = V> + Send + '_;
}

#[derive(Debug, Default)]
pub struct LazyMapper<K, V, B> {
    /// The backend we use to compute values and check dirtiness
    backend: B,

    /// Map of all values we've calculated already
    calculated: RwLock<HashMap<K, (V, NodeId)>>,

    /// Values we're currently calculating
    /// The Notify will be triggered when computation is done.
    waiting: Mutex<HashMap<K, Arc<Notify>>>,
}

impl<K: Clone + Eq + Hash + Send + Sync, V: Send + Sync, B: LazyMappingBackend<K, V>>
    LazyMapper<K, V, B>
{
    /// Create a new instance of the lazy mapper.
    pub fn new(backend: B) -> Self {
        Self {
            backend,
            calculated: RwLock::default(),
            waiting: Mutex::default(),
        }
    }

    /// Returns a reference to the backend of this [`LazyMapper<K, V, B>`].
    pub fn backend(&self) -> &B {
        &self.backend
    }

    /// Get the node ID for the given key
    pub async fn dep_id(&self, key: &K) -> Option<NodeId> {
        self.calculated.read().await.get(key).map(|(_, node)| *node)
    }
}

impl<K, V, B> Mapper for LazyMapper<K, V, B>
where
    K: Clone + Eq + Hash + Send + Sync,
    V: Send + Sync,
    B: LazyMappingBackend<K, V>,
{
    type Key = K;
    type Value = V;
    type Wrapper<'a> = RwLockReadGuard<'a, V> where V: 'a;

    async fn get<'a>(&'a self, key: &Self::Key) -> Self::Wrapper<'a> {
        // Attempt to reuse or evict the existing value
        let mut reuse_dep_id = None;
        {
            let finished = self.calculated.read().await;
            if let Some((_, node)) = finished.get(key) {
                let dirty = deps::is_dirty(*node) || self.backend.is_dirty(key).await;
                if !dirty {
                    deps::add_dep(*node);
                    return RwLockReadGuard::map(finished, |hm| &hm.get(key).unwrap().0);
                } else {
                    reuse_dep_id = Some(*node);
                    drop(finished);
                    // Dirty, so we'll recompute below but we should remove it now
                    if self.calculated.write().await.remove(key).is_none() {
                        // Someone else already noticed it was dirty and removed it before us, so we need to deal with that
                        todo!("dirty value removed between us noticing and us doing something")
                    }
                }
            }
        }

        let barrier = self.waiting.lock().await.get(key).cloned();
        if let Some(barrier) = barrier {
            // Waiting for completion
            barrier.notified().await;

            let val = RwLockReadGuard::map(self.calculated.read().await, |hm| hm.get(key).unwrap());
            deps::add_dep(val.1);

            return RwLockReadGuard::map(val, |inf| &inf.0);
        } else {
            // Needs calculated
            let notify = Arc::new(Notify::new());
            self.waiting
                .lock()
                .await
                .insert(key.clone(), notify.clone());

            let dep = if let Some(x) = reuse_dep_id {
                deps::clear(x);
                x
            } else {
                deps::next_node_id()
            };

            let val = deps::with_node_id(dep, self.backend.compute(key.clone())).await;
            deps::add_dep(dep);

            self.calculated
                .write()
                .await
                .insert(key.clone(), (val, dep));
            self.waiting.lock().await.remove(key);

            notify.notify_waiters();

            return RwLockReadGuard::map(self.calculated.read().await, |hm| {
                &hm.get(key).unwrap().0
            });
        }
    }
}