From 96e078049160ad8d17c5ea5347c89e560201ffb4 Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 21 Mar 2024 09:08:58 +0530 Subject: [PATCH] webrtc: set sctp receive buffer size to 100kB (#2745) --- p2p/test/transport/rcmgr_test.go | 4 ++++ p2p/transport/webrtc/listener.go | 4 ++++ p2p/transport/webrtc/stream.go | 21 ++++++++------------- p2p/transport/webrtc/stream_write.go | 4 ++-- p2p/transport/webrtc/transport.go | 6 ++++++ 5 files changed, 24 insertions(+), 15 deletions(-) diff --git a/p2p/test/transport/rcmgr_test.go b/p2p/test/transport/rcmgr_test.go index 20f34de799..9a58a344f0 100644 --- a/p2p/test/transport/rcmgr_test.go +++ b/p2p/test/transport/rcmgr_test.go @@ -86,6 +86,10 @@ func TestResourceManagerIsUsed(t *testing.T) { } return nil }) + if tc.Name == "WebRTC" { + // webrtc receive buffer is a fix sized buffer allocated up front + connScope.EXPECT().ReserveMemory(gomock.Any(), gomock.Any()) + } connScope.EXPECT().Done().MinTimes(1) var allStreamsDone sync.WaitGroup diff --git a/p2p/transport/webrtc/listener.go b/p2p/transport/webrtc/listener.go index cbf9f8fb8b..1834fc812b 100644 --- a/p2p/transport/webrtc/listener.go +++ b/p2p/transport/webrtc/listener.go @@ -209,6 +209,10 @@ func (l *listener) setupConnection( // in a release. settingEngine.SetReceiveMTU(udpmux.ReceiveBufSize) settingEngine.DetachDataChannels() + settingEngine.SetSCTPMaxReceiveBufferSize(sctpReceiveBufferSize) + if err := scope.ReserveMemory(sctpReceiveBufferSize, network.ReservationPriorityMedium); err != nil { + return nil, err + } w, err = newWebRTCConnection(settingEngine, l.config) if err != nil { diff --git a/p2p/transport/webrtc/stream.go b/p2p/transport/webrtc/stream.go index 135940e4ce..733c2347c6 100644 --- a/p2p/transport/webrtc/stream.go +++ b/p2p/transport/webrtc/stream.go @@ -17,24 +17,19 @@ import ( const ( // maxMessageSize is the maximum message size of the Protobuf message we send / receive. maxMessageSize = 16384 - // Pion SCTP association has an internal receive buffer of 1MB (roughly, 1MB per connection). - // We can change this value in the SettingEngine before creating the peerconnection. - // https://github.com/pion/webrtc/blob/v3.1.49/sctptransport.go#L341 - maxBufferedAmount = 2 * maxMessageSize + // maxSendBuffer is the maximum data we enqueue on the underlying data channel for writes. + // The underlying SCTP layer has an unbounded buffer for writes. We limit the amount enqueued + // per stream is limited to avoid a single stream monopolizing the entire connection. + maxSendBuffer = 2 * maxMessageSize + // sendBufferLowThreshold is the threshold below which we write more data on the underlying + // data channel. We want a notification as soon as we can write 1 full sized message. + sendBufferLowThreshold = maxSendBuffer - maxMessageSize // maxTotalControlMessagesSize is the maximum total size of all control messages we will // write on this stream. // 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be // exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection // send queue. maxTotalControlMessagesSize = 50 - // bufferedAmountLowThreshold and maxBufferedAmount are bound - // to a stream but congestion control is done on the whole - // SCTP association. This means that a single stream can monopolize - // the complete congestion control window (cwnd) if it does not - // read stream data and it's remote continues to send. We can - // add messages to the send buffer once there is space for 1 full - // sized message. - bufferedAmountLowThreshold = maxBufferedAmount / 2 // Proto overhead assumption is 5 bytes protoOverhead = 5 @@ -120,7 +115,7 @@ func newStream( } // released when the controlMessageReader goroutine exits s.controlMessageReaderDone.Add(1) - s.dataChannel.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold) + s.dataChannel.SetBufferedAmountLowThreshold(sendBufferLowThreshold) s.dataChannel.OnBufferedAmountLow(func() { s.notifyWriteStateChanged() diff --git a/p2p/transport/webrtc/stream_write.go b/p2p/transport/webrtc/stream_write.go index 82d4ac287d..0510c96c4e 100644 --- a/p2p/transport/webrtc/stream_write.go +++ b/p2p/transport/webrtc/stream_write.go @@ -112,9 +112,9 @@ func (s *stream) SetWriteDeadline(t time.Time) error { func (s *stream) availableSendSpace() int { buffered := int(s.dataChannel.BufferedAmount()) - availableSpace := maxBufferedAmount - buffered + availableSpace := maxSendBuffer - buffered if availableSpace+maxTotalControlMessagesSize < 0 { // this should never happen, but better check - log.Errorw("data channel buffered more data than the maximum amount", "max", maxBufferedAmount, "buffered", buffered) + log.Errorw("data channel buffered more data than the maximum amount", "max", maxSendBuffer, "buffered", buffered) } return availableSpace } diff --git a/p2p/transport/webrtc/transport.go b/p2p/transport/webrtc/transport.go index 20bfdf98eb..2309ac4d97 100644 --- a/p2p/transport/webrtc/transport.go +++ b/p2p/transport/webrtc/transport.go @@ -79,6 +79,8 @@ const ( DefaultDisconnectedTimeout = 20 * time.Second DefaultFailedTimeout = 30 * time.Second DefaultKeepaliveTimeout = 15 * time.Second + + sctpReceiveBufferSize = 100_000 ) type WebRTCTransport struct { @@ -314,6 +316,10 @@ func (t *WebRTCTransport) dial(ctx context.Context, scope network.ConnManagement // If you run pion on a system with only the loopback interface UP, // it will not connect to anything. settingEngine.SetIncludeLoopbackCandidate(true) + settingEngine.SetSCTPMaxReceiveBufferSize(sctpReceiveBufferSize) + if err := scope.ReserveMemory(sctpReceiveBufferSize, network.ReservationPriorityMedium); err != nil { + return nil, err + } w, err = newWebRTCConnection(settingEngine, t.webrtcConfig) if err != nil {