Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.5 release prevents going back from Bytes to BytesMut #350

Open
mqudsi opened this issue Dec 21, 2019 · 6 comments
Open

0.5 release prevents going back from Bytes to BytesMut #350

mqudsi opened this issue Dec 21, 2019 · 6 comments

Comments

@mqudsi
Copy link

mqudsi commented Dec 21, 2019

Is the (new) inability to roundtrip between Bytes and BytesMut intentional? The inability to do so somewhat diminishes the usefulness of the reference counting feature this crate offers, if it means that freeze() renders a chunk of memory permanently immutable with no way to reclaim borrowed ranges and start over again without reallocating.

I understand that Bytes is (now) unaware of the existence of BytesMut at this time, but it should still be possible to update the BytesMut api to recover memory whose ref count has reached zero. I was all set to patch BytesMut to add a version of freeze() that looked like this:

pub fn freeze() -> (Bytes, Unfreeze)

where Unfreeze is a struct with a single function

pub fn unfreeze(self) -> BytesMut

with Unfreeze itself containing a weak reference to the same shared memory used by Bytes instances, but then I discovered that Bytes doesn't use Arc but rather a custom implementation that does not support weak references.

(The reason for having BytesMut return an Unfreeze rather than adding, e.g., an unfreeze() /try_mut() method to Bytes is that the latter would require Bytes keeping track of whether it was instantiated from read-only memory or not, which the current vtable mask isn't capable of distinguishing between.)

(My use case involves reading some bytes into a buffer, sending the ref-counted read bytes to another thread for processing, then reclaiming them when an event is set (and the Bytes have been dropped) to reuse the buffer for the next cycle.)

@seanmonstar
Copy link
Member

Are you dropping the BytesMut, and wishing to get one back from the Bytes you now have? Or are you splitting off frozen Bytes while still retaining the BytesMut?

If the latter, you can just try to reserve space on it, and if all frozen Bytes have been dropped, the BytesMut can notice and reuse its memory.

@mqudsi
Copy link
Author

mqudsi commented Dec 21, 2019

Thanks for the quick reply.

Are you dropping the BytesMut, and wishing to get one back from the Bytes you now have? Or are you splitting off frozen Bytes while still retaining the BytesMut?

I can structure it either way, but I was originally thinking the latter so that the original buffer size is a hard guarantee (to avoid all allocations).

If the latter, you can just try to reserve space on it, and if all frozen Bytes have been dropped, the BytesMut can notice and reuse its memory.

With a possible exception of the case where the BytesMut write pointer is pointing to the very beginning of the remainder after a split, this would necessarily imply either a) result in fragmentation of a once-contiguous underlying buffer (as bytes mapped to memory range 0..m now are being reused after range m..n) or else b) it would not be a zero-cost operation if the contiguous range is detected, the disjoint buffers unified, and the previous contents of the BytesMut post-split are then copied/moved to the beginning of the reclaimed range?

Even if fragmentation or lack of contiguity were not an issue, I think there would be merit in offering an explicit API, if only for a developer's peace of mind?

gakonst added a commit to interledger/interledger-rs that referenced this issue Jan 15, 2020
ILP-Packet is built using Bytes 0.4. The Futures 0.3 ecosystem's HTTP crates use Bytes 0.5.
Porting this crate to use Bytes 0.5 is non-trivial due to significant breaking changes in the
Bytes API:

tokio-rs/bytes#350
tokio-rs/bytes#288
gakonst added a commit to interledger/interledger-rs that referenced this issue Jan 23, 2020
ILP-Packet is built using Bytes 0.4. The Futures 0.3 ecosystem's HTTP crates use Bytes 0.5.
Porting this crate to use Bytes 0.5 is non-trivial due to significant breaking changes in the
Bytes API:

tokio-rs/bytes#350
tokio-rs/bytes#288
gakonst added a commit to interledger/interledger-rs that referenced this issue Jan 29, 2020
* feat(packet): implement From<Fulfill/Reject> for bytes05::BytesMut

