Skip to content

Commit

Permalink
Don't use JitterBuffer in SampleBuilder
Browse files Browse the repository at this point in the history
The performance of the SampleBuilder is significantly worse when using
the SampleBuilder. It would be good to evaluate improving the
performance of the JitterBuffer. However for the time being we are just
going to revert.

Resolve #2778
  • Loading branch information
Sean-Der committed Oct 11, 2024
1 parent dc1f8ff commit bb41f23
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 50 deletions.
63 changes: 20 additions & 43 deletions pkg/media/samplebuilder/samplebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math"
"time"

"github.com/pion/interceptor/pkg/jitterbuffer"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4/pkg/media"
)
Expand All @@ -17,7 +16,7 @@ import (
type SampleBuilder struct {
maxLate uint16 // how many packets to wait until we get a valid Sample
maxLateTimestamp uint32 // max timestamp between old and new timestamps before dropping packets
buffer *jitterbuffer.JitterBuffer
buffer [math.MaxUint16 + 1]*rtp.Packet
preparedSamples [math.MaxUint16 + 1]*media.Sample

// Interface that allows us to take RTP packets to samples
Expand Down Expand Up @@ -61,7 +60,7 @@ type SampleBuilder struct {
// The depacketizer extracts media samples from RTP packets.
// Several depacketizers are available in package github.com/pion/rtp/codecs.
func New(maxLate uint16, depacketizer rtp.Depacketizer, sampleRate uint32, opts ...Option) *SampleBuilder {
s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate, buffer: jitterbuffer.New(jitterbuffer.WithMinimumPacketCount(1))}
s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate}
for _, o := range opts {
o(s)
}
Expand All @@ -77,7 +76,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
var foundTail *rtp.Packet

for i := location.head; i != location.tail; i++ {
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
if packet := s.buffer[i]; packet != nil {
foundHead = packet
break
}
Expand All @@ -88,7 +87,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
}

for i := location.tail - 1; i != location.head; i-- {
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
if packet := s.buffer[i]; packet != nil {
foundTail = packet
break
}
Expand All @@ -106,16 +105,16 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta
if location.empty() {
return 0, false
}
packet, err := s.buffer.PeekAtSequence(location.head)
if packet == nil || err != nil {
packet := s.buffer[location.head]
if packet == nil {
return 0, false
}
return packet.Timestamp, true
}

func (s *SampleBuilder) releasePacket(i uint16) {
var p *rtp.Packet
p, _ = s.buffer.PopAtSequence(i)
p, s.buffer[i] = s.buffer[i], nil
if p != nil && s.packetReleaseHandler != nil {
s.packetReleaseHandler(p)
}
Expand Down Expand Up @@ -179,7 +178,7 @@ func (s *SampleBuilder) purgeBuffers(flush bool) {
// Push does not copy the input. If you wish to reuse
// this memory make sure to copy before calling Push
func (s *SampleBuilder) Push(p *rtp.Packet) {
s.buffer.Push(p)
s.buffer[p.SequenceNumber] = p

switch s.filled.compare(p.SequenceNumber) {
case slCompareVoid:
Expand Down Expand Up @@ -221,19 +220,14 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {

var consume sampleSequenceLocation

for i := s.active.head; s.active.compare(i) != slCompareAfter; i++ {
pkt, err := s.buffer.PeekAtSequence(i)
if pkt == nil || err != nil {
break
}

if s.depacketizer.IsPartitionTail(pkt.Marker, pkt.Payload) {
for i := s.active.head; s.buffer[i] != nil && s.active.compare(i) != slCompareAfter; i++ {
if s.depacketizer.IsPartitionTail(s.buffer[i].Marker, s.buffer[i].Payload) {
consume.head = s.active.head
consume.tail = i + 1
break
}
headTimestamp, hasData := s.fetchTimestamp(s.active)
if hasData && pkt.Timestamp != headTimestamp {
if hasData && s.buffer[i].Timestamp != headTimestamp {
consume.head = s.active.head
consume.tail = i
break
Expand All @@ -243,8 +237,8 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
if consume.empty() {
return nil
}
pkt, _ := s.buffer.PeekAtSequence(consume.tail)
if !purgingBuffers && pkt == nil {

if !purgingBuffers && s.buffer[consume.tail] == nil {
// wait for the next packet after this set of packets to arrive
// to ensure at least one post sample timestamp is known
// (unless we have to release right now)
Expand All @@ -256,10 +250,8 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {

// scan for any packet after the current and use that time stamp as the diff point
for i := consume.tail; i < s.active.tail; i++ {
pkt, _ = s.buffer.PeekAtSequence(i)

if pkt != nil {
afterTimestamp = pkt.Timestamp
if s.buffer[i] != nil {
afterTimestamp = s.buffer[i].Timestamp
break
}
}
Expand All @@ -269,11 +261,10 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {

// prior to decoding all the packets, check if this packet
// would end being disposed anyway
pkt, err := s.buffer.PeekAtSequence(consume.head)
if err == nil && !s.depacketizer.IsPartitionHead(pkt.Payload) {
if !s.depacketizer.IsPartitionHead(s.buffer[consume.head].Payload) {
isPadding := false
for i := consume.head; i != consume.tail; i++ {
if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == pkt.Timestamp && len(pkt.Payload) == 0 {
if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == s.buffer[i].Timestamp && len(s.buffer[i].Payload) == 0 {
isPadding = true
}
}
Expand All @@ -291,22 +282,16 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
var metadata interface{}
var rtpHeaders []*rtp.Header
for i := consume.head; i != consume.tail; i++ {
pkt, err := s.buffer.PeekAtSequence(i)
if err != nil {
return nil
}
p, err := s.depacketizer.Unmarshal(pkt.Payload)
p, err := s.depacketizer.Unmarshal(s.buffer[i].Payload)
if err != nil {
return nil
}
if i == consume.head && s.packetHeadHandler != nil {
metadata = s.packetHeadHandler(s.depacketizer)
}
if s.returnRTPHeaders {
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
h := pkt.Header.Clone()
rtpHeaders = append(rtpHeaders, &h)
}
h := s.buffer[i].Header.Clone()
rtpHeaders = append(rtpHeaders, &h)
}

data = append(data, p...)
Expand Down Expand Up @@ -404,11 +389,3 @@ func WithRTPHeaders(enable bool) Option {
o.returnRTPHeaders = enable
}
}

// WithJitterBufferMinimumLength sets the minimum number of packets which must first
// be received before starting any playback
func WithJitterBufferMinimumLength(length uint16) Option {
return func(o *SampleBuilder) {
o.buffer = jitterbuffer.New(jitterbuffer.WithMinimumPacketCount(length))
}
}
10 changes: 3 additions & 7 deletions pkg/media/samplebuilder/samplebuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,18 +396,14 @@ func TestSampleBuilderCleanReference(t *testing.T) {
s.Push(pkt5)

for i := 0; i < 3; i++ {
pkt, err := s.buffer.PeekAtSequence(uint16((i + int(seqStart)) % 0x10000))

if pkt != nil || err == nil {
if s.buffer[(i+int(seqStart))%0x10000] != nil {
t.Errorf("Old packet (%d) is not unreferenced (maxLate: 10, pushed: 12)", i)
}
}
pkt, err := s.buffer.PeekAtSequence(uint16((14 + int(seqStart)) % 0x10000))
if pkt != pkt4 || err != nil {
if s.buffer[(14+int(seqStart))%0x10000] != pkt4 {
t.Error("New packet must be referenced after jump")
}
pkt, err = s.buffer.PeekAtSequence(uint16((12 + int(seqStart)) % 0x10000))
if pkt != pkt5 || err != nil {
if s.buffer[(12+int(seqStart))%0x10000] != pkt5 {
t.Error("New packet must be referenced after jump")
}
})
Expand Down

0 comments on commit bb41f23

Please sign in to comment.