diff --git a/Cargo.toml b/Cargo.toml index cf43635..76c2c20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,11 +7,15 @@ description = "BufReader adapter for nom parsers" license = "MIT" keywords = ["parser", "parser-combinators", "parsing", "streaming", "async"] categories = ["parsing"] +repository = "https://github.com/rust-bakery/nom-bufreader" +readme = "README.md" +documentation = "https://docs.rs/nom-bufreader" [dependencies] nom = "7.0.0" async-trait = { version = "0.1.51", optional = true } -futures = { version = "0.3.16", optional = true } +futures-io = { version = "0.3.16", default-features = false, optional = true } +futures-util = { version = "0.3.16", default-features = false, features = ["io"], optional = true } pin-project-lite = { version = "0.2.7", optional = true } [dev-dependencies] @@ -21,4 +25,4 @@ tokio-util = { version = "0.6.7", features = ["compat"] } [features] default = ["async"] -async = ["futures", "async-trait", "pin-project-lite"] +async = ["futures-io", "futures-util", "async-trait", "pin-project-lite"] diff --git a/examples/async_std_http.rs b/examples/async_std_http.rs index dd15fbe..fee120d 100644 --- a/examples/async_std_http.rs +++ b/examples/async_std_http.rs @@ -1,3 +1,4 @@ +use futures_util::io::BufReader as IoBufReader; use nom::{ branch::alt, bytes::streaming::{tag, take_until}, @@ -27,7 +28,7 @@ fn space(i: &[u8]) -> IResult<&[u8], (), ()> { #[async_std::main] async fn main() -> Result<(), Error<()>> { let listener = async_std::net::TcpListener::bind("127.0.0.1:8080").await?; - let mut i = BufReader::new(listener.accept().await?.0); + let mut i = BufReader::new(IoBufReader::new(listener.accept().await?.0)); let m = i.parse(method).await?; let _ = i.parse(space).await?; diff --git a/examples/tokio_http.rs b/examples/tokio_http.rs index 194996e..e7bc330 100644 --- a/examples/tokio_http.rs +++ b/examples/tokio_http.rs @@ -1,3 +1,4 @@ +use futures_util::io::BufReader as IoBufReader; use nom::{ branch::alt, bytes::streaming::{tag, take_until}, @@ -28,7 +29,7 @@ fn space(i: &[u8]) -> IResult<&[u8], (), ()> { #[tokio::main] async fn main() -> Result<(), Error<()>> { let listener = tokio::net::TcpListener::bind("127.0.0.1:8080").await?; - let mut i = BufReader::new(listener.accept().await?.0.compat()); + let mut i = BufReader::new(IoBufReader::new(listener.accept().await?.0.compat())); let m = i.parse(method).await?; let _ = i.parse(space).await?; diff --git a/src/async_bufreader.rs b/src/async_bufreader.rs index add34a7..a6afb60 100644 --- a/src/async_bufreader.rs +++ b/src/async_bufreader.rs @@ -1,11 +1,13 @@ use super::bufreader::DEFAULT_BUF_SIZE; -use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom}; -use futures::ready; -use futures::task::{Context, Poll}; +use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom}; +use futures_util::ready; +use futures_util::AsyncReadExt; use pin_project_lite::pin_project; -use std::io::{self, Read}; +use std::collections::VecDeque; +use std::fmt; +use std::io; use std::pin::Pin; -use std::{cmp, fmt}; +use std::task::{Context, Poll}; pin_project! { /// The `BufReader` struct adds buffering to any reader. @@ -24,22 +26,17 @@ pin_project! { /// discarded. Creating multiple instances of a `BufReader` on the same /// stream can cause data loss. /// - /// **Note: this is a fork from `std::io::BufReader` that reads more data in - /// `fill_buf` even if there is already some data in the buffer** - /// /// [`AsyncRead`]: futures_io::AsyncRead /// // TODO: Examples pub struct BufReader { #[pin] inner: R, - buffer: Vec, - pos: usize, - cap: usize, + buffer: VecDeque, } } -impl BufReader { +impl BufReader { /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, /// but may change in the future. pub fn new(inner: R) -> Self { @@ -48,13 +45,8 @@ impl BufReader { /// Creates a new `BufReader` with the specified buffer capacity. pub fn with_capacity(capacity: usize, inner: R) -> Self { - let buffer = vec![0; capacity]; - Self { - inner, - buffer, - pos: 0, - cap: 0, - } + let buffer = VecDeque::with_capacity(capacity); + Self { inner, buffer } } /// Acquires a reference to the underlying sink or stream that this combinator is @@ -92,53 +84,74 @@ impl BufReader { /// Returns a reference to the internally buffered data. /// /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty. - pub fn buffer(&self) -> &[u8] { - &self.buffer[self.pos..self.cap] + pub fn buffer(&mut self) -> &[u8] { + self.buffer.make_contiguous() } - /// Invalidates all data in the internal buffer. - #[inline] - fn discard_buffer(self: Pin<&mut Self>) { - let this = self.project(); - *this.pos = 0; - *this.cap = 0; + pub fn consume(&mut self, nread: usize) { + for _ in 0..nread { + self.buffer.pop_front(); + } + } + + pub async fn my_fill_buf( + &mut self, + nread: Option, + ) -> Result { + let mut buf = match nread { + Some(x) => vec![0; x.get().next_power_of_two()], + None => vec![0; 4096], + }; + + let i = self.inner.read(&mut buf).await?; + buf.truncate(i); + + self.buffer.append(&mut buf.into()); + + Ok(i) } } -impl AsyncRead for BufReader { +impl AsyncRead for BufReader { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { + let this = self.project(); // If we don't have any buffered data and we're doing a massive read // (larger than our internal buffer), bypass our internal buffer // entirely. - if self.pos == self.cap && buf.len() >= self.buffer.len() { - let res = ready!(self.as_mut().project().inner.poll_read(cx, buf)); - self.discard_buffer(); + if buf.len() >= this.buffer.len() { + let res = ready!(this.inner.poll_read(cx, buf)); + this.buffer.clear(); return Poll::Ready(res); } - let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; - let nread = rem.read(buf)?; - self.consume(nread); + let mut rem = ready!(this.inner.poll_fill_buf(cx))?; + let nread = std::io::Read::read(&mut rem, buf)?; + for _ in 0..nread { + this.buffer.pop_front(); + } Poll::Ready(Ok(nread)) } fn poll_read_vectored( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll> { + let this = self.project(); let total_len = bufs.iter().map(|b| b.len()).sum::(); - if self.pos == self.cap && total_len >= self.buffer.len() { - let res = ready!(self.as_mut().project().inner.poll_read_vectored(cx, bufs)); - self.discard_buffer(); + if total_len >= this.buffer.len() { + let res = ready!(this.inner.poll_read_vectored(cx, bufs)); + this.buffer.clear(); return Poll::Ready(res); } - let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; - let nread = rem.read_vectored(bufs)?; - self.consume(nread); + let mut rem = ready!(this.inner.poll_fill_buf(cx))?; + let nread = std::io::Read::read_vectored(&mut rem, bufs)?; + for _ in 0..nread { + this.buffer.pop_front(); + } Poll::Ready(Ok(nread)) } @@ -149,39 +162,6 @@ impl AsyncRead for BufReader { } } -impl AsyncBufRead for BufReader { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - - if *this.cap == this.buffer.len() { - if *this.pos == 0 { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::Interrupted, - "buffer completely filled", - ))); - } else { - // reset buffer position - if *this.cap - *this.pos > 0 { - for i in 0..(*this.cap - *this.pos) { - this.buffer[i] = this.buffer[*this.pos + i]; - } - } - *this.cap = *this.cap - *this.pos; - *this.pos = 0; - } - } - - let read = ready!(this.inner.poll_read(cx, this.buffer))?; - *this.cap += read; - - Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap])) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - *self.project().pos = cmp::min(self.pos + amt, self.cap); - } -} - impl AsyncWrite for BufReader { fn poll_write( self: core::pin::Pin<&mut Self>, @@ -215,10 +195,7 @@ impl fmt::Debug for BufReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BufReader") .field("reader", &self.inner) - .field( - "buffer", - &format_args!("{}/{}", self.cap - self.pos, self.buffer.len()), - ) + .field("buffer", &format_args!("{}", self.buffer.len())) .finish() } } @@ -249,7 +226,7 @@ impl AsyncSeek for BufReader { ) -> Poll> { let result: u64; if let SeekFrom::Current(n) = pos { - let remainder = (self.cap - self.pos) as i64; + let remainder = self.as_mut().project().buffer.len() as i64; // it should be safe to assume that remainder fits within an i64 as the alternative // means we managed to allocate 8 exbibytes and that's absurd. // But it's not out of the realm of possibility for some weird underlying reader to @@ -268,7 +245,7 @@ impl AsyncSeek for BufReader { .project() .inner .poll_seek(cx, SeekFrom::Current(-remainder)))?; - self.as_mut().discard_buffer(); + self.as_mut().project().buffer.clear(); result = ready!(self .as_mut() .project() @@ -279,7 +256,7 @@ impl AsyncSeek for BufReader { // Seeking with Start/End doesn't care about our buffer length. result = ready!(self.as_mut().project().inner.poll_seek(cx, pos))?; } - self.discard_buffer(); + self.as_mut().project().buffer.clear(); Poll::Ready(Ok(result)) } } diff --git a/src/lib.rs b/src/lib.rs index 6fdca82..2d38a50 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,16 +72,16 @@ //! Ok(()) //! } //! ``` -use nom::{Err, Offset, Parser}; +use futures_io::AsyncBufRead; +use nom::{Err, Needed, Offset, Parser}; use std::io::{self, BufRead, Read}; #[cfg(feature = "async")] use async_trait::async_trait; #[cfg(feature = "async")] -use futures::{ - io::{AsyncBufReadExt, BufReader}, - AsyncRead, -}; +use futures_io::AsyncRead; +#[cfg(feature = "async")] +use futures_util::io::{AsyncBufReadExt, BufReader}; #[cfg(feature = "async")] pub mod async_bufreader; @@ -235,7 +235,7 @@ impl AsyncParse for BufRead #[cfg(feature = "async")] #[async_trait] -impl AsyncParse +impl AsyncParse for async_bufreader::BufReader { async fn parse(&mut self, mut p: P) -> Result> @@ -243,29 +243,22 @@ impl AsyncParse for<'a> P: Parser<&'a [u8], O, E> + Send + 'async_trait, { loop { - let opt = - //match p(input.buffer()) { - match p.parse(self.buffer()) { - Err(Err::Error(e)) => return Err(Error::Error(e)), - Err(Err::Failure(e)) => return Err(Error::Failure(e)), - Err(Err::Incomplete(_)) => { - None - }, - Ok((i, o)) => { - let offset = self.buffer().offset(i); - Some((offset, o)) - }, - }; - - match opt { - Some((sz, o)) => { - self.consume_unpin(sz); - return Ok(o); + let buffer = self.buffer(); + match p.parse(buffer) { + Err(Err::Error(e)) => return Err(Error::Error(e)), + Err(Err::Failure(e)) => return Err(Error::Failure(e)), + Err(Err::Incomplete(Needed::Unknown)) => { + self.my_fill_buf(None).await?; } - None => { - self.fill_buf().await?; + Err(Err::Incomplete(Needed::Size(x))) => { + self.my_fill_buf(Some(x)).await?; } - } + Ok((i, o)) => { + let offset = buffer.offset(i); + self.consume(offset); + return Ok(o); + } + }; } } }