Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix: Implementing futures_io::AsyncBufRead is not straightforward. #12

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ description = "BufReader adapter for nom parsers"
license = "MIT"
keywords = ["parser", "parser-combinators", "parsing", "streaming", "async"]
categories = ["parsing"]
repository = "https://github.com/rust-bakery/nom-bufreader"
readme = "README.md"
documentation = "https://docs.rs/nom-bufreader"

[dependencies]
nom = "7.0.0"
async-trait = { version = "0.1.51", optional = true }
futures = { version = "0.3.16", optional = true }
futures-io = { version = "0.3.16", default-features = false, optional = true }
futures-util = { version = "0.3.16", default-features = false, features = ["io"], optional = true }
pin-project-lite = { version = "0.2.7", optional = true }

[dev-dependencies]
Expand All @@ -21,4 +25,4 @@ tokio-util = { version = "0.6.7", features = ["compat"] }

[features]
default = ["async"]
async = ["futures", "async-trait", "pin-project-lite"]
async = ["futures-io", "futures-util", "async-trait", "pin-project-lite"]
3 changes: 2 additions & 1 deletion examples/async_std_http.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures_util::io::BufReader as IoBufReader;
use nom::{
branch::alt,
bytes::streaming::{tag, take_until},
Expand Down Expand Up @@ -27,7 +28,7 @@ fn space(i: &[u8]) -> IResult<&[u8], (), ()> {
#[async_std::main]
async fn main() -> Result<(), Error<()>> {
let listener = async_std::net::TcpListener::bind("127.0.0.1:8080").await?;
let mut i = BufReader::new(listener.accept().await?.0);
let mut i = BufReader::new(IoBufReader::new(listener.accept().await?.0));

let m = i.parse(method).await?;
let _ = i.parse(space).await?;
Expand Down
3 changes: 2 additions & 1 deletion examples/tokio_http.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures_util::io::BufReader as IoBufReader;
use nom::{
branch::alt,
bytes::streaming::{tag, take_until},
Expand Down Expand Up @@ -28,7 +29,7 @@ fn space(i: &[u8]) -> IResult<&[u8], (), ()> {
#[tokio::main]
async fn main() -> Result<(), Error<()>> {
let listener = tokio::net::TcpListener::bind("127.0.0.1:8080").await?;
let mut i = BufReader::new(listener.accept().await?.0.compat());
let mut i = BufReader::new(IoBufReader::new(listener.accept().await?.0.compat()));

let m = i.parse(method).await?;
let _ = i.parse(space).await?;
Expand Down
141 changes: 59 additions & 82 deletions src/async_bufreader.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use super::bufreader::DEFAULT_BUF_SIZE;
use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
use futures::ready;
use futures::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
use futures_util::ready;
use futures_util::AsyncReadExt;
use pin_project_lite::pin_project;
use std::io::{self, Read};
use std::collections::VecDeque;
use std::fmt;
use std::io;
use std::pin::Pin;
use std::{cmp, fmt};
use std::task::{Context, Poll};

pin_project! {
/// The `BufReader` struct adds buffering to any reader.
Expand All @@ -24,22 +26,17 @@ pin_project! {
/// discarded. Creating multiple instances of a `BufReader` on the same
/// stream can cause data loss.
///
/// **Note: this is a fork from `std::io::BufReader` that reads more data in
/// `fill_buf` even if there is already some data in the buffer**
///
/// [`AsyncRead`]: futures_io::AsyncRead
///
// TODO: Examples
pub struct BufReader<R> {
#[pin]
inner: R,
buffer: Vec<u8>,
pos: usize,
cap: usize,
buffer: VecDeque<u8>,
}
}

impl<R: AsyncRead> BufReader<R> {
impl<R: AsyncBufRead + std::marker::Unpin> BufReader<R> {
/// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
/// but may change in the future.
pub fn new(inner: R) -> Self {
Expand All @@ -48,13 +45,8 @@ impl<R: AsyncRead> BufReader<R> {

/// Creates a new `BufReader` with the specified buffer capacity.
pub fn with_capacity(capacity: usize, inner: R) -> Self {
let buffer = vec![0; capacity];
Self {
inner,
buffer,
pos: 0,
cap: 0,
}
let buffer = VecDeque::with_capacity(capacity);
Self { inner, buffer }
}

/// Acquires a reference to the underlying sink or stream that this combinator is
Expand Down Expand Up @@ -92,53 +84,74 @@ impl<R: AsyncRead> BufReader<R> {
/// Returns a reference to the internally buffered data.
///
/// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
pub fn buffer(&self) -> &[u8] {
&self.buffer[self.pos..self.cap]
pub fn buffer(&mut self) -> &[u8] {
self.buffer.make_contiguous()
}

/// Invalidates all data in the internal buffer.
#[inline]
fn discard_buffer(self: Pin<&mut Self>) {
let this = self.project();
*this.pos = 0;
*this.cap = 0;
pub fn consume(&mut self, nread: usize) {
for _ in 0..nread {
self.buffer.pop_front();
}
}

pub async fn my_fill_buf(
&mut self,
nread: Option<std::num::NonZeroUsize>,
) -> Result<usize, io::Error> {
let mut buf = match nread {
Some(x) => vec![0; x.get().next_power_of_two()],
None => vec![0; 4096],
};

let i = self.inner.read(&mut buf).await?;
buf.truncate(i);

self.buffer.append(&mut buf.into());

Ok(i)
}
}

impl<R: AsyncRead> AsyncRead for BufReader<R> {
impl<R: AsyncBufRead + std::marker::Unpin> AsyncRead for BufReader<R> {
fn poll_read(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = self.project();
// If we don't have any buffered data and we're doing a massive read
// (larger than our internal buffer), bypass our internal buffer
// entirely.
if self.pos == self.cap && buf.len() >= self.buffer.len() {
let res = ready!(self.as_mut().project().inner.poll_read(cx, buf));
self.discard_buffer();
if buf.len() >= this.buffer.len() {
let res = ready!(this.inner.poll_read(cx, buf));
this.buffer.clear();
return Poll::Ready(res);
}
let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
let nread = rem.read(buf)?;
self.consume(nread);
let mut rem = ready!(this.inner.poll_fill_buf(cx))?;
let nread = std::io::Read::read(&mut rem, buf)?;
for _ in 0..nread {
this.buffer.pop_front();
}
Poll::Ready(Ok(nread))
}

fn poll_read_vectored(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
let this = self.project();
let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
if self.pos == self.cap && total_len >= self.buffer.len() {
let res = ready!(self.as_mut().project().inner.poll_read_vectored(cx, bufs));
self.discard_buffer();
if total_len >= this.buffer.len() {
let res = ready!(this.inner.poll_read_vectored(cx, bufs));
this.buffer.clear();
return Poll::Ready(res);
}
let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
let nread = rem.read_vectored(bufs)?;
self.consume(nread);
let mut rem = ready!(this.inner.poll_fill_buf(cx))?;
let nread = std::io::Read::read_vectored(&mut rem, bufs)?;
for _ in 0..nread {
this.buffer.pop_front();
}
Poll::Ready(Ok(nread))
}

Expand All @@ -149,39 +162,6 @@ impl<R: AsyncRead> AsyncRead for BufReader<R> {
}
}

impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let this = self.project();

if *this.cap == this.buffer.len() {
if *this.pos == 0 {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Interrupted,
"buffer completely filled",
)));
} else {
// reset buffer position
if *this.cap - *this.pos > 0 {
for i in 0..(*this.cap - *this.pos) {
this.buffer[i] = this.buffer[*this.pos + i];
}
}
*this.cap = *this.cap - *this.pos;
*this.pos = 0;
}
}

let read = ready!(this.inner.poll_read(cx, this.buffer))?;
*this.cap += read;

Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap]))
}

fn consume(self: Pin<&mut Self>, amt: usize) {
*self.project().pos = cmp::min(self.pos + amt, self.cap);
}
}

impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
fn poll_write(
self: core::pin::Pin<&mut Self>,
Expand Down Expand Up @@ -215,10 +195,7 @@ impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BufReader")
.field("reader", &self.inner)
.field(
"buffer",
&format_args!("{}/{}", self.cap - self.pos, self.buffer.len()),
)
.field("buffer", &format_args!("{}", self.buffer.len()))
.finish()
}
}
Expand Down Expand Up @@ -249,7 +226,7 @@ impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
) -> Poll<io::Result<u64>> {
let result: u64;
if let SeekFrom::Current(n) = pos {
let remainder = (self.cap - self.pos) as i64;
let remainder = self.as_mut().project().buffer.len() as i64;
// it should be safe to assume that remainder fits within an i64 as the alternative
// means we managed to allocate 8 exbibytes and that's absurd.
// But it's not out of the realm of possibility for some weird underlying reader to
Expand All @@ -268,7 +245,7 @@ impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
.project()
.inner
.poll_seek(cx, SeekFrom::Current(-remainder)))?;
self.as_mut().discard_buffer();
self.as_mut().project().buffer.clear();
result = ready!(self
.as_mut()
.project()
Expand All @@ -279,7 +256,7 @@ impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
// Seeking with Start/End doesn't care about our buffer length.
result = ready!(self.as_mut().project().inner.poll_seek(cx, pos))?;
}
self.discard_buffer();
self.as_mut().project().buffer.clear();
Poll::Ready(Ok(result))
}
}
47 changes: 20 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,16 @@
//! Ok(())
//! }
//! ```
use nom::{Err, Offset, Parser};
use futures_io::AsyncBufRead;
use nom::{Err, Needed, Offset, Parser};
use std::io::{self, BufRead, Read};

#[cfg(feature = "async")]
use async_trait::async_trait;
#[cfg(feature = "async")]
use futures::{
io::{AsyncBufReadExt, BufReader},
AsyncRead,
};
use futures_io::AsyncRead;
#[cfg(feature = "async")]
use futures_util::io::{AsyncBufReadExt, BufReader};

#[cfg(feature = "async")]
pub mod async_bufreader;
Expand Down Expand Up @@ -235,37 +235,30 @@ impl<R: AsyncRead + Unpin + Send, O: Send, E, P> AsyncParse<O, E, P> for BufRead

#[cfg(feature = "async")]
#[async_trait]
impl<R: AsyncRead + Unpin + Send, O: Send, E, P> AsyncParse<O, E, P>
impl<R: AsyncBufRead + Unpin + Send, O: Send, E: Send, P> AsyncParse<O, E, P>
for async_bufreader::BufReader<R>
{
async fn parse(&mut self, mut p: P) -> Result<O, Error<E>>
where
for<'a> P: Parser<&'a [u8], O, E> + Send + 'async_trait,
{
loop {
let opt =
//match p(input.buffer()) {
match p.parse(self.buffer()) {
Err(Err::Error(e)) => return Err(Error::Error(e)),
Err(Err::Failure(e)) => return Err(Error::Failure(e)),
Err(Err::Incomplete(_)) => {
None
},
Ok((i, o)) => {
let offset = self.buffer().offset(i);
Some((offset, o))
},
};

match opt {
Some((sz, o)) => {
self.consume_unpin(sz);
return Ok(o);
let buffer = self.buffer();
match p.parse(buffer) {
Err(Err::Error(e)) => return Err(Error::Error(e)),
Err(Err::Failure(e)) => return Err(Error::Failure(e)),
Err(Err::Incomplete(Needed::Unknown)) => {
self.my_fill_buf(None).await?;
}
None => {
self.fill_buf().await?;
Err(Err::Incomplete(Needed::Size(x))) => {
self.my_fill_buf(Some(x)).await?;
}
}
Ok((i, o)) => {
let offset = buffer.offset(i);
self.consume(offset);
return Ok(o);
}
};
}
}
}