From 897ff13239a19205710303f4cc8eb7d535ca5ce7 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 9 Jun 2024 00:25:40 +0200 Subject: [PATCH] webrtc: support reading, publishing, proxying LPCM tracks --- internal/formatprocessor/generic.go | 2 +- internal/protocols/webrtc/incoming_track.go | 51 ++- internal/protocols/webrtc/outgoing_track.go | 28 +- .../protocols/webrtc/peer_connection_test.go | 45 +++ internal/servers/webrtc/read_index.html | 22 ++ internal/servers/webrtc/server_test.go | 291 ++++++++++++------ internal/servers/webrtc/session.go | 40 +++ 7 files changed, 375 insertions(+), 104 deletions(-) diff --git a/internal/formatprocessor/generic.go b/internal/formatprocessor/generic.go index 0140aa80b2e..1faba6a8f50 100644 --- a/internal/formatprocessor/generic.go +++ b/internal/formatprocessor/generic.go @@ -20,7 +20,7 @@ func newGeneric( generateRTPPackets bool, ) (*formatProcessorGeneric, error) { if generateRTPPackets { - return nil, fmt.Errorf("we don't know how to generate RTP packets of format %+v", forma) + return nil, fmt.Errorf("we don't know how to generate RTP packets of format %T", forma) } return &formatProcessorGeneric{ diff --git a/internal/protocols/webrtc/incoming_track.go b/internal/protocols/webrtc/incoming_track.go index 785a432753f..fa5fd8786fc 100644 --- a/internal/protocols/webrtc/incoming_track.go +++ b/internal/protocols/webrtc/incoming_track.go @@ -19,6 +19,11 @@ const ( keyFrameInterval = 2 * time.Second ) +const ( + mimeTypeMultiopus = "audio/multiopus" + mimeTypeL16 = "audio/L16" +) + var incomingVideoCodecs = []webrtc.RTPCodecParameters{ { RTPCodecCapability: webrtc.RTPCodecCapability{ @@ -95,7 +100,7 @@ var incomingVideoCodecs = []webrtc.RTPCodecParameters{ var incomingAudioCodecs = []webrtc.RTPCodecParameters{ { RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: mimeMultiopus, + MimeType: mimeTypeMultiopus, ClockRate: 48000, Channels: 3, SDPFmtpLine: "channel_mapping=0,2,1;num_streams=2;coupled_streams=1", @@ -104,7 +109,7 @@ var incomingAudioCodecs = []webrtc.RTPCodecParameters{ }, { RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: mimeMultiopus, + MimeType: mimeTypeMultiopus, ClockRate: 48000, Channels: 4, SDPFmtpLine: "channel_mapping=0,1,2,3;num_streams=2;coupled_streams=2", @@ -113,7 +118,7 @@ var incomingAudioCodecs = []webrtc.RTPCodecParameters{ }, { RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: mimeMultiopus, + MimeType: mimeTypeMultiopus, ClockRate: 48000, Channels: 5, SDPFmtpLine: "channel_mapping=0,4,1,2,3;num_streams=3;coupled_streams=2", @@ -122,7 +127,7 @@ var incomingAudioCodecs = []webrtc.RTPCodecParameters{ }, { RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: mimeMultiopus, + MimeType: mimeTypeMultiopus, ClockRate: 48000, Channels: 6, SDPFmtpLine: "channel_mapping=0,4,1,2,3,5;num_streams=4;coupled_streams=2", @@ -131,7 +136,7 @@ var incomingAudioCodecs = []webrtc.RTPCodecParameters{ }, { RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: mimeMultiopus, + MimeType: mimeTypeMultiopus, ClockRate: 48000, Channels: 7, SDPFmtpLine: "channel_mapping=0,4,1,2,3,5,6;num_streams=4;coupled_streams=4", @@ -140,7 +145,7 @@ var incomingAudioCodecs = []webrtc.RTPCodecParameters{ }, { RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: mimeMultiopus, + MimeType: mimeTypeMultiopus, ClockRate: 48000, Channels: 8, SDPFmtpLine: "channel_mapping=0,6,1,4,5,2,3,7;num_streams=5;coupled_streams=4", @@ -193,6 +198,30 @@ var incomingAudioCodecs = []webrtc.RTPCodecParameters{ }, PayloadType: 8, }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: mimeTypeL16, + ClockRate: 8000, + Channels: 2, + }, + PayloadType: 120, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: mimeTypeL16, + ClockRate: 16000, + Channels: 2, + }, + PayloadType: 121, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: mimeTypeL16, + ClockRate: 48000, + Channels: 2, + }, + PayloadType: 122, + }, } // IncomingTrack is an incoming track. @@ -245,7 +274,7 @@ func newIncomingTrack( PacketizationMode: 1, } - case strings.ToLower(mimeMultiopus): + case strings.ToLower(mimeTypeMultiopus): t.format = &format.Opus{ PayloadTyp: uint8(track.PayloadType()), ChannelCount: int(track.Codec().Channels), @@ -301,6 +330,14 @@ func newIncomingTrack( ChannelCount: int(channels), } + case strings.ToLower(mimeTypeL16): + t.format = &format.LPCM{ + PayloadTyp: uint8(track.PayloadType()), + BitDepth: 16, + SampleRate: int(track.Codec().ClockRate), + ChannelCount: int(track.Codec().Channels), + } + default: return nil, fmt.Errorf("unsupported codec: %+v", track.Codec()) } diff --git a/internal/protocols/webrtc/outgoing_track.go b/internal/protocols/webrtc/outgoing_track.go index 3f790ba19a2..efd79cb6555 100644 --- a/internal/protocols/webrtc/outgoing_track.go +++ b/internal/protocols/webrtc/outgoing_track.go @@ -8,10 +8,6 @@ import ( "github.com/pion/webrtc/v3" ) -const ( - mimeMultiopus = "audio/multiopus" -) - // OutgoingTrack is a WebRTC outgoing track type OutgoingTrack struct { Format format.Format @@ -62,7 +58,7 @@ func (t *OutgoingTrack) codecParameters() (webrtc.RTPCodecParameters, error) { if forma.ChannelCount > 2 { return webrtc.RTPCodecParameters{ RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: mimeMultiopus, + MimeType: mimeTypeMultiopus, ClockRate: 48000, Channels: uint16(forma.ChannelCount), }, @@ -140,6 +136,28 @@ func (t *OutgoingTrack) codecParameters() (webrtc.RTPCodecParameters, error) { PayloadType: 8, }, nil + case *format.LPCM: + if forma.BitDepth != 16 { + return webrtc.RTPCodecParameters{}, fmt.Errorf("unsupported LPCM bit depth: %d", forma.BitDepth) + } + + if forma.ClockRate() != 8000 && forma.ClockRate() != 16000 && forma.ClockRate() != 48000 { + return webrtc.RTPCodecParameters{}, fmt.Errorf("unsupported clock rate: %d", forma.ClockRate()) + } + + if forma.ChannelCount != 1 && forma.ChannelCount != 2 { + return webrtc.RTPCodecParameters{}, fmt.Errorf("unsupported channel count: %d", forma.ChannelCount) + } + + return webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: mimeTypeL16, + ClockRate: uint32(forma.ClockRate()), + Channels: uint16(forma.ChannelCount), + }, + PayloadType: 96, + }, nil + default: return webrtc.RTPCodecParameters{}, fmt.Errorf("unsupported track type: %T", forma) } diff --git a/internal/protocols/webrtc/peer_connection_test.go b/internal/protocols/webrtc/peer_connection_test.go index adf467f3ee2..8c265dbd820 100644 --- a/internal/protocols/webrtc/peer_connection_test.go +++ b/internal/protocols/webrtc/peer_connection_test.go @@ -169,6 +169,51 @@ func TestPeerConnectionPublishRead(t *testing.T) { ChannelCount: 1, }, }, + { + "l16 8000 stereo", + &format.LPCM{ + PayloadTyp: 96, + BitDepth: 16, + SampleRate: 8000, + ChannelCount: 2, + }, + &format.LPCM{ + PayloadTyp: 96, + BitDepth: 16, + SampleRate: 8000, + ChannelCount: 2, + }, + }, + { + "l16 16000 stereo", + &format.LPCM{ + PayloadTyp: 96, + BitDepth: 16, + SampleRate: 16000, + ChannelCount: 2, + }, + &format.LPCM{ + PayloadTyp: 96, + BitDepth: 16, + SampleRate: 16000, + ChannelCount: 2, + }, + }, + { + "l16 48khz stereo", + &format.LPCM{ + PayloadTyp: 96, + BitDepth: 16, + SampleRate: 48000, + ChannelCount: 2, + }, + &format.LPCM{ + PayloadTyp: 96, + BitDepth: 16, + SampleRate: 48000, + ChannelCount: 2, + }, + }, } { t.Run(ca.name, func(t *testing.T) { pc1 := &PeerConnection{ diff --git a/internal/servers/webrtc/read_index.html b/internal/servers/webrtc/read_index.html index 55a2409652d..f3052d59a4e 100644 --- a/internal/servers/webrtc/read_index.html +++ b/internal/servers/webrtc/read_index.html @@ -158,6 +158,24 @@ return lines.join('\r\n'); }; +const enableL16 = (section) => { + let lines = section.split('\r\n'); + + lines[0] += " 120"; + lines.splice(lines.length - 1, 0, "a=rtpmap:120 L16/8000/2"); + lines.splice(lines.length - 1, 0, "a=rtcp-fb:120 transport-cc"); + + lines[0] += " 121"; + lines.splice(lines.length - 1, 0, "a=rtpmap:121 L16/16000/2"); + lines.splice(lines.length - 1, 0, "a=rtcp-fb:121 transport-cc"); + + lines[0] += " 122"; + lines.splice(lines.length - 1, 0, "a=rtpmap:122 L16/48000/2"); + lines.splice(lines.length - 1, 0, "a=rtcp-fb:122 transport-cc"); + + return lines.join('\r\n'); +}; + const enableStereoOpus = (section) => { let opusPayloadFormat = ''; let lines = section.split('\r\n'); @@ -202,6 +220,10 @@ sections[i] = enableMultichannelOpus(sections[i]); } + if (nonAdvertisedCodecs.includes('L16/48000/2')) { + sections[i] = enableL16(sections[i]); + } + break; } } diff --git a/internal/servers/webrtc/server_test.go b/internal/servers/webrtc/server_test.go index b87e63c01ae..74e41f0016d 100644 --- a/internal/servers/webrtc/server_test.go +++ b/internal/servers/webrtc/server_test.go @@ -5,10 +5,12 @@ import ( "context" "net/http" "net/url" + "reflect" "testing" "time" "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/auth" "github.com/bluenviron/mediamtx/internal/conf" @@ -336,107 +338,214 @@ func TestServerPublish(t *testing.T) { } func TestServerRead(t *testing.T) { - desc := &description.Session{Medias: []*description.Media{test.MediaH264}} - - stream, err := stream.New( - 1460, - desc, - true, - test.NilLogger, - ) - require.NoError(t, err) - - path := &dummyPath{stream: stream} + for _, ca := range []struct { + name string + medias []*description.Media + unit unit.Unit + outRTPPayload []byte + }{ + { + "av1", + []*description.Media{{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.AV1{ + PayloadTyp: 96, + }}, + }}, + &unit.AV1{ + TU: [][]byte{{1, 2}}, + }, + []byte{0, 2, 1, 2}, + }, + // TODO: check why this doesn't work + /*{ + "vp9", + []*description.Media{{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.VP9{ + PayloadTyp: 96, + }}, + }}, + &unit.VP9{ + Frame: []byte{1, 2}, + }, + []byte{1, 2}, + },*/ + { + "vp8", + []*description.Media{{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.VP8{ + PayloadTyp: 96, + }}, + }}, + &unit.VP8{ + Frame: []byte{1, 2}, + }, + []byte{0x10, 1, 2}, + }, + { + "h264", + []*description.Media{test.MediaH264}, + &unit.H264{ + AU: [][]byte{ + {5, 1}, + }, + }, + []byte{ + 0x18, 0x00, 0x19, 0x67, 0x42, 0xc0, 0x28, 0xd9, + 0x00, 0x78, 0x02, 0x27, 0xe5, 0x84, 0x00, 0x00, + 0x03, 0x00, 0x04, 0x00, 0x00, 0x03, 0x00, 0xf0, + 0x3c, 0x60, 0xc9, 0x20, 0x00, 0x04, 0x08, 0x06, + 0x07, 0x08, 0x00, 0x02, 0x05, 0x01, + }, + }, + { + "opus", + []*description.Media{{ + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.Opus{ + PayloadTyp: 96, + ChannelCount: 2, + }}, + }}, + &unit.Opus{ + Packets: [][]byte{{1, 2}}, + }, + []byte{1, 2}, + }, + // TODO: check why this doesn't work + /*{ + "g722", + []*description.Media{{ + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.G722{}}, + }}, + &unit.Generic{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{{ + Header: rtp.Header{}, + Payload: []byte{1, 2}, + }}, + }, + }, + []byte{1, 2}, + },*/ + { + "g711", + []*description.Media{{ + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.G711{ + MULaw: true, + SampleRate: 8000, + ChannelCount: 1, + }}, + }}, + &unit.G711{ + Samples: []byte{1, 2, 3}, + }, + []byte{1, 2, 3}, + }, + { + "lpcm", + []*description.Media{{ + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.LPCM{ + PayloadTyp: 96, + BitDepth: 16, + SampleRate: 48000, + ChannelCount: 2, + }}, + }}, + &unit.LPCM{ + Samples: []byte{1, 2, 3, 4}, + }, + []byte{1, 2, 3, 4}, + }, + } { + t.Run(ca.name, func(t *testing.T) { + desc := &description.Session{Medias: ca.medias} + + stream, err := stream.New( + 1460, + desc, + true, + test.NilLogger, + ) + require.NoError(t, err) - pathManager := &dummyPathManager{path: path} + path := &dummyPath{stream: stream} + + pathManager := &dummyPathManager{path: path} + + s := &Server{ + Address: "127.0.0.1:8886", + Encryption: false, + ServerKey: "", + ServerCert: "", + AllowOrigin: "", + TrustedProxies: conf.IPNetworks{}, + ReadTimeout: conf.StringDuration(10 * time.Second), + WriteQueueSize: 512, + LocalUDPAddress: "127.0.0.1:8887", + LocalTCPAddress: "127.0.0.1:8887", + IPsFromInterfaces: true, + IPsFromInterfacesList: []string{}, + AdditionalHosts: []string{}, + ICEServers: []conf.WebRTCICEServer{}, + HandshakeTimeout: conf.StringDuration(10 * time.Second), + TrackGatherTimeout: conf.StringDuration(2 * time.Second), + ExternalCmdPool: nil, + PathManager: pathManager, + Parent: test.NilLogger, + } + err = s.Initialize() + require.NoError(t, err) + defer s.Close() - s := &Server{ - Address: "127.0.0.1:8886", - Encryption: false, - ServerKey: "", - ServerCert: "", - AllowOrigin: "", - TrustedProxies: conf.IPNetworks{}, - ReadTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, - LocalUDPAddress: "127.0.0.1:8887", - LocalTCPAddress: "127.0.0.1:8887", - IPsFromInterfaces: true, - IPsFromInterfacesList: []string{}, - AdditionalHosts: []string{}, - ICEServers: []conf.WebRTCICEServer{}, - HandshakeTimeout: conf.StringDuration(10 * time.Second), - TrackGatherTimeout: conf.StringDuration(2 * time.Second), - ExternalCmdPool: nil, - PathManager: pathManager, - Parent: test.NilLogger, - } - err = s.Initialize() - require.NoError(t, err) - defer s.Close() + u, err := url.Parse("http://myuser:mypass@localhost:8886/teststream/whep?param=value") + require.NoError(t, err) - u, err := url.Parse("http://myuser:mypass@localhost:8886/teststream/whep?param=value") - require.NoError(t, err) + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} + wc := &webrtc.WHIPClient{ + HTTPClient: hc, + URL: u, + Log: test.NilLogger, + } - wc := &webrtc.WHIPClient{ - HTTPClient: hc, - URL: u, - Log: test.NilLogger, - } + writerDone := make(chan struct{}) + defer func() { <-writerDone }() - writerDone := make(chan struct{}) - defer func() { <-writerDone }() + writerTerminate := make(chan struct{}) + defer close(writerTerminate) - writerTerminate := make(chan struct{}) - defer close(writerTerminate) + go func() { + defer close(writerDone) + for { + select { + case <-time.After(100 * time.Millisecond): + case <-writerTerminate: + return + } - go func() { - defer close(writerDone) - for { - select { - case <-time.After(100 * time.Millisecond): - case <-writerTerminate: - return - } - stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ - Base: unit.Base{ - NTP: time.Time{}, - }, - AU: [][]byte{ - {5, 1}, - }, - }) - } - }() + r := reflect.New(reflect.TypeOf(ca.unit).Elem()) + r.Elem().Set(reflect.ValueOf(ca.unit).Elem()) + stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], r.Interface().(unit.Unit)) + } + }() - tracks, err := wc.Read(context.Background()) - require.NoError(t, err) - defer checkClose(t, wc.Close) + tracks, err := wc.Read(context.Background()) + require.NoError(t, err) + defer checkClose(t, wc.Close) - pkt, err := tracks[0].ReadRTP() - require.NoError(t, err) - require.Equal(t, &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 104, - SequenceNumber: pkt.SequenceNumber, - Timestamp: pkt.Timestamp, - SSRC: pkt.SSRC, - CSRC: []uint32{}, - }, - Payload: []byte{ - 0x18, 0x00, 0x19, 0x67, 0x42, 0xc0, 0x28, 0xd9, - 0x00, 0x78, 0x02, 0x27, 0xe5, 0x84, 0x00, 0x00, - 0x03, 0x00, 0x04, 0x00, 0x00, 0x03, 0x00, 0xf0, - 0x3c, 0x60, 0xc9, 0x20, 0x00, 0x04, 0x08, 0x06, - 0x07, 0x08, 0x00, 0x02, 0x05, 0x01, - }, - }, pkt) + pkt, err := tracks[0].ReadRTP() + require.NoError(t, err) + require.Equal(t, ca.outRTPPayload, pkt.Payload) + }) + } } func TestServerPostNotFound(t *testing.T) { diff --git a/internal/servers/webrtc/session.go b/internal/servers/webrtc/session.go index 5a7fc416a56..ed6a20664f8 100644 --- a/internal/servers/webrtc/session.go +++ b/internal/servers/webrtc/session.go @@ -14,6 +14,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpav1" "github.com/bluenviron/gortsplib/v4/pkg/format/rtph264" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtplpcm" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp8" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp9" "github.com/bluenviron/gortsplib/v4/pkg/rtptime" @@ -259,6 +260,45 @@ func findAudioTrack( } } + var lpcmFormat *format.LPCM + media = stream.Desc().FindFormat(&lpcmFormat) + + if lpcmFormat != nil { + return lpcmFormat, func(track *webrtc.OutgoingTrack) error { + encoder := &rtplpcm.Encoder{ + PayloadType: 96, + BitDepth: 16, + ChannelCount: lpcmFormat.ChannelCount, + PayloadMaxSize: webrtcPayloadMaxSize, + } + err := encoder.Init() + if err != nil { + return err + } + + stream.AddReader(writer, media, lpcmFormat, func(u unit.Unit) error { + tunit := u.(*unit.LPCM) + + if tunit.Samples == nil { + return nil + } + + packets, err := encoder.Encode(tunit.Samples) + if err != nil { + return nil //nolint:nilerr + } + + for _, pkt := range packets { + pkt.Timestamp += tunit.RTPPackets[0].Timestamp + track.WriteRTP(pkt) //nolint:errcheck + } + + return nil + }) + return nil + } + } + return nil, nil }