Skip to content

Commit

Permalink
rtmp: support ingesting AV1, VP9, H265, MP3, PCM from other servers (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 authored Sep 9, 2024
1 parent ab85249 commit e665385
Show file tree
Hide file tree
Showing 8 changed files with 353 additions and 352 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Live streams can be published to the server with:
|[RTSP clients](#rtsp-clients)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711 (PCMA, PCMU), LPCM and any RTP-compatible codec|
|[RTSP cameras and servers](#rtsp-cameras-and-servers)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711 (PCMA, PCMU), LPCM and any RTP-compatible codec|
|[RTMP clients](#rtmp-clients)|RTMP, RTMPS, Enhanced RTMP|AV1, VP9, H265, H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G711 (PCMA, PCMU), LPCM|
|[RTMP cameras and servers](#rtmp-cameras-and-servers)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)|
|[RTMP cameras and servers](#rtmp-cameras-and-servers)|RTMP, RTMPS, Enhanced RTMP|AV1, VP9, H265, H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G711 (PCMA, PCMU), LPCM|
|[HLS cameras and servers](#hls-cameras-and-servers)|Low-Latency HLS, MP4-based HLS, legacy HLS|AV1, VP9, H265, H264|Opus, MPEG-4 Audio (AAC)|
|[UDP/MPEG-TS](#udpmpeg-ts)|Unicast, broadcast, multicast|H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3|
|[Raspberry Pi Cameras](#raspberry-pi-cameras)||H264||
Expand Down
2 changes: 1 addition & 1 deletion internal/protocols/mpegts/from_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func durationGoToMPEGTS(v time.Duration) int64 {
return int64(v.Seconds() * 90000)
}

// FromStream links a server stream to a MPEG-TS writer.
// FromStream maps a MediaMTX stream to a MPEG-TS writer.
func FromStream(
stream *stream.Stream,
writer *asyncwriter.Writer,
Expand Down
2 changes: 1 addition & 1 deletion internal/protocols/mpegts/to_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
var ErrNoTracks = errors.New("no supported tracks found (supported are H265, H264," +
" MPEG-4 Video, MPEG-1/2 Video, Opus, MPEG-4 Audio, MPEG-1 Audio, AC-3")

// ToStream converts a MPEG-TS stream to a server stream.
// ToStream maps a MPEG-TS stream to a MediaMTX stream.
func ToStream(r *mpegts.Reader, stream **stream.Stream) ([]*description.Media, error) {
var medias []*description.Media //nolint:prealloc

Expand Down
204 changes: 204 additions & 0 deletions internal/protocols/rtmp/from_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package rtmp

import (
"errors"
"fmt"
"net"
"time"

"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg1audio"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)

var errNoSupportedCodecs = errors.New(
"the stream doesn't contain any supported codec, which are currently H264, MPEG-4 Audio, MPEG-1/2 Audio")

func setupVideo(
stream *stream.Stream,
writer *asyncwriter.Writer,
w **Writer,
nconn net.Conn,
writeTimeout time.Duration,
) format.Format {
var videoFormatH264 *format.H264
videoMedia := stream.Desc().FindFormat(&videoFormatH264)

if videoFormatH264 != nil {
var videoDTSExtractor *h264.DTSExtractor

stream.AddReader(writer, videoMedia, videoFormatH264, func(u unit.Unit) error {
tunit := u.(*unit.H264)

if tunit.AU == nil {
return nil
}

idrPresent := false
nonIDRPresent := false

for _, nalu := range tunit.AU {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeIDR:
idrPresent = true

case h264.NALUTypeNonIDR:
nonIDRPresent = true
}
}

var dts time.Duration

// wait until we receive an IDR
if videoDTSExtractor == nil {
if !idrPresent {
return nil
}

videoDTSExtractor = h264.NewDTSExtractor()

var err error
dts, err = videoDTSExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
} else {
if !idrPresent && !nonIDRPresent {
return nil
}

var err error
dts, err = videoDTSExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
}

nconn.SetWriteDeadline(time.Now().Add(writeTimeout))
return (*w).WriteH264(tunit.PTS, dts, idrPresent, tunit.AU)
})

return videoFormatH264
}

return nil
}

func setupAudio(
stream *stream.Stream,
writer *asyncwriter.Writer,
w **Writer,
nconn net.Conn,
writeTimeout time.Duration,
) format.Format {
var audioFormatMPEG4Audio *format.MPEG4Audio
audioMedia := stream.Desc().FindFormat(&audioFormatMPEG4Audio)

if audioMedia != nil {
stream.AddReader(writer, audioMedia, audioFormatMPEG4Audio, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Audio)

if tunit.AUs == nil {
return nil
}

for i, au := range tunit.AUs {
nconn.SetWriteDeadline(time.Now().Add(writeTimeout))
err := (*w).WriteMPEG4Audio(
tunit.PTS+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/time.Duration(audioFormatMPEG4Audio.ClockRate()),
au,
)
if err != nil {
return err
}
}

return nil
})

return audioFormatMPEG4Audio
}

var audioFormatMPEG1 *format.MPEG1Audio
audioMedia = stream.Desc().FindFormat(&audioFormatMPEG1)

if audioMedia != nil {
stream.AddReader(writer, audioMedia, audioFormatMPEG1, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Audio)

pts := tunit.PTS

for _, frame := range tunit.Frames {
var h mpeg1audio.FrameHeader
err := h.Unmarshal(frame)
if err != nil {
return err
}

if !(!h.MPEG2 && h.Layer == 3) {
return fmt.Errorf("RTMP only supports MPEG-1 layer 3 audio")
}

nconn.SetWriteDeadline(time.Now().Add(writeTimeout))
err = (*w).WriteMPEG1Audio(pts, &h, frame)
if err != nil {
return err
}

pts += time.Duration(h.SampleCount()) *
time.Second / time.Duration(h.SampleRate)
}

return nil
})

return audioFormatMPEG1
}

return nil
}

// FromStream maps a MediaMTX stream to a RTMP stream.
func FromStream(
stream *stream.Stream,
writer *asyncwriter.Writer,
conn *Conn,
nconn net.Conn,
writeTimeout time.Duration,
) error {
var w *Writer

videoFormat := setupVideo(
stream,
writer,
&w,
nconn,
writeTimeout,
)

audioFormat := setupAudio(
stream,
writer,
&w,
nconn,
writeTimeout,
)

if videoFormat == nil && audioFormat == nil {
return errNoSupportedCodecs
}

var err error
w, err = NewWriter(conn, videoFormat, audioFormat)
if err != nil {
return err
}

return nil
}
134 changes: 134 additions & 0 deletions internal/protocols/rtmp/to_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package rtmp

import (
"fmt"
"time"

"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)

// ToStream maps a RTMP stream to a MediaMTX stream.
func ToStream(r *Reader, stream **stream.Stream) ([]*description.Media, error) {
videoFormat, audioFormat := r.Tracks()

var medias []*description.Media

if videoFormat != nil {
medi := &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{videoFormat},
}
medias = append(medias, medi)

switch videoFormat.(type) {
case *format.AV1:
r.OnDataAV1(func(pts time.Duration, tu [][]byte) {
(*stream).WriteUnit(medi, videoFormat, &unit.AV1{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
TU: tu,
})
})

case *format.VP9:
r.OnDataVP9(func(pts time.Duration, frame []byte) {
(*stream).WriteUnit(medi, videoFormat, &unit.VP9{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
Frame: frame,
})
})

case *format.H265:
r.OnDataH265(func(pts time.Duration, au [][]byte) {
(*stream).WriteUnit(medi, videoFormat, &unit.H265{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
AU: au,
})
})

case *format.H264:
r.OnDataH264(func(pts time.Duration, au [][]byte) {
(*stream).WriteUnit(medi, videoFormat, &unit.H264{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
AU: au,
})
})

default:
return nil, fmt.Errorf("unsupported video codec: %T", videoFormat)
}
}

if audioFormat != nil {
medi := &description.Media{
Type: description.MediaTypeAudio,
Formats: []format.Format{audioFormat},
}
medias = append(medias, medi)

switch audioFormat.(type) {
case *format.MPEG4Audio:
r.OnDataMPEG4Audio(func(pts time.Duration, au []byte) {
(*stream).WriteUnit(medi, audioFormat, &unit.MPEG4Audio{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
AUs: [][]byte{au},
})
})

case *format.MPEG1Audio:
r.OnDataMPEG1Audio(func(pts time.Duration, frame []byte) {
(*stream).WriteUnit(medi, audioFormat, &unit.MPEG1Audio{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
Frames: [][]byte{frame},
})
})

case *format.G711:
r.OnDataG711(func(pts time.Duration, samples []byte) {
(*stream).WriteUnit(medi, audioFormat, &unit.G711{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
Samples: samples,
})
})

case *format.LPCM:
r.OnDataLPCM(func(pts time.Duration, samples []byte) {
(*stream).WriteUnit(medi, audioFormat, &unit.LPCM{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
Samples: samples,
})
})

default:
return nil, fmt.Errorf("unsupported audio codec: %T", audioFormat)
}
}

return medias, nil
}
2 changes: 1 addition & 1 deletion internal/protocols/webrtc/peer_connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func TestPeerConnectionPublishRead(t *testing.T) {
}
}

// test that an audio codec is present regardless of the fact that an audio track is not.
// test that an audio codec is present regardless of the fact that an audio track is.
func TestPeerConnectionFallbackCodecs(t *testing.T) {
pc1 := &PeerConnection{
HandshakeTimeout: conf.StringDuration(10 * time.Second),
Expand Down
Loading

0 comments on commit e665385

Please sign in to comment.