diff options
author | tcmal <tcmal> | 2023-06-16 14:19:58 +0000 |
---|---|---|
committer | Aria <me@aria.rip> | 2023-10-01 17:31:30 +0100 |
commit | ad55a4b05ac35cc21c64c1dd62bff9faec775eb6 (patch) | |
tree | c590e0584e2fb301d0e6c84f60d1d6b42d0c9912 /incria/src | |
parent | 3f2937c15aec73120dd4d57ddd245ac9ab789090 (diff) |
factor out common code for lazy mappings
Diffstat (limited to 'incria/src')
-rw-r--r-- | incria/src/lazy_mapping.rs | 123 | ||||
-rw-r--r-- | incria/src/lib.rs | 12 | ||||
-rw-r--r-- | incria/src/thunk.rs | 114 |
3 files changed, 152 insertions, 97 deletions
diff --git a/incria/src/lazy_mapping.rs b/incria/src/lazy_mapping.rs new file mode 100644 index 0000000..bef3a3d --- /dev/null +++ b/incria/src/lazy_mapping.rs @@ -0,0 +1,123 @@ +use std::{collections::HashMap, future::Future, hash::Hash, sync::Arc}; + +use tokio::sync::{Mutex, Notify, RwLock, RwLockReadGuard}; + +use crate::{ + deps::{self, NodeId}, + Mapper, +}; + +pub trait LazyMappingBackend<K, V> { + async fn is_dirty(&self, key: &K) -> bool; + fn compute(&self, key: K) -> impl Future<Output = V> + Send + '_; +} + +#[derive(Debug, Default)] +pub struct LazyMapper<K, V, B> { + /// The backend we use to compute values and check dirtiness + backend: B, + + /// Map of all values we've calculated already + calculated: RwLock<HashMap<K, (V, NodeId)>>, + + /// Values we're currently calculating + /// The Notify will be triggered when computation is done. + waiting: Mutex<HashMap<K, Arc<Notify>>>, +} + +impl<K: Clone + Eq + Hash + Send + Sync, V: Send + Sync, B: LazyMappingBackend<K, V>> + LazyMapper<K, V, B> +{ + /// Create a new instance of the lazy mapper. + pub fn new(backend: B) -> Self { + Self { + backend, + calculated: RwLock::default(), + waiting: Mutex::default(), + } + } + + /// Returns a reference to the backend of this [`LazyMapper<K, V, B>`]. + pub fn backend(&self) -> &B { + &self.backend + } + + /// Get the node ID for the given key + pub async fn dep_id(&self, key: &K) -> Option<NodeId> { + self.calculated.read().await.get(key).map(|(_, node)| *node) + } +} + +impl<K, V, B> Mapper for LazyMapper<K, V, B> +where + K: Clone + Eq + Hash + Send + Sync, + V: Send + Sync, + B: LazyMappingBackend<K, V>, +{ + type Key = K; + type Value = V; + type Wrapper<'a> = RwLockReadGuard<'a, V> where V: 'a; + + async fn get<'a>(&'a self, key: &Self::Key) -> Self::Wrapper<'a> { + // Attempt to reuse or evict the existing value + let mut reuse_dep_id = None; + { + let finished = self.calculated.read().await; + if let Some((_, node)) = finished.get(key) { + let dirty = deps::is_dirty(*node) || self.backend.is_dirty(key).await; + if !dirty { + deps::add_dep(*node); + return RwLockReadGuard::map(finished, |hm| &hm.get(key).unwrap().0); + } else { + reuse_dep_id = Some(*node); + drop(finished); + // Dirty, so we'll recompute below but we should remove it now + if self.calculated.write().await.remove(key).is_none() { + // Someone else already noticed it was dirty and removed it before us, so we need to deal with that + todo!("dirty value removed between us noticing and us doing something") + } + } + } + } + + let barrier = self.waiting.lock().await.get(key).cloned(); + if let Some(barrier) = barrier { + // Waiting for completion + barrier.notified().await; + + let val = RwLockReadGuard::map(self.calculated.read().await, |hm| hm.get(key).unwrap()); + deps::add_dep(val.1); + + return RwLockReadGuard::map(val, |inf| &inf.0); + } else { + // Needs calculated + let notify = Arc::new(Notify::new()); + self.waiting + .lock() + .await + .insert(key.clone(), notify.clone()); + + let dep = if let Some(x) = reuse_dep_id { + deps::clear(x); + x + } else { + deps::next_node_id() + }; + + let val = deps::with_node_id(dep, self.backend.compute(key.clone())).await; + deps::add_dep(dep); + + self.calculated + .write() + .await + .insert(key.clone(), (val, dep)); + self.waiting.lock().await.remove(key); + + notify.notify_waiters(); + + return RwLockReadGuard::map(self.calculated.read().await, |hm| { + &hm.get(key).unwrap().0 + }); + } + } +} diff --git a/incria/src/lib.rs b/incria/src/lib.rs index 24fa23d..6acae57 100644 --- a/incria/src/lib.rs +++ b/incria/src/lib.rs @@ -1,4 +1,8 @@ -#![feature(async_fn_in_trait, thread_id_value)] +#![feature( + async_fn_in_trait, + thread_id_value, + return_position_impl_trait_in_trait +)] /*! Incria is a library for incremental computation. It lets you record what a calculation depends on and then only re-run that calculation once one of those dependencies has changed. @@ -39,8 +43,9 @@ To memoise computations, you can normally use a [`ThunkMapper`](`self::thunk::Th This memoises a given 'thunk', which is simply a function with one input, one output, and that is pure except for its use of other mappers. ```rust -# use incria::{thunk::{Thunk, ThunkMapper}, Mapper}; +use incria::{thunk::{Thunk, ThunkMapper}, Mapper}; # use std::{future::Future, pin::Pin}; +#[derive(Default)] struct ExampleThunk; impl Thunk<usize, usize> for ExampleThunk { fn compute(&self, key: usize) -> Pin<Box<dyn Future<Output = usize> + Send + '_>> { @@ -52,7 +57,7 @@ impl Thunk<usize, usize> for ExampleThunk { type ExampleMapper = ThunkMapper<usize, usize, ExampleThunk>; # async fn example() { -assert_eq!(*ExampleMapper::new(ExampleThunk).get(&1).await, 1); +assert_eq!(*ExampleMapper::default().get(&1).await, 1); # } ``` @@ -65,6 +70,7 @@ Normally, you will keep track of dependency IDs alongside some other information */ pub mod deps; +mod lazy_mapping; mod mapping; pub mod thunk; diff --git a/incria/src/thunk.rs b/incria/src/thunk.rs index f076ba1..937f109 100644 --- a/incria/src/thunk.rs +++ b/incria/src/thunk.rs @@ -1,111 +1,37 @@ //! A mapper based on a function from keys to values. -use std::{collections::HashMap, hash::Hash, pin::Pin, sync::Arc}; +use std::pin::Pin; -use tokio::sync::{Mutex, Notify, RwLock, RwLockReadGuard}; +use crate::lazy_mapping::{LazyMapper, LazyMappingBackend}; -use crate::{ - deps::{self, NodeId}, - Mapper, -}; +/// A function from keys to values. +/// +/// Should be pure, except for use of other mappings. This ensures recomputation is done when needed. +pub trait Thunk<K, V>: Send + 'static { + fn compute(&self, key: K) -> Pin<Box<dyn std::future::Future<Output = V> + Send + '_>>; +} -/// A mapping that lazily computes values with a given async thunk and memoizes them. #[derive(Debug, Default)] -pub struct ThunkMapper<K, V, T> { - /// The thunk we use to compute values +pub struct ThunkBackend<T> { thunk: T, - - /// Map of all values we've calculated already - calculated: RwLock<HashMap<K, (V, NodeId)>>, - - /// Values we're currently calculating - /// The Notify will be triggered when computation is done. - waiting: Mutex<HashMap<K, Arc<Notify>>>, } -impl<K: Clone + Eq + Hash + Send + Sync, V: Send + Sync, T: Thunk<K, V>> ThunkMapper<K, V, T> { - /// Create a new instance of the computing store. +impl<T> ThunkBackend<T> { pub fn new(thunk: T) -> Self { - Self { - thunk, - calculated: RwLock::default(), - waiting: Mutex::default(), - } + Self { thunk } } } -impl<K: Clone + Eq + Hash + Send + Sync, V: Send + Sync, C: Thunk<K, V>> Mapper - for ThunkMapper<K, V, C> +impl<K, V: 'static, T> LazyMappingBackend<K, V> for ThunkBackend<T> +where + T: Thunk<K, V>, { - type Key = K; - type Value = V; - type Wrapper<'a> = RwLockReadGuard<'a, V> where V: 'a; - - async fn get<'a>(&'a self, key: &Self::Key) -> Self::Wrapper<'a> { - // Attempt to reuse or evict the existing value - let mut reuse_dep_id = None; - { - let finished = self.calculated.read().await; - if let Some((_, dep)) = finished.get(key) { - if !deps::is_dirty(*dep) { - deps::add_dep(*dep); - return RwLockReadGuard::map(finished, |hm| &hm.get(key).unwrap().0); - } else { - reuse_dep_id = Some(*dep); - drop(finished); - // Dirty, so we'll recompute below but we should remove it now - if self.calculated.write().await.remove(key).is_none() { - // Someone else already noticed it was dirty and removed it before us, so we need to deal with that - todo!("dirty value removed between us noticing and us doing something") - } - } - } - } - - let barrier = self.waiting.lock().await.get(key).cloned(); - if let Some(barrier) = barrier { - // Waiting for completion - barrier.notified().await; - - let val = RwLockReadGuard::map(self.calculated.read().await, |hm| hm.get(key).unwrap()); - deps::add_dep(val.1); - - return RwLockReadGuard::map(val, |inf| &inf.0); - } else { - // Needs calculated - let notify = Arc::new(Notify::new()); - self.waiting - .lock() - .await - .insert(key.clone(), notify.clone()); - - let dep = if let Some(x) = reuse_dep_id { - deps::clear(x); - x - } else { - deps::next_node_id() - }; - - let val = deps::with_node_id(dep, self.thunk.compute(key.clone())).await; - deps::add_dep(dep); - - self.calculated - .write() - .await - .insert(key.clone(), (val, dep)); - self.waiting.lock().await.remove(key); - - notify.notify_waiters(); + async fn is_dirty(&self, _: &K) -> bool { + false // only dirty if marked dirty by someting else + } - return RwLockReadGuard::map(self.calculated.read().await, |hm| { - &hm.get(key).unwrap().0 - }); - } + fn compute(&self, key: K) -> impl std::future::Future<Output = V> + Send + '_ { + self.thunk.compute(key) } } -/// A function from keys to values. -/// -/// Should be pure, except for use of other mappings. This ensures recomputation is done when needed. -pub trait Thunk<K, V>: Send + 'static { - fn compute(&self, key: K) -> Pin<Box<dyn std::future::Future<Output = V> + Send + '_>>; -} +pub type ThunkMapper<K, V, T> = LazyMapper<K, V, ThunkBackend<T>>; |