Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncWrite support #15

Open
Alexei-Kornienko opened this issue Jun 1, 2021 · 8 comments
Open

AsyncWrite support #15

Alexei-Kornienko opened this issue Jun 1, 2021 · 8 comments

Comments

@Alexei-Kornienko
Copy link

Would be good to add impl of FrameEncoder/FrameDecoder where W: AsyncWrite (futures::io::AsyncWrite), R: AsyncRead

This would enable user the possibility to use compression in async data streams

@PSeitz
Copy link
Owner

PSeitz commented Jun 6, 2021

Thanks for the issue, yeah I also thought about that. It would be nice to have, but I'm not sure how good it can be managed to have both in terms of code duplication.

@Alexei-Kornienko
Copy link
Author

One solution that I could propose is:

  1. Refactor implementation to always work with AsyncWrite trait
  2. Add a thin Wrapper that would take any type that implement Write and impl AsyncWrite for it without returning Poll::Pending

In result Async version would be used as is.
Sync version could internally use block_on (https://docs.rs/futures/0.3.15/futures/executor/fn.block_on.html) to convert sync call to async call

Pros:

  • only 1 impl of the algorithm without copy/paste
  • for sync version compiler should be able to optimize out all await keywords and make it zero cost (need to benchmark)

Cons:

  • hard dependency on futures crate
  • slightly more complicated code using async/await syntax

@arthurprs
Copy link
Contributor

arthurprs commented Jun 9, 2021

One significant problem right now is that the ecosystem is split regarding AsyncRead/Write. Every runtime has an its own (tokio, ..) set of traits and the futures crates have another one.

@PSeitz
Copy link
Owner

PSeitz commented Jan 24, 2023

A good idea may be to add lz4_flex support to https://github.com/Nemo157/async-compression

@cpick
Copy link

cpick commented Sep 25, 2024

The guts of my ugly workaround on the AsyncRead side is:

const CHUNK_LENGTH: usize =  /* 1 arbitrary * */ 1024 /* KiB */ * 1024 /* MiB */ ;
let compressed_chunk_max_length = lz4_flex::block::get_maximum_output_size(CHUNK_LENGTH);

let mut reader = lz4_flex::frame::FrameDecoder::new(ChunkReader::new());

loop {
    while reader.get_ref().len() < compressed_chunk_max_length {
        let Some(received) = stream.recv_data().await.context("receive data")? else {
            break;
        };
        reader.get_mut().append(received);
    }

    let read_length = {
        // FIXME: ideally wouldn't initialize
        let buffer = if buffer.get_ref().remaining() > CHUNK_LENGTH {
            buffer.get_mut().initialize_unfilled_to(CHUNK_LENGTH)
        } else {
            buffer.get_mut().initialize_unfilled()
        };

        reader.read(buffer).context("read chunk")?
    };
    buffer.get_mut().advance(read_length);

    // break at end of stream
    if read_length == 0 {
        break;
    }
}

Where buffer: &mut buf::Writer<ReadBuf<'_>>, stream is a h3::client::RequestStream, and ChunkReader is a custom type that implements bytes::Buf and is based on an inner VecDeque<buf::Reader<B>>.

This attempts to read chunks off the connection asynchronously and decompress them piecemeal without ever blocking.

Likewise on the AsyncWrite side:

let mut writer = lz4_flex::frame::FrameEncoder::new(vec![].writer());

for chunk in buffer.chunks(
    /* 1 arbitrary * */ 1024 /* KiB */ * 1024, /* MiB */
) {
    writer.write_all(chunk).context("encoder write all")?;
    let buffer = mem::take(writer.get_mut().get_mut());
    stream.send_data(buffer.into()).await.context("send data")?;
}

let buffer = writer.finish().context("encoder finish")?.into_inner();
stream
    .send_data(buffer.into())
    .await
    .context("send finished data")?;

Where buffer: &[u8] and stream is a h3::client::RequestStream.

This attempts to periodically steal the compressed data from the encoder and asynchronously write it to the connection.

I don't really like how these turned out or think they're a clean solution, but wanted to share what has been working for me in case it helps anyone else.

@link2xt
Copy link

link2xt commented Oct 13, 2024

Thanks for the issue, yeah I also thought about that. It would be nice to have, but I'm not sure how good it can be managed to have both in terms of code duplication.

There should be some API that does not depend on std::io::Read, tokio::io::AsyncRead or futures::io::AsyncRead. This is usually referred to as sans I/O API.

Then everyone can implement their own Read, Write, AsyncRead and AsyncWrite traits on top of this as a thin layer, e.g. async-compression can provide async implementations for futures and tokio.

I looked into the code a bit but it seems that a lot of logic is currently inside the lz4_flex::frame::FrameDecoder and lz4_flex::frame::FrameEncoder. This should be factored out before attempting to implement async support, otherwise async-compression crate will end up copying a lot of code from this crate.

Good example is flate2 crate, it has Compress and Decompress structures that get input and output buffers, but don't manage the buffering and I/O themselves.

@brandonros
Copy link

https://docs.rs/flate2/1.0.34/src/flate2/deflate/bufread.rs.html#116-118

flate2 encoders for sure just use std::io::Write and not this cool sans I/O thing you are talking about, right?

@link2xt
Copy link

link2xt commented Nov 1, 2024

flate2 encoders for sure just use std::io::Write and not this cool sans I/O thing you are talking about, right?

async-compression does not use std::io::Write, but the Compress API instead:
https://github.com/Nullus157/async-compression/blob/f0a5c7420de645bbb76898866c11cb5a50ecf89d/src/codec/flate/encoder.rs#L33-L35

Same for the sync compressor you link to, it also uses sans-io Compress API internally.

Once you have something like compress function that operates on memory buffer, you can implement both std::io::Write and AsyncWrite on top of it. But lz4_flex implements compressor directly in std::io::Write implementation, so writing AsyncWrite is not possible without copy-pasting the code from sync implementation or wrapping sync implementation by running it in a separate thread (this is what e.g. tokio::fs module does to wrap std::fs but that's hacky and means underlying file is owned by a thread or you need to add synchronization on top).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants