diff options
Diffstat (limited to 'incria/src/thunk.rs')
-rw-r--r-- | incria/src/thunk.rs | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/incria/src/thunk.rs b/incria/src/thunk.rs new file mode 100644 index 0000000..6aa44dd --- /dev/null +++ b/incria/src/thunk.rs @@ -0,0 +1,104 @@ +//! A mapper based on a function from keys to values. +use std::{collections::HashMap, hash::Hash, pin::Pin, sync::Arc}; + +use parking_lot::{MappedRwLockReadGuard, Mutex, RwLock, RwLockReadGuard}; +use tokio::sync::Notify; + +use crate::{ + deps::{self, NodeId}, + Mapper, +}; + +/// A mapping that lazily computes values with a given async thunk and memoizes them. +#[derive(Debug, Default)] +pub struct ThunkMapper<K, V, T> { + /// The thunk we use to compute values + thunk: T, + + /// Map of all values we've calculated already + calculated: RwLock<HashMap<K, (V, NodeId)>>, + + /// Values we're currently calculating + /// The Notify will be triggered when computation is done. + waiting: Mutex<HashMap<K, Arc<Notify>>>, +} + +impl<K: Clone + Eq + Hash + Send + Sync, V: Send + Sync, T: Thunk<K, V>> ThunkMapper<K, V, T> { + /// Create a new instance of the computing store. + pub fn new(thunk: T) -> Self { + Self { + thunk, + calculated: RwLock::default(), + waiting: Mutex::default(), + } + } +} + +impl<K: Clone + Eq + Hash + Send + Sync, V: Send + Sync, C: Thunk<K, V>> Mapper + for ThunkMapper<K, V, C> +{ + type Key = K; + type Value = V; + type Wrapper<'a> = MappedRwLockReadGuard<'a, V> where V: 'a; + + async fn get<'a>(&'a self, key: &Self::Key) -> Self::Wrapper<'a> { + // Attempt to reuse or evict the existing value + let mut reuse_dep_id = None; + { + let finished = self.calculated.read(); + if let Some((_, dep)) = finished.get(key) { + if !deps::is_dirty(*dep) { + deps::add_dep(*dep); + return RwLockReadGuard::map(finished, |hm| &hm.get(key).unwrap().0); + } else { + reuse_dep_id = Some(*dep); + drop(finished); + // Dirty, so we'll recompute below but we should remove it now + if self.calculated.write().remove(key).is_none() { + // Someone else already noticed it was dirty and removed it before us, so we need to deal with that + todo!("dirty value removed between us noticing and us doing something") + } + } + } + } + + let barrier = self.waiting.lock().get(key).cloned(); + if let Some(barrier) = barrier { + // Waiting for completion + barrier.notified().await; + + let val = RwLockReadGuard::map(self.calculated.read(), |hm| hm.get(key).unwrap()); + deps::add_dep(val.1); + + return MappedRwLockReadGuard::map(val, |inf| &inf.0); + } else { + // Needs calculated + let notify = Arc::new(Notify::new()); + self.waiting.lock().insert(key.clone(), notify.clone()); + + let dep = if let Some(x) = reuse_dep_id { + deps::clear(x); + x + } else { + deps::next_node_id() + }; + + let val = deps::with_node_id(dep, self.thunk.compute(key.clone())).await; + deps::add_dep(dep); + + self.calculated.write().insert(key.clone(), (val, dep)); + self.waiting.lock().remove(key); + + notify.notify_waiters(); + + return RwLockReadGuard::map(self.calculated.read(), |hm| &hm.get(key).unwrap().0); + } + } +} + +/// A function from keys to values. +/// +/// Should be pure, except for use of other mappings. This ensures recomputation is done when needed. +pub trait Thunk<K, V>: Send + 'static { + fn compute(&self, key: K) -> Pin<Box<dyn std::future::Future<Output = V> + Send + '_>>; +} |