ILP-Packet is built using Bytes 0.4. The Futures 0.3 ecosystem's HTTP crates use Bytes 0.5.
Porting this crate to use Bytes 0.5 is non-trivial due to significant breaking changes in the
Bytes API:

tokio-rs/bytes#350
tokio-rs/bytes#288

# Interledger Service: Futures 0.3 Transition (#596)

* feat(service): upgrade to futures 0.3 and async/await

* feat(service): Box wrapper methods to avoid exponential type blowup

Relevant rust-lang issue: rust-lang/rust#68508

* docs(service): add explanation on IlpResult

* chore(service): remove unused associated type

# Interledger Router: Futures 0.3 Transition (#595)

* feat(router): upgrade to futures 0.3 and async/await

# Interledger ILDCP: Futures 0.3 Transition (#597)

* feat(client): convert client to async/await

* docs(ildcp): enhance docs

* feat(server): make the service async

* test(server): add tests

# Interledger CCP: Futures 0.3 Transition (#598)

* feat(ccp): convert store traits to async/await

* feat(ccp-server): make the ccp server async

* test(ccp-server): make tests async

* chore(routing-table): limit api visibility of table methods

# Interledger BTP: Futures 0.3 Transition  (#599)

* feat(btp): update traits to be async

* refactor(btp/wrapped-ws): refactor WsWrap to a separate file

Ideally, we would want to get rid of it by doing a `StreamExt::map_ok` and `SinkExt::with` to map both WebSocket return types to the same value. We also use `filter_map` to get rid of any errors from the WebSocket. The WsError type has been removed as a result of that.

* feat(btp/client): port to async/await

* feat(btp/server): move to async/await

* feat(btp/service): move service to async/await

* We refactored the service to be more readable. Basically, we split the websocket in a Sink (write) and a Stream (read). We also create a `tx`/`rx` pair per account. The rx receiver gets attached to the sink, meaning any data sent over by the `tx` sender will get forwarded to the sink, which will forward it to the other end of the websocket. Unfortunately, due to being unable to combine the read and write sockets, we have to spawn them separately. This means that we have to remove the hook which cancels the streams.

# Interledger HTTP: Futures 0.3 Transition  (#600)

* feat(http): Update HttpStore trait to futures 0.3 and deserialize_json method

* feat(http): Update HTTP Errors and client

* feat(http): Update HTTP Server

* docs(http): extend http docs

# Interledger Stream: Futures 0.3 Transition  (#601)

* feat(stream): Update Stream server

* feat(stream): Update Stream client

* docs(stream): extend stream docs

* fix(stream): add extra limits to ensure all the pending request futures are thread safe

# Interledger Settlement: Futures 0.3 Transition  (#602)

* feat(settlement/core): Upgrade types and idempotency

* feat(settlement/core): Upgrade engines API Warp interface

* feat(settlement/core): Upgrade Redis backend implementation

* feat(settlement/api): Upgrade the message service

* feat(settlement/api): Upgrade the settlement client

* feat(settlement/api): Upgrade the Settlement API exposed by the node

* chore(settlement): remove need to pass future wrapped in closure

* docs(settlement): extend settlement docs

# Interledger SPSP: Futures 0.3 Transition (#603)

* feat(spsp): move to futures 0.3 and async/await

* docs(spsp): extend spsp docs

* fix(spsp): tighten trait bounds to account for stream changes

# Interledger Service Util: Futures 0.3 Transition  (#604)

* feat(service-util): update validator service

* feat(service-util): update rate limit service

* feat(service-util): update max packet amount service

* feat(service-util): update expiry shortener service

* feat(service-util): update exchange rate service and providers

* feat(service-util): update echo service

* feat(service-util): update balance service

# Interledger API: Futures 0.3 Transition  (#605)

* feat(api): update trait definitions and dependencies

* feat(api): update http retry client

* test(api): migrate test helpers

* feat(api): update node-settings route

* test(api): update node-settings route tests

* feat(api): update accounts route

* test(api): update accounts route tests

* chore(api): add missing doc

# Interledger Store: Futures 0.3 Transition (#606)

* feat(store): Update redis reconnect

* feat(store): Update base redis struct

* feat(store): Update AccountStore trait

* feat(store): Update StreamNotificationsStore trait

* feat(store): Update BalanceStore trait

* feat(store): Update BtpStore trait

* feat(store): Update HttpStore trait

* feat(store): Update NodeStore trait

* feat(store): Update AddressStore trait

* feat(store): Update RouteManagerStore trait

* feat(store): Update RateLimitStore trait

* feat(store): Update IdempotentStore trait

* feat(store): Update SettlementStore trait

* feat(store): Update LeftoversStore trait

* feat(store): Update update_routes

* test(store): convert all tests to tokio::test with async/await

* feat(store): update secrecy/bytes/zeroize

* docs(store): add more docs

# ILP CLI: Futures 0.3 Transition (#607)

* feat(ilp-cli): update CLI to async/await

# ILP Node: Futures 0.3 Transition (#608) (#609)

* test(ilp-node): migrate tests to futures 0.3

* feat(ilp-node): move metrics related files to feature-gated module

* feat(ilp-node): remove deprecated insert account function

* feat(ilp-node): make the node run on async/await

* ci(ilp-node): disable some advisories and update README

* fix(ilp-node): spawn prometheus filter
@andrewbaxter
Copy link

andrewbaxter commented Jul 16, 2022

If the latter, you can just try to reserve space on it, and if all frozen Bytes have been dropped, the BytesMut can notice and reuse its memory.

I stumbled across this from Tokio Framed, or at least I believe this is the right thread.

AFAICT BytesMut can't be reused because freeze() consumes the BytesMut. I couldn't find another way to get a Bytes from BytesMut.

I'd expect to be able to do something like the following:

let mut my_bytes = BytesMut::with_capacity(999);
write_stuff(&mut my_bytes);
send(my_bytes.to_bytes()).await?;
my_bytes.clear();
write_stuff2(&mut my_bytes);
send(my_bytes.to_bytes()).await?;
...

I could do .split().freeze() but AFAICT there's no way to unsplit() the frozen bytes afterwards.

Edit:
Is it the case that if you drop the frozen split bytes and don't write anything new to the original bytesmut in the interim, the original bytesmut will be able to reuse the memory? Is BytesMut less like a byte array and more like a pool of byte arrays?

Edit edit:
Per #435 it looks like there's legitimately no way to reuse allocated memory. TBH for my use case I'm not sure anything more than a Vec is really required (Arc, etc) so maybe I should look for something other than LengthDelimitedCodec which appears to be what made the decision to use Bytes here.

@Darksonn
Copy link
Contributor

@andrewbaxter If the reference count is equal to one (i.e. all other split Bytes/BytesMut have been dropped) when the BytesMut tries to reallocate, then it will reuse the entire allocation even if you have previously done a split+freeze.

@andrewbaxter
Copy link

Ah, thanks for the quick reply! Okay, so that usage is idiomatic.

I think my confusion arose basically from crate documentation that says This is managed by using a reference count to track when the memory is no longer needed and can be freed. wrt Bytes, but doesn't mention writing or BytesMut and I would expect those to have different constraints. Since Bytes is RO, I thought the reference counting was for zero copy parsing, rather than reusing write buffers.

Reading deeper into the BytesMut documentation I guess when Bytes is obtained from BytesMut both Bytes and BytesMut are views of a single underlying data store, and the reference count in Bytes in this case is actually the reference count on the data store shared by BytesMut rather than a Bytes-specific reference count.

So this would be the ideal usage for write/read/reuse:

let mut my_bytes = BytesMut::with_capacity(999);
write_stuff(&mut my_bytes);
send(my_bytes.split().freeze()).await?;
write_stuff2(&mut my_bytes);
send(my_bytes.split().freeze()).await?;

@Darksonn
Copy link
Contributor

Indeed, a single allocation can contain any number of BytesMut and Bytes pointing into it as long as each BytesMut does not overlap with the range of any other handle.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants