diff --git a/pkg/jitterbuffer/jitter_buffer.go b/pkg/jitterbuffer/jitter_buffer.go index 56cb6c9e..e7a9f43c 100644 --- a/pkg/jitterbuffer/jitter_buffer.go +++ b/pkg/jitterbuffer/jitter_buffer.go @@ -91,10 +91,18 @@ type Stats struct { // New will initialize a jitter buffer and its associated statistics func New(opts ...Option) *JitterBuffer { - jb := &JitterBuffer{state: Buffering, stats: Stats{0, 0, 0}, minStartCount: 50, packets: NewQueue(), listeners: make(map[Event][]EventListener)} + jb := &JitterBuffer{ + state: Buffering, + stats: Stats{0, 0, 0}, + minStartCount: 50, + packets: NewQueue(), + listeners: make(map[Event][]EventListener), + } + for _, o := range opts { o(jb) } + return jb } @@ -113,6 +121,23 @@ func (jb *JitterBuffer) Listen(event Event, cb EventListener) { jb.listeners[event] = append(jb.listeners[event], cb) } +// PlayoutHead returns the SequenceNumber that will be attempted to Pop next +func (jb *JitterBuffer) PlayoutHead() uint16 { + jb.mutex.Lock() + defer jb.mutex.Unlock() + + return jb.playoutHead +} + +// SetPlayoutHead allows you to manually specify the packet you wish to pop next +// If you have encountered a packet that hasn't resolved you can skip it +func (jb *JitterBuffer) SetPlayoutHead(playoutHead uint16) { + jb.mutex.Lock() + defer jb.mutex.Unlock() + + jb.playoutHead = playoutHead +} + func (jb *JitterBuffer) updateStats(lastPktSeqNo uint16) { // If we have at least one packet, and the next packet being pushed in is not // at the expected sequence number increment the out of order count diff --git a/pkg/jitterbuffer/jitter_buffer_test.go b/pkg/jitterbuffer/jitter_buffer_test.go index b326cd2a..0ed73023 100644 --- a/pkg/jitterbuffer/jitter_buffer_test.go +++ b/pkg/jitterbuffer/jitter_buffer_test.go @@ -13,6 +13,7 @@ import ( func TestJitterBuffer(t *testing.T) { assert := assert.New(t) + t.Run("Appends packets in order", func(*testing.T) { jb := New() assert.Equal(jb.lastSequence, uint16(0)) @@ -29,6 +30,7 @@ func TestJitterBuffer(t *testing.T) { assert.Equal(jb.packets.Length(), uint16(4)) assert.Equal(jb.lastSequence, uint16(5012)) }) + t.Run("Appends packets and begins playout", func(*testing.T) { jb := New() for i := 0; i < 100; i++ { @@ -53,6 +55,7 @@ func TestJitterBuffer(t *testing.T) { assert.Equal(head.SequenceNumber, uint16(5012)) assert.Equal(err, nil) }) + t.Run("Wraps playout correctly", func(*testing.T) { jb := New() for i := 0; i < 100; i++ { @@ -75,6 +78,7 @@ func TestJitterBuffer(t *testing.T) { } } }) + t.Run("Pops at timestamp correctly", func(*testing.T) { jb := New() for i := 0; i < 100; i++ { @@ -94,6 +98,7 @@ func TestJitterBuffer(t *testing.T) { assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32)) assert.Equal(err, nil) }) + t.Run("Can peek at a packet", func(*testing.T) { jb := New() jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}) @@ -110,6 +115,7 @@ func TestJitterBuffer(t *testing.T) { assert.Equal(pkt.SequenceNumber, uint16(5000)) assert.Equal(err, nil) }) + t.Run("Pops at sequence with an invalid sequence number", func(*testing.T) { jb := New() for i := 0; i < 50; i++ { @@ -124,6 +130,7 @@ func TestJitterBuffer(t *testing.T) { assert.Equal(head, (*rtp.Packet)(nil)) assert.NotEqual(err, nil) }) + t.Run("Pops at timestamp with multiple packets", func(*testing.T) { jb := New() for i := 0; i < 50; i++ { @@ -145,6 +152,7 @@ func TestJitterBuffer(t *testing.T) { assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32)) assert.Equal(err, nil) }) + t.Run("Peeks at timestamp with multiple packets", func(*testing.T) { jb := New() for i := 0; i < 50; i++ { @@ -166,4 +174,44 @@ func TestJitterBuffer(t *testing.T) { assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32)) assert.Equal(err, nil) }) + + t.Run("SetPlayoutHead", func(*testing.T) { + jb := New(WithMinimumPacketCount(1)) + + // Push packets 0-9, but no packet 4 + for i := uint16(0); i < 10; i++ { + if i == 4 { + continue + } + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: i, Timestamp: uint32(512 + i)}, Payload: []byte{0x00}}) + } + + // The first 3 packets will be able to popped + for i := 0; i < 4; i++ { + pkt, err := jb.Pop() + assert.NoError(err) + assert.NotNil(pkt) + } + + // The next pop will fail because of gap + pkt, err := jb.Pop() + assert.ErrorIs(err, ErrNotFound) + assert.Nil(pkt) + assert.Equal(jb.PlayoutHead(), uint16(4)) + + // Assert that PlayoutHead isn't modified with pushing/popping again + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 10, Timestamp: uint32(522)}, Payload: []byte{0x00}}) + pkt, err = jb.Pop() + assert.ErrorIs(err, ErrNotFound) + assert.Nil(pkt) + assert.Equal(jb.PlayoutHead(), uint16(4)) + + // Increment the PlayoutHead and popping will work again + jb.SetPlayoutHead(jb.PlayoutHead() + 1) + for i := 0; i < 6; i++ { + pkt, err := jb.Pop() + assert.NoError(err) + assert.NotNil(pkt) + } + }) }