1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
|
use std::{
any::type_name, collections::HashMap, fmt::Debug, future::Future, hash::Hash, mem::transmute,
sync::Arc,
};
use blink_alloc::{Blink, SyncBlinkAlloc};
use tokio::sync::{Mutex, Notify, RwLock, RwLockReadGuard};
use tracing::{debug_span, instrument, trace_span, Instrument};
use crate::{
deps::{self, NodeId},
Mapper,
};
pub trait LazyMappingBackend<K, V> {
async fn is_dirty(&self, key: &K) -> bool;
fn compute(&self, key: K) -> impl Future<Output = V> + Send + '_;
}
#[derive(Default)]
pub struct LazyMapper<K, V: 'static, B> {
/// The backend we use to compute values and check dirtiness
backend: B,
values: SyncBlinkAlloc,
/// Map of all values we've calculated already
calculated: RwLock<HashMap<K, (&'static V, NodeId)>>,
/// Values we're currently calculating
/// The Notify will be triggered when computation is done.
waiting: Mutex<HashMap<K, Arc<Notify>>>,
}
impl<K: Clone + Eq + Hash + Send + Sync, V: Send + Sync, B: LazyMappingBackend<K, V>>
LazyMapper<K, V, B>
{
/// Create a new instance of the lazy mapper.
pub fn new(backend: B) -> Self {
Self {
backend,
calculated: RwLock::default(),
waiting: Mutex::default(),
values: SyncBlinkAlloc::new(),
}
}
/// Returns a reference to the backend of this [`LazyMapper<K, V, B>`].
pub fn backend(&self) -> &B {
&self.backend
}
/// Get the node ID for the given key
pub async fn dep_id(&self, key: &K) -> Option<NodeId> {
self.calculated.read().await.get(key).map(|(_, node)| *node)
}
}
impl<K, V, B> Mapper for LazyMapper<K, V, B>
where
K: Clone + Eq + Hash + Send + Sync,
V: Send + Sync + 'static,
B: LazyMappingBackend<K, V>,
{
type Key = K;
type Value = V;
type Wrapper<'a> = &'a V where V: 'a;
#[instrument(level = "debug", skip_all, fields(key_type = type_name::<K>(), value_type = type_name::<V>()))]
async fn get<'a>(&'a self, key: &Self::Key) -> Self::Wrapper<'a> {
// Attempt to reuse or evict the existing value
let mut reuse_dep_id = None;
{
let finished = self
.calculated
.read()
.instrument(trace_span!("calculated.read"))
.await;
if let Some((val, node)) = finished.get(key).copied() {
drop(finished);
let dirty = deps::is_dirty(node)
|| self
.backend
.is_dirty(key)
.instrument(trace_span!("backend.is_dirty"))
.await;
if !dirty {
deps::add_dep(node);
return val;
} else {
reuse_dep_id = Some(node);
// Dirty, so we'll recompute below but we should remove it now
let removed = async { self.calculated.write().await.remove(key).is_none() }
.instrument(trace_span!("remove dirty value"))
.await;
if removed {
// Someone else already noticed it was dirty and removed it before us, so we need to deal with that
todo!("dirty value removed between us noticing and us doing something")
}
}
}
};
let barrier = self
.waiting
.lock()
.instrument(trace_span!("waiting.lock"))
.await
.get(key)
.cloned();
if let Some(barrier) = barrier {
// Waiting for completion
barrier
.notified()
.instrument(trace_span!("completion_wait"))
.await;
let (value, node_id) = *RwLockReadGuard::map(
self.calculated
.read()
.instrument(trace_span!("calculated.read"))
.await,
|hm| hm.get(key).unwrap(),
);
deps::add_dep(node_id);
value
} else {
// Needs calculated
let notify = Arc::new(Notify::new());
self.waiting
.lock()
.instrument(trace_span!("waiting.insert"))
.await
.insert(key.clone(), notify.clone());
let dep = if let Some(x) = reuse_dep_id {
deps::clear(x);
x
} else {
deps::next_node_id()
};
let val = deps::with_node_id(dep, self.backend.compute(key.clone()))
.instrument(debug_span!("backend.compute"))
.await;
let val_ref: &'static V = unsafe { transmute(Blink::new_in(&self.values).put(val)) };
deps::add_dep(dep);
self.calculated
.write()
.instrument(trace_span!("calculated.insert"))
.await
.insert(key.clone(), (val_ref, dep));
self.waiting
.lock()
.instrument(trace_span!("waiting.remove"))
.await
.remove(key);
notify.notify_waiters();
val_ref
}
}
}
impl<K: Debug, V: Debug, B: Debug> Debug for LazyMapper<K, V, B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LazyMapper")
.field("backend", &self.backend)
.finish_non_exhaustive()
}
}
|