Skip to content

Commit

Permalink
support publishing AV1 tracks with Enhanced RTMP
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed May 4, 2023
1 parent 87bb6ca commit a76a15b
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 39 deletions.
31 changes: 16 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,24 @@

Live streams can be published to the server with:

|protocol|variants|codecs|
|--------|--------|------|
|RTSP clients (FFmpeg, GStreamer, etc)|UDP, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 Video, M-JPEG, MPEG-4 Video (H263, Xvid), MPEG-2 Audio (MP3), MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec|
|RTSP servers and cameras|UDP, UDP-Multicast, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 Video, M-JPEG, MPEG-4 Video (H263, Xvid), MPEG-2 Audio (MP3), MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec|
|RTMP clients (OBS Studio)|RTMP, RTMPS, Enhanced RTMP|H264, H265, MPEG-2 Audio (MP3), MPEG-4 Audio (AAC)|
|RTMP servers and cameras|RTMP, RTMPS, Enhanced RTMP|H264, MPEG-2 Audio (MP3), MPEG-4 Audio (AAC)|
|HLS servers and cameras|Low-Latency HLS, MP4-based HLS, legacy HLS|H264, H265, MPEG-4 Audio (AAC), Opus|
|UDP/MPEG-TS streams|Unicast, broadcast, multicast|H264, H265, MPEG-4 Audio (AAC), Opus|
|Raspberry Pi Cameras||H264|
|protocol|variants|video codecs|audio codecs|
|--------|--------|------------|------------|
|RTSP clients (FFmpeg, GStreamer)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-2 Audio (MP3), G722, G711, LPCM and any RTP-compatible codec|
|RTSP servers and cameras|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-2 Audio (MP3), G722, G711, LPCM and any RTP-compatible codec|
|RTMP clients (OBS Studio)|RTMP, RTMPS, Enhanced RTMP|AV1, H265, H264|MPEG-4 Audio (AAC), MPEG-2 Audio (MP3)|
|RTMP servers and cameras|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-2 Audio (MP3)|
|HLS servers and cameras|Low-Latency HLS, MP4-based HLS, legacy HLS|H265, H264|Opus, MPEG-4 Audio (AAC)|
|UDP/MPEG-TS streams|Unicast, broadcast, multicast|H265, H264|Opus, MPEG-4 Audio (AAC)|
|Raspberry Pi Cameras||H264||

And can be read from the server with:

|protocol|variants|codecs|
|--------|--------|------|
|RTSP|UDP, UDP-Multicast, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 Video, M-JPEG, MPEG-4 Video (H263, Xvid), MPEG-2 Audio (MP3), MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec|
|RTMP|RTMP, RTMPS, Enhanced RTMP|H264, MPEG-2 Audio (MP3), MPEG-4 Audio (AAC)|
|HLS|Low-Latency HLS, MP4-based HLS, legacy HLS|H264, H265, MPEG-4 Audio (AAC), Opus|
|WebRTC||H264, VP8, VP9, Opus, G711, G722|
|protocol|variants|video codecs|audio codecs|
|--------|--------|------------|------------|
|RTSP|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-2 Audio (MP3), G722, G711, LPCM and any RTP-compatible codec|
|RTMP|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-2 Audio (MP3)|
|HLS|Low-Latency HLS, MP4-based HLS, legacy HLS|H265, H264|Opus, MPEG-4 Audio (AAC)|
|WebRTC||VP9, VP8, H264|Opus, G722, G711|

Features:

Expand Down Expand Up @@ -1201,6 +1201,7 @@ For more advanced options, you can create and serve a custom web page by startin

* [RTSP/RTP/RTCP standards](https://github.com/bluenviron/gortsplib#standards)
* [HLS standards](https://github.com/bluenviron/gohlslib#standards)
* [Codec standards](https://github.com/bluenviron/mediacommon#standards)
* [RTMP specification](https://rtmp.veriskope.com/pdf/rtmp_specification_1.0.pdf)
* [Enhanced RTMP](https://raw.githubusercontent.com/veovera/enhanced-rtmp/main/enhanced-rtmp-v1.pdf)
* [Golang project layout](https://github.com/golang-standards/project-layout)
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ go 1.20

require (
code.cloudfoundry.org/bytefmt v0.0.0
github.com/abema/go-mp4 v0.10.1
github.com/alecthomas/kong v0.7.1
github.com/asticode/go-astits v1.11.0
github.com/bluenviron/gohlslib v0.2.3
github.com/bluenviron/gortsplib/v3 v3.4.0
github.com/bluenviron/mediacommon v0.4.2
github.com/bluenviron/gortsplib/v3 v3.5.0
github.com/bluenviron/mediacommon v0.5.0
github.com/fsnotify/fsnotify v1.6.0
github.com/gin-gonic/gin v1.9.0
github.com/google/uuid v1.3.0
Expand All @@ -27,7 +28,6 @@ require (
)

require (
github.com/abema/go-mp4 v0.10.1 // indirect
github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82 // indirect
github.com/asticode/go-astikit v0.30.0 // indirect
github.com/bytedance/sonic v1.8.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2Z
github.com/asticode/go-astits v1.11.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI=
github.com/bluenviron/gohlslib v0.2.3 h1:vZmpjh2qWHaCvwwha04tgu8Kz9p4CuSBRLayD2yf89A=
github.com/bluenviron/gohlslib v0.2.3/go.mod h1:loD97sTtBh/nBcw8yZJgXc71A6XQb0FsDWXFRkl7Yj4=
github.com/bluenviron/gortsplib/v3 v3.4.0 h1:N4ticlV5YqRFDNvU52CRJgBQ0hHnxerDLfsd5wf5GI0=
github.com/bluenviron/gortsplib/v3 v3.4.0/go.mod h1:Th3S/suqfnpV81y31YpE1hcOP9odMqvIjOB7RV1+2lU=
github.com/bluenviron/mediacommon v0.4.2 h1:rdghY3g70+fdviapO2hL6CHpOGeTd7KbH1aEZnMwh88=
github.com/bluenviron/mediacommon v0.4.2/go.mod h1:t0dqPsWUTchyvib0MhixIwXEgvDX4V9G+I0GzWLQRb8=
github.com/bluenviron/gortsplib/v3 v3.5.0 h1:8d6DYcwVhghObgBFOnoJwK6xf1ZiAQ8Vi7DRv6DGLdw=
github.com/bluenviron/gortsplib/v3 v3.5.0/go.mod h1:gc6Z8pBUMC9QBqYxcOY9eVxjDPOrmFcwVH61Xs3Gu2A=
github.com/bluenviron/mediacommon v0.5.0 h1:YsVFlEknaXWhZGfz+Y1QbuzXLMVSmHODc7OnRqZoITY=
github.com/bluenviron/mediacommon v0.5.0/go.mod h1:t0dqPsWUTchyvib0MhixIwXEgvDX4V9G+I0GzWLQRb8=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.8.0 h1:ea0Xadu+sHlu7x5O3gKhRpQ1IKiMrSiHttPF0ybECuA=
github.com/bytedance/sonic v1.8.0/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
Expand Down
19 changes: 19 additions & 0 deletions internal/core/rtmp_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/media"
"github.com/bluenviron/gortsplib/v3/pkg/ringbuffer"
"github.com/bluenviron/mediacommon/pkg/codecs/av1"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg2audio"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
Expand Down Expand Up @@ -126,6 +127,24 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream)
return nil
}

case *formats.AV1:
return func(msg interface{}) error {
if tmsg, ok := msg.(*message.ExtendedCodedFrames); ok {
obus, err := av1.BitstreamUnmarshal(tmsg.Payload, true)
if err != nil {
return fmt.Errorf("unable to decode bitstream: %v", err)
}

stream.writeUnit(medi, format, &formatprocessor.UnitAV1{
PTS: tmsg.DTS,
OBUs: obus,
NTP: time.Now(),
})
}

return nil
}

case *formats.MPEG2Audio:
return func(msg interface{}) error {
tmsg := msg.(*message.Audio)
Expand Down
5 changes: 3 additions & 2 deletions internal/core/rtmp_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha
return err
}

if _, ok := videoFormat.(*formats.H265); ok {
return fmt.Errorf("proxying H265 streams with RTMP is not supported")
switch videoFormat.(type) {
case *formats.H265, *formats.AV1:
return fmt.Errorf("proxying H265 or AV1 tracks with RTMP is not supported")
}

var medias media.Medias
Expand Down
3 changes: 2 additions & 1 deletion internal/core/webrtc_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func newPeerConnection(configuration webrtc.Configuration,
options ...func(*webrtc.API),
) (*webrtc.PeerConnection, error) {
m := &webrtc.MediaEngine{}

if err := m.RegisterDefaultCodecs(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -328,7 +329,7 @@ func (c *webRTCConn) runInner(ctx context.Context) error {

if tracks == nil {
return fmt.Errorf(
"the stream doesn't contain any supported codec (which are currently VP9, VP8, H264, Opus, G722, G711)")
"the stream doesn't contain any supported codec, which are currently H264, VP8, VP9, G711, G722, Opus")
}

err = c.wsconn.WriteJSON(c.genICEServers())
Expand Down
133 changes: 133 additions & 0 deletions internal/formatprocessor/av1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package formatprocessor

import (
"fmt"
"time"

"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/formats/rtpav1"
"github.com/bluenviron/mediacommon/pkg/codecs/av1"
"github.com/pion/rtp"

"github.com/aler9/mediamtx/internal/logger"
)

// UnitAV1 is an AV1 data unit.
type UnitAV1 struct {
RTPPackets []*rtp.Packet
NTP time.Time
PTS time.Duration
OBUs [][]byte
}

// GetRTPPackets implements Unit.
func (d *UnitAV1) GetRTPPackets() []*rtp.Packet {
return d.RTPPackets
}

// GetNTP implements Unit.
func (d *UnitAV1) GetNTP() time.Time {
return d.NTP
}

type formatProcessorAV1 struct {
udpMaxPayloadSize int
format *formats.AV1
log logger.Writer

encoder *rtpav1.Encoder
decoder *rtpav1.Decoder
lastKeyFrameReceived time.Time
}

func newAV1(
udpMaxPayloadSize int,
forma *formats.AV1,
generateRTPPackets bool,
log logger.Writer,
) (*formatProcessorAV1, error) {
t := &formatProcessorAV1{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
log: log,
}

if generateRTPPackets {
t.encoder = &rtpav1.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
}
t.encoder.Init()
t.lastKeyFrameReceived = time.Now()
}

return t, nil
}

func (t *formatProcessorAV1) checkKeyFrameInterval(containsKeyFrame bool) {
if containsKeyFrame {
t.lastKeyFrameReceived = time.Now()
} else {
now := time.Now()
if now.Sub(t.lastKeyFrameReceived) >= maxKeyFrameInterval {
t.lastKeyFrameReceived = now
t.log.Log(logger.Warn, "no AV1 key frames received in %v, stream can't be decoded", maxKeyFrameInterval)
}
}
}

func (t *formatProcessorAV1) checkOBUs(obus [][]byte) {
containsKeyFrame, _ := av1.ContainsKeyFrame(obus)
t.checkKeyFrameInterval(containsKeyFrame)
}

func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := unit.(*UnitAV1)

if tunit.RTPPackets != nil {
pkt := tunit.RTPPackets[0]

// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0

if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}

// decode from RTP
if hasNonRTSPReaders {
if t.decoder == nil {
t.decoder = t.format.CreateDecoder()
t.lastKeyFrameReceived = time.Now()
}

// DecodeUntilMarker() is necessary, otherwise Encode() generates partial groups
obus, pts, err := t.decoder.DecodeUntilMarker(pkt)
if err != nil {
if err == rtpav1.ErrNonStartingPacketAndNoPrevious || err == rtpav1.ErrMorePacketsNeeded {
return nil
}
return err
}

tunit.OBUs = obus
t.checkOBUs(obus)
tunit.PTS = pts
}

// route packet as is
return nil
}

t.checkOBUs(tunit.OBUs)

// encode into RTP
pkts, err := t.encoder.Encode(tunit.OBUs, tunit.PTS)
if err != nil {
return err
}
tunit.RTPPackets = pkts

return nil
}
2 changes: 1 addition & 1 deletion internal/formatprocessor/h264.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (t *formatProcessorH264) checkKeyFrameInterval(isKeyFrame bool) {
now := time.Now()
if now.Sub(t.lastKeyFrameReceived) >= maxKeyFrameInterval {
t.lastKeyFrameReceived = now
t.log.Log(logger.Warn, "no key frames received in %v, stream can't be decoded")
t.log.Log(logger.Warn, "no H264 key frames received in %v, stream can't be decoded")
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/formatprocessor/h265.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (t *formatProcessorH265) checkKeyFrameInterval(isKeyFrame bool) {
now := time.Now()
if now.Sub(t.lastKeyFrameReceived) >= maxKeyFrameInterval {
t.lastKeyFrameReceived = now
t.log.Log(logger.Warn, "no key frames received in %v, stream can't be decoded")
t.log.Log(logger.Warn, "no H265 key frames received in %v, stream can't be decoded")
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions internal/formatprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func New(
case *formats.VP9:
return newVP9(udpMaxPayloadSize, forma, generateRTPPackets, log)

case *formats.AV1:
return newAV1(udpMaxPayloadSize, forma, generateRTPPackets, log)

case *formats.MPEG2Audio:
return newMPEG2Audio(udpMaxPayloadSize, forma, generateRTPPackets, log)

Expand Down
32 changes: 24 additions & 8 deletions internal/rtmp/message/extended_coded_frames.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,39 @@ func (m *ExtendedCodedFrames) Unmarshal(raw *rawmessage.Message) error {
m.DTS = raw.Timestamp
m.MessageStreamID = raw.MessageStreamID
copy(m.FourCC[:], raw.Body[1:5])
m.PTSDelta = time.Duration(uint32(raw.Body[5])<<16|uint32(raw.Body[6])<<8|uint32(raw.Body[7])) * time.Millisecond
m.Payload = raw.Body[8:]

if m.FourCC == FourCCHEVC {
m.PTSDelta = time.Duration(uint32(raw.Body[5])<<16|uint32(raw.Body[6])<<8|uint32(raw.Body[7])) * time.Millisecond
m.Payload = raw.Body[8:]
} else {
m.Payload = raw.Body[5:]
}

return nil
}

// Marshal implements Message.
func (m ExtendedCodedFrames) Marshal() (*rawmessage.Message, error) {
body := make([]byte, 8+len(m.Payload))
var l int
if m.FourCC == FourCCHEVC {
l = 8 + len(m.Payload)
} else {
l = 5 + len(m.Payload)
}
body := make([]byte, l)

body[0] = 0b10000000 | byte(ExtendedTypeCodedFrames)
copy(body[1:5], m.FourCC[:])
tmp := uint32(m.PTSDelta / time.Millisecond)
body[5] = uint8(tmp >> 16)
body[6] = uint8(tmp >> 8)
body[7] = uint8(tmp)
copy(body[8:], m.Payload)

if m.FourCC == FourCCHEVC {
tmp := uint32(m.PTSDelta / time.Millisecond)
body[5] = uint8(tmp >> 16)
body[6] = uint8(tmp >> 8)
body[7] = uint8(tmp)
copy(body[8:], m.Payload)
} else {
copy(body[5:], m.Payload)
}

return &rawmessage.Message{
ChunkStreamID: m.ChunkStreamID,
Expand Down
37 changes: 37 additions & 0 deletions internal/rtmp/tracks/boxes_av1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package tracks

import (
gomp4 "github.com/abema/go-mp4"
)

// BoxTypeAv1C returns the box type.
func BoxTypeAv1C() gomp4.BoxType { return gomp4.StrToBoxType("av1C") }

func init() { //nolint:gochecknoinits
gomp4.AddBoxDef(&Av1C{})
}

// Av1C is a Av1C ISO-BMFF box.
type Av1C struct {
gomp4.Box
Marker uint8 `mp4:"0,size=1,const=1"`
Version uint8 `mp4:"1,size=7,const=1"`
SeqProfile uint8 `mp4:"2,size=3"`
SeqLevelIdx0 uint8 `mp4:"3,size=5"`
SeqTier0 uint8 `mp4:"4,size=1"`
HighBitdepth uint8 `mp4:"5,size=1"`
TwelveBit uint8 `mp4:"6,size=1"`
Monochrome uint8 `mp4:"7,size=1"`
ChromaSubsamplingX uint8 `mp4:"8,size=1"`
ChromaSubsamplingY uint8 `mp4:"9,size=1"`
ChromaSamplePosition uint8 `mp4:"10,size=2"`
Reserved uint8 `mp4:"11,size=3,const=0"`
InitialPresentationDelayPresent uint8 `mp4:"12,size=1"`
InitialPresentationDelayMinusOne uint8 `mp4:"13,size=4"`
ConfigOBUs []uint8 `mp4:"14,size=8"`
}

// GetType returns the box type.
func (Av1C) GetType() gomp4.BoxType {
return BoxTypeAv1C()
}
Loading

0 comments on commit a76a15b

Please sign in to comment.