Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

sc_network::NetworkService::write_notifications should have proper back-pressure #5481

Closed
tomaka opened this issue Apr 1, 2020 · 30 comments
Closed

Comments

@tomaka
Copy link
Contributor

tomaka commented Apr 1, 2020

We need an answer to the question: what happens if someone runs the code below?

for _ in 0..4000000000 {
    network_service.write_notification(some_peer, some_proto, b"hello world".to_vec());
}

Right now we would buffer up 256 of these notifications and then discard the 257th and above.
While it could be a sensible option (UDP-style), GrandPa in particular isn't robust to messages being silently dropped.

I can see a couple of solutions:

  • Turn write_notification into an asynchronous function whose future is ready once the remote has accepted our message. This is the way that this problem would normally be solved, but would require changes in GrandPa.

  • write_notification returns an error if the queue is full. Note that this might be difficult to implement because the queue of messages is actually in a background task, but if that's the solution I will accommodate for it.

  • If the buffer is full, we drop all pending messages then report on the API that we have disconnected and reconnected. GrandPa (and others) need to account for random disconnections anyway, and by making a "full buffer" look like a disconnection, we use the same code for both. This might however cause an infinite loop if we immediately send messages as a response to a reconnection.

  • Make GrandPa robust to messages being silently dropped.

  • Just increase the size of the buffer if we notice that the limit gets reached.

cc @rphmeier @andresilva @mxinden

@mxinden
Copy link
Contributor

mxinden commented Apr 1, 2020

  • Turn write_notification into an asynchronous function whose future is ready once the remote has accepted our message. This is the way that this problem would normally be solved, but would require changes in GrandPa.

While intrusive, I am in favor of making this call asynchronous. It would allow for proper backpressure. Whenever the queue is full for a given peer, we need to make sure to reduce the reputation of the peer and eventually disconnect, as otherwise a peer could halt all of Grandpa.

  • write_notification returns an error if the queue is full. Note that this might be difficult to implement because the queue of messages is actually in a background task, but if that's the solution I will accommodate for it.

This sounds like a lot more work than the suggestion above it.

  • If the buffer is full, we drop all pending messages then report on the API that we have disconnected and reconnected. GrandPa (and others) need to account for random disconnections anyway, and by making a "full buffer" look like a disconnection, we use the same code for both. This might however cause an infinite loop if we immediately send messages as a response to a reconnection.

Agree to the complexity this might cause.

  • Make GrandPa robust to messages being silently dropped.

From what I understand this is already partially the case. While probably a good property to have anyways, there is no feedback back to Grandpa, thus it has no clue that messages are being dropped and why messages are being dropped. Idea (1) would give that feedback to Grandpa.

  • Just increase the size of the buffer if we notice that the limit gets reached.

I think this could be an interim solution until (1) is in place.

@gavofyork
Copy link
Member

Isn't it the case that some later messages can supersede former messages? If so, then rather than spitting out these notifications and expecting them all to be delivered, maybe instead have some sort of channel system so that grandpa can express which messages still actually need to be delivered at any given time. So e.g. if you send a notification with a channel 42 and then at some time later, send another with the same channel 42, then networking won't bother trying to send the first.

@rphmeier
Copy link
Contributor

rphmeier commented Apr 1, 2020

Make GrandPa robust to messages being silently dropped.

This just doesn't work - the grandpa paper & the asynchronous network model can provide more information. However, it is more ok to drop messages to a single peer. The issue is when messages are completely dropped and are not delivered at all.
I'm with Max that backpressure is the right solution and we should just increase the buffer size until then.

Isn't it the case that some later messages can supersede former messages?

In some rare cases, but not for the majority of message. This would specifically apply for COMMIT messages which are fine to drop anyway.

@tomaka
Copy link
Contributor Author

tomaka commented Apr 2, 2020

While intrusive, I am in favor of making this call asynchronous.

I'm with Max that backpressure is the right solution

Maybe I'm overestimating the difficulty of the change, but I really don't see this as easy.

You can't just do write_notification(...).await everywhere, otherwise a single non-responsive node would totally freeze GrandPa.

However I also don't personally know enough about GP's code to know what to change to properly handle that.

@rphmeier
Copy link
Contributor

rphmeier commented Apr 2, 2020

What timeout behavior are you thinking for write_notification calls? And if a node times out, does that mean disconnect is imminent?

Grandpa relays messages using substrate-network-gossip and doesn't do write_notification directly. awaiting on single write_notification calls is probably not a good idea as you say.

For network-gossip, the send-message API could join internally all the write-notification calls under a FuturesUnordered, Grandpa doesn't care about messages to individual peers being dropped due to shaky conection, but it does care about a message to all peers being dropped. The behavior I think we want from network-gossip is to make sure that if a message is not successfully sent to a peer, that the peer's known_messages map doesn't contain that message. And then we could look into scheduling retries within network_gossip

The only exception is for things like neighbor packets where we really want them to be delivered to specific peers before we can proceed. How we handle these failures depends on what happens to a peer when they are unresponsive.

@rphmeier
Copy link
Contributor

rphmeier commented Apr 2, 2020

My last comment is mostly about how to handle failures to deliver messages. Since we don't handle this right now, what we mostly care about as an incremental change is failures to buffer messages.

So the initial .await from write_notification should probably just return when the write_notification task has been successfully enqueued (in this case, I guess sucessfully sent via a bounded channel). It should be fine to turn all write_notification into write_notification.await then, as long as the corresponding Receiver is being pulled from regularly.

@tomaka
Copy link
Contributor Author

tomaka commented Apr 6, 2020

Just to have data, here is a graph of the maximum size that the buffer reached in the past 24 hours.
Since its limit is 256, any time we reach the top of the graph we discarded some messages.

(the first peak corresponds to an era change and the second to a session change, but that's irrelevant for this issue)

Screenshot from 2020-04-06 10-07-29

@arkpar
Copy link
Member

arkpar commented Jun 5, 2020

We need an answer to the question: what happens if someone runs the code below?

for _ in 0..4000000000 {
network_service.write_notification(some_peer, some_proto, b"hello world".to_vec());
}

My opinion is that write_notification should result in synchronous call to send on the peer socket. And return an error if the transport can't handle it. Leving the callling code with the option to handle backpressure. There's zero reason for introducing additional async buffer here. OS already has one internally.

@infinity0
Copy link
Contributor

There's zero reason for introducing additional async buffer here. OS already has one internally.

One major reason for introducing an application-side send buffer is that only the application knows how to prioritise buffered messages and drop obsolete messages, as I describe here. Then you'd set the OS-level send buffer to a small value (to hold one single message) with this approach.

Making use of the OS-level TCP send buffer in a significant way, is only really appropriate if the data you're sending has a strict linear ordering, which is the case when you are uploading a single large file, or using SSH, but is not the case for p2p gossip protocols.

@arkpar
Copy link
Member

arkpar commented Jul 6, 2020

@infinity0
This is correct, though I'd still argue that this should be done at the QoS or multiplexing layer, and not at the application level.

Application level should adapt to limited bandwidth by producing less messages in the first place, and not by generating a lot, and then dropping some of them from the queue. E.g. if the peer can't handle GRANPA traffic, start sending very other round. Drop them if there are too many rounds skipped.

@infinity0
Copy link
Contributor

Grandpa, and many other applications, cannot avoid producing messages in the first place - since these are broadcast messages generated independently of the receiving peer, with no knowledge of the specific peers that might be receiving the messages. The cleanest way to implement flow control therefore, is to drop the messages that become obsolete before each peer can process them. There is no tenet of network engineering that says applications "should adapt to limited bandwidth by producing less messages in the first place", I do not know why you think that is somehow better or "should".

@mxinden
Copy link
Contributor

mxinden commented Jul 6, 2020

Following up on a discussion in Riot.

Question: What is the difficulty of introducing backpressure into client/finality-grandpa:

Finality-grandpa depends on the Network trait which in turn depends on the client/network-gossip trait. All methods on these traits e.g. write_notification are synchronous. In order to introduce backpressure these would need to be made asynchronous in order to be able to tell the caller to cooperatively yield for now. Thus wherever finality-grandpa calls any of the methods of the Network trait or of the Validator trait, or on the GossipEngine it would need to be able to yield (i.e. be in an async context and await the returned future). As most of these call-sites don't have access to either a Context or are within an async function, calling asynchronous methods is impossible.

@infinity0
Copy link
Contributor

In order to introduce backpressure these would need to be made asynchronous in order to be able to tell the caller to cooperatively yield for now.

Instead of an asynchronous call, you can perform a synchronous call that writes synchronously into a bounded priority buffer, as I describe here. That also avoids the need to perform refactorings to cope with the added asynchronity.

@mxinden
Copy link
Contributor

mxinden commented Jul 6, 2020

Yes, introducing a bounded priority buffer would be a second option. That way the methods on Network could stay synchronous and thus the needed refactorings would be less intrusive (limited in scope).

There are a couple of problems that come to my mind which is why we have not pursued this approach:

  • Managing this queue is yet another peace of logic to write, maintain and test.

  • While this might fix the immediate network I/O there will be other asynchronous APIs to be called within finality-grandpa. E.g. there is a current ongoing effort to allow various crypto operations to happen in an external process (e.g. an HSM on another node). Those operations would also need to be able to yield (be awaited).

  • The Rust task model does not only allow the callee to push back, but also allow the callee to notify the caller whenever it is ready again via the Waker concept. Using a synchronous priority queue forces the caller to check the status of the priority queue in a loop waiting for a message to be taken off by the network. While this can probably made efficient via a CondVar I doubt it is worth reinventing the wheel.

@infinity0
Copy link
Contributor

Using a synchronous priority queue forces the caller to check the status of the priority queue in a loop waiting for a message to be taken off by the network.

By caller, do you mean the producer (upper layer)? The consumer (network layer, sending side) is also going to be calling the queue to pop items off it.

The producer should not need to be checking the status of the queue in a loop - they simply push into it synchronously, and synchronously perform any producer-specific logic needed to keep the queue within bounds. The consumer (the per-peer sending thread, AIUI) reads synchronously from the queue, blocking until it is non-empty.

The key point is that dropping is unavoidable in our scenario. Any solution that does not involve dropping messages is doomed to fail - it is physically not possible to send X+Y bandwidth when your peer is only receiving Y bandwidth, without dropping things. Making things async won't solve the actual problem described in the OP - at some point you will need dropping logic somewhere. Doing this synchronously with a bounded priority buffer is the simplest, cleanest and well-known solution.

In a more simple scenario such as a simple proxy where A is sending to you and you are forwarding this to B, you may use backpressure from B to in turn apply the same backpressure to A, and then you don't need to drop anything explicitly. But we are dealing with a complex p2p protocol with multiple senders and multiple recipients, and so it is unrealistic to attempt to "pass through" the backpressure from our recipient peers to our sending peers. Dropping is far simpler.

@rphmeier
Copy link
Contributor

rphmeier commented Jul 6, 2020

@mxinden

The network-gossip issue

finality-grandpa is primarily asynchronous code written with futures so I see that as removing the primary barriers to integrating other asynchronous work.

The location where finality-grandpa interacts with the network is the communication module. The main difficulty with making this fully asynchronous is the dependence on client-network-gossip as you mention.

There is an implementation of a network-gossip::Validator here:

impl<Block: BlockT> sc_network_gossip::Validator<Block> for GossipValidator<Block> {

The functions on this trait do not return futures, and furthermore accept an &mut dyn ValidatorContext<Block> which allows for these synchronous functions to trigger underlying network events as seen here:

context.send_message(who, packet_data);

or

https://github.com/paritytech/substrate/blob/master/client/finality-grandpa/src/communication/gossip.rs#L1486

or in other places.

Approaches to the network-gossip issue

The first approach is to modify both the network_gossip::Validator and network_gossip::ValidatorContext to be asynchronous traits.

This may be a more robust approach in the long term. Since the Validator is being triggered in an asynchronous context, it makes sense to give the Validator access to that asynchrony.

Note that within the implementation of Validator for finality-grandpa, that the context member is only used at the top-level of each of those functions, with most of the logic occurring in synchronous subroutines. It would be trivial to alter each call to a method of the ValidatorContext to carry a .await behind it.

However, there is also another approach: alter the practical implementation of ValidatorContext to be synchronous by storing a list of Actions internally to be applied after the scope of the method call on the Validator. This is a pattern in Rust development: separate determining what to do from actually doing it.

communication and the Environment.

The finality-grandpa also has an overarching Communication module that serves the role of bridging the low-level Substrate NetworkService and the actual logic of finality-grandpa, which is implemented in an asynchrony-friendly way.

fn round_communication is meant to produce a sink and a stream of round-localized messages to and from the voter.

fn global_communication is similar, but for messages that are relevant across all rounds of a specific voter-set.

There is, in principle, nothing in this module that could not be made into an async fn.

All start-up logic for the network bridge is within (fn run_grandpa_voter)[https://github.com/paritytech/substrate/blob/master/client/finality-grandpa/src/lib.rs#L686] which could trivially be made async without affecting its primary consumer (the Service) substantially.

fn global_communication is already run within a Future called VoterWork.

The bottleneck is mostly about how some other functions are used. round_communication is used within the Environment implementation. Environment is not an async-trait, but it is run within a Voter which is a Future exposed by an underlying crate which performs consensus logic.

The primary change that would need to be made would be to make some Environment methods asynchronous. This would not have significant fallout in the underlying crate as it is already set up for asynchrony.

Conclusion

In short, the finality-grandpa code is already well set up for asynchrony, and although it has leaned on bad APIs exposed by NetworkService and network-gossip in the past, these issues are all rectifiable, as long as we maintain the existing behavior that, from the higher-level networking perspective, no messages should be dropped and we can just send(x).await everything. However, all of this logic can be abstracted behind refactors within network-gossip

The actual hard problem is altering the behavior of GRANDPA or other networking protocols to handle co-operative yielding in a smarter way. However I view this as orthogonal to introducing backpressure - those are further issues about increasing our tolerance to pressure and the necessity and approach of those will be best determined by observing how the simple backpressure-friendly adjustments perform.

@mxinden
Copy link
Contributor

mxinden commented Jul 6, 2020

Following up on #5481 (comment) and expanding on the question Why it is hard to make the call-sites async or have them have access to a Context:

Let's e.g. look at the finality-grandpa Validator implementation:

/// A validator for GRANDPA gossip messages.
pub(super) struct GossipValidator<Block: BlockT> {
inner: parking_lot::RwLock<Inner<Block>>,
set_state: environment::SharedVoterSetState<Block>,
report_sender: TracingUnboundedSender<PeerReport>,
metrics: Option<Metrics>,
}

Within its impl Validator for GossipValidator implementation it has the validate method:

fn validate(&self, context: &mut dyn ValidatorContext<Block>, who: &PeerId, data: &[u8])

This function gets as a parameter a ValidatorContext which is basically the same as a Network.

When introducing backpressure into Network by making all its methods async we would also have to make all methods on ValidatorContext async. In order for the above referernced validate function to call any of those async methods, it itself would need to be async. Problem is that GossipValidator is using a RWLock to synchronize access to the underlying inner struct. Locks and async functions don't play well. (1) While waiting for a lock the underlying OS thread blocks which might prevent any other Task on the same OS thread from making any progress which in the worst case could turn into a deadlock. (2) The lock guard is not Send (see rust-lang/rust#23465 for details), thereby they can not be used across await points. (3) ...

Thus in order to achieve our goal we would (1) need to get rid of the Mutex and (2) make all the callers of any method on GossipValidator also async.

@infinity0
Copy link
Contributor

infinity0 commented Jul 6, 2020

altering the behavior of GRANDPA or other networking protocols to handle co-operative yielding in a smarter way

This is another benefit of the bounded priority queue approach, whether it is implemented in an async or sync way - you do not need to modify the existing application logic, only add to it, to specify:

  1. the priority of how buffered messages should be popped from the buffer
  2. how to drop messages when the buffer gets too full

In the meantime, Grandpa can still produce messages in the same way as it did before, i.e. in response to messages we receive, without worrying about who might be receiving the messages we send.

As opposed to, folding what is effectively the same logic as above (i.e. 1, 2) into the generation of Grandpa's responses to specific outgoing peers, that would be much more complex, since Grandpa would then have to be aware of which specific peers we are sending to, which it really does not need to be aware of.

@rphmeier
Copy link
Contributor

rphmeier commented Jul 6, 2020

Referencing #5481 (comment)

When introducing backpressure into Network by making all its methods async we would also have to make all methods on ValidatorContext async.

My above comment contends against this; see the second approach I mention.

Thus in order to achieve our goal we would (1) need to get rid of the Mutex and (2) make all the callers of any method on GossipValidator also async.

And this again doesn't seem too bad as finality-grandpa is largely futures code - the point I make in the latter half of my comment

@rphmeier
Copy link
Contributor

rphmeier commented Jul 6, 2020

The last open question raised by my comment is "what should the semantic of write_notification_async be?".

I argue that we should emulate whatever the network code currently does if the unbounded send of write_notification encounters an overfull buffer on the UnboundedReceiver.

@mxinden
Copy link
Contributor

mxinden commented Jul 6, 2020

However, there is also another approach: alter the practical implementation of ValidatorContext to be synchronous by storing a list of Actions internally to be applied after the scope of the method call on the Validator. This is a pattern in Rust development: separate determining what to do from actually doing it.

Can you expand on how that would work @rphmeier? How would that list be bounded? Who is making sure all items from the list are send down to the asynchronous network?

@arkpar
Copy link
Member

arkpar commented Jul 6, 2020

Grandpa, and many other applications, cannot avoid producing messages in the first place - since these are broadcast messages generated independently of the receiving peer, with no knowledge of the specific peers that might be receiving the messages. The cleanest way to implement flow control therefore, is to drop the messages that become obsolete before each peer can process them.

I believe GRANDPA gossip validator already maintains some network-related context for each peer.
The gossip protocol is still point to point. Some of the messages are indeed broadcasted to all peers and some are not. You might call them "broadcast messages" but unlike the broadcast protocols, such as UDP they are still sent out individually. And each peer queue would hold a copy of each such message.

I do not know why you think that is somehow better or "should".

It is a simple efficiency principle. Don't waste CPU and memory. Adding something to a queue only to be dropped later sounds like a waste if it can be avoided.

@infinity0
Copy link
Contributor

infinity0 commented Jul 6, 2020

I believe GRANDPA gossip validator already maintains some network-related context for each peer. The gossip protocol is still point to point. [..]

The logic for reifying the abstract broadcast protocol as a point-to-point protocol, can be completely generic of what Grandpa does as a consensus protocol. It can, and should be written in a way that works for any possible application protocol, and the interface I outlined involving the bounded priority buffer achieves this separation.

It is a simple efficiency principle. Don't waste CPU and memory. Adding something to a queue only to be dropped later sounds like a waste if it can be avoided.

In general, you should prefer simplicity over efficiency, especially if the efficiency is theoretical - other overheads involved in the complexity of doing what you're suggesting can totally wipe out any potential efficiency.

In this case, the messages must be generated at the earliest possible opportunity anyway, and must reside in memory until all peers have consumed them - so adding references to the message to the queue of each peer, then dropping them later, is not a heavy cost. Simply put, the efficiency gain in the alternative approach, is not worth it.

@rphmeier
Copy link
Contributor

rphmeier commented Jul 6, 2020

Can you expand on how that would work @rphmeier? How would that list be bounded? Who is making sure all items from the list are send down to the asynchronous network?

It is pretty simple. Imagine that the ValidatorContext is just an &mut Vec<Action>:

async fn do_work_with_gossip_validator(val: &mut impl Validator, net: &mut NetworkHandle) {
    loop {
        let mut actions = Vec::new();
        val.validate(net.recv().await, &mut actions); // synchronous.
        for Action::SendMessage(to, msg) in actions {
            // assume only one action type, in practice do a `match`.
            net.send(to, msg).await;
        }
    }
}

So it's basically this but the ValidatorContext is a thin wrapper around the &mut Vec<Action> to hide some of the details.

The Vec<Action> is not bounded except by the fact that we aren't writing gossip-validators that don't request an unbounded amount of actions and only request actions that they expect to occur. The network-gossip logic is responsible for ensuring that actions get sent down to the underlying network, and should do whatever the network code currently does when encountering an overfull peer, although this does depend on how write_notification_async works. This is a first step moving this question one layer higher, that's all.

I do want to say that this is only one approach. And I also went into why your suggested approach (which I tend to agree with) of making all ValidatorContext methods async is not likely to be a big problem.

@rphmeier
Copy link
Contributor

rphmeier commented Jul 6, 2020

@infinity0

Instead of an asynchronous call, you can perform a synchronous call that writes synchronously into a bounded priority buffer, as I describe here. That also avoids the need to perform refactorings to cope with the added asynchronity.

In practice we are going to have a buffer full of messages from different components. Take a look at the "network bridge" architecture of Polkadot's networking code: https://github.com/paritytech/polkadot/blob/ff708f3/roadmap/implementors-guide/src/node/utility/network-bridge.md

I don't see any way where we can expose a synchronous API of a peer's sending buffer in a meaningful way to other subsystems, if there is only one buffer per peer and this buffer needs to be shared across all application logic.

Alternatively, we can allow creating multiple buffers per peer and give one of those to each subsystem. Then each subsystem would be inspecting messages only originating from its corner of the application-specific logic. I'm still not sure exactly what the API would be or if this is user-friendly at all.

Maybe something like this?

struct PeerSendingSet {
    // a fixed-size buffer for each peer.
}

impl PeerSendingSet {
    /// Push a message onto the buffer for a peer, returning the message if the buffer is full.
    /// On an `Err` we'd expect a `peer_buffer_retain` call or `flush_until_space` call.
    fn push_message(&mut self, peer: PeerId, msg: Vec<u8>) -> Result<(), Vec<u8>> {
        ...
    }

    /// Inspect each item from the buffer in order, retaining only those which pass the predicate.
    // Annoyance: this struct stores encoded messages we'd have to decode all the time.
    fn peer_buffer_retain(&mut self, peer: PeerId, pred: impl Fn(&[u8]) -> bool) { ... }

    /// Asynchronously drive network flushes for all peers until 
    //// there is space for at least `n` messages in this specific peer's buffer. 
    /// If the buffer length is less than `n`, only wait until the buffer is empty.
    // Annoyance: frame this in terms of bytes instead?
    async fn flush_until_space(&mut self, peer: PeerId, n: usize) { ... }

    /// Asynchronously works to flush messages. Once a message leaves the buffer and gets to the network, it can
    /// can be assumed to have been successfully sent.
    ///
    /// This future is an infinite loop, so it should be `selected` with some other event producers.
    async fn flush(&mut self) { ... }
}

So each subsystem gets one of these, with the ability to add and remove peer connections. It uses some kind of underlying network API (write_notification_async) where .await means that the message will eventually be flushed to the peer and will not be dropped unless the peer disconnects.

Used like this

loop {
    // For subsystems, we might obtain this from the network bridge.
    let mut peer_channel_set = PeerChannelSet::new(buffer_size_for_this_protocol);
    select! {
        // unless we have something better to do, try to flush messages to the network all the time.
        _ = peer_channel_set.flush() => {},
        x = network_events.next() => match x {
            NetworkEvent::PeerConnected(peer) => { peer_channel_set.add_peer(peer); }
            NetworkEvent::PeerDisconnected(peer) => { peer_channel_set.remove_peer(peer); }
            _ => { } // ignore incoming messages
        }
        x = logic_events.next() => match handle_logic(x) {
            Action::SendMessage(peer, msg) => {
                if let Err(msg) = peer_channel_set.push_message(peer, msg) {
                    // decide whether to `flush_until_space`, `peer_buffer_retain`, or drop
                    // the peer. I'm unsure how to handle this general situation where we
                    // don't want one peer to slow down our handling of all other peer
                    // connections. It seems like choosing peers to disconnect should happen
                    // at a lower level, in which case `flush_until_space` needs to resolve
                    // shortly after the peer is disconnected so we don't deadlock here.
                }  
            }
        }
    }
}

This is set up so that we can ensure that we are still trying to flush messages to other peers even while waiting on a slow/overloaded peer.

The long comment I have about choosing when to move on and say "fuck it, this peer is too slow" is my main open question. I think this occurs in some way whether or not we take this "application does buffering" approach. This will clearly need to vary from protocol to protocol. I think the PeerChannelSet could be implemented using FuturesUnordered or join_all or something like that along with a NetworkService handle. It's also not clear how we choose the buffer size that each specific application gets. Probably the buffers should be denominated in bytes as opposed to number of messages.

So furthermore, on the bandwidth question, @infinity0 your initial proposal was to do bandwidth management based on the type of peer that each peer is. Whereas this buffering approach (which I am aware is my own tangled improvisation on top of misinterpretation and not something that you are proposing) does bandwidth management based on which protocols each peer is part of and how much bandwidth should be expected for each peer in those protocols. This naturally becomes peer-type based bandwidth management when we start to restrict the types of peers that are members of particular protocols.

@infinity0
Copy link
Contributor

infinity0 commented Jul 6, 2020

Alternatively, we can allow creating multiple buffers per peer and give one of those to each subsystem. Then each subsystem would be inspecting messages only originating from its corner of the application-specific logic. I'm still not sure exactly what the API would be or if this is user-friendly at all.

With multiple subsystems, the simplest solution is to just send the messages over distinct streams - QUIC streams, or TCP connections, etc (yamux is faking it and we'd expect worse performance, but the API usage would "look similar" at least to the good versions). Each stream/subsystem would have its own outgoing buffer, managed by the subsystem according to its own logic. This should be the go-to option if the messages between the subsystems do not depend on each other and can be processed in any order by the recipient.

If the messages between the subsystems do depend on each other, and you really need to ensure that this ordering is preserved when sending (as opposed to, having the recipient buffer out-of-order messages and re-create the correct order on their side), then there is no general solution and you'd need to pick a specific solution depending on how the subsystems compose together, and the dropping logic might have to look at all the outgoing buffers. However for a blockchain protocol this might not be so bad in practise. Concrete example:

  • subsystem A has outgoing buffer [ (9, mainA), (9, aux-A1), (9, aux-A2), (9, aux-A3) ]
  • subsystem B has outgoing buffer [ (9, mainB), (9, aux-B1), (9, aux-B2) ]

where (9, XXX) means "some message at round 9".

When subsystem A wants to send (10, mainA) indicating the start of round 10, it can drop all the (9, aux-*) messages from both buffers. On top of this, you'd need to define whether you should send (9, mainA) before or after (9, mainB), since the network sending logic needs to pick one when the receiving peer unblocks their connection.

I'd say that the distinct-streams solution is sufficient for a first implementation and we can see if it actually causes problems before attempting the second more sophisticated solution.

In terms of a rust interface, since every application needs to specify its own drop/priority logic, IMO the very high-level API should just be something like:

trait PeerSendQueueWriter {
    type Msg;
    /// Push a message onto the buffer for a peer, dropping any obsolete messages to maintain the size bound.
    /// If this is not possible (e.g. because all existing messages are critical to the application) then returns an Error.
    /// In this case you should probably log an error, alert the node operator, and perhaps disconnect the peer.
    fn push_message(&mut self, msg: Msg) -> Result<SomeError, ()>;
}

trait PeerSendQueueReader {
    type Msg;
    /// Pop the most urgent message from the buffer for a peer, blocking until one is available.
    fn pop_message(&mut self) -> Msg;
}

You need an associated abstract type Msg not u8 since the drop/priority logic will likely need to examine the message. Variations are of course possible, such as an async/nonblocking version of pop_message.

In order to implement this trait, you might use the struct PeerSendingSet that you sketched in your post within the implementation, passing it different predicates. The network sending code doesn't need to see any of this however, it just needs to call pop_message, probably with a trait bound that constraints Msg to be serialisable.

@infinity0
Copy link
Contributor

Also, a simple naive implementation can simply drop the least-recently-pushed message (or even have the queue be unbounded, and never drop anything), and have the priority ordering be random (or select the least-recently-pushed message again). That can be a test to see that any refactoring was successful, before we think about how Grandpa should actually define the dropping/priorities.

@infinity0
Copy link
Contributor

we are still trying to flush messages to other peers even while waiting on a slow/overloaded peer.

I think flushing is a bit of a red herring in this scenario. Instead of flushing, we can just set our own TCP send-buffers to be relatively small, possibly even 0 if that works. From upper layers' perspective, I don't see an inherent difference between local TCP send buffers, vs all the in-flight network traffic already out there - they are all part of the same overall pipe, flushing is not going to help the overall process be quicker, send() will still block once the pipe is full. Calling flushed data "sent" vs calling unflushed data "not sent" really makes no difference to the application.

@infinity0 your initial proposal was to do bandwidth management based on the type of peer that each peer is. Whereas this buffering approach (which I am aware is my own tangled improvisation on top of misinterpretation and not something that you are proposing) does bandwidth management based on which protocols each peer is part of and how much bandwidth should be expected for each peer in those protocols.

Yes, this all sounds fine, per-stream control implies per-peer control.

@rphmeier
Copy link
Contributor

rphmeier commented Jul 7, 2020

If the messages between the subsystems do depend on each other, and you really need to ensure that this ordering is preserved when sending (as opposed to, having the recipient buffer out-of-order messages and re-create the correct order on their side), then there is no general solution and you'd need to pick a specific solution depending on how the subsystems compose together

For the higher level use-cases that I am thinking of, there is some of this, but only a small amount. In particular, there is one protocol for neighbor view updates that all other network protocols will hinge on to some extent. Still, off the top of my head I don't see any obvious ways that reordering across subsystems could cause issues in the set of subsystems that we are planning to implement.

Having a separate low-level substream per-peer makes sense to me but I still don't understand how it would fit in with bandwidth control. What I've referred to as "flushing" is probably better referred to as "completing" the sends from application-space into libp2p space as opposed to flushing on any kind of socket. Basically just freeing up the source buffer that we use in some subsystem.

I actually think my proposed PeerSendingSet is the logical conclusion of your PeerSendQueueWriter once you account for how we need to manage multiple peers, certain details of how async Rust works, and the fact that libp2p expects Vec<u8>/Bytes messages. (as an aside: it would be cool to have the underlying network worker expect Box<dyn Encode>). For inspection of the buffers, it might be nice to move the encoding further down the call-stack as you say.

However, it might not be that easy to get new substreams for each peer on-demand as that is what's required here. @mxinden and @tomaka could answer that better than me.

@tomaka
Copy link
Contributor Author

tomaka commented Jul 29, 2020

Since #6692, there is now an alternative NetworkService::notification_sender API that permits proper back-pressure.

As such, I'm going to close this issue in favour of paritytech/polkadot-sdk#551

@tomaka tomaka closed this as completed Jul 29, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants