#![cfg(all(feature = "std", feature = "nom-adapters"))] use futures_core::Stream; use nom::{ bytes::{complete, streaming}, combinator::{map, opt}, error::ErrorKind, sequence::terminated, }; use pin_project_lite::pin_project; use std::{ path::{Path, PathBuf}, pin::Pin, task::{Context, Poll}, }; use streamparser::{ iter::IterSource, nom::{nom_finish, nom_stream}, read::{AsyncRead, Read, ReadSource}, utf8::Utf8Adapter, Parse, Parsed, StreamParser, Streaming, }; #[derive(Debug)] struct Error; struct WordsParser; impl<'a> Parse<'a> for WordsParser { type Input = &'a str; type Output = &'a str; type Error = (&'a str, ErrorKind); fn parse( &mut self, input: Self::Input, ) -> Result>, Self::Error> { nom_stream(map( terminated( streaming::take_till1(|c: char| c.is_whitespace()), streaming::take_while1(|c: char| c.is_whitespace()), ), Some, ))(input) } fn parse_eof(&mut self, input: Self::Input) -> Result, Self::Error> { nom_finish(terminated( opt(complete::take_till1(|b: char| b.is_whitespace())), complete::take_while(|c: char| c.is_whitespace()), ))(input) } } pin_project! { struct IterAdapter { #[pin] underlying: S, read_size: usize, } } impl IterAdapter { fn new(underlying: S, read_size: usize) -> Self { IterAdapter { underlying, read_size, } } } impl Iterator for IterAdapter where S: Read, { type Item = Result, S::Error>; fn next(&mut self) -> Option { let mut buf = Vec::with_capacity(self.read_size); buf.resize(self.read_size, 0); match self.underlying.read(&mut buf) { Ok(0) => None, Ok(read) => { buf.truncate(read); Some(Ok(buf)) } Err(err) => Some(Err(err)), } } } impl Stream for IterAdapter where S: AsyncRead, { type Item = Result, S::Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.project(); let mut buf = Vec::with_capacity(*this.read_size); buf.resize(*this.read_size, 0); match this.underlying.poll_read(cx, &mut buf) { Poll::Ready(Ok(0)) => Poll::Ready(None), Poll::Ready(Ok(read)) => { buf.truncate(read); Poll::Ready(Some(Ok(buf))) } Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), Poll::Pending => Poll::Pending, } } } fn test_file() -> PathBuf { let file = Path::new(file!()); file.parent().unwrap().join("test.txt") } fn expected_words() -> Vec { vec![ "some", "words", "in", "a", "file", "separated", "by", "whitespace", ] .into_iter() .map(|s| s.to_string()) .collect() } #[test] fn test_sync_read() { let file = std::fs::File::open(&test_file()).unwrap(); let mut parser = StreamParser::new(Utf8Adapter::new(WordsParser), ReadSource::new(file, 5)); let mut results = Vec::new(); while let Some(result) = parser.get().unwrap() { match result { Streaming::Item(word) => { results.push(word.to_string()); } Streaming::Incomplete => parser.advance().unwrap(), } } assert_eq!(results, expected_words()); } /*#[async_std::test] async fn test_async_read() { let file = async_std::fs::File::open(&test_file()).await.unwrap(); let mut parser = StreamParser::new_read(Utf8Adapter::new(WordsParser), file, 5); let mut results = Vec::new(); while let Some(result) = parser.get().unwrap() { match result { Streaming::Item(word) => { results.push(word.to_string()); } Streaming::Incomplete => parser.advance_async().await.unwrap(), } } assert_eq!(results, expected_words()); }*/ #[test] fn test_sync_iter() { let file = std::fs::File::open(&test_file()).unwrap(); let mut parser = StreamParser::new( Utf8Adapter::new(WordsParser), IterSource::new(IterAdapter::new(file, 5)), ); let mut results = Vec::new(); while let Some(result) = parser.get().unwrap() { match result { Streaming::Item(word) => { results.push(word.to_string()); } Streaming::Incomplete => parser.advance().unwrap(), } } assert_eq!(results, expected_words()); } /*#[async_std::test] async fn test_async_iter() { let file = async_std::fs::File::open(&test_file()).await.unwrap(); let mut parser = StreamParser::new_iter(Utf8Adapter::new(WordsParser), IterAdapter::new(file, 5)); let mut results = Vec::new(); while let Some(result) = parser.get().unwrap() { match result { Streaming::Item(word) => { results.push(word.to_string()); } Streaming::Incomplete => parser.advance_async().await.unwrap(), } } assert_eq!(results, expected_words()); }*/