//! A mapper based on a function from keys to values. use std::{collections::HashMap, hash::Hash, pin::Pin, sync::Arc}; use tokio::sync::{Mutex, Notify, RwLock, RwLockReadGuard}; use crate::{ deps::{self, NodeId}, Mapper, }; /// A mapping that lazily computes values with a given async thunk and memoizes them. #[derive(Debug, Default)] pub struct ThunkMapper { /// The thunk we use to compute values thunk: T, /// Map of all values we've calculated already calculated: RwLock>, /// Values we're currently calculating /// The Notify will be triggered when computation is done. waiting: Mutex>>, } impl> ThunkMapper { /// Create a new instance of the computing store. pub fn new(thunk: T) -> Self { Self { thunk, calculated: RwLock::default(), waiting: Mutex::default(), } } } impl> Mapper for ThunkMapper { type Key = K; type Value = V; type Wrapper<'a> = RwLockReadGuard<'a, V> where V: 'a; async fn get<'a>(&'a self, key: &Self::Key) -> Self::Wrapper<'a> { // Attempt to reuse or evict the existing value let mut reuse_dep_id = None; { let finished = self.calculated.read().await; if let Some((_, dep)) = finished.get(key) { if !deps::is_dirty(*dep) { deps::add_dep(*dep); return RwLockReadGuard::map(finished, |hm| &hm.get(key).unwrap().0); } else { reuse_dep_id = Some(*dep); drop(finished); // Dirty, so we'll recompute below but we should remove it now if self.calculated.write().await.remove(key).is_none() { // Someone else already noticed it was dirty and removed it before us, so we need to deal with that todo!("dirty value removed between us noticing and us doing something") } } } } let barrier = self.waiting.lock().await.get(key).cloned(); if let Some(barrier) = barrier { // Waiting for completion barrier.notified().await; let val = RwLockReadGuard::map(self.calculated.read().await, |hm| hm.get(key).unwrap()); deps::add_dep(val.1); return RwLockReadGuard::map(val, |inf| &inf.0); } else { // Needs calculated let notify = Arc::new(Notify::new()); self.waiting .lock() .await .insert(key.clone(), notify.clone()); let dep = if let Some(x) = reuse_dep_id { deps::clear(x); x } else { deps::next_node_id() }; let val = deps::with_node_id(dep, self.thunk.compute(key.clone())).await; deps::add_dep(dep); self.calculated .write() .await .insert(key.clone(), (val, dep)); self.waiting.lock().await.remove(key); notify.notify_waiters(); return RwLockReadGuard::map(self.calculated.read().await, |hm| { &hm.get(key).unwrap().0 }); } } } /// A function from keys to values. /// /// Should be pure, except for use of other mappings. This ensures recomputation is done when needed. pub trait Thunk: Send + 'static { fn compute(&self, key: K) -> Pin + Send + '_>>; }