diff --git a/pkg/buffer/buffer.go b/pkg/buffer/buffer.go index 75c8502d3..d808528c5 100644 --- a/pkg/buffer/buffer.go +++ b/pkg/buffer/buffer.go @@ -397,11 +397,14 @@ func (b *Buffer) calc(pkt []byte, arrivalTime int64) { } func (b *Buffer) buildNACKPacket() []rtcp.Packet { - if nacks, askKeyframe := b.nacker.pairs(b.cycles | uint32(b.maxSeqNo)); nacks != nil && len(nacks) > 0 { - pkts := []rtcp.Packet{&rtcp.TransportLayerNack{ - MediaSSRC: b.mediaSSRC, - Nacks: nacks, - }} + if nacks, askKeyframe := b.nacker.pairs(b.cycles | uint32(b.maxSeqNo)); (nacks != nil && len(nacks) > 0) || askKeyframe { + var pkts []rtcp.Packet + if len(nacks) > 0 { + pkts = []rtcp.Packet{&rtcp.TransportLayerNack{ + MediaSSRC: b.mediaSSRC, + Nacks: nacks, + }} + } if askKeyframe { pkts = append(pkts, &rtcp.PictureLossIndication{ diff --git a/pkg/buffer/buffer_test.go b/pkg/buffer/buffer_test.go index a3762679a..891d1d52f 100644 --- a/pkg/buffer/buffer_test.go +++ b/pkg/buffer/buffer_test.go @@ -44,6 +44,63 @@ func CreateTestListPackets(snsAndTSs []SequenceNumberAndTimeStamp) (packetList [ return packetList } +func TestNack(t *testing.T) { + pool := &sync.Pool{ + New: func() interface{} { + return make([]byte, 1500) + }, + } + buff := NewBuffer(123, pool, pool) + buff.codecType = webrtc.RTPCodecTypeVideo + assert.NotNil(t, buff) + var wg sync.WaitGroup + // 3 nacks 1 Pli + wg.Add(4) + buff.OnFeedback(func(fb []rtcp.Packet) { + for _, pkt := range fb { + switch p := pkt.(type) { + case *rtcp.TransportLayerNack: + if p.Nacks[0].PacketList()[0] == 2 && p.MediaSSRC == 123 { + wg.Done() + } + case *rtcp.PictureLossIndication: + if p.MediaSSRC == 123 { + wg.Done() + } + } + } + }) + buff.Bind(webrtc.RTPParameters{ + HeaderExtensions: nil, + Codecs: []webrtc.RTPCodecParameters{ + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: "video/vp8", + ClockRate: 90000, + RTCPFeedback: []webrtc.RTCPFeedback{{ + Type: "nack", + }}, + }, + PayloadType: 96, + }, + }, + }, Options{}) + for i := 0; i < 15; i++ { + if i == 2 { + continue + } + pkt := rtp.Packet{ + Header: rtp.Header{SequenceNumber: uint16(i), Timestamp: uint32(i)}, + Payload: []byte{0xff, 0xff, 0xff, 0xfd, 0xb4, 0x9f, 0x94, 0x1}, + } + b, err := pkt.Marshal() + assert.NoError(t, err) + _, err = buff.Write(b) + assert.NoError(t, err) + } + wg.Wait() +} + func TestNewBuffer(t *testing.T) { type args struct { options Options diff --git a/pkg/buffer/nack.go b/pkg/buffer/nack.go index 4a6ff90f9..46d634413 100644 --- a/pkg/buffer/nack.go +++ b/pkg/buffer/nack.go @@ -55,16 +55,11 @@ func (n *nackQueue) pairs(headSN uint32) ([]rtcp.NackPair, bool) { if len(n.nacks) == 0 { return nil, false } - i := 0 askKF := false var np rtcp.NackPair var nps []rtcp.NackPair for _, nck := range n.nacks { - if nck.sn >= headSN-2 { - continue - } - if nck.nacked >= maxNackTimes { if nck.sn > n.kfSN { n.kfSN = nck.sn @@ -72,6 +67,11 @@ func (n *nackQueue) pairs(headSN uint32) ([]rtcp.NackPair, bool) { } continue } + if nck.sn >= headSN-2 { + n.nacks[i] = nck + i++ + continue + } n.nacks[i] = nack{ sn: nck.sn, nacked: nck.nacked + 1,