diff options
author | tcmal <tcmal> | 2023-05-24 18:05:53 +0000 |
---|---|---|
committer | Aria <me@aria.rip> | 2023-10-01 17:31:29 +0100 |
commit | f31fcefd85cb69f0c7aa8922f3a579a86eac415d (patch) | |
tree | e0019fbf3383a2cc638d7d0365263c0ae9047347 /incria/src | |
parent | 3903363da90e127454365695364b778f45c59199 (diff) |
initial commit
Diffstat (limited to 'incria/src')
-rw-r--r-- | incria/src/deps.rs | 182 | ||||
-rw-r--r-- | incria/src/lib.rs | 7 | ||||
-rw-r--r-- | incria/src/mapping.rs | 25 | ||||
-rw-r--r-- | incria/src/thunk.rs | 104 |
4 files changed, 318 insertions, 0 deletions
diff --git a/incria/src/deps.rs b/incria/src/deps.rs new file mode 100644 index 0000000..84411c0 --- /dev/null +++ b/incria/src/deps.rs @@ -0,0 +1,182 @@ +//! The main code used for tracking dependencies (mostly) invisibly to the user. +//! +//! Each mapping that could be memoised is represented by a Node. Each Node can have one or more dependencies, forming a directed acyclic graph. +//! +//! [`self::add_dep`] should be called whenever any mapping is retrieved, to record the dependency. +//! +//! [`self::next_node_id`] and [`self::with_node_id`] should be used to create new nodes in the graph when a new mapping is being evaluated. +//! +//! [`self::mark_dirty`] should be called from outside the evaluation of any mapping. In many cases, this requires the dependency ID to be stored externally alongside the key, which can be accomplished with [`self::current_node_id`] +//! +//! [`self::is_dirty`] should be used when a mapping is retrieved to determine if it needs to be re-evaluated. If it does, [`self::clear`] should be used to reset the mapping's dependencies and dirty state. +use std::{ + collections::{HashMap, VecDeque}, + fmt::Write, + future::Future, +}; + +use once_cell::sync::OnceCell; +use parking_lot::Mutex; + +/// Identifier of a node, unique across a program run. +pub type NodeId = usize; + +tokio::task_local! { + /// The ID of the node in the dependency tree for the currently running computation + static NODE_ID: NodeId; +} + +/// Responsible for tracking dependencies. +/// There is one global instance of this, [`DEP_TRACKER`], with convenience functions for most of its functionality. +#[derive(Default)] +struct DepTracker { + /// Neighbour-List representation of dependency tree + deps: Mutex<HashMap<NodeId, NodeInfo>>, + + /// Next node ID to issue + next_node_id: Mutex<NodeId>, +} + +impl DepTracker { + /// See [`self::add_dep`] + fn add_dep(&self, dep: NodeId) { + self.deps + .lock() + .entry(NODE_ID.get()) + .and_modify(|v| v.deps.push(dep)) + .or_insert(NodeInfo { + deps: vec![dep], + dirty: false, + }); + } + + /// See [`self::next_node_id`] + fn next_node_id(&self) -> usize { + let mut lock = self.next_node_id.lock(); + *lock += 1; + *lock - 1 + } + + /// See [`self::mark_dirty`] + fn mark_dirty(&self, node: NodeId) { + let mut lock = self.deps.lock(); + let mut frontier = VecDeque::new(); + frontier.push_back(node); + while let Some(node_id) = frontier.pop_front() { + let node = match lock.get_mut(&node_id) { + Some(x) => x, + None => continue, + }; + + if node.dirty { + continue; + } + node.dirty = true; + + for (dependent, dependent_info) in lock.iter() { + if dependent_info.deps.contains(&node_id) { + frontier.push_back(*dependent); + } + } + } + } + + /// See [`self::is_dirty`] + fn is_dirty(&self, node: NodeId) -> bool { + self.deps + .lock() + .get(&node) + .map(|v| v.dirty) + .unwrap_or(false) + } + + /// See [`self::clear`] + fn clear(&self, node: NodeId) { + let mut lock = self.deps.lock(); + let node = match lock.get_mut(&node) { + Some(x) => x, + None => return, + }; + + node.dirty = false; + node.deps = vec![]; + } +} + +/// Info about a single dependency node +struct NodeInfo { + deps: Vec<NodeId>, + dirty: bool, +} + +/// The global instance of the dependency tracker +static DEP_TRACKER: OnceCell<DepTracker> = OnceCell::new(); +fn dep_tracker() -> &'static DepTracker { + DEP_TRACKER.get_or_init(DepTracker::default) +} + +/// Register a dependency for the current node +pub fn add_dep(dep: NodeId) { + dep_tracker().add_dep(dep); +} + +/// Get the next node id to use +pub fn next_node_id() -> NodeId { + dep_tracker().next_node_id() +} + +/// Get the ID of the current node +pub fn current_node_id() -> NodeId { + NODE_ID.get() +} + +/// Mark the given node and all nodes that depend on it 'dirty'. +/// +/// This should be used in conjunction with [`self::is_dirty`] to recompute values when necessary. +pub fn mark_dirty(dep: NodeId) { + dep_tracker().mark_dirty(dep) +} + +/// Check if the given node is dirty, indicating it should be recomputed. +pub fn is_dirty(dep: NodeId) -> bool { + dep_tracker().is_dirty(dep) +} + +/// Clear all information about a given node. +/// +/// This should be used when a node is recomputed, to ensure stale dependencies are removed. +pub fn clear(dep: NodeId) { + dep_tracker().clear(dep) +} + +/// Generate a graphviz representation of the current dependency graph, for debugging. +/// +/// Dirty nodes will be coloured grey. +pub fn dep_graphviz() -> String { + let mut out = String::new(); + let deps = dep_tracker().deps.lock(); + writeln!(&mut out, "digraph G {{").unwrap(); + for (id, info) in deps.iter() { + if *id == 0 { + writeln!(&mut out, "\t{} [label=Root]", id).unwrap(); + } else if info.dirty { + writeln!(&mut out, "\t{} [style=filled,color=lightgrey]", id).unwrap(); + } + for dep in info.deps.iter() { + writeln!(&mut out, "\t{} -> {}", id, dep).unwrap(); + } + } + + writeln!(&mut out, "}}").unwrap(); + + out +} + +/// Decorate the given future to have the given dependency ID. +pub async fn with_node_id<F>(dep: NodeId, f: F) -> F::Output +where + F: Future + Send, + F::Output: Send, +{ + NODE_ID.scope(dep, f).await +} diff --git a/incria/src/lib.rs b/incria/src/lib.rs new file mode 100644 index 0000000..2f52fbb --- /dev/null +++ b/incria/src/lib.rs @@ -0,0 +1,7 @@ +#![feature(async_fn_in_trait)] + +pub mod deps; +mod mapping; +pub mod thunk; + +pub use mapping::Mapper; diff --git a/incria/src/mapping.rs b/incria/src/mapping.rs new file mode 100644 index 0000000..6cfac67 --- /dev/null +++ b/incria/src/mapping.rs @@ -0,0 +1,25 @@ +use std::ops::Deref; + +/// A mapper of keys to values, that memoises the results and re-evaluates when necessary. +/// +/// This mapping can be: +/// * Storage or access of externally-modifiable values like files or resources shared with other threads (cells) +/// * Computations that are pure, except for their use of cells above (thunks) +/// +/// Mappers should be mainly responsible for tracking and re-performing new computations as appropriate, using the API in [`crate::deps`]. +pub trait Mapper { + /// The key or input type + type Key; + + /// The result + type Value; + + /// A wrapper around the result. + /// This allows things like read guards or Arcs to be returned, so long as they dereference to the correct type. + type Wrapper<'a>: Deref<Target = Self::Value> + 'a + where + Self::Value: 'a; + + /// Get a value by the given key, calculating it if necessary. + async fn get<'a>(&'a self, key: &Self::Key) -> Self::Wrapper<'a>; +} diff --git a/incria/src/thunk.rs b/incria/src/thunk.rs new file mode 100644 index 0000000..6aa44dd --- /dev/null +++ b/incria/src/thunk.rs @@ -0,0 +1,104 @@ +//! A mapper based on a function from keys to values. +use std::{collections::HashMap, hash::Hash, pin::Pin, sync::Arc}; + +use parking_lot::{MappedRwLockReadGuard, Mutex, RwLock, RwLockReadGuard}; +use tokio::sync::Notify; + +use crate::{ + deps::{self, NodeId}, + Mapper, +}; + +/// A mapping that lazily computes values with a given async thunk and memoizes them. +#[derive(Debug, Default)] +pub struct ThunkMapper<K, V, T> { + /// The thunk we use to compute values + thunk: T, + + /// Map of all values we've calculated already + calculated: RwLock<HashMap<K, (V, NodeId)>>, + + /// Values we're currently calculating + /// The Notify will be triggered when computation is done. + waiting: Mutex<HashMap<K, Arc<Notify>>>, +} + +impl<K: Clone + Eq + Hash + Send + Sync, V: Send + Sync, T: Thunk<K, V>> ThunkMapper<K, V, T> { + /// Create a new instance of the computing store. + pub fn new(thunk: T) -> Self { + Self { + thunk, + calculated: RwLock::default(), + waiting: Mutex::default(), + } + } +} + +impl<K: Clone + Eq + Hash + Send + Sync, V: Send + Sync, C: Thunk<K, V>> Mapper + for ThunkMapper<K, V, C> +{ + type Key = K; + type Value = V; + type Wrapper<'a> = MappedRwLockReadGuard<'a, V> where V: 'a; + + async fn get<'a>(&'a self, key: &Self::Key) -> Self::Wrapper<'a> { + // Attempt to reuse or evict the existing value + let mut reuse_dep_id = None; + { + let finished = self.calculated.read(); + if let Some((_, dep)) = finished.get(key) { + if !deps::is_dirty(*dep) { + deps::add_dep(*dep); + return RwLockReadGuard::map(finished, |hm| &hm.get(key).unwrap().0); + } else { + reuse_dep_id = Some(*dep); + drop(finished); + // Dirty, so we'll recompute below but we should remove it now + if self.calculated.write().remove(key).is_none() { + // Someone else already noticed it was dirty and removed it before us, so we need to deal with that + todo!("dirty value removed between us noticing and us doing something") + } + } + } + } + + let barrier = self.waiting.lock().get(key).cloned(); + if let Some(barrier) = barrier { + // Waiting for completion + barrier.notified().await; + + let val = RwLockReadGuard::map(self.calculated.read(), |hm| hm.get(key).unwrap()); + deps::add_dep(val.1); + + return MappedRwLockReadGuard::map(val, |inf| &inf.0); + } else { + // Needs calculated + let notify = Arc::new(Notify::new()); + self.waiting.lock().insert(key.clone(), notify.clone()); + + let dep = if let Some(x) = reuse_dep_id { + deps::clear(x); + x + } else { + deps::next_node_id() + }; + + let val = deps::with_node_id(dep, self.thunk.compute(key.clone())).await; + deps::add_dep(dep); + + self.calculated.write().insert(key.clone(), (val, dep)); + self.waiting.lock().remove(key); + + notify.notify_waiters(); + + return RwLockReadGuard::map(self.calculated.read(), |hm| &hm.get(key).unwrap().0); + } + } +} + +/// A function from keys to values. +/// +/// Should be pure, except for use of other mappings. This ensures recomputation is done when needed. +pub trait Thunk<K, V>: Send + 'static { + fn compute(&self, key: K) -> Pin<Box<dyn std::future::Future<Output = V> + Send + '_>>; +} |