aboutsummaryrefslogtreecommitdiff
path: root/incria/src/lazy_mapping.rs
blob: 27bd76e0bd429842a7e073694083c8369f2a3afb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use std::{
    collections::HashMap, fmt::Debug, future::Future, hash::Hash, mem::transmute, sync::Arc,
};

use blink_alloc::{Blink, SyncBlinkAlloc};
use tokio::sync::{Mutex, Notify, RwLock, RwLockReadGuard};

use crate::{
    deps::{self, NodeId},
    Mapper,
};

pub trait LazyMappingBackend<K, V> {
    async fn is_dirty(&self, key: &K) -> bool;
    fn compute(&self, key: K) -> impl Future<Output = V> + Send + '_;
}

#[derive(Default)]
pub struct LazyMapper<K, V: 'static, B> {
    /// The backend we use to compute values and check dirtiness
    backend: B,

    values: SyncBlinkAlloc,

    /// Map of all values we've calculated already
    calculated: RwLock<HashMap<K, (&'static V, NodeId)>>,

    /// Values we're currently calculating
    /// The Notify will be triggered when computation is done.
    waiting: Mutex<HashMap<K, Arc<Notify>>>,
}

impl<K: Clone + Eq + Hash + Send + Sync, V: Send + Sync, B: LazyMappingBackend<K, V>>
    LazyMapper<K, V, B>
{
    /// Create a new instance of the lazy mapper.
    pub fn new(backend: B) -> Self {
        Self {
            backend,
            calculated: RwLock::default(),
            waiting: Mutex::default(),
            values: SyncBlinkAlloc::new(),
        }
    }

    /// Returns a reference to the backend of this [`LazyMapper<K, V, B>`].
    pub fn backend(&self) -> &B {
        &self.backend
    }

    /// Get the node ID for the given key
    pub async fn dep_id(&self, key: &K) -> Option<NodeId> {
        self.calculated.read().await.get(key).map(|(_, node)| *node)
    }
}

impl<K, V, B> Mapper for LazyMapper<K, V, B>
where
    K: Clone + Eq + Hash + Send + Sync + Debug,
    V: Send + Sync + 'static,
    B: LazyMappingBackend<K, V>,
{
    type Key = K;
    type Value = V;
    type Wrapper<'a> = &'a V where V: 'a;

    async fn get<'a>(&'a self, key: &Self::Key) -> Self::Wrapper<'a> {
        // Attempt to reuse or evict the existing value
        let mut reuse_dep_id = None;
        {
            let finished = self.calculated.read().await;
            if let Some((val, node)) = finished.get(key) {
                let dirty = deps::is_dirty(*node) || self.backend.is_dirty(key).await;
                if !dirty {
                    deps::add_dep(*node);
                    return val;
                } else {
                    reuse_dep_id = Some(*node);
                    drop(finished);
                    // Dirty, so we'll recompute below but we should remove it now
                    if self.calculated.write().await.remove(key).is_none() {
                        // Someone else already noticed it was dirty and removed it before us, so we need to deal with that
                        todo!("dirty value removed between us noticing and us doing something")
                    }
                }
            }
        }

        let barrier = self.waiting.lock().await.get(key).cloned();
        if let Some(barrier) = barrier {
            // Waiting for completion
            barrier.notified().await;

            let (value, node_id) =
                *RwLockReadGuard::map(self.calculated.read().await, |hm| hm.get(key).unwrap());

            deps::add_dep(node_id);

            value
        } else {
            // Needs calculated
            let notify = Arc::new(Notify::new());
            self.waiting
                .lock()
                .await
                .insert(key.clone(), notify.clone());

            let dep = if let Some(x) = reuse_dep_id {
                deps::clear(x);
                x
            } else {
                deps::next_node_id()
            };

            let val = deps::with_node_id(dep, self.backend.compute(key.clone())).await;
            let val_ref: &'static V = unsafe { transmute(Blink::new_in(&self.values).put(val)) };
            deps::add_dep(dep);

            self.calculated
                .write()
                .await
                .insert(key.clone(), (val_ref, dep));
            self.waiting.lock().await.remove(key);

            notify.notify_waiters();

            val_ref
        }
    }
}

impl<K: Debug, V: Debug, B: Debug> Debug for LazyMapper<K, V, B> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("LazyMapper")
            .field("backend", &self.backend)
            .finish_non_exhaustive()
    }
}