//! Small wrapper crate for `ipfs_api`. //! Mostly exists because I do not want any part of the `failure` crate to appear in my code. //! //! Quick example: //! ```no_run //! # use mfs::Mfs; //! # use std::path::Path; //! # async { //! let mfs = Mfs::new("http://127.0.0.1:5001").unwrap(); //! mfs.put(Path::new("/demo"), futures::io::Cursor::new("Hello ipfs files")).await.ok(); //! # }; //! ``` //! //! The relevant functions are exposted through the main `Mfs` struct. use futures::stream::StreamExt; use futures_util::io::AsyncReadExt; use ipfs_api::request::*; use ipfs_api::response::Error as IpfsApiError; use ipfs_api::response::FilesStatResponse; use ipfs_api::IpfsClient; use std::future::Future; use std::io::Cursor; use std::path::{Path, PathBuf}; use std::pin::Pin; use thiserror::Error as ThisError; #[derive(ThisError, Debug)] pub enum MfsError { #[error("mfs: {msg}")] OpError { msg: String, #[source] source: failure::Compat, }, #[error("mfs: {msg}")] LayeredError { msg: String, #[source] source: Box, }, #[error("mfs: could not read non-ipfs input")] InputError { #[source] source: std::io::Error, }, } type MfsResult = Result; trait OpText { fn with_context(self, f: F) -> Result where C: std::fmt::Display + Send + Sync + 'static, F: FnOnce() -> C; } impl OpText for std::result::Result { fn with_context(self, f: F) -> Result where C: std::fmt::Display + Send + Sync + 'static, F: FnOnce() -> C, { match self { Ok(ok) => Ok(ok), Err(e) => Err(MfsError::OpError { msg: f().to_string(), source: failure::Fail::compat(e), }), } } } impl OpText for std::result::Result { fn with_context(self, f: F) -> Result where C: std::fmt::Display + Send + Sync + 'static, F: FnOnce() -> C, { match self { Ok(ok) => Ok(ok), Err(e) => Err(MfsError::LayeredError { msg: f().to_string(), source: Box::new(e), }), } } } trait Unpath { fn unpath(&self) -> &str; } impl Unpath for T where T: AsRef, { fn unpath(&self) -> &str { self.as_ref().to_str().unwrap() } } /// The main struct /// /// All of its functions will panic if the passed paths are not valid unicode. #[derive(Clone)] pub struct Mfs { ipfs: IpfsClient, /// Default hash function to use for write / create operations. pub hash_default: String, /// Default cid version to request on write / create operations. Defaults to `1`. pub cid_default: i32, /// Whether to write files with `raw-leaves` pub raw_leaves_default: bool, } impl Mfs { pub fn new(api: &str) -> Result { Ok(Mfs { ipfs: ipfs_api::TryFromUri::from_str(api)?, hash_default: "sha2-256".to_owned(), cid_default: 1, raw_leaves_default: true, }) } /// Remove file or folder (possibly non-empty) pub async fn rm_r>(&self, p: P) -> MfsResult<()> { Ok(self .ipfs .files_rm(p.unpath(), true) .await .with_context(|| format!("rm -r {:?}", p.as_ref()))?) } /// Remove file pub async fn rm>(&self, p: P) -> MfsResult<()> { Ok(self .ipfs .files_rm(p.unpath(), false) .await .with_context(|| format!("rm {:?}", p.as_ref()))?) } /// Create directory `p` and parents as needed. pub async fn mkdirs>(&self, p: P) -> MfsResult<()> { Ok(self .ipfs .files_mkdir_with_options( FilesMkdir::builder() .path(p.unpath()) .parents(true) .cid_version(self.cid_default) .hash(&self.hash_default) .build(), ) .await .with_context(|| format!("mkdir -p {:?}", p.as_ref()))?) } /// Create directory `p`. Requires that its parent already exist. pub async fn mkdir>(&self, p: P) -> MfsResult<()> { Ok(self .ipfs .files_mkdir_with_options( FilesMkdir::builder() .path(p.unpath()) .parents(false) .cid_version(self.cid_default) .hash(&self.hash_default) .build(), ) .await .with_context(|| format!("mkdir {:?}", p.as_ref()))?) } /// Rename / move `s` to `d`. /// pub async fn mv, PD: AsRef>(&self, s: PS, d: PD) -> MfsResult<()> { Ok(self .ipfs .files_mv(s.unpath(), d.unpath()) .await .with_context(|| format!("mv {:?} {:?}", s.as_ref(), d.as_ref()))?) } /// Copy path `s` to path `d`. Beware of `s` starting with `/ipfs` or `/ipns`. pub async fn cp, PD: AsRef>(&self, s: PS, d: PD) -> MfsResult<()> { Ok(self .ipfs .files_cp(s.unpath(), d.unpath()) .await .with_context(|| format!("cp {:?} {:?}", s.as_ref(), d.as_ref()))?) } /// List files in folder pub async fn ls>(&self, p: P) -> MfsResult> { Ok(self .ipfs .files_ls(Some(p.unpath())) .await .with_context(|| format!("ls {:?}", p.as_ref()))? .entries .into_iter() .map(|e| e.name.into()) .collect()) } /// Read file at `s`. pub fn get<'a, P: AsRef>( &self, s: P, ) -> impl futures_core::stream::Stream> { self.ipfs .files_read(s.unpath()) .map(move |e| Ok(e.with_context(|| format!("reading {:?}", s.as_ref()))?)) } /// Read file at `s` into in-memory buffer. pub async fn get_fully>(&self, s: P) -> MfsResult> { use futures_util::stream::TryStreamExt; self.get(s).map_ok(|chunk| chunk.to_vec()).try_concat().await // Optimally, I'd have a version of this that returns a Read or similar… } /// Flush folder pub async fn flush>(&self, p: P) -> MfsResult<()> { Ok(self .ipfs .files_flush(Some(p.unpath())) .await .with_context(|| format!("flush {:?}", p.as_ref()))?) } /// Write file to `d`. /// /// Internally buffers chunks of 8 MiB and writes them with separate calls to IPFS /// due to limitations of `multipart`. pub async fn put, R: 'static + futures::AsyncRead + Send + Sync + Unpin>( &self, d: P, mut data: R, ) -> MfsResult<()> { let d = d.unpath(); let mut firstwrite = true; let mut total = 0; let mut finished = false; let mut pending: Option> + Send>>> = None; while !finished { let mut buf = Vec::new(); buf.resize(1 << 23, 0); let mut offset = 0; while offset < buf.len() && !finished { let read = data .read(&mut buf[offset..]) .await .map_err(|e| MfsError::InputError { source: e })?; offset += read; finished = read == 0; } buf.truncate(offset); if offset == 0 && !firstwrite { break; } let req = self.ipfs.files_write_with_options( FilesWrite::builder() .path(d) .create(firstwrite) .truncate(firstwrite) .parents(true) .offset(total as i64) .count(offset as i64) .raw_leaves(self.raw_leaves_default) .cid_version(self.cid_default) .hash(&self.hash_default) .flush(true) .build(), Cursor::new(buf), ); if let Some(req) = pending { req.await.with_context(|| format!("write {:?}: chunk", d))?; } pending = Some(Box::pin(req)); total += offset; firstwrite = false; } if let Some(req) = pending { req.await.with_context(|| format!("write {:?}: final chunk", d))?; } let stat = self.stat(d).await.with_context(|| format!("write {:?}: confirm", d))?; if stat.map(|stat| stat.size != total as u64).unwrap_or(false) { self.rm(d).await.ok(); todo!( "write {:?}: read/write sizes do not match - lost bytes :( TODO: don't panic", d ); } Ok(()) } /// Request hash, type, sizes, and block count of `p`. Returns `None` if the file does not exist pub async fn stat>(&self, p: P) -> MfsResult> { match self.ipfs.files_stat(p.unpath()).await { Ok(r) => return Ok(Some(r)), Err(ipfs_api::response::Error::Api(ipfs_api::response::ApiError { code: 0, .. })) => return Ok(None), e @ Err(_) => e.with_context(|| format!("stat {:?}", p.as_ref()))?, }; unreachable!(""); } } #[cfg(test)] mod tests { use super::*; #[tokio::test] #[ignore] async fn eierlegendewollmilchsau() { let mfs = Mfs::new("http://127.0.0.1:5001").expect("create client"); let basedir = Path::new("/test-rust-ipfs-mfs"); if mfs .stat(basedir) .await .expect("Test preparation: check existing files at test path") .is_some() { mfs.rm_r(basedir) .await .expect("Test preparation: remove already existing test files"); } mfs.mkdir(basedir) .await .expect("Test preparation: make working directory"); mfs.mkdir(basedir.join("a/b")) .await .err() .expect("mkdir does not create parents"); mfs.mkdirs(basedir.join("a/b")).await.expect("mkdirs creates parents"); let stat1 = mfs .stat(basedir) .await .expect("Statting working directory") .expect("Working directory exists"); mfs.cp(basedir, basedir.join("a/c")).await.expect("cp succeeds"); assert_eq!( mfs.stat(basedir.join("a/c")).await.unwrap().unwrap().hash, stat1.hash, "After cp is before cp (the hash)" ); mfs.mv(basedir.join("a/b"), basedir.join("a/d")) .await .expect("mv succeeds"); let mut ls1 = mfs.ls(basedir.join("a")).await.expect("Listing a"); ls1.sort(); assert_eq!( vec![PathBuf::from("c"), PathBuf::from("d")], ls1, "Directory listing matches expected" ); for size in vec![0 as usize, 10, 1, 8 << 20, 9 << 20] { let f = &basedir.join("f"); let data = (0..size).map(|_| rand::random::()).collect::>(); mfs.put(f, futures::io::Cursor::new(data.clone())) .await .expect(&format!("Write file of size {}", size)); let redata = mfs.get_fully(f).await.expect("Read file"); assert_eq!(data.len(), redata.len(), "Read size matches written size"); assert_eq!(data, redata, "Read matches written"); } mfs.rm_r(basedir).await.expect("cleanup"); } // Compile only test #[allow(dead_code)] fn good_little_future() { fn good(_: T) {} let mfs = Mfs::new("http://127.0.0.1:5001").expect("create client"); let a = Path::new(""); let b = a; let dat = futures::io::Cursor::new("asdf".as_bytes()); good(mfs.put(a, dat)); good(mfs.cp(a, b)); good(mfs.mv(a, b)); good(mfs.get(a)); good(mfs.ls(a)); good(mfs.mkdir(a)); good(mfs.mkdirs(a)); good(mfs.get_fully(a)); } }