1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
use std::{collections::HashMap, future::Future, hash::Hash, sync::Arc};
use tokio::sync::{Mutex, Notify, RwLock, RwLockReadGuard};
use crate::{
deps::{self, NodeId},
Mapper,
};
pub trait LazyMappingBackend<K, V> {
async fn is_dirty(&self, key: &K) -> bool;
fn compute(&self, key: K) -> impl Future<Output = V> + Send + '_;
}
#[derive(Debug, Default)]
pub struct LazyMapper<K, V, B> {
/// The backend we use to compute values and check dirtiness
backend: B,
/// Map of all values we've calculated already
calculated: RwLock<HashMap<K, (V, NodeId)>>,
/// Values we're currently calculating
/// The Notify will be triggered when computation is done.
waiting: Mutex<HashMap<K, Arc<Notify>>>,
}
impl<K: Clone + Eq + Hash + Send + Sync, V: Send + Sync, B: LazyMappingBackend<K, V>>
LazyMapper<K, V, B>
{
/// Create a new instance of the lazy mapper.
pub fn new(backend: B) -> Self {
Self {
backend,
calculated: RwLock::default(),
waiting: Mutex::default(),
}
}
/// Returns a reference to the backend of this [`LazyMapper<K, V, B>`].
pub fn backend(&self) -> &B {
&self.backend
}
/// Get the node ID for the given key
pub async fn dep_id(&self, key: &K) -> Option<NodeId> {
self.calculated.read().await.get(key).map(|(_, node)| *node)
}
}
impl<K, V, B> Mapper for LazyMapper<K, V, B>
where
K: Clone + Eq + Hash + Send + Sync,
V: Send + Sync,
B: LazyMappingBackend<K, V>,
{
type Key = K;
type Value = V;
type Wrapper<'a> = RwLockReadGuard<'a, V> where V: 'a;
async fn get<'a>(&'a self, key: &Self::Key) -> Self::Wrapper<'a> {
// Attempt to reuse or evict the existing value
let mut reuse_dep_id = None;
{
let finished = self.calculated.read().await;
if let Some((_, node)) = finished.get(key) {
let dirty = deps::is_dirty(*node) || self.backend.is_dirty(key).await;
if !dirty {
deps::add_dep(*node);
return RwLockReadGuard::map(finished, |hm| &hm.get(key).unwrap().0);
} else {
reuse_dep_id = Some(*node);
drop(finished);
// Dirty, so we'll recompute below but we should remove it now
if self.calculated.write().await.remove(key).is_none() {
// Someone else already noticed it was dirty and removed it before us, so we need to deal with that
todo!("dirty value removed between us noticing and us doing something")
}
}
}
}
let barrier = self.waiting.lock().await.get(key).cloned();
if let Some(barrier) = barrier {
// Waiting for completion
barrier.notified().await;
let val = RwLockReadGuard::map(self.calculated.read().await, |hm| hm.get(key).unwrap());
deps::add_dep(val.1);
return RwLockReadGuard::map(val, |inf| &inf.0);
} else {
// Needs calculated
let notify = Arc::new(Notify::new());
self.waiting
.lock()
.await
.insert(key.clone(), notify.clone());
let dep = if let Some(x) = reuse_dep_id {
deps::clear(x);
x
} else {
deps::next_node_id()
};
let val = deps::with_node_id(dep, self.backend.compute(key.clone())).await;
deps::add_dep(dep);
self.calculated
.write()
.await
.insert(key.clone(), (val, dep));
self.waiting.lock().await.remove(key);
notify.notify_waiters();
return RwLockReadGuard::map(self.calculated.read().await, |hm| {
&hm.get(key).unwrap().0
});
}
}
}
|