use anyhow::Result; use fnv::FnvHashSet; use ipfs_embed::{ identity::ed25519::Keypair, Block, Cid, Config, DefaultParams, Ipfs, NetworkConfig, StorageConfig, }; use libipld::{cbor::DagCborCodec, multihash::Code, DagCbor}; use rand::{thread_rng, RngCore}; use std::time::{Duration, Instant}; use tracing_subscriber::fmt::format::FmtSpan; fn tracing_try_init() { tracing_subscriber::fmt() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .with_span_events(FmtSpan::ENTER | FmtSpan::CLOSE) .try_init() .ok(); } #[async_std::test] async fn gc() -> Result<()> { tracing_try_init(); let mut builder = DagBuilder::new().await?; let now = Instant::now(); for _ in 0..100 { builder.add_head()?; } println!("created dags in {}ms", now.elapsed().as_millis()); let now = Instant::now(); builder.check().await?; println!("checked dags in {}ms", now.elapsed().as_millis()); let heads = builder.heads(); println!(" {} heads", heads); for _ in 0..builder.heads() { let now = Instant::now(); builder.remove_head()?; builder.check().await?; println!("removed dag in {}ms", now.elapsed().as_millis()); } assert!(builder.ipfs.iter()?.next().is_none()); Ok(()) } #[derive(DagCbor)] struct Node { nonce: u64, children: Vec, } struct DagBuilder { ipfs: Ipfs, heads: FnvHashSet, } impl DagBuilder { async fn new() -> Result { let tmp = tempdir::TempDir::new("gc")?; let config = Config { storage: StorageConfig::new(Some(tmp.into_path()), None, 0, Duration::from_secs(1000)), network: NetworkConfig::new(Keypair::generate()), }; let ipfs = Ipfs::new(config).await?; Ok(Self { ipfs, heads: Default::default(), }) } fn heads(&self) -> usize { self.heads.len() } fn add_head(&mut self) -> Result<()> { let mut rng = thread_rng(); let n_children = if self.heads.is_empty() { 0 } else { rng.next_u32() as usize % self.heads.len() }; let n_children_rm = if n_children == 0 { 0 } else { rng.next_u32() as usize % n_children }; let nonce = rng.next_u64(); let mut children = Vec::with_capacity(n_children); children.extend(self.heads.iter().take(n_children)); let node = Node { nonce, children }; let block = Block::encode(DagCborCodec, Code::Blake3_256, &node)?; self.ipfs.alias(block.cid().to_bytes(), Some(block.cid()))?; let cid = *block.cid(); self.ipfs.insert(block)?; for cid in node.children.into_iter().take(n_children_rm) { self.ipfs.alias(cid.to_bytes(), None)?; self.heads.remove(&cid); } self.heads.insert(cid); Ok(()) } fn remove_head(&mut self) -> Result<()> { if let Some(cid) = self.heads.iter().next().copied() { self.ipfs.alias(cid.to_bytes(), None)?; self.heads.remove(&cid); } Ok(()) } async fn check(&self) -> Result<()> { let now = Instant::now(); self.ipfs.flush().await?; println!("flushed in {}ms", now.elapsed().as_millis()); let now = Instant::now(); self.ipfs.evict().await?; println!("gc in {}ms", now.elapsed().as_millis()); let now = Instant::now(); let mut live = FnvHashSet::default(); let mut stack = self.heads.clone(); while let Some(cid) = stack.iter().next().copied() { stack.remove(&cid); if live.contains(&cid) { continue; } self.ipfs.get(&cid)?.references(&mut stack)?; live.insert(cid); } println!("computed closure in {}ms", now.elapsed().as_millis()); Ok(()) } }