diff --git a/pkg/media/samplebuilder/samplebuilder.go b/pkg/media/samplebuilder/samplebuilder.go index dc1643ac812..170c00f068d 100644 --- a/pkg/media/samplebuilder/samplebuilder.go +++ b/pkg/media/samplebuilder/samplebuilder.go @@ -8,7 +8,6 @@ import ( "math" "time" - "github.com/pion/interceptor/pkg/jitterbuffer" "github.com/pion/rtp" "github.com/pion/webrtc/v4/pkg/media" ) @@ -17,7 +16,7 @@ import ( type SampleBuilder struct { maxLate uint16 // how many packets to wait until we get a valid Sample maxLateTimestamp uint32 // max timestamp between old and new timestamps before dropping packets - buffer *jitterbuffer.JitterBuffer + buffer [math.MaxUint16 + 1]*rtp.Packet preparedSamples [math.MaxUint16 + 1]*media.Sample // Interface that allows us to take RTP packets to samples @@ -61,7 +60,7 @@ type SampleBuilder struct { // The depacketizer extracts media samples from RTP packets. // Several depacketizers are available in package github.com/pion/rtp/codecs. func New(maxLate uint16, depacketizer rtp.Depacketizer, sampleRate uint32, opts ...Option) *SampleBuilder { - s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate, buffer: jitterbuffer.New(jitterbuffer.WithMinimumPacketCount(1))} + s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate} for _, o := range opts { o(s) } @@ -77,7 +76,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool { var foundTail *rtp.Packet for i := location.head; i != location.tail; i++ { - if packet, _ := s.buffer.PeekAtSequence(i); packet != nil { + if packet := s.buffer[i]; packet != nil { foundHead = packet break } @@ -88,7 +87,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool { } for i := location.tail - 1; i != location.head; i-- { - if packet, _ := s.buffer.PeekAtSequence(i); packet != nil { + if packet := s.buffer[i]; packet != nil { foundTail = packet break } @@ -106,8 +105,8 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta if location.empty() { return 0, false } - packet, err := s.buffer.PeekAtSequence(location.head) - if packet == nil || err != nil { + packet := s.buffer[location.head] + if packet == nil { return 0, false } return packet.Timestamp, true @@ -115,7 +114,7 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta func (s *SampleBuilder) releasePacket(i uint16) { var p *rtp.Packet - p, _ = s.buffer.PopAtSequence(i) + p, s.buffer[i] = s.buffer[i], nil if p != nil && s.packetReleaseHandler != nil { s.packetReleaseHandler(p) } @@ -179,7 +178,7 @@ func (s *SampleBuilder) purgeBuffers(flush bool) { // Push does not copy the input. If you wish to reuse // this memory make sure to copy before calling Push func (s *SampleBuilder) Push(p *rtp.Packet) { - s.buffer.Push(p) + s.buffer[p.SequenceNumber] = p switch s.filled.compare(p.SequenceNumber) { case slCompareVoid: @@ -221,19 +220,14 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample { var consume sampleSequenceLocation - for i := s.active.head; s.active.compare(i) != slCompareAfter; i++ { - pkt, err := s.buffer.PeekAtSequence(i) - if pkt == nil || err != nil { - break - } - - if s.depacketizer.IsPartitionTail(pkt.Marker, pkt.Payload) { + for i := s.active.head; s.buffer[i] != nil && s.active.compare(i) != slCompareAfter; i++ { + if s.depacketizer.IsPartitionTail(s.buffer[i].Marker, s.buffer[i].Payload) { consume.head = s.active.head consume.tail = i + 1 break } headTimestamp, hasData := s.fetchTimestamp(s.active) - if hasData && pkt.Timestamp != headTimestamp { + if hasData && s.buffer[i].Timestamp != headTimestamp { consume.head = s.active.head consume.tail = i break @@ -243,8 +237,8 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample { if consume.empty() { return nil } - pkt, _ := s.buffer.PeekAtSequence(consume.tail) - if !purgingBuffers && pkt == nil { + + if !purgingBuffers && s.buffer[consume.tail] == nil { // wait for the next packet after this set of packets to arrive // to ensure at least one post sample timestamp is known // (unless we have to release right now) @@ -256,10 +250,8 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample { // scan for any packet after the current and use that time stamp as the diff point for i := consume.tail; i < s.active.tail; i++ { - pkt, _ = s.buffer.PeekAtSequence(i) - - if pkt != nil { - afterTimestamp = pkt.Timestamp + if s.buffer[i] != nil { + afterTimestamp = s.buffer[i].Timestamp break } } @@ -269,11 +261,10 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample { // prior to decoding all the packets, check if this packet // would end being disposed anyway - pkt, err := s.buffer.PeekAtSequence(consume.head) - if err == nil && !s.depacketizer.IsPartitionHead(pkt.Payload) { + if !s.depacketizer.IsPartitionHead(s.buffer[consume.head].Payload) { isPadding := false for i := consume.head; i != consume.tail; i++ { - if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == pkt.Timestamp && len(pkt.Payload) == 0 { + if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == s.buffer[i].Timestamp && len(s.buffer[i].Payload) == 0 { isPadding = true } } @@ -291,11 +282,7 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample { var metadata interface{} var rtpHeaders []*rtp.Header for i := consume.head; i != consume.tail; i++ { - pkt, err := s.buffer.PeekAtSequence(i) - if err != nil { - return nil - } - p, err := s.depacketizer.Unmarshal(pkt.Payload) + p, err := s.depacketizer.Unmarshal(s.buffer[i].Payload) if err != nil { return nil } @@ -303,10 +290,8 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample { metadata = s.packetHeadHandler(s.depacketizer) } if s.returnRTPHeaders { - if packet, _ := s.buffer.PeekAtSequence(i); packet != nil { - h := pkt.Header.Clone() - rtpHeaders = append(rtpHeaders, &h) - } + h := s.buffer[i].Header.Clone() + rtpHeaders = append(rtpHeaders, &h) } data = append(data, p...) @@ -404,11 +389,3 @@ func WithRTPHeaders(enable bool) Option { o.returnRTPHeaders = enable } } - -// WithJitterBufferMinimumLength sets the minimum number of packets which must first -// be received before starting any playback -func WithJitterBufferMinimumLength(length uint16) Option { - return func(o *SampleBuilder) { - o.buffer = jitterbuffer.New(jitterbuffer.WithMinimumPacketCount(length)) - } -} diff --git a/pkg/media/samplebuilder/samplebuilder_test.go b/pkg/media/samplebuilder/samplebuilder_test.go index c20a6797eef..655ecb85d4c 100644 --- a/pkg/media/samplebuilder/samplebuilder_test.go +++ b/pkg/media/samplebuilder/samplebuilder_test.go @@ -396,18 +396,14 @@ func TestSampleBuilderCleanReference(t *testing.T) { s.Push(pkt5) for i := 0; i < 3; i++ { - pkt, err := s.buffer.PeekAtSequence(uint16((i + int(seqStart)) % 0x10000)) - - if pkt != nil || err == nil { + if s.buffer[(i+int(seqStart))%0x10000] != nil { t.Errorf("Old packet (%d) is not unreferenced (maxLate: 10, pushed: 12)", i) } } - pkt, err := s.buffer.PeekAtSequence(uint16((14 + int(seqStart)) % 0x10000)) - if pkt != pkt4 || err != nil { + if s.buffer[(14+int(seqStart))%0x10000] != pkt4 { t.Error("New packet must be referenced after jump") } - pkt, err = s.buffer.PeekAtSequence(uint16((12 + int(seqStart)) % 0x10000)) - if pkt != pkt5 || err != nil { + if s.buffer[(12+int(seqStart))%0x10000] != pkt5 { t.Error("New packet must be referenced after jump") } })