aboutsummaryrefslogtreecommitdiff
path: root/incria/src
diff options
context:
space:
mode:
authortcmal <tcmal>2023-06-16 14:19:58 +0000
committerAria <me@aria.rip>2023-10-01 17:31:30 +0100
commitad55a4b05ac35cc21c64c1dd62bff9faec775eb6 (patch)
treec590e0584e2fb301d0e6c84f60d1d6b42d0c9912 /incria/src
parent3f2937c15aec73120dd4d57ddd245ac9ab789090 (diff)
factor out common code for lazy mappings
Diffstat (limited to 'incria/src')
-rw-r--r--incria/src/lazy_mapping.rs123
-rw-r--r--incria/src/lib.rs12
-rw-r--r--incria/src/thunk.rs114
3 files changed, 152 insertions, 97 deletions
diff --git a/incria/src/lazy_mapping.rs b/incria/src/lazy_mapping.rs
new file mode 100644
index 0000000..bef3a3d
--- /dev/null
+++ b/incria/src/lazy_mapping.rs
@@ -0,0 +1,123 @@
+use std::{collections::HashMap, future::Future, hash::Hash, sync::Arc};
+
+use tokio::sync::{Mutex, Notify, RwLock, RwLockReadGuard};
+
+use crate::{
+ deps::{self, NodeId},
+ Mapper,
+};
+
+pub trait LazyMappingBackend<K, V> {
+ async fn is_dirty(&self, key: &K) -> bool;
+ fn compute(&self, key: K) -> impl Future<Output = V> + Send + '_;
+}
+
+#[derive(Debug, Default)]
+pub struct LazyMapper<K, V, B> {
+ /// The backend we use to compute values and check dirtiness
+ backend: B,
+
+ /// Map of all values we've calculated already
+ calculated: RwLock<HashMap<K, (V, NodeId)>>,
+
+ /// Values we're currently calculating
+ /// The Notify will be triggered when computation is done.
+ waiting: Mutex<HashMap<K, Arc<Notify>>>,
+}
+
+impl<K: Clone + Eq + Hash + Send + Sync, V: Send + Sync, B: LazyMappingBackend<K, V>>
+ LazyMapper<K, V, B>
+{
+ /// Create a new instance of the lazy mapper.
+ pub fn new(backend: B) -> Self {
+ Self {
+ backend,
+ calculated: RwLock::default(),
+ waiting: Mutex::default(),
+ }
+ }
+
+ /// Returns a reference to the backend of this [`LazyMapper<K, V, B>`].
+ pub fn backend(&self) -> &B {
+ &self.backend
+ }
+
+ /// Get the node ID for the given key
+ pub async fn dep_id(&self, key: &K) -> Option<NodeId> {
+ self.calculated.read().await.get(key).map(|(_, node)| *node)
+ }
+}
+
+impl<K, V, B> Mapper for LazyMapper<K, V, B>
+where
+ K: Clone + Eq + Hash + Send + Sync,
+ V: Send + Sync,
+ B: LazyMappingBackend<K, V>,
+{
+ type Key = K;
+ type Value = V;
+ type Wrapper<'a> = RwLockReadGuard<'a, V> where V: 'a;
+
+ async fn get<'a>(&'a self, key: &Self::Key) -> Self::Wrapper<'a> {
+ // Attempt to reuse or evict the existing value
+ let mut reuse_dep_id = None;
+ {
+ let finished = self.calculated.read().await;
+ if let Some((_, node)) = finished.get(key) {
+ let dirty = deps::is_dirty(*node) || self.backend.is_dirty(key).await;
+ if !dirty {
+ deps::add_dep(*node);
+ return RwLockReadGuard::map(finished, |hm| &hm.get(key).unwrap().0);
+ } else {
+ reuse_dep_id = Some(*node);
+ drop(finished);
+ // Dirty, so we'll recompute below but we should remove it now
+ if self.calculated.write().await.remove(key).is_none() {
+ // Someone else already noticed it was dirty and removed it before us, so we need to deal with that
+ todo!("dirty value removed between us noticing and us doing something")
+ }
+ }
+ }
+ }
+
+ let barrier = self.waiting.lock().await.get(key).cloned();
+ if let Some(barrier) = barrier {
+ // Waiting for completion
+ barrier.notified().await;
+
+ let val = RwLockReadGuard::map(self.calculated.read().await, |hm| hm.get(key).unwrap());
+ deps::add_dep(val.1);
+
+ return RwLockReadGuard::map(val, |inf| &inf.0);
+ } else {
+ // Needs calculated
+ let notify = Arc::new(Notify::new());
+ self.waiting
+ .lock()
+ .await
+ .insert(key.clone(), notify.clone());
+
+ let dep = if let Some(x) = reuse_dep_id {
+ deps::clear(x);
+ x
+ } else {
+ deps::next_node_id()
+ };
+
+ let val = deps::with_node_id(dep, self.backend.compute(key.clone())).await;
+ deps::add_dep(dep);
+
+ self.calculated
+ .write()
+ .await
+ .insert(key.clone(), (val, dep));
+ self.waiting.lock().await.remove(key);
+
+ notify.notify_waiters();
+
+ return RwLockReadGuard::map(self.calculated.read().await, |hm| {
+ &hm.get(key).unwrap().0
+ });
+ }
+ }
+}
diff --git a/incria/src/lib.rs b/incria/src/lib.rs
index 24fa23d..6acae57 100644
--- a/incria/src/lib.rs
+++ b/incria/src/lib.rs
@@ -1,4 +1,8 @@
-#![feature(async_fn_in_trait, thread_id_value)]
+#![feature(
+ async_fn_in_trait,
+ thread_id_value,
+ return_position_impl_trait_in_trait
+)]
/*!
Incria is a library for incremental computation.
It lets you record what a calculation depends on and then only re-run that calculation once one of those dependencies has changed.
@@ -39,8 +43,9 @@ To memoise computations, you can normally use a [`ThunkMapper`](`self::thunk::Th
This memoises a given 'thunk', which is simply a function with one input, one output, and that is pure except for its use of other mappers.
```rust
-# use incria::{thunk::{Thunk, ThunkMapper}, Mapper};
+use incria::{thunk::{Thunk, ThunkMapper}, Mapper};
# use std::{future::Future, pin::Pin};
+#[derive(Default)]
struct ExampleThunk;
impl Thunk<usize, usize> for ExampleThunk {
fn compute(&self, key: usize) -> Pin<Box<dyn Future<Output = usize> + Send + '_>> {
@@ -52,7 +57,7 @@ impl Thunk<usize, usize> for ExampleThunk {
type ExampleMapper = ThunkMapper<usize, usize, ExampleThunk>;
# async fn example() {
-assert_eq!(*ExampleMapper::new(ExampleThunk).get(&1).await, 1);
+assert_eq!(*ExampleMapper::default().get(&1).await, 1);
# }
```
@@ -65,6 +70,7 @@ Normally, you will keep track of dependency IDs alongside some other information
*/
pub mod deps;
+mod lazy_mapping;
mod mapping;
pub mod thunk;
diff --git a/incria/src/thunk.rs b/incria/src/thunk.rs
index f076ba1..937f109 100644
--- a/incria/src/thunk.rs
+++ b/incria/src/thunk.rs
@@ -1,111 +1,37 @@
//! A mapper based on a function from keys to values.
-use std::{collections::HashMap, hash::Hash, pin::Pin, sync::Arc};
+use std::pin::Pin;
-use tokio::sync::{Mutex, Notify, RwLock, RwLockReadGuard};
+use crate::lazy_mapping::{LazyMapper, LazyMappingBackend};
-use crate::{
- deps::{self, NodeId},
- Mapper,
-};
+/// A function from keys to values.
+///
+/// Should be pure, except for use of other mappings. This ensures recomputation is done when needed.
+pub trait Thunk<K, V>: Send + 'static {
+ fn compute(&self, key: K) -> Pin<Box<dyn std::future::Future<Output = V> + Send + '_>>;
+}
-/// A mapping that lazily computes values with a given async thunk and memoizes them.
#[derive(Debug, Default)]
-pub struct ThunkMapper<K, V, T> {
- /// The thunk we use to compute values
+pub struct ThunkBackend<T> {
thunk: T,
-
- /// Map of all values we've calculated already
- calculated: RwLock<HashMap<K, (V, NodeId)>>,
-
- /// Values we're currently calculating
- /// The Notify will be triggered when computation is done.
- waiting: Mutex<HashMap<K, Arc<Notify>>>,
}
-impl<K: Clone + Eq + Hash + Send + Sync, V: Send + Sync, T: Thunk<K, V>> ThunkMapper<K, V, T> {
- /// Create a new instance of the computing store.
+impl<T> ThunkBackend<T> {
pub fn new(thunk: T) -> Self {
- Self {
- thunk,
- calculated: RwLock::default(),
- waiting: Mutex::default(),
- }
+ Self { thunk }
}
}
-impl<K: Clone + Eq + Hash + Send + Sync, V: Send + Sync, C: Thunk<K, V>> Mapper
- for ThunkMapper<K, V, C>
+impl<K, V: 'static, T> LazyMappingBackend<K, V> for ThunkBackend<T>
+where
+ T: Thunk<K, V>,
{
- type Key = K;
- type Value = V;
- type Wrapper<'a> = RwLockReadGuard<'a, V> where V: 'a;
-
- async fn get<'a>(&'a self, key: &Self::Key) -> Self::Wrapper<'a> {
- // Attempt to reuse or evict the existing value
- let mut reuse_dep_id = None;
- {
- let finished = self.calculated.read().await;
- if let Some((_, dep)) = finished.get(key) {
- if !deps::is_dirty(*dep) {
- deps::add_dep(*dep);
- return RwLockReadGuard::map(finished, |hm| &hm.get(key).unwrap().0);
- } else {
- reuse_dep_id = Some(*dep);
- drop(finished);
- // Dirty, so we'll recompute below but we should remove it now
- if self.calculated.write().await.remove(key).is_none() {
- // Someone else already noticed it was dirty and removed it before us, so we need to deal with that
- todo!("dirty value removed between us noticing and us doing something")
- }
- }
- }
- }
-
- let barrier = self.waiting.lock().await.get(key).cloned();
- if let Some(barrier) = barrier {
- // Waiting for completion
- barrier.notified().await;
-
- let val = RwLockReadGuard::map(self.calculated.read().await, |hm| hm.get(key).unwrap());
- deps::add_dep(val.1);
-
- return RwLockReadGuard::map(val, |inf| &inf.0);
- } else {
- // Needs calculated
- let notify = Arc::new(Notify::new());
- self.waiting
- .lock()
- .await
- .insert(key.clone(), notify.clone());
-
- let dep = if let Some(x) = reuse_dep_id {
- deps::clear(x);
- x
- } else {
- deps::next_node_id()
- };
-
- let val = deps::with_node_id(dep, self.thunk.compute(key.clone())).await;
- deps::add_dep(dep);
-
- self.calculated
- .write()
- .await
- .insert(key.clone(), (val, dep));
- self.waiting.lock().await.remove(key);
-
- notify.notify_waiters();
+ async fn is_dirty(&self, _: &K) -> bool {
+ false // only dirty if marked dirty by someting else
+ }
- return RwLockReadGuard::map(self.calculated.read().await, |hm| {
- &hm.get(key).unwrap().0
- });
- }
+ fn compute(&self, key: K) -> impl std::future::Future<Output = V> + Send + '_ {
+ self.thunk.compute(key)
}
}
-/// A function from keys to values.
-///
-/// Should be pure, except for use of other mappings. This ensures recomputation is done when needed.
-pub trait Thunk<K, V>: Send + 'static {
- fn compute(&self, key: K) -> Pin<Box<dyn std::future::Future<Output = V> + Send + '_>>;
-}
+pub type ThunkMapper<K, V, T> = LazyMapper<K, V, ThunkBackend<T>>;