//! Traits for implementing a block store. use crate::block::{create_cbor_block, decode_cbor, decode_ipld, validate}; use crate::cid::Cid; use crate::error::Result; use crate::gc::closure; use crate::hash::{CidHashMap, CidHashSet, Hash}; use crate::ipld::Ipld; use async_std::sync::RwLock; use async_trait::async_trait; use core::ops::Deref; use dag_cbor::{ReadCbor, WriteCbor}; use futures::join; use std::collections::HashMap; use std::mem; use std::path::Path; use std::sync::Arc; /// Implementable by ipld storage backends. #[async_trait] pub trait Store: Send + Sync { /// Returns the block with cid. async fn read(&self, cid: &Cid) -> Result<Option<Box<[u8]>>>; /// Writes the block with cid. async fn write(&self, cid: &Cid, data: Box<[u8]>) -> Result<()>; /// Flushes the write buffer. async fn flush(&self) -> Result<()>; /// Gc's unused blocks. async fn gc(&self) -> Result<()>; /// Pin a block. async fn pin(&self, cid: &Cid) -> Result<()>; /// Unpin a block. async fn unpin(&self, cid: &Cid) -> Result<()>; /// Create an indirect user managed pin. async fn autopin(&self, cid: &Cid, auto_path: &Path) -> Result<()>; /// Write a link to a block. async fn write_link(&self, label: &str, cid: &Cid) -> Result<()>; /// Read a link to a block. async fn read_link(&self, label: &str) -> Result<Option<Cid>>; /// Remove link to a block. async fn remove_link(&self, label: &str) -> Result<()>; } #[async_trait] impl<TStore: Store> Store for Arc<TStore> { async fn read(&self, cid: &Cid) -> Result<Option<Box<[u8]>>> { self.deref().read(cid).await } async fn write(&self, cid: &Cid, data: Box<[u8]>) -> Result<()> { self.deref().write(cid, data).await } async fn flush(&self) -> Result<()> { self.deref().flush().await } async fn gc(&self) -> Result<()> { self.deref().gc().await } async fn pin(&self, cid: &Cid) -> Result<()> { self.deref().pin(cid).await } async fn unpin(&self, cid: &Cid) -> Result<()> { self.deref().unpin(cid).await } async fn autopin(&self, cid: &Cid, auto_path: &Path) -> Result<()> { self.deref().autopin(cid, auto_path).await } async fn write_link(&self, label: &str, cid: &Cid) -> Result<()> { self.deref().write_link(label, cid).await } async fn read_link(&self, label: &str) -> Result<Option<Cid>> { self.deref().read_link(label).await } async fn remove_link(&self, label: &str) -> Result<()> { self.deref().remove_link(label).await } } /// A store wrapper for debugging. pub struct DebugStore<TStore: Store> { prefix: &'static str, store: TStore, } fn print_cid(cid: &Cid) -> String { (&cid.to_string()[..30]).to_string() } impl<TStore: Store> DebugStore<TStore> { /// Creates a new debug store. pub fn new(store: TStore) -> Self { Self::new_with_prefix(store, "") } /// Creates a new debug store. pub fn new_with_prefix(store: TStore, prefix: &'static str) -> Self { Self { store, prefix } } } #[async_trait] impl<TStore: Store> Store for DebugStore<TStore> { async fn read(&self, cid: &Cid) -> Result<Option<Box<[u8]>>> { let res = self.store.read(cid).await?; println!( "{}read {} {:?}", self.prefix, print_cid(cid), res.as_ref().map(|d| d.len()) ); Ok(res) } async fn write(&self, cid: &Cid, data: Box<[u8]>) -> Result<()> { println!("{}write {} {}", self.prefix, print_cid(cid), data.len()); self.store.write(cid, data).await } async fn flush(&self) -> Result<()> { println!("{}flush", self.prefix); self.store.flush().await } async fn gc(&self) -> Result<()> { println!("{}gc", self.prefix); self.store.gc().await } async fn pin(&self, cid: &Cid) -> Result<()> { println!("{}pin {}", self.prefix, print_cid(cid)); self.store.pin(cid).await } async fn unpin(&self, cid: &Cid) -> Result<()> { println!("{}unpin {}", self.prefix, print_cid(cid)); self.store.unpin(cid).await } async fn autopin(&self, cid: &Cid, auto_path: &Path) -> Result<()> { println!("{}autopin {}", self.prefix, print_cid(cid)); self.store.autopin(cid, auto_path).await } async fn write_link(&self, label: &str, cid: &Cid) -> Result<()> { println!("{}write_link {} {}", self.prefix, label, print_cid(cid)); self.store.write_link(label, cid).await } async fn read_link(&self, label: &str) -> Result<Option<Cid>> { let res = self.store.read_link(label).await?; println!( "{}read_link {} {:?}", self.prefix, label, res.as_ref().map(print_cid) ); Ok(res) } async fn remove_link(&self, label: &str) -> Result<()> { println!("{}remove_link {}", self.prefix, label); self.store.remove_link(label).await } } /// Ipld extension trait. #[async_trait] pub trait StoreIpldExt { /// Reads the block with cid and decodes it to ipld. async fn read_ipld(&self, cid: &Cid) -> Result<Option<Ipld>>; } #[async_trait] impl<T: Store> StoreIpldExt for T { async fn read_ipld(&self, cid: &Cid) -> Result<Option<Ipld>> { if let Some(data) = self.read(cid).await? { let ipld = decode_ipld(cid, &data).await?; return Ok(Some(ipld)); } Ok(None) } } /// Cbor extension trait. #[async_trait] pub trait StoreCborExt { /// Reads the block with cid and decodes it to cbor. async fn read_cbor<C: ReadCbor + Send>(&self, cid: &Cid) -> Result<Option<C>>; /// Writes a block using the cbor codec. async fn write_cbor<H: Hash, C: WriteCbor + Send + Sync>(&self, c: &C) -> Result<Cid>; } #[async_trait] impl<T: Store> StoreCborExt for T { async fn read_cbor<C: ReadCbor + Send>(&self, cid: &Cid) -> Result<Option<C>> { if let Some(data) = self.read(cid).await? { let cbor = decode_cbor::<C>(cid, &data).await?; return Ok(Some(cbor)); } Ok(None) } async fn write_cbor<H: Hash, C: WriteCbor + Send + Sync>(&self, c: &C) -> Result<Cid> { let (cid, data) = create_cbor_block::<H, C>(c).await?; self.write(&cid, data).await?; Ok(cid) } } /// A memory backed store #[derive(Default)] pub struct MemStore { blocks: RwLock<CidHashMap<Box<[u8]>>>, pins: RwLock<CidHashSet>, links: RwLock<HashMap<String, Cid>>, } #[async_trait] impl Store for MemStore { async fn read(&self, cid: &Cid) -> Result<Option<Box<[u8]>>> { Ok(self.blocks.read().await.get(cid).cloned()) } async fn write(&self, cid: &Cid, data: Box<[u8]>) -> Result<()> { self.blocks.write().await.insert(cid.clone(), data); Ok(()) } async fn flush(&self) -> Result<()> { Ok(()) } async fn gc(&self) -> Result<()> { let pins = self.pins.read().await; let roots = pins.iter().map(Clone::clone).collect(); let blocks = self .blocks .read() .await .iter() .map(|(cid, _)| cid.clone()) .collect(); let dead = crate::gc::dead_paths(self, blocks, roots).await?; for cid in dead { self.blocks.write().await.remove(&cid); } Ok(()) } async fn pin(&self, cid: &Cid) -> Result<()> { self.pins.write().await.insert(cid.clone()); Ok(()) } async fn unpin(&self, cid: &Cid) -> Result<()> { self.pins.write().await.remove(&cid); Ok(()) } async fn autopin(&self, cid: &Cid, _: &Path) -> Result<()> { self.pin(cid).await } async fn write_link(&self, link: &str, cid: &Cid) -> Result<()> { self.links .write() .await .insert(link.to_string(), cid.clone()); Ok(()) } async fn read_link(&self, link: &str) -> Result<Option<Cid>> { Ok(self.links.read().await.get(link).cloned()) } async fn remove_link(&self, link: &str) -> Result<()> { self.links.write().await.remove(link); Ok(()) } } /// A buffered store. pub struct BufStore<TStore: Store = MemStore> { store: TStore, cache: RwLock<CidHashMap<Box<[u8]>>>, buffer: RwLock<CidHashMap<Box<[u8]>>>, pins: RwLock<CidHashMap<PinOp>>, } enum PinOp { Pin, Unpin, } impl<TStore: Store> BufStore<TStore> { /// Creates a new block store. pub fn new(store: TStore, _cache_cap: usize, _buffer_cap: usize) -> Self { Self { store, cache: Default::default(), buffer: Default::default(), pins: Default::default(), } } } #[async_trait] impl<TStore: Store> Store for BufStore<TStore> { async fn read(&self, cid: &Cid) -> Result<Option<Box<[u8]>>> { let cached = self.cache.read().await.get(cid).cloned(); if let Some(data) = cached { return Ok(Some(data)); } let fresh = self.store.read(cid).await?; if let Some(ref data) = fresh { validate(cid, &data)?; self.cache.write().await.insert(cid.clone(), data.clone()); } Ok(fresh) } async fn write(&self, cid: &Cid, data: Box<[u8]>) -> Result<()> { self.cache.write().await.insert(cid.clone(), data.clone()); self.buffer.write().await.insert(cid.clone(), data); Ok(()) } async fn flush(&self) -> Result<()> { let (pins, buffer) = { let (mut pins, mut buffer) = join!(self.pins.write(), self.buffer.write()); let pins = mem::replace(&mut *pins, Default::default()); let buffer = mem::replace(&mut *buffer, Default::default()); (pins, buffer) }; let mut buffer_pins: CidHashSet = Default::default(); for (cid, op) in pins.into_iter() { if buffer.contains_key(&cid) { if let PinOp::Pin = op { buffer_pins.insert(cid); } } else { match op { PinOp::Pin => self.store.pin(&cid).await?, PinOp::Unpin => self.store.unpin(&cid).await?, } } } let live = closure(self, buffer_pins.clone()).await?; for (cid, data) in buffer { if live.contains(&cid) { self.store.write(&cid, data).await?; } if buffer_pins.contains(&cid) { self.store.pin(&cid).await?; } } self.store.flush().await?; Ok(()) } async fn gc(&self) -> Result<()> { self.store.gc().await } async fn pin(&self, cid: &Cid) -> Result<()> { self.pins.write().await.insert(cid.clone(), PinOp::Pin); Ok(()) } async fn unpin(&self, cid: &Cid) -> Result<()> { self.pins.write().await.insert(cid.clone(), PinOp::Unpin); Ok(()) } async fn autopin(&self, cid: &Cid, auto_path: &Path) -> Result<()> { self.store.autopin(cid, auto_path).await } async fn write_link(&self, label: &str, cid: &Cid) -> Result<()> { self.store.write_link(label, cid).await } async fn read_link(&self, label: &str) -> Result<Option<Cid>> { self.store.read_link(label).await } async fn remove_link(&self, label: &str) -> Result<()> { self.store.remove_link(label).await } } #[cfg(test)] mod tests { use super::*; use crate::block::{create_cbor_block, create_raw_block}; use crate::DefaultHash as H; use async_std::task; use core::future::Future; use model::*; fn create_block_raw(n: usize) -> (Cid, Box<[u8]>) { let data = n.to_ne_bytes().to_vec().into_boxed_slice(); create_raw_block::<H>(data).unwrap() } async fn create_block_ipld(ipld: &Ipld) -> (Cid, Box<[u8]>) { create_cbor_block::<H, _>(ipld).await.unwrap() } #[test] fn test_obj() { let store = MemStore::default(); let _ = &store as &dyn Store; let store = Arc::new(store); let _ = &store as &dyn Store; let store = MemStore::default(); let store = BufStore::new(store, 16, 16); let _ = &store as &dyn Store; let store = Arc::new(store); let _ = &store as &dyn Store; } async fn run_gc_no_pin(store: &dyn Store) { let (cid_0, data_0) = create_block_raw(0); store.write(&cid_0, data_0).await.unwrap(); store.flush().await.unwrap(); store.gc().await.unwrap(); let data_0_2 = store.read(&cid_0).await.unwrap(); assert!(data_0_2.is_none()); } #[test] fn test_gc_no_pin() { let store = MemStore::default(); task::block_on(run_gc_no_pin(&store)); } async fn run_gc_pin(store: &dyn Store) { let (cid_0, data_0) = create_block_raw(0); store.write(&cid_0, data_0.clone()).await.unwrap(); store.pin(&cid_0).await.unwrap(); store.flush().await.unwrap(); store.gc().await.unwrap(); let data_0_2 = store.read(&cid_0).await.unwrap(); assert_eq!(data_0_2, Some(data_0)); } #[test] fn test_gc_pin() { let store = MemStore::default(); task::block_on(run_gc_pin(&store)); } async fn run_gc_pin_leaf(store: &dyn Store) { let (cid_0, data_0) = create_block_raw(0); let ipld = Ipld::Link(cid_0.clone()); let (cid_1, data_1) = create_block_ipld(&ipld).await; store.write(&cid_0, data_0.clone()).await.unwrap(); store.write(&cid_1, data_1.clone()).await.unwrap(); store.pin(&cid_1).await.unwrap(); store.flush().await.unwrap(); store.gc().await.unwrap(); let data_0_2 = store.read(&cid_0).await.unwrap(); assert_eq!(data_0_2, Some(data_0)); } #[test] fn test_gc_pin_leaf() { let store = MemStore::default(); task::block_on(run_gc_pin_leaf(&store)); } fn join<T>(f1: impl Future<Output = Result<T>>, f2: impl Future<Output = Result<T>>) -> (T, T) { task::block_on(async { let f1_u = async { f1.await.unwrap() }; let f2_u = async { f2.await.unwrap() }; join!(f1_u, f2_u) }) } #[test] fn mem_buf_store_eqv() { const LEN: usize = 4; let blocks: Vec<_> = (0..LEN).into_iter().map(create_block_raw).collect(); model! { Model => let mem_store = MemStore::default(), Implementation => let buf_store = BufStore::new(MemStore::default(), 16, 16), Read(usize)(i in 0..LEN) => { let (cid, _) = &blocks[i]; let mem = mem_store.read(cid); let buf = buf_store.read(cid); let (mem, buf) = join(mem, buf); // Element can be in cache after gc. if !(mem.is_none() && buf.is_some()) { assert_eq!(mem, buf); } }, Write(usize)(i in 0..LEN) => { let (cid, data) = &blocks[i]; let mem = mem_store.write(cid, data.clone()); let buf = buf_store.write(cid, data.clone()); join(mem, buf); }, Flush(usize)(_ in 0..LEN) => { let mem = mem_store.flush(); let buf = buf_store.flush(); join(mem, buf); }, Gc(usize)(_ in 0..LEN) => { let mem = mem_store.gc(); let buf = buf_store.gc(); join(mem, buf); }, Pin(usize)(i in 0..LEN) => { let (cid, _) = &blocks[i]; let mem = mem_store.pin(&cid); let buf = buf_store.pin(&cid); join(mem, buf); }, Unpin(usize)(i in 0..LEN) => { let (cid, _) = &blocks[i]; let mem = mem_store.unpin(&cid); let buf = buf_store.unpin(&cid); join(mem, buf); } } } macro_rules! linearizable_store { ($store:expr) => { const LEN: usize = 4; let blocks: Vec<_> = (0..LEN).into_iter().map(create_block_raw).collect(); let blocks = Shared::new(blocks); const LLEN: usize = 3; let links = Shared::new(["a", "b", "c"]); linearizable! { Implementation => let store = model::Shared::new($store), Read(usize)(i in 0..LEN) -> Option<Box<[u8]>> { let (cid, _) = &blocks[i]; task::block_on(store.read(cid)).unwrap() }, Write(usize)(i in 0..LEN) -> () { let (cid, data) = &blocks[i]; task::block_on(store.write(cid, data.clone())).unwrap() }, Flush(usize)(_ in 0..LEN) -> () { task::block_on(store.flush()).unwrap() }, Gc(usize)(_ in 0..LEN) -> () { task::block_on(store.gc()).unwrap() }, Pin(usize)(i in 0..LEN) -> () { let (cid, _) = &blocks[i]; task::block_on(store.pin(cid)).unwrap() }, Unpin(usize)(i in 0..LEN) -> () { let (cid, _) = &blocks[i]; task::block_on(store.unpin(cid)).unwrap() }, WriteLink((usize, usize))((i1, i2) in (0..LLEN, 0..LEN)) -> () { let link = &links[i1]; let (cid, _) = &blocks[i2]; task::block_on(store.write_link(link, cid)).unwrap() }, ReadLink(usize)(i in 0..LLEN) -> Option<Cid> { let link = &links[i]; task::block_on(store.read_link(link)).unwrap() }, RemoveLink(usize)(i in 0..LLEN) -> () { let link = &links[i]; task::block_on(store.remove_link(link)).unwrap() } } }; } #[test] fn mem_store_lin() { linearizable_store!(MemStore::default()); } #[test] fn buf_store_lin() { linearizable_store!(BufStore::new(MemStore::default(), 16, 16)); } }