aboutsummaryrefslogtreecommitdiff
path: root/incria/src/lazy_mapping.rs
diff options
context:
space:
mode:
Diffstat (limited to 'incria/src/lazy_mapping.rs')
-rw-r--r--incria/src/lazy_mapping.rs123
1 files changed, 123 insertions, 0 deletions
diff --git a/incria/src/lazy_mapping.rs b/incria/src/lazy_mapping.rs
new file mode 100644
index 0000000..bef3a3d
--- /dev/null
+++ b/incria/src/lazy_mapping.rs
@@ -0,0 +1,123 @@
+use std::{collections::HashMap, future::Future, hash::Hash, sync::Arc};
+
+use tokio::sync::{Mutex, Notify, RwLock, RwLockReadGuard};
+
+use crate::{
+ deps::{self, NodeId},
+ Mapper,
+};
+
+pub trait LazyMappingBackend<K, V> {
+ async fn is_dirty(&self, key: &K) -> bool;
+ fn compute(&self, key: K) -> impl Future<Output = V> + Send + '_;
+}
+
+#[derive(Debug, Default)]
+pub struct LazyMapper<K, V, B> {
+ /// The backend we use to compute values and check dirtiness
+ backend: B,
+
+ /// Map of all values we've calculated already
+ calculated: RwLock<HashMap<K, (V, NodeId)>>,
+
+ /// Values we're currently calculating
+ /// The Notify will be triggered when computation is done.
+ waiting: Mutex<HashMap<K, Arc<Notify>>>,
+}
+
+impl<K: Clone + Eq + Hash + Send + Sync, V: Send + Sync, B: LazyMappingBackend<K, V>>
+ LazyMapper<K, V, B>
+{
+ /// Create a new instance of the lazy mapper.
+ pub fn new(backend: B) -> Self {
+ Self {
+ backend,
+ calculated: RwLock::default(),
+ waiting: Mutex::default(),
+ }
+ }
+
+ /// Returns a reference to the backend of this [`LazyMapper<K, V, B>`].
+ pub fn backend(&self) -> &B {
+ &self.backend
+ }
+
+ /// Get the node ID for the given key
+ pub async fn dep_id(&self, key: &K) -> Option<NodeId> {
+ self.calculated.read().await.get(key).map(|(_, node)| *node)
+ }
+}
+
+impl<K, V, B> Mapper for LazyMapper<K, V, B>
+where
+ K: Clone + Eq + Hash + Send + Sync,
+ V: Send + Sync,
+ B: LazyMappingBackend<K, V>,
+{
+ type Key = K;
+ type Value = V;
+ type Wrapper<'a> = RwLockReadGuard<'a, V> where V: 'a;
+
+ async fn get<'a>(&'a self, key: &Self::Key) -> Self::Wrapper<'a> {
+ // Attempt to reuse or evict the existing value
+ let mut reuse_dep_id = None;
+ {
+ let finished = self.calculated.read().await;
+ if let Some((_, node)) = finished.get(key) {
+ let dirty = deps::is_dirty(*node) || self.backend.is_dirty(key).await;
+ if !dirty {
+ deps::add_dep(*node);
+ return RwLockReadGuard::map(finished, |hm| &hm.get(key).unwrap().0);
+ } else {
+ reuse_dep_id = Some(*node);
+ drop(finished);
+ // Dirty, so we'll recompute below but we should remove it now
+ if self.calculated.write().await.remove(key).is_none() {
+ // Someone else already noticed it was dirty and removed it before us, so we need to deal with that
+ todo!("dirty value removed between us noticing and us doing something")
+ }
+ }
+ }
+ }
+
+ let barrier = self.waiting.lock().await.get(key).cloned();
+ if let Some(barrier) = barrier {
+ // Waiting for completion
+ barrier.notified().await;
+
+ let val = RwLockReadGuard::map(self.calculated.read().await, |hm| hm.get(key).unwrap());
+ deps::add_dep(val.1);
+
+ return RwLockReadGuard::map(val, |inf| &inf.0);
+ } else {
+ // Needs calculated
+ let notify = Arc::new(Notify::new());
+ self.waiting
+ .lock()
+ .await
+ .insert(key.clone(), notify.clone());
+
+ let dep = if let Some(x) = reuse_dep_id {
+ deps::clear(x);
+ x
+ } else {
+ deps::next_node_id()
+ };
+
+ let val = deps::with_node_id(dep, self.backend.compute(key.clone())).await;
+ deps::add_dep(dep);
+
+ self.calculated
+ .write()
+ .await
+ .insert(key.clone(), (val, dep));
+ self.waiting.lock().await.remove(key);
+
+ notify.notify_waiters();
+
+ return RwLockReadGuard::map(self.calculated.read().await, |hm| {
+ &hm.get(key).unwrap().0
+ });
+ }
+ }
+}