use std::{ any::type_name, collections::HashMap, fmt::Debug, future::Future, hash::Hash, mem::transmute, sync::Arc, }; use blink_alloc::{Blink, SyncBlinkAlloc}; use tokio::sync::{Mutex, Notify, RwLock, RwLockReadGuard}; use tracing::{debug_span, instrument, trace_span, Instrument}; use crate::{ deps::{self, NodeId}, Mapper, }; pub trait LazyMappingBackend { async fn is_dirty(&self, key: &K) -> bool; fn compute(&self, key: K) -> impl Future + Send + '_; } #[derive(Default)] pub struct LazyMapper { /// The backend we use to compute values and check dirtiness backend: B, values: SyncBlinkAlloc, /// Map of all values we've calculated already calculated: RwLock>, /// Values we're currently calculating /// The Notify will be triggered when computation is done. waiting: Mutex>>, } impl> LazyMapper { /// Create a new instance of the lazy mapper. pub fn new(backend: B) -> Self { Self { backend, calculated: RwLock::default(), waiting: Mutex::default(), values: SyncBlinkAlloc::new(), } } /// Returns a reference to the backend of this [`LazyMapper`]. pub fn backend(&self) -> &B { &self.backend } /// Get the node ID for the given key pub async fn dep_id(&self, key: &K) -> Option { self.calculated.read().await.get(key).map(|(_, node)| *node) } } impl Mapper for LazyMapper where K: Clone + Eq + Hash + Send + Sync, V: Send + Sync + 'static, B: LazyMappingBackend, { type Key = K; type Value = V; type Wrapper<'a> = &'a V where V: 'a; #[instrument(level = "debug", skip_all, fields(key_type = type_name::(), value_type = type_name::()))] async fn get<'a>(&'a self, key: &Self::Key) -> Self::Wrapper<'a> { // Attempt to reuse or evict the existing value let mut reuse_dep_id = None; { let finished = self .calculated .read() .instrument(trace_span!("calculated.read")) .await; if let Some((val, node)) = finished.get(key).copied() { drop(finished); let dirty = deps::is_dirty(node) || self .backend .is_dirty(key) .instrument(trace_span!("backend.is_dirty")) .await; if !dirty { deps::add_dep(node); return val; } else { reuse_dep_id = Some(node); // Dirty, so we'll recompute below but we should remove it now let removed = async { self.calculated.write().await.remove(key).is_none() } .instrument(trace_span!("remove dirty value")) .await; if removed { // Someone else already noticed it was dirty and removed it before us, so we need to deal with that todo!("dirty value removed between us noticing and us doing something") } } } }; let barrier = self .waiting .lock() .instrument(trace_span!("waiting.lock")) .await .get(key) .cloned(); if let Some(barrier) = barrier { // Waiting for completion barrier .notified() .instrument(trace_span!("completion_wait")) .await; let (value, node_id) = *RwLockReadGuard::map( self.calculated .read() .instrument(trace_span!("calculated.read")) .await, |hm| hm.get(key).unwrap(), ); deps::add_dep(node_id); value } else { // Needs calculated let notify = Arc::new(Notify::new()); self.waiting .lock() .instrument(trace_span!("waiting.insert")) .await .insert(key.clone(), notify.clone()); let dep = if let Some(x) = reuse_dep_id { deps::clear(x); x } else { deps::next_node_id() }; let val = deps::with_node_id(dep, self.backend.compute(key.clone())) .instrument(debug_span!("backend.compute")) .await; let val_ref: &'static V = unsafe { transmute(Blink::new_in(&self.values).put(val)) }; deps::add_dep(dep); self.calculated .write() .instrument(trace_span!("calculated.insert")) .await .insert(key.clone(), (val_ref, dep)); self.waiting .lock() .instrument(trace_span!("waiting.remove")) .await .remove(key); notify.notify_waiters(); val_ref } } } impl Debug for LazyMapper { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("LazyMapper") .field("backend", &self.backend) .finish_non_exhaustive() } }