-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming encoder #178
base: main
Are you sure you want to change the base?
Streaming encoder #178
Conversation
@Nemo157 I cleaned it up a bit, to make it testable with other algorithms than just gzip. Should I really add implementations for other targets than tokio? Considering that support for tokio 0.2 and 0.3 is removed in #152, the Another approach I'd like investigate is having the streaming encoder expose a handler of some kind (channel, whatever) that could be used to trigger a flush externally. The user would then decide when to flush, possibly based on timeouts, or other conditions (like the deferred responses in the graphql router). That would remove the link to the runtime, and let higher level libraries decide on the flushing strategy |
with 87bf8f1 and cc86d79 there's an implementation driven by a channel. I think that's the cleaner approach here, since it makes no assumption on the platform, and that channel can be carried in different places depending on the use case (ex: in tower-http's |
I removed the timeout based solutin, this should be enough to review now @Nemo157 |
The FlushableEncoder now accepts a Stream, which will be more flexible than forcing a channel
@Nemo157 could you activate the CI on this PR? |
Hmm, reminds me I need to find out why github has changed the default to not running CI, afaik it's supposed to be secure against malicious PRs 🤔 I'll hopefully have some time to review this week. |
@Nemo157 friendly ping! 😄 Is there any way I can make this easier to review for you? |
@Nemo157 looks like the CI is passing. Do you think anything is missing from this PR? |
@Nemo157 FYI I'll vendor part of the compression code in the router, so I can fix the bug on my end, so no pressure for you on this PR, review it when it's convenient :) |
We replace tower-http's `CompressionLayer` with a custom stream transformation. This is necessary because tower-http uses async-compression, which buffers data until the end of the stream to then write it, ensuring a better compression. This is incompatible with the multipart protocol for `@defer`, which requires chunks to be sent as soon as possible. So we need to compress them independently. This extracts parts of the codec module of async-compression, which so far is not public, and makes a streaming wrapper above it that flushes the compressed data on every response in the stream. This is expected to be temporary, as we have in flight PRs for async-compression: - Nullus157/async-compression#155 - Nullus157/async-compression#178 With Nullus157/async-compression#150 we might be able to at least remove the vendored code
@NobodyXu since this is additive, I propose we review this after the 0.4.0 release ? |
Agree, I can't see how this could result in a breaking change. |
Plus it's not ready yet, so IMO we should go ahead and release 0.4.0 |
Fix #154
this is a follow up on the proposal for a
poll_flush
implementation. It is implemented ontokio::bufread::Encoder
and in the macros. We then provide aStreamingEncoder
that wraps aAsyncRead
andAsyncFlush
implementation to flush it if it produced nothing after a configurable time.The test shows it appears to work, but I'd like to test it more using the proptest integration.
On the API side, I think it will look better as a
.flush_timeout()
method?I looked into flushing once a configurable amount of data has been passed, but that requires hooking the data producer to count the bytes passing through, pass it to the
Encoder
, then have theStreamingEncoder
monitor that count. That's a bit involved, maybe that could be a separate implementation.