Skip to content

Commit

Permalink
Reduce memory allocated in DataChannel.readLoop
Browse files Browse the repository at this point in the history
See pion#1516

This patch preserves the semantics of the OnMessage handler and is
more safe but less efficient than the patch first described in pion#1516.

$ git checkout origin/master datachannel.go && \
  go test -bench=. -run=XXX -benchmem -count=10 > original.txt
$ git checkout datachannel.go && git apply pool.patch && \
  go test -bench=. -run=XXX -benchmem -count=10 > option1.txt

$ benchstat original.txt option1.txt
name                 old time/op    new time/op    delta
DSend2-8     20.3µs ±51%     3.7µs ± 6%   -81.74%  (p=0.000 n=10+10)
DSend4-8     23.5µs ±34%     3.6µs ± 8%   -84.80%  (p=0.000 n=10+8)
DSend8-8     18.9µs ±35%     5.8µs ±68%   -69.45%  (p=0.000 n=9+10)
DSend16-8    16.8µs ±30%    10.0µs ±24%   -40.77%  (p=0.000 n=10+10)
DSend32-8    710ms ±100%       0ms ±81%  -100.00%  (p=0.035 n=10+9)

name                 old alloc/op   new alloc/op   delta
DSend2-8     15.3kB ±89%     1.4kB ± 0%   -90.59%  (p=0.000 n=9+10)
DSend4-8     41.7kB ±63%     1.4kB ± 1%   -96.58%  (p=0.000 n=10+10)
DSend8-8     45.0kB ±33%     1.4kB ± 2%   -96.83%  (p=0.000 n=9+10)
DSend16-8    34.0kB ±69%     1.4kB ± 1%   -95.77%  (p=0.000 n=10+10)
DSend32-8   37.4MB ±388%     0.0MB ± 4%  -100.00%  (p=0.000 n=10+7)

name                 old allocs/op  new allocs/op  delta
DSend2-8       15.8 ±46%      38.6 ± 2%  +144.30%  (p=0.000 n=10+10)
DSend4-8       27.1 ±48%      38.0 ± 0%   +40.22%  (p=0.000 n=10+9)
DSend8-8       29.3 ±16%      38.0 ± 0%   +29.55%  (p=0.000 n=9+8)
DSend16-8      23.6 ±41%      37.0 ± 0%   +56.78%  (p=0.000 n=10+9)
DSend32-8    19.3k ±100%      0.0k ± 0%      ~     (p=0.178 n=10+7)
  • Loading branch information
bshimc committed Nov 13, 2020
1 parent ae5c004 commit 56d9428
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 3 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
run: |
go test \
-coverprofile=cover.out -covermode=atomic \
-bench=. \
-tags quic \
-v -race ${TEST_PACKAGES}
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ Check out the **[contributing wiki](https://github.com/pion/webrtc/wiki/Contribu
* [Assad Obaid](https://github.com/assadobaid)
* [Jamie Good](https://github.com/jamiegood) - *Bug fix in jsfiddle example*
* [Artur Shellunts](https://github.com/ashellunts)
* [Bo Shi](https://github.com/bshimc)

### License
MIT License - see [LICENSE](LICENSE) for full text
17 changes: 15 additions & 2 deletions datachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,18 @@ func (d *DataChannel) onError(err error) {
}
}

// See https://github.com/pion/webrtc/issues/1516
// nolint:gochecknoglobals
var rlBufPool = sync.Pool{New: func() interface{} {
return make([]byte, dataChannelBufferSize)
}}

func (d *DataChannel) readLoop() {
for {
buffer := make([]byte, dataChannelBufferSize)
buffer := rlBufPool.Get().([]byte)
n, isString, err := d.dataChannel.ReadDataChannel(buffer)
if err != nil {
rlBufPool.Put(buffer)
d.setReadyState(DataChannelStateClosed)
if err != io.EOF {
d.onError(err)
Expand All @@ -324,7 +331,13 @@ func (d *DataChannel) readLoop() {
return
}

d.onMessage(DataChannelMessage{Data: buffer[:n], IsString: isString})
m := DataChannelMessage{Data: make([]byte, n), IsString: isString}
copy(m.Data, buffer[:n])

// NB: Why was DataChannelMessage not passed as a pointer value? The
// pragma for Put() is a false positive on the part of the CI linter.
d.onMessage(m) // nolint:staticcheck
rlBufPool.Put(buffer) // nolint:staticcheck
}
}

Expand Down
51 changes: 50 additions & 1 deletion datachannel_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package webrtc

import (
"fmt"
"io"
"sync"
"testing"
Expand All @@ -15,7 +16,7 @@ import (
// bindings this is a requirement).
const expectedLabel = "data"

func closePairNow(t *testing.T, pc1, pc2 io.Closer) {
func closePairNow(t testing.TB, pc1, pc2 io.Closer) {
var fail bool
if err := pc1.Close(); err != nil {
t.Errorf("Failed to close PeerConnection: %v", err)
Expand Down Expand Up @@ -63,6 +64,54 @@ func closeReliabilityParamTest(t *testing.T, pc1, pc2 *PeerConnection, done chan
closePair(t, pc1, pc2, done)
}

func BenchmarkDataChannelSend2(b *testing.B) { benchmarkDataChannelSend(b, 2) }
func BenchmarkDataChannelSend4(b *testing.B) { benchmarkDataChannelSend(b, 4) }
func BenchmarkDataChannelSend8(b *testing.B) { benchmarkDataChannelSend(b, 8) }
func BenchmarkDataChannelSend16(b *testing.B) { benchmarkDataChannelSend(b, 16) }
func BenchmarkDataChannelSend32(b *testing.B) { benchmarkDataChannelSend(b, 32) }

// See https://github.com/pion/webrtc/issues/1516
func benchmarkDataChannelSend(b *testing.B, numChannels int) {
offerPC, answerPC, err := newPair()
if err != nil {
b.Fatalf("Failed to create a PC pair for testing")
}

open := make(map[string]chan bool)
answerPC.OnDataChannel(func(d *DataChannel) {
if _, ok := open[d.Label()]; !ok {
// Ignore anything unknown channel label.
return
}
d.OnOpen(func() { open[d.Label()] <- true })
})

var wg sync.WaitGroup
for i := 0; i < numChannels; i++ {
label := fmt.Sprintf("dc-%d", i)
open[label] = make(chan bool)
wg.Add(1)
go func() {
dc, err := offerPC.CreateDataChannel(label, nil)
assert.NoError(b, err)

dc.OnOpen(func() {
<-open[label]
for n := 0; n < b.N/numChannels; n++ {
if err := dc.SendText("Ping"); err != nil {
b.Fatalf("Unexpected error sending data (label=%q): %v", label, err)
}
}
wg.Done()
})
}()
}

assert.NoError(b, signalPair(offerPC, answerPC))
wg.Wait()
closePairNow(b, offerPC, answerPC)
}

func TestDataChannel_Open(t *testing.T) {
t.Run("handler should be called once", func(t *testing.T) {
report := test.CheckRoutines(t)
Expand Down

0 comments on commit 56d9428

Please sign in to comment.