From 237d25da73e3d0f30b2ac274aa57cecfa830d825 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 14 Oct 2023 22:24:24 +0200 Subject: [PATCH] support recording to MPEG-TS --- README.md | 1 + internal/conf/conf.go | 2 +- internal/conf/conf_test.go | 2 +- internal/conf/path.go | 10 +- internal/conf/record_format.go | 56 ++ internal/conf/rtsp_range_type.go | 2 +- internal/core/core.go | 1 + internal/core/hls_muxer.go | 2 +- internal/core/mpegts.go | 252 ++++- internal/core/path.go | 3 +- internal/core/reader.go | 9 + internal/core/rtmp_conn.go | 27 +- internal/core/rtsp_session.go | 2 +- internal/core/source.go | 2 +- internal/core/source_static.go | 2 +- internal/core/srt_conn.go | 243 +---- internal/core/srt_source.go | 2 +- internal/core/udp_source.go | 2 +- internal/core/webrtc_session.go | 10 +- internal/record/agent.go | 862 +----------------- internal/record/agent_test.go | 282 +++--- internal/record/cleaner.go | 13 +- internal/record/cleaner_test.go | 3 + internal/record/rec_format.go | 5 + internal/record/rec_format_fmp4.go | 821 +++++++++++++++++ .../{part.go => rec_format_fmp4_part.go} | 48 +- ...{segment.go => rec_format_fmp4_segment.go} | 42 +- internal/record/rec_format_fmp4_track.go | 57 ++ internal/record/rec_format_mpegts.go | 332 +++++++ internal/record/rec_format_mpegts_segment.go | 73 ++ internal/record/track.go | 57 -- internal/stream/stream.go | 19 + mediamtx.yml | 3 +- 33 files changed, 1911 insertions(+), 1336 deletions(-) create mode 100644 internal/conf/record_format.go create mode 100644 internal/record/rec_format.go create mode 100644 internal/record/rec_format_fmp4.go rename internal/record/{part.go => rec_format_fmp4_part.go} (57%) rename internal/record/{segment.go => rec_format_fmp4_segment.go} (52%) create mode 100644 internal/record/rec_format_fmp4_track.go create mode 100644 internal/record/rec_format_mpegts.go create mode 100644 internal/record/rec_format_mpegts_segment.go delete mode 100644 internal/record/track.go diff --git a/README.md b/README.md index 2cc2af86ebd..1d32042b1ee 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ And can be recorded with: |format|video codecs|audio codecs| |------|------------|------------| |[fMP4](#record-streams-to-disk)|AV1, VP9, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, LPCM| +|[MPEG-TS](#record-streams-to-disk)|H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| **Features** diff --git a/internal/conf/conf.go b/internal/conf/conf.go index 0d6401a1295..4525da81382 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -165,7 +165,7 @@ type Conf struct { // Record Record *bool `json:"record,omitempty"` // deprecated RecordPath *string `json:"recordPath,omitempty"` // deprecated - RecordFormat *string `json:"recordFormat,omitempty"` // deprecated + RecordFormat *RecordFormat `json:"recordFormat,omitempty"` // deprecated RecordPartDuration *StringDuration `json:"recordPartDuration,omitempty"` // deprecated RecordSegmentDuration *StringDuration `json:"recordSegmentDuration,omitempty"` // deprecated RecordDeleteAfter *StringDuration `json:"recordDeleteAfter,omitempty"` // deprecated diff --git a/internal/conf/conf_test.go b/internal/conf/conf_test.go index 7b9eed2ceec..1d9fdb4c321 100644 --- a/internal/conf/conf_test.go +++ b/internal/conf/conf_test.go @@ -52,7 +52,7 @@ func TestConfFromFile(t *testing.T) { SourceOnDemandStartTimeout: 10 * StringDuration(time.Second), SourceOnDemandCloseAfter: 10 * StringDuration(time.Second), RecordPath: "./recordings/%path/%Y-%m-%d_%H-%M-%S-%f", - RecordFormat: "fmp4", + RecordFormat: RecordFormatFMP4, RecordPartDuration: 100000000, RecordSegmentDuration: 3600000000000, RecordDeleteAfter: 86400000000000, diff --git a/internal/conf/path.go b/internal/conf/path.go index b84b25ec558..fedc7a3d00b 100644 --- a/internal/conf/path.go +++ b/internal/conf/path.go @@ -63,7 +63,7 @@ type Path struct { // Record Record bool `json:"record"` RecordPath string `json:"recordPath"` - RecordFormat string `json:"recordFormat"` + RecordFormat RecordFormat `json:"recordFormat"` RecordPartDuration StringDuration `json:"recordPartDuration"` RecordSegmentDuration StringDuration `json:"recordSegmentDuration"` RecordDeleteAfter StringDuration `json:"recordDeleteAfter"` @@ -150,7 +150,7 @@ func (pconf *Path) setDefaults() { // Record pconf.RecordPath = "./recordings/%path/%Y-%m-%d_%H-%M-%S-%f" - pconf.RecordFormat = "fmp4" + pconf.RecordFormat = RecordFormatFMP4 pconf.RecordPartDuration = 100 * StringDuration(time.Millisecond) pconf.RecordSegmentDuration = 3600 * StringDuration(time.Second) pconf.RecordDeleteAfter = 24 * 3600 * StringDuration(time.Second) @@ -400,12 +400,6 @@ func (pconf *Path) check(conf *Conf, name string) error { } } - // Record - - if pconf.RecordFormat != "fmp4" { - return fmt.Errorf("unsupported record format '%s'", pconf.RecordFormat) - } - // Publisher if pconf.DisablePublisherOverride != nil { diff --git a/internal/conf/record_format.go b/internal/conf/record_format.go new file mode 100644 index 00000000000..e9e39f81dfe --- /dev/null +++ b/internal/conf/record_format.go @@ -0,0 +1,56 @@ +package conf + +import ( + "encoding/json" + "fmt" +) + +// RecordFormat is the recordFormat parameter. +type RecordFormat int + +// supported values. +const ( + RecordFormatFMP4 RecordFormat = iota + RecordFormatMPEGTS +) + +// MarshalJSON implements json.Marshaler. +func (d RecordFormat) MarshalJSON() ([]byte, error) { + var out string + + switch d { + case RecordFormatMPEGTS: + out = "mpegts" + + default: + out = "fmp4" + } + + return json.Marshal(out) +} + +// UnmarshalJSON implements json.Unmarshaler. +func (d *RecordFormat) UnmarshalJSON(b []byte) error { + var in string + if err := json.Unmarshal(b, &in); err != nil { + return err + } + + switch in { + case "mpegts": + *d = RecordFormatMPEGTS + + case "fmp4": + *d = RecordFormatFMP4 + + default: + return fmt.Errorf("invalid record format '%s'", in) + } + + return nil +} + +// UnmarshalEnv implements env.Unmarshaler. +func (d *RecordFormat) UnmarshalEnv(_ string, v string) error { + return d.UnmarshalJSON([]byte(`"` + v + `"`)) +} diff --git a/internal/conf/rtsp_range_type.go b/internal/conf/rtsp_range_type.go index 168f306d9ca..2a6decd2bb3 100644 --- a/internal/conf/rtsp_range_type.go +++ b/internal/conf/rtsp_range_type.go @@ -8,7 +8,7 @@ import ( // RTSPRangeType is the type used in the Range header. type RTSPRangeType int -// supported rtsp range types. +// supported values. const ( RTSPRangeTypeUndefined RTSPRangeType = iota RTSPRangeTypeClock diff --git a/internal/core/core.go b/internal/core/core.go index ce96b63c577..1f7449bc039 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -41,6 +41,7 @@ func gatherCleanerEntries(paths map[string]*conf.Path) []record.CleanerEntry { if pa.Record { entry := record.CleanerEntry{ RecordPath: pa.RecordPath, + RecordFormat: pa.RecordFormat, RecordDeleteAfter: time.Duration(pa.RecordDeleteAfter), } out[entry] = struct{}{} diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index fdc73f7a531..61b71e46378 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -302,7 +302,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) innerReady <- struct{}{} m.Log(logger.Info, "is converting into HLS, %s", - sourceMediaInfo(medias)) + mediaInfo(medias)) m.writer.Start() diff --git a/internal/core/mpegts.go b/internal/core/mpegts.go index 32182fa791a..a5045739d6f 100644 --- a/internal/core/mpegts.go +++ b/internal/core/mpegts.go @@ -1,18 +1,32 @@ package core import ( + "bufio" + "errors" "fmt" "time" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/pkg/codecs/ac3" + "github.com/bluenviron/mediacommon/pkg/codecs/h264" + "github.com/bluenviron/mediacommon/pkg/codecs/h265" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" + "github.com/datarhei/gosrt" + "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/unit" ) -func mpegtsSetupTracks(r *mpegts.Reader, stream **stream.Stream) ([]*description.Media, error) { +var errMPEGTSNoTracks = errors.New("no supported tracks found (supported are H265, H264," + + " MPEG-4 Video, MPEG-1/2 Video, Opus, MPEG-4 Audio, MPEG-1 Audio, AC-3") + +func durationGoToMPEGTS(v time.Duration) int64 { + return int64(v.Seconds() * 90000) +} + +func mpegtsSetupRead(r *mpegts.Reader, stream **stream.Stream) ([]*description.Media, error) { var medias []*description.Media //nolint:prealloc var td *mpegts.TimeDecoder @@ -191,8 +205,242 @@ func mpegtsSetupTracks(r *mpegts.Reader, stream **stream.Stream) ([]*description } if len(medias) == 0 { - return nil, fmt.Errorf("no supported tracks found") + return nil, errMPEGTSNoTracks } return medias, nil } + +func mpegtsSetupWrite(stream *stream.Stream, + writer *asyncwriter.Writer, bw *bufio.Writer, sconn srt.Conn, + writeTimeout time.Duration, +) error { + var w *mpegts.Writer + var tracks []*mpegts.Track + + addTrack := func(codec mpegts.Codec) *mpegts.Track { + track := &mpegts.Track{ + Codec: codec, + } + tracks = append(tracks, track) + return track + } + + for _, medi := range stream.Desc().Medias { + for _, forma := range medi.Formats { + switch forma := forma.(type) { + case *format.H265: //nolint:dupl + track := addTrack(&mpegts.CodecH265{}) + + var dtsExtractor *h265.DTSExtractor + + stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.H265) + if tunit.AU == nil { + return nil + } + + randomAccess := h265.IsRandomAccess(tunit.AU) + + if dtsExtractor == nil { + if !randomAccess { + return nil + } + dtsExtractor = h265.NewDTSExtractor() + } + + dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err = (*w).WriteH26x(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU) + if err != nil { + return err + } + return bw.Flush() + }) + + case *format.H264: //nolint:dupl + track := addTrack(&mpegts.CodecH264{}) + + var dtsExtractor *h264.DTSExtractor + + stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.H264) + if tunit.AU == nil { + return nil + } + + idrPresent := h264.IDRPresent(tunit.AU) + + if dtsExtractor == nil { + if !idrPresent { + return nil + } + dtsExtractor = h264.NewDTSExtractor() + } + + dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err = (*w).WriteH26x(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), idrPresent, tunit.AU) + if err != nil { + return err + } + return bw.Flush() + }) + + case *format.MPEG4Video: + track := addTrack(&mpegts.CodecMPEG4Video{}) + + firstReceived := false + var lastPTS time.Duration + + stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Video) + if tunit.Frame == nil { + return nil + } + + if !firstReceived { + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)") + } + lastPTS = tunit.PTS + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err := (*w).WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + if err != nil { + return err + } + return bw.Flush() + }) + + case *format.MPEG1Video: + track := addTrack(&mpegts.CodecMPEG1Video{}) + + firstReceived := false + var lastPTS time.Duration + + stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Video) + if tunit.Frame == nil { + return nil + } + + if !firstReceived { + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)") + } + lastPTS = tunit.PTS + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err := (*w).WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + if err != nil { + return err + } + return bw.Flush() + }) + + case *format.Opus: + track := addTrack(&mpegts.CodecOpus{ + ChannelCount: func() int { + if forma.IsStereo { + return 2 + } + return 1 + }(), + }) + + stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.Opus) + if tunit.Packets == nil { + return nil + } + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err := (*w).WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets) + if err != nil { + return err + } + return bw.Flush() + }) + + case *format.MPEG4Audio: + track := addTrack(&mpegts.CodecMPEG4Audio{ + Config: *forma.GetConfig(), + }) + + stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Audio) + if tunit.AUs == nil { + return nil + } + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err := (*w).WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs) + if err != nil { + return err + } + return bw.Flush() + }) + + case *format.MPEG1Audio: + track := addTrack(&mpegts.CodecMPEG1Audio{}) + + stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Audio) + if tunit.Frames == nil { + return nil + } + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err := (*w).WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames) + if err != nil { + return err + } + return bw.Flush() + }) + + case *format.AC3: + track := addTrack(&mpegts.CodecAC3{}) + + sampleRate := time.Duration(forma.SampleRate) + + stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.AC3) + if tunit.Frames == nil { + return nil + } + + for i, frame := range tunit.Frames { + framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame* + time.Second/sampleRate + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err := (*w).WriteAC3(track, durationGoToMPEGTS(framePTS), frame) + if err != nil { + return err + } + } + return bw.Flush() + }) + } + } + } + + if len(tracks) == 0 { + return errMPEGTSNoTracks + } + + w = mpegts.NewWriter(bw, tracks) + + return nil +} diff --git a/internal/core/path.go b/internal/core/path.go index 4005bdc40df..daee050a90f 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -649,7 +649,7 @@ func (pa *path) doStartPublisher(req pathStartPublisherReq) { req.author.Log(logger.Info, "is publishing to path '%s', %s", pa.name, - sourceMediaInfo(req.desc.Medias)) + mediaInfo(req.desc.Medias)) if pa.conf.HasOnDemandPublisher() { pa.onDemandPublisherReadyTimer.Stop() @@ -953,6 +953,7 @@ func (pa *path) startRecording() { pa.recordAgent = record.NewAgent( pa.writeQueueSize, pa.conf.RecordPath, + pa.conf.RecordFormat, time.Duration(pa.conf.RecordPartDuration), time.Duration(pa.conf.RecordSegmentDuration), pa.name, diff --git a/internal/core/reader.go b/internal/core/reader.go index f03bcb0bfdb..c97a017861b 100644 --- a/internal/core/reader.go +++ b/internal/core/reader.go @@ -1,7 +1,16 @@ package core +import ( + "github.com/bluenviron/mediamtx/internal/asyncwriter" + "github.com/bluenviron/mediamtx/internal/stream" +) + // reader is an entity that can read a stream. type reader interface { close() apiReaderDescribe() apiPathSourceOrReader } + +func readerMediaInfo(r *asyncwriter.Writer, stream *stream.Stream) string { + return mediaInfo(stream.MediasForReader(r)) +} diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index f7dca5bf259..9de46c03016 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -233,24 +233,17 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { defer res.stream.RemoveReader(writer) - var medias []*description.Media var w *rtmp.Writer - videoMedia, videoFormat := c.setupVideo( + videoFormat := c.setupVideo( &w, res.stream, writer) - if videoMedia != nil { - medias = append(medias, videoMedia) - } - audioMedia, audioFormat := c.setupAudio( + audioFormat := c.setupAudio( &w, res.stream, writer) - if audioFormat != nil { - medias = append(medias, audioMedia) - } if videoFormat == nil && audioFormat == nil { return fmt.Errorf( @@ -258,7 +251,7 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { } c.Log(logger.Info, "is reading from path '%s', %s", - res.path.name, sourceMediaInfo(medias)) + res.path.name, readerMediaInfo(writer, res.stream)) pathConf := res.path.safeConf() @@ -325,7 +318,7 @@ func (c *rtmpConn) setupVideo( w **rtmp.Writer, stream *stream.Stream, writer *asyncwriter.Writer, -) (*description.Media, format.Format) { +) format.Format { var videoFormatH264 *format.H264 videoMedia := stream.Desc().FindFormat(&videoFormatH264) @@ -384,17 +377,17 @@ func (c *rtmpConn) setupVideo( return (*w).WriteH264(tunit.PTS, dts, idrPresent, tunit.AU) }) - return videoMedia, videoFormatH264 + return videoFormatH264 } - return nil, nil + return nil } func (c *rtmpConn) setupAudio( w **rtmp.Writer, stream *stream.Stream, writer *asyncwriter.Writer, -) (*description.Media, format.Format) { +) format.Format { var audioFormatMPEG4Audio *format.MPEG4Audio audioMedia := stream.Desc().FindFormat(&audioFormatMPEG4Audio) @@ -421,7 +414,7 @@ func (c *rtmpConn) setupAudio( return nil }) - return audioMedia, audioFormatMPEG4Audio + return audioFormatMPEG4Audio } var audioFormatMPEG1 *format.MPEG1Audio @@ -457,10 +450,10 @@ func (c *rtmpConn) setupAudio( return nil }) - return audioMedia, audioFormatMPEG1 + return audioFormatMPEG1 } - return nil, nil + return nil } func (c *rtmpConn) runPublish(conn *rtmp.Conn, u *url.URL) error { diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index 28b0fe9b1b7..838bb704cbe 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -308,7 +308,7 @@ func (s *rtspSession) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Respons s.Log(logger.Info, "is reading from path '%s', with %s, %s", s.path.name, s.session.SetuppedTransport(), - sourceMediaInfo(s.session.SetuppedMedias())) + mediaInfo(s.session.SetuppedMedias())) pathConf := s.path.safeConf() diff --git a/internal/core/source.go b/internal/core/source.go index 03137761298..f27ada71b7f 100644 --- a/internal/core/source.go +++ b/internal/core/source.go @@ -35,7 +35,7 @@ func mediasDescription(medias []*description.Media) []string { return ret } -func sourceMediaInfo(medias []*description.Media) string { +func mediaInfo(medias []*description.Media) string { return fmt.Sprintf("%d %s (%s)", len(medias), func() string { diff --git a/internal/core/source_static.go b/internal/core/source_static.go index 3fba226ef36..5e4446c4d8e 100644 --- a/internal/core/source_static.go +++ b/internal/core/source_static.go @@ -228,7 +228,7 @@ func (s *sourceStatic) setReady(req pathSourceStaticSetReadyReq) pathSourceStati res := <-req.res if res.err == nil { - s.impl.Log(logger.Info, "ready: %s", sourceMediaInfo(req.desc.Medias)) + s.impl.Log(logger.Info, "ready: %s", mediaInfo(req.desc.Medias)) } return res diff --git a/internal/core/srt_conn.go b/internal/core/srt_conn.go index 860c994df5b..aa10a978495 100644 --- a/internal/core/srt_conn.go +++ b/internal/core/srt_conn.go @@ -11,10 +11,6 @@ import ( "time" "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/mediacommon/pkg/codecs/ac3" - "github.com/bluenviron/mediacommon/pkg/codecs/h264" - "github.com/bluenviron/mediacommon/pkg/codecs/h265" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" "github.com/datarhei/gosrt" "github.com/google/uuid" @@ -24,13 +20,8 @@ import ( "github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" - "github.com/bluenviron/mediamtx/internal/unit" ) -func durationGoToMPEGTS(v time.Duration) int64 { - return int64(v.Seconds() * 90000) -} - func srtCheckPassphrase(connReq srt.ConnRequest, passphrase string) error { if passphrase == "" { return nil @@ -287,7 +278,7 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { var stream *stream.Stream - medias, err := mpegtsSetupTracks(r, &stream) + medias, err := mpegtsSetupRead(r, &stream) if err != nil { return err } @@ -357,237 +348,15 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass defer res.stream.RemoveReader(writer) - var w *mpegts.Writer - var tracks []*mpegts.Track - var medias []*description.Media bw := bufio.NewWriterSize(sconn, srtMaxPayloadSize(c.udpMaxPayloadSize)) - addTrack := func(medi *description.Media, codec mpegts.Codec) *mpegts.Track { - track := &mpegts.Track{ - Codec: codec, - } - tracks = append(tracks, track) - medias = append(medias, medi) - return track - } - - for _, medi := range res.stream.Desc().Medias { - for _, forma := range medi.Formats { - switch forma := forma.(type) { - case *format.H265: //nolint:dupl - track := addTrack(medi, &mpegts.CodecH265{}) - - var dtsExtractor *h265.DTSExtractor - - res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { - tunit := u.(*unit.H265) - if tunit.AU == nil { - return nil - } - - randomAccess := h265.IsRandomAccess(tunit.AU) - - if dtsExtractor == nil { - if !randomAccess { - return nil - } - dtsExtractor = h265.NewDTSExtractor() - } - - dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) - if err != nil { - return err - } - - sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = w.WriteH26x(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU) - if err != nil { - return err - } - return bw.Flush() - }) - - case *format.H264: //nolint:dupl - track := addTrack(medi, &mpegts.CodecH264{}) - - var dtsExtractor *h264.DTSExtractor - - res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { - tunit := u.(*unit.H264) - if tunit.AU == nil { - return nil - } - - idrPresent := h264.IDRPresent(tunit.AU) - - if dtsExtractor == nil { - if !idrPresent { - return nil - } - dtsExtractor = h264.NewDTSExtractor() - } - - dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) - if err != nil { - return err - } - - sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = w.WriteH26x(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), idrPresent, tunit.AU) - if err != nil { - return err - } - return bw.Flush() - }) - - case *format.MPEG4Video: - track := addTrack(medi, &mpegts.CodecMPEG4Video{}) - - firstReceived := false - var lastPTS time.Duration - - res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG4Video) - if tunit.Frame == nil { - return nil - } - - if !firstReceived { - firstReceived = true - } else if tunit.PTS < lastPTS { - return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)") - } - lastPTS = tunit.PTS - - sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = w.WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) - if err != nil { - return err - } - return bw.Flush() - }) - - case *format.MPEG1Video: - track := addTrack(medi, &mpegts.CodecMPEG1Video{}) - - firstReceived := false - var lastPTS time.Duration - - res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG1Video) - if tunit.Frame == nil { - return nil - } - - if !firstReceived { - firstReceived = true - } else if tunit.PTS < lastPTS { - return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)") - } - lastPTS = tunit.PTS - - sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = w.WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) - if err != nil { - return err - } - return bw.Flush() - }) - - case *format.MPEG4Audio: - track := addTrack(medi, &mpegts.CodecMPEG4Audio{ - Config: *forma.GetConfig(), - }) - - res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG4Audio) - if tunit.AUs == nil { - return nil - } - - sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = w.WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs) - if err != nil { - return err - } - return bw.Flush() - }) - - case *format.Opus: - track := addTrack(medi, &mpegts.CodecOpus{ - ChannelCount: func() int { - if forma.IsStereo { - return 2 - } - return 1 - }(), - }) - - res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { - tunit := u.(*unit.Opus) - if tunit.Packets == nil { - return nil - } - - sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = w.WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets) - if err != nil { - return err - } - return bw.Flush() - }) - - case *format.MPEG1Audio: - track := addTrack(medi, &mpegts.CodecMPEG1Audio{}) - - res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG1Audio) - if tunit.Frames == nil { - return nil - } - - sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = w.WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames) - if err != nil { - return err - } - return bw.Flush() - }) - - case *format.AC3: - track := addTrack(medi, &mpegts.CodecAC3{}) - - sampleRate := time.Duration(forma.SampleRate) - - res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { - tunit := u.(*unit.AC3) - if tunit.Frames == nil { - return nil - } - - for i, frame := range tunit.Frames { - framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame* - time.Second/sampleRate - - sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = w.WriteAC3(track, durationGoToMPEGTS(framePTS), frame) - if err != nil { - return err - } - } - return bw.Flush() - }) - } - } - } - - if len(tracks) == 0 { - return true, fmt.Errorf( - "the stream doesn't contain any supported codec, which are currently H265, H264, Opus, MPEG-4 Audio") + err = mpegtsSetupWrite(res.stream, writer, bw, sconn, time.Duration(c.writeTimeout)) + if err != nil { + return true, err } c.Log(logger.Info, "is reading from path '%s', %s", - res.path.name, sourceMediaInfo(medias)) + res.path.name, readerMediaInfo(writer, res.stream)) pathConf := res.path.safeConf() @@ -629,8 +398,6 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass }() } - w = mpegts.NewWriter(bw, tracks) - // disable read deadline sconn.SetReadDeadline(time.Time{}) diff --git a/internal/core/srt_source.go b/internal/core/srt_source.go index cd882a06ab4..be203a55b33 100644 --- a/internal/core/srt_source.go +++ b/internal/core/srt_source.go @@ -96,7 +96,7 @@ func (s *srtSource) runReader(sconn srt.Conn) error { var stream *stream.Stream - medias, err := mpegtsSetupTracks(r, &stream) + medias, err := mpegtsSetupRead(r, &stream) if err != nil { return err } diff --git a/internal/core/udp_source.go b/internal/core/udp_source.go index cd21477b5e3..987200c6746 100644 --- a/internal/core/udp_source.go +++ b/internal/core/udp_source.go @@ -129,7 +129,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error { var stream *stream.Stream - medias, err := mpegtsSetupTracks(r, &stream) + medias, err := mpegtsSetupRead(r, &stream) if err != nil { return err } diff --git a/internal/core/webrtc_session.go b/internal/core/webrtc_session.go index 937b3e66e03..a194db20fdf 100644 --- a/internal/core/webrtc_session.go +++ b/internal/core/webrtc_session.go @@ -27,14 +27,6 @@ type trackRecvPair struct { receiver *webrtc.RTPReceiver } -func webrtcMediasOfOutgoingTracks(tracks []*webRTCOutgoingTrack) []*description.Media { - ret := make([]*description.Media, len(tracks)) - for i, track := range tracks { - ret[i] = track.media - } - return ret -} - func webrtcMediasOfIncomingTracks(tracks []*webRTCIncomingTrack) []*description.Media { ret := make([]*description.Media, len(tracks)) for i, track := range tracks { @@ -525,7 +517,7 @@ func (s *webRTCSession) runRead() (int, error) { } s.Log(logger.Info, "is reading from path '%s', %s", - res.path.name, sourceMediaInfo(webrtcMediasOfOutgoingTracks(tracks))) + res.path.name, readerMediaInfo(writer, res.stream)) pathConf := res.path.safeConf() diff --git a/internal/record/agent.go b/internal/record/agent.go index 36bf83f2cf3..f4b37abdcbc 100644 --- a/internal/record/agent.go +++ b/internal/record/agent.go @@ -1,108 +1,21 @@ package record import ( - "bytes" "context" - "fmt" "strings" "time" - "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/mediacommon/pkg/codecs/ac3" - "github.com/bluenviron/mediacommon/pkg/codecs/av1" - "github.com/bluenviron/mediacommon/pkg/codecs/h264" - "github.com/bluenviron/mediacommon/pkg/codecs/h265" - "github.com/bluenviron/mediacommon/pkg/codecs/jpeg" - "github.com/bluenviron/mediacommon/pkg/codecs/mpeg1audio" - "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" - "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4video" - "github.com/bluenviron/mediacommon/pkg/codecs/opus" - "github.com/bluenviron/mediacommon/pkg/codecs/vp9" "github.com/bluenviron/mediacommon/pkg/formats/fmp4" "github.com/bluenviron/mediamtx/internal/asyncwriter" + "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" - "github.com/bluenviron/mediamtx/internal/unit" ) // OnSegmentFunc is the prototype of the function passed as runOnSegmentStart / runOnSegmentComplete type OnSegmentFunc = func(string) -func durationGoToMp4(v time.Duration, timeScale uint32) uint64 { - timeScale64 := uint64(timeScale) - secs := v / time.Second - dec := v % time.Second - return uint64(secs)*timeScale64 + uint64(dec)*timeScale64/uint64(time.Second) -} - -func mpeg1audioChannelCount(cm mpeg1audio.ChannelMode) int { - switch cm { - case mpeg1audio.ChannelModeStereo, - mpeg1audio.ChannelModeJointStereo, - mpeg1audio.ChannelModeDualChannel: - return 2 - - default: - return 1 - } -} - -func jpegExtractSize(image []byte) (int, int, error) { - l := len(image) - if l < 2 || image[0] != 0xFF || image[1] != jpeg.MarkerStartOfImage { - return 0, 0, fmt.Errorf("invalid header") - } - - image = image[2:] - - for { - if len(image) < 2 { - return 0, 0, fmt.Errorf("not enough bits") - } - - h0, h1 := image[0], image[1] - image = image[2:] - - if h0 != 0xFF { - return 0, 0, fmt.Errorf("invalid image") - } - - switch h1 { - case 0xE0, 0xE1, 0xE2, // JFIF - jpeg.MarkerDefineHuffmanTable, - jpeg.MarkerComment, - jpeg.MarkerDefineQuantizationTable, - jpeg.MarkerDefineRestartInterval: - mlen := int(image[0])<<8 | int(image[1]) - if len(image) < mlen { - return 0, 0, fmt.Errorf("not enough bits") - } - image = image[mlen:] - - case jpeg.MarkerStartOfFrame1: - mlen := int(image[0])<<8 | int(image[1]) - if len(image) < mlen { - return 0, 0, fmt.Errorf("not enough bits") - } - - var sof jpeg.StartOfFrame1 - err := sof.Unmarshal(image[2:mlen]) - if err != nil { - return 0, 0, err - } - - return sof.Width, sof.Height, nil - - case jpeg.MarkerStartOfScan: - return 0, 0, fmt.Errorf("SOF not found") - - default: - return 0, 0, fmt.Errorf("unknown marker: 0x%.2x", h1) - } - } -} - type sample struct { *fmp4.PartSample dts time.Duration @@ -118,13 +31,10 @@ type Agent struct { onSegmentComplete OnSegmentFunc parent logger.Writer - ctx context.Context - ctxCancel func() - writer *asyncwriter.Writer - tracks []*track - hasVideo bool - currentSegment *segment - nextSequenceNumber uint32 + ctx context.Context + ctxCancel func() + writer *asyncwriter.Writer + format recFormat done chan struct{} } @@ -132,7 +42,8 @@ type Agent struct { // NewAgent allocates an Agent. func NewAgent( writeQueueSize int, - recordPath string, + path string, + format conf.RecordFormat, partDuration time.Duration, segmentDuration time.Duration, pathName string, @@ -141,13 +52,20 @@ func NewAgent( onSegmentComplete OnSegmentFunc, parent logger.Writer, ) *Agent { - recordPath = strings.ReplaceAll(recordPath, "%path", pathName) - recordPath += ".mp4" + path = strings.ReplaceAll(path, "%path", pathName) + + switch format { + case conf.RecordFormatMPEGTS: + path += ".ts" + + default: + path += ".mp4" + } ctx, ctxCancel := context.WithCancel(context.Background()) - r := &Agent{ - path: recordPath, + a := &Agent{ + path: path, partDuration: partDuration, segmentDuration: segmentDuration, stream: stream, @@ -159,744 +77,48 @@ func NewAgent( done: make(chan struct{}), } - r.writer = asyncwriter.New(writeQueueSize, r) - - nextID := 1 - - addTrack := func(codec fmp4.Codec) *track { - initTrack := &fmp4.InitTrack{ - TimeScale: 90000, - Codec: codec, - } - initTrack.ID = nextID - nextID++ - - track := newTrack(r, initTrack) - r.tracks = append(r.tracks, track) - - return track - } - - for _, media := range stream.Desc().Medias { - for _, forma := range media.Formats { - switch forma := forma.(type) { - case *format.AV1: - codec := &fmp4.CodecAV1{ - SequenceHeader: []byte{ - 8, 0, 0, 0, 66, 167, 191, 228, 96, 13, 0, 64, - }, - } - track := addTrack(codec) - - firstReceived := false - - stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.AV1) - if tunit.TU == nil { - return nil - } - - randomAccess := false - - for _, obu := range tunit.TU { - var h av1.OBUHeader - err := h.Unmarshal(obu) - if err != nil { - return err - } - - if h.Type == av1.OBUTypeSequenceHeader { - if !bytes.Equal(codec.SequenceHeader, obu) { - codec.SequenceHeader = obu - r.updateCodecs() - } - randomAccess = true - } - } - - if !firstReceived { - if !randomAccess { - return nil - } - firstReceived = true - } - - sampl, err := fmp4.NewPartSampleAV1( - randomAccess, - tunit.TU) - if err != nil { - return err - } - - return track.record(&sample{ - PartSample: sampl, - dts: tunit.PTS, - }) - }) - - case *format.VP9: - codec := &fmp4.CodecVP9{ - Width: 1280, - Height: 720, - Profile: 1, - BitDepth: 8, - ChromaSubsampling: 1, - ColorRange: false, - } - track := addTrack(codec) - - firstReceived := false - - stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.VP9) - if tunit.Frame == nil { - return nil - } - - var h vp9.Header - err := h.Unmarshal(tunit.Frame) - if err != nil { - return err - } - - randomAccess := false - - if h.FrameType == vp9.FrameTypeKeyFrame { - randomAccess = true - - if w := h.Width(); codec.Width != w { - codec.Width = w - r.updateCodecs() - } - if h := h.Width(); codec.Height != h { - codec.Height = h - r.updateCodecs() - } - if codec.Profile != h.Profile { - codec.Profile = h.Profile - r.updateCodecs() - } - if codec.BitDepth != h.ColorConfig.BitDepth { - codec.BitDepth = h.ColorConfig.BitDepth - r.updateCodecs() - } - if c := h.ChromaSubsampling(); codec.ChromaSubsampling != c { - codec.ChromaSubsampling = c - r.updateCodecs() - } - if codec.ColorRange != h.ColorConfig.ColorRange { - codec.ColorRange = h.ColorConfig.ColorRange - r.updateCodecs() - } - } - - if !firstReceived { - if !randomAccess { - return nil - } - firstReceived = true - } - - return track.record(&sample{ - PartSample: &fmp4.PartSample{ - IsNonSyncSample: !randomAccess, - Payload: tunit.Frame, - }, - dts: tunit.PTS, - }) - }) - - case *format.VP8: - // TODO - - case *format.H265: - vps, sps, pps := forma.SafeParams() - - if vps == nil || sps == nil || pps == nil { - vps = []byte{ - 0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x02, 0x20, - 0x00, 0x00, 0x03, 0x00, 0xb0, 0x00, 0x00, 0x03, - 0x00, 0x00, 0x03, 0x00, 0x7b, 0x18, 0xb0, 0x24, - } - - sps = []byte{ - 0x42, 0x01, 0x01, 0x02, 0x20, 0x00, 0x00, 0x03, - 0x00, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, - 0x00, 0x7b, 0xa0, 0x07, 0x82, 0x00, 0x88, 0x7d, - 0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88, - 0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9, - 0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc, - 0xa2, 0x23, 0xff, 0x00, 0x01, 0x00, 0x01, 0x6a, - 0x02, 0x02, 0x02, 0x01, - } - - pps = []byte{ - 0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40, - } - } - - codec := &fmp4.CodecH265{ - VPS: vps, - SPS: sps, - PPS: pps, - } - track := addTrack(codec) - - var dtsExtractor *h265.DTSExtractor - - stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.H265) - if tunit.AU == nil { - return nil - } - - randomAccess := false - - for _, nalu := range tunit.AU { - typ := h265.NALUType((nalu[0] >> 1) & 0b111111) - - switch typ { - case h265.NALUType_VPS_NUT: - if !bytes.Equal(codec.VPS, nalu) { - codec.VPS = nalu - r.updateCodecs() - } - - case h265.NALUType_SPS_NUT: - if !bytes.Equal(codec.SPS, nalu) { - codec.SPS = nalu - r.updateCodecs() - } - - case h265.NALUType_PPS_NUT: - if !bytes.Equal(codec.PPS, nalu) { - codec.PPS = nalu - r.updateCodecs() - } - - case h265.NALUType_IDR_W_RADL, h265.NALUType_IDR_N_LP, h265.NALUType_CRA_NUT: - randomAccess = true - } - } - - if dtsExtractor == nil { - if !randomAccess { - return nil - } - dtsExtractor = h265.NewDTSExtractor() - } - - dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) - if err != nil { - return err - } - - sampl, err := fmp4.NewPartSampleH26x( - int32(durationGoToMp4(tunit.PTS-dts, 90000)), - randomAccess, - tunit.AU) - if err != nil { - return err - } - - return track.record(&sample{ - PartSample: sampl, - dts: dts, - }) - }) - - case *format.H264: - sps, pps := forma.SafeParams() - - if sps == nil || pps == nil { - sps = []byte{ - 0x67, 0x42, 0xc0, 0x1f, 0xd9, 0x00, 0xf0, 0x11, - 0x7e, 0xf0, 0x11, 0x00, 0x00, 0x03, 0x00, 0x01, - 0x00, 0x00, 0x03, 0x00, 0x30, 0x8f, 0x18, 0x32, - 0x48, - } - - pps = []byte{ - 0x68, 0xcb, 0x8c, 0xb2, - } - } - - codec := &fmp4.CodecH264{ - SPS: sps, - PPS: pps, - } - track := addTrack(codec) - - var dtsExtractor *h264.DTSExtractor - - stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.H264) - if tunit.AU == nil { - return nil - } + a.writer = asyncwriter.New(writeQueueSize, a) - randomAccess := false + switch format { + case conf.RecordFormatMPEGTS: + a.format = newRecFormatMPEGTS(a) - for _, nalu := range tunit.AU { - typ := h264.NALUType(nalu[0] & 0x1F) - switch typ { - case h264.NALUTypeSPS: - if !bytes.Equal(codec.SPS, nalu) { - codec.SPS = nalu - r.updateCodecs() - } - - case h264.NALUTypePPS: - if !bytes.Equal(codec.PPS, nalu) { - codec.PPS = nalu - r.updateCodecs() - } - - case h264.NALUTypeIDR: - randomAccess = true - } - } - - if dtsExtractor == nil { - if !randomAccess { - return nil - } - dtsExtractor = h264.NewDTSExtractor() - } - - dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) - if err != nil { - return err - } - - sampl, err := fmp4.NewPartSampleH26x( - int32(durationGoToMp4(tunit.PTS-dts, 90000)), - randomAccess, - tunit.AU) - if err != nil { - return err - } - - return track.record(&sample{ - PartSample: sampl, - dts: dts, - }) - }) - - case *format.MPEG4Video: - config := forma.SafeParams() - - if config == nil { - config = []byte{ - 0x00, 0x00, 0x01, 0xb0, 0x01, 0x00, 0x00, 0x01, - 0xb5, 0x89, 0x13, 0x00, 0x00, 0x01, 0x00, 0x00, - 0x00, 0x01, 0x20, 0x00, 0xc4, 0x8d, 0x88, 0x00, - 0xf5, 0x3c, 0x04, 0x87, 0x14, 0x63, 0x00, 0x00, - 0x01, 0xb2, 0x4c, 0x61, 0x76, 0x63, 0x35, 0x38, - 0x2e, 0x31, 0x33, 0x34, 0x2e, 0x31, 0x30, 0x30, - } - } - - codec := &fmp4.CodecMPEG4Video{ - Config: config, - } - track := addTrack(codec) - - firstReceived := false - var lastPTS time.Duration - - stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG4Video) - if tunit.Frame == nil { - return nil - } - - randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) - - if bytes.HasPrefix(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.VisualObjectSequenceStartCode)}) { - end := bytes.Index(tunit.Frame[4:], []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) - if end >= 0 { - config := tunit.Frame[:end+4] - - if !bytes.Equal(codec.Config, config) { - codec.Config = config - r.updateCodecs() - } - } - } - - if !firstReceived { - if !randomAccess { - return nil - } - firstReceived = true - } else if tunit.PTS < lastPTS { - return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)") - } - lastPTS = tunit.PTS - - return track.record(&sample{ - PartSample: &fmp4.PartSample{ - Payload: tunit.Frame, - IsNonSyncSample: !randomAccess, - }, - dts: tunit.PTS, - }) - }) - - case *format.MPEG1Video: - codec := &fmp4.CodecMPEG1Video{ - Config: []byte{ - 0x00, 0x00, 0x01, 0xb3, 0x78, 0x04, 0x38, 0x35, - 0xff, 0xff, 0xe0, 0x18, 0x00, 0x00, 0x01, 0xb5, - 0x14, 0x4a, 0x00, 0x01, 0x00, 0x00, - }, - } - track := addTrack(codec) - - firstReceived := false - var lastPTS time.Duration - - stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG1Video) - if tunit.Frame == nil { - return nil - } - - randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8}) - - if bytes.HasPrefix(tunit.Frame, []byte{0, 0, 1, 0xB3}) { - end := bytes.Index(tunit.Frame[4:], []byte{0, 0, 1, 0xB8}) - if end >= 0 { - config := tunit.Frame[:end+4] - - if !bytes.Equal(codec.Config, config) { - codec.Config = config - r.updateCodecs() - } - } - } - - if !firstReceived { - if !randomAccess { - return nil - } - firstReceived = true - } else if tunit.PTS < lastPTS { - return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)") - } - lastPTS = tunit.PTS - - return track.record(&sample{ - PartSample: &fmp4.PartSample{ - Payload: tunit.Frame, - IsNonSyncSample: !randomAccess, - }, - dts: tunit.PTS, - }) - }) - - case *format.MJPEG: - codec := &fmp4.CodecMJPEG{ - Width: 800, - Height: 600, - } - track := addTrack(codec) - - parsed := false - - stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MJPEG) - if tunit.Frame == nil { - return nil - } - - if !parsed { - parsed = true - width, height, err := jpegExtractSize(tunit.Frame) - if err != nil { - return err - } - codec.Width = width - codec.Height = height - r.updateCodecs() - } - - return track.record(&sample{ - PartSample: &fmp4.PartSample{ - Payload: tunit.Frame, - }, - dts: tunit.PTS, - }) - }) - - case *format.Opus: - codec := &fmp4.CodecOpus{ - ChannelCount: func() int { - if forma.IsStereo { - return 2 - } - return 1 - }(), - } - track := addTrack(codec) - - stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.Opus) - if tunit.Packets == nil { - return nil - } - - pts := tunit.PTS - - for _, packet := range tunit.Packets { - err := track.record(&sample{ - PartSample: &fmp4.PartSample{ - Payload: packet, - }, - dts: pts, - }) - if err != nil { - return err - } - - pts += opus.PacketDuration(packet) - } - - return nil - }) - - case *format.MPEG4Audio: - codec := &fmp4.CodecMPEG4Audio{ - Config: *forma.GetConfig(), - } - track := addTrack(codec) - - sampleRate := time.Duration(forma.ClockRate()) - - stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG4Audio) - if tunit.AUs == nil { - return nil - } - - for i, au := range tunit.AUs { - auPTS := tunit.PTS + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* - time.Second/sampleRate - - err := track.record(&sample{ - PartSample: &fmp4.PartSample{ - Payload: au, - }, - dts: auPTS, - }) - if err != nil { - return err - } - } - - return nil - }) - - case *format.MPEG1Audio: - codec := &fmp4.CodecMPEG1Audio{ - SampleRate: 32000, - ChannelCount: 2, - } - track := addTrack(codec) - - parsed := false - - stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG1Audio) - if tunit.Frames == nil { - return nil - } - - pts := tunit.PTS - - for _, frame := range tunit.Frames { - var h mpeg1audio.FrameHeader - err := h.Unmarshal(frame) - if err != nil { - return err - } - - if !parsed { - parsed = true - codec.SampleRate = h.SampleRate - codec.ChannelCount = mpeg1audioChannelCount(h.ChannelMode) - r.updateCodecs() - } - - err = track.record(&sample{ - PartSample: &fmp4.PartSample{ - Payload: frame, - }, - dts: pts, - }) - if err != nil { - return err - } - - pts += time.Duration(h.SampleCount()) * - time.Second / time.Duration(h.SampleRate) - } - - return nil - }) - - case *format.AC3: - codec := &fmp4.CodecAC3{ - SampleRate: forma.SampleRate, - ChannelCount: forma.ChannelCount, - Fscod: 0, - Bsid: 8, - Bsmod: 0, - Acmod: 7, - LfeOn: true, - BitRateCode: 7, - } - track := addTrack(codec) - - parsed := false - - stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.AC3) - if tunit.Frames == nil { - return nil - } - - pts := tunit.PTS - - for _, frame := range tunit.Frames { - var syncInfo ac3.SyncInfo - err := syncInfo.Unmarshal(frame) - if err != nil { - return fmt.Errorf("invalid AC-3 frame: %s", err) - } - - var bsi ac3.BSI - err = bsi.Unmarshal(frame[5:]) - if err != nil { - return fmt.Errorf("invalid AC-3 frame: %s", err) - } - - if !parsed { - parsed = true - codec.SampleRate = syncInfo.SampleRate() - codec.ChannelCount = bsi.ChannelCount() - codec.Fscod = syncInfo.Fscod - codec.Bsid = bsi.Bsid - codec.Bsmod = bsi.Bsmod - codec.Acmod = bsi.Acmod - codec.LfeOn = bsi.LfeOn - codec.BitRateCode = syncInfo.Frmsizecod >> 1 - r.updateCodecs() - } - - err = track.record(&sample{ - PartSample: &fmp4.PartSample{ - Payload: frame, - }, - dts: pts, - }) - if err != nil { - return err - } - - pts += time.Duration(ac3.SamplesPerFrame) * - time.Second / time.Duration(codec.SampleRate) - } - - return nil - }) - - case *format.G722: - // TODO - - case *format.G711: - // TODO - - case *format.LPCM: - codec := &fmp4.CodecLPCM{ - LittleEndian: false, - BitDepth: forma.BitDepth, - SampleRate: forma.SampleRate, - ChannelCount: forma.ChannelCount, - } - track := addTrack(codec) - - stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.LPCM) - if tunit.Samples == nil { - return nil - } - - return track.record(&sample{ - PartSample: &fmp4.PartSample{ - Payload: tunit.Samples, - }, - dts: tunit.PTS, - }) - }) - } - } + default: + a.format = newRecFormatFMP4(a) } - r.Log(logger.Info, "recording %d %s", - len(r.tracks), - func() string { - if len(r.tracks) == 1 { - return "track" - } - return "tracks" - }()) - - go r.run() + go a.run() - return r + return a } // Close closes the Agent. -func (r *Agent) Close() { - r.Log(logger.Info, "recording stopped") +func (a *Agent) Close() { + a.Log(logger.Info, "recording stopped") - r.ctxCancel() - <-r.done + a.ctxCancel() + <-a.done } // Log is the main logging function. -func (r *Agent) Log(level logger.Level, format string, args ...interface{}) { - r.parent.Log(level, "[record] "+format, args...) +func (a *Agent) Log(level logger.Level, format string, args ...interface{}) { + a.parent.Log(level, "[record] "+format, args...) } -func (r *Agent) run() { - defer close(r.done) +func (a *Agent) run() { + defer close(a.done) - r.writer.Start() + a.writer.Start() select { - case err := <-r.writer.Error(): - r.Log(logger.Error, err.Error()) - r.stream.RemoveReader(r.writer) + case err := <-a.writer.Error(): + a.Log(logger.Error, err.Error()) + a.stream.RemoveReader(a.writer) - case <-r.ctx.Done(): - r.stream.RemoveReader(r.writer) - r.writer.Stop() + case <-a.ctx.Done(): + a.stream.RemoveReader(a.writer) + a.writer.Stop() } - if r.currentSegment != nil { - r.currentSegment.close() //nolint:errcheck - } -} - -func (r *Agent) updateCodecs() { - // if codec parameters have been updated, - // and current segment has already written codec parameters on disk, - // close current segment. - if r.currentSegment != nil && r.currentSegment.f != nil { - r.currentSegment.close() //nolint:errcheck - r.currentSegment = nil - } + a.format.close() } diff --git a/internal/record/agent_test.go b/internal/record/agent_test.go index 92e29aec3d6..438b298d132 100644 --- a/internal/record/agent_test.go +++ b/internal/record/agent_test.go @@ -13,6 +13,7 @@ import ( "github.com/bluenviron/mediamtx/internal/logger" "github.com/stretchr/testify/require" + "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/unit" ) @@ -23,142 +24,161 @@ func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) { } func TestAgent(t *testing.T) { - n := 0 - timeNow = func() time.Time { - n++ - if n >= 2 { - return time.Date(2008, 0o5, 20, 22, 15, 25, 125000, time.UTC) - } - return time.Date(2009, 0o5, 20, 22, 15, 25, 427000, time.UTC) - } - - desc := &description.Session{Medias: []*description.Media{ - { - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.H265{ - PayloadTyp: 96, - }}, - }, - { - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.H264{ - PayloadTyp: 96, - PacketizationMode: 1, - }}, - }, - { - Type: description.MediaTypeAudio, - Formats: []format.Format{&format.MPEG4Audio{ - PayloadTyp: 96, - Config: &mpeg4audio.Config{ - Type: 2, - SampleRate: 44100, - ChannelCount: 2, - }, - SizeLength: 13, - IndexLength: 3, - IndexDeltaLength: 3, - }}, - }, - }} - - stream, err := stream.New( - 1460, - desc, - true, - &nilLogger{}, - ) - require.NoError(t, err) - defer stream.Close() - - dir, err := os.MkdirTemp("", "mediamtx-agent") - require.NoError(t, err) - defer os.RemoveAll(dir) - - recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f") - - segCreated := make(chan struct{}, 2) - segDone := make(chan struct{}, 2) - - a := NewAgent( - 1024, - recordPath, - 100*time.Millisecond, - 1*time.Second, - "mypath", - stream, - func(fpath string) { - segCreated <- struct{}{} - }, - func(fpath string) { - segDone <- struct{}{} - }, - &nilLogger{}, - ) - - for i := 0; i < 3; i++ { - stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H265{ - Base: unit.Base{ - PTS: (50 + time.Duration(i)) * time.Second, - }, - AU: [][]byte{ - { // VPS - 0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x02, 0x20, - 0x00, 0x00, 0x03, 0x00, 0xb0, 0x00, 0x00, 0x03, - 0x00, 0x00, 0x03, 0x00, 0x7b, 0x18, 0xb0, 0x24, + for _, ca := range []string{"fmp4", "mpegts"} { + t.Run(ca, func(t *testing.T) { + n := 0 + timeNow = func() time.Time { + n++ + if n >= 2 { + return time.Date(2008, 0o5, 20, 22, 15, 25, 125000, time.UTC) + } + return time.Date(2009, 0o5, 20, 22, 15, 25, 427000, time.UTC) + } + + desc := &description.Session{Medias: []*description.Media{ + { + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.H265{ + PayloadTyp: 96, + }}, }, - { // SPS - 0x42, 0x01, 0x01, 0x02, 0x20, 0x00, 0x00, 0x03, - 0x00, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, - 0x00, 0x7b, 0xa0, 0x07, 0x82, 0x00, 0x88, 0x7d, - 0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88, - 0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9, - 0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc, - 0xa2, 0x23, 0xff, 0x00, 0x01, 0x00, 0x01, 0x6a, - 0x02, 0x02, 0x02, 0x01, + { + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.H264{ + PayloadTyp: 96, + PacketizationMode: 1, + }}, }, - { // PPS - 0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40, + { + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.MPEG4Audio{ + PayloadTyp: 96, + Config: &mpeg4audio.Config{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, + }}, }, - {byte(h265.NALUType_CRA_NUT) << 1, 0}, // IDR - }, - }) - - stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H264{ - Base: unit.Base{ - PTS: (50 + time.Duration(i)) * time.Second, - }, - AU: [][]byte{ - { // SPS - 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, - 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, - 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20, + }} + + stream, err := stream.New( + 1460, + desc, + true, + &nilLogger{}, + ) + require.NoError(t, err) + defer stream.Close() + + dir, err := os.MkdirTemp("", "mediamtx-agent") + require.NoError(t, err) + defer os.RemoveAll(dir) + + recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f") + + segCreated := make(chan struct{}, 2) + segDone := make(chan struct{}, 2) + + var f conf.RecordFormat + if ca == "fmp4" { + f = conf.RecordFormatFMP4 + } else { + f = conf.RecordFormatMPEGTS + } + + a := NewAgent( + 1024, + recordPath, + f, + 100*time.Millisecond, + 1*time.Second, + "mypath", + stream, + func(fpath string) { + segCreated <- struct{}{} }, - { // PPS - 0x08, 0x06, 0x07, 0x08, + func(fpath string) { + segDone <- struct{}{} }, - {5}, // IDR - }, - }) - - stream.WriteUnit(desc.Medias[2], desc.Medias[2].Formats[0], &unit.MPEG4Audio{ - Base: unit.Base{ - PTS: (50 + time.Duration(i)) * time.Second, - }, - AUs: [][]byte{{1, 2, 3, 4}}, + &nilLogger{}, + ) + + for i := 0; i < 3; i++ { + stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H265{ + Base: unit.Base{ + PTS: (50 + time.Duration(i)) * time.Second, + }, + AU: [][]byte{ + { // VPS + 0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x02, 0x20, + 0x00, 0x00, 0x03, 0x00, 0xb0, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x7b, 0x18, 0xb0, 0x24, + }, + { // SPS + 0x42, 0x01, 0x01, 0x02, 0x20, 0x00, 0x00, 0x03, + 0x00, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, + 0x00, 0x7b, 0xa0, 0x07, 0x82, 0x00, 0x88, 0x7d, + 0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88, + 0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9, + 0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc, + 0xa2, 0x23, 0xff, 0x00, 0x01, 0x00, 0x01, 0x6a, + 0x02, 0x02, 0x02, 0x01, + }, + { // PPS + 0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40, + }, + {byte(h265.NALUType_CRA_NUT) << 1, 0}, // IDR + }, + }) + + stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H264{ + Base: unit.Base{ + PTS: (50 + time.Duration(i)) * time.Second, + }, + AU: [][]byte{ + { // SPS + 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, + 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, + 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20, + }, + { // PPS + 0x08, 0x06, 0x07, 0x08, + }, + {5}, // IDR + }, + }) + + stream.WriteUnit(desc.Medias[2], desc.Medias[2].Formats[0], &unit.MPEG4Audio{ + Base: unit.Base{ + PTS: (50 + time.Duration(i)) * time.Second, + }, + AUs: [][]byte{{1, 2, 3, 4}}, + }) + } + + for i := 0; i < 2; i++ { + <-segCreated + <-segDone + } + + a.Close() + + var ext string + if ca == "fmp4" { + ext = "mp4" + } else { + ext = "ts" + } + + _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000125."+ext)) + require.NoError(t, err) + + _, err = os.Stat(filepath.Join(dir, "mypath", "2009-05-20_22-15-25-000427."+ext)) + require.NoError(t, err) }) } - - for i := 0; i < 2; i++ { - <-segCreated - <-segDone - } - - a.Close() - - _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000125.mp4")) - require.NoError(t, err) - - _, err = os.Stat(filepath.Join(dir, "mypath", "2009-05-20_22-15-25-000427.mp4")) - require.NoError(t, err) } diff --git a/internal/record/cleaner.go b/internal/record/cleaner.go index 8950b728cff..396623624fe 100644 --- a/internal/record/cleaner.go +++ b/internal/record/cleaner.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/logger" ) @@ -41,6 +42,7 @@ func commonPath(v string) string { // CleanerEntry is a cleaner entry. type CleanerEntry struct { RecordPath string + RecordFormat conf.RecordFormat RecordDeleteAfter time.Duration } @@ -115,7 +117,16 @@ func (c *Cleaner) doRun() { } func (c *Cleaner) doRunEntry(e *CleanerEntry) error { - recordPath := e.RecordPath + ".mp4" + recordPath := e.RecordPath + + switch e.RecordFormat { + case conf.RecordFormatMPEGTS: + recordPath += ".ts" + + default: + recordPath += ".mp4" + } + commonPath := commonPath(recordPath) now := timeNow() diff --git a/internal/record/cleaner_test.go b/internal/record/cleaner_test.go index 0429657a230..8b4f50dd6b5 100644 --- a/internal/record/cleaner_test.go +++ b/internal/record/cleaner_test.go @@ -7,6 +7,8 @@ import ( "time" "github.com/stretchr/testify/require" + + "github.com/bluenviron/mediamtx/internal/conf" ) func TestCleaner(t *testing.T) { @@ -32,6 +34,7 @@ func TestCleaner(t *testing.T) { c := NewCleaner( []CleanerEntry{{ RecordPath: recordPath, + RecordFormat: conf.RecordFormatFMP4, RecordDeleteAfter: 10 * time.Second, }}, nilLogger{}, diff --git a/internal/record/rec_format.go b/internal/record/rec_format.go new file mode 100644 index 00000000000..8ec70d90db3 --- /dev/null +++ b/internal/record/rec_format.go @@ -0,0 +1,5 @@ +package record + +type recFormat interface { + close() +} diff --git a/internal/record/rec_format_fmp4.go b/internal/record/rec_format_fmp4.go new file mode 100644 index 00000000000..5e63a3dc3cc --- /dev/null +++ b/internal/record/rec_format_fmp4.go @@ -0,0 +1,821 @@ +package record + +import ( + "bytes" + "fmt" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/pkg/codecs/ac3" + "github.com/bluenviron/mediacommon/pkg/codecs/av1" + "github.com/bluenviron/mediacommon/pkg/codecs/h264" + "github.com/bluenviron/mediacommon/pkg/codecs/h265" + "github.com/bluenviron/mediacommon/pkg/codecs/jpeg" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg1audio" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4video" + "github.com/bluenviron/mediacommon/pkg/codecs/opus" + "github.com/bluenviron/mediacommon/pkg/codecs/vp9" + "github.com/bluenviron/mediacommon/pkg/formats/fmp4" + + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/unit" +) + +func durationGoToMp4(v time.Duration, timeScale uint32) uint64 { + timeScale64 := uint64(timeScale) + secs := v / time.Second + dec := v % time.Second + return uint64(secs)*timeScale64 + uint64(dec)*timeScale64/uint64(time.Second) +} + +func mpeg1audioChannelCount(cm mpeg1audio.ChannelMode) int { + switch cm { + case mpeg1audio.ChannelModeStereo, + mpeg1audio.ChannelModeJointStereo, + mpeg1audio.ChannelModeDualChannel: + return 2 + + default: + return 1 + } +} + +func jpegExtractSize(image []byte) (int, int, error) { + l := len(image) + if l < 2 || image[0] != 0xFF || image[1] != jpeg.MarkerStartOfImage { + return 0, 0, fmt.Errorf("invalid header") + } + + image = image[2:] + + for { + if len(image) < 2 { + return 0, 0, fmt.Errorf("not enough bits") + } + + h0, h1 := image[0], image[1] + image = image[2:] + + if h0 != 0xFF { + return 0, 0, fmt.Errorf("invalid image") + } + + switch h1 { + case 0xE0, 0xE1, 0xE2, // JFIF + jpeg.MarkerDefineHuffmanTable, + jpeg.MarkerComment, + jpeg.MarkerDefineQuantizationTable, + jpeg.MarkerDefineRestartInterval: + mlen := int(image[0])<<8 | int(image[1]) + if len(image) < mlen { + return 0, 0, fmt.Errorf("not enough bits") + } + image = image[mlen:] + + case jpeg.MarkerStartOfFrame1: + mlen := int(image[0])<<8 | int(image[1]) + if len(image) < mlen { + return 0, 0, fmt.Errorf("not enough bits") + } + + var sof jpeg.StartOfFrame1 + err := sof.Unmarshal(image[2:mlen]) + if err != nil { + return 0, 0, err + } + + return sof.Width, sof.Height, nil + + case jpeg.MarkerStartOfScan: + return 0, 0, fmt.Errorf("SOF not found") + + default: + return 0, 0, fmt.Errorf("unknown marker: 0x%.2x", h1) + } + } +} + +type recFormatFMP4 struct { + a *Agent + tracks []*recFormatFMP4Track + hasVideo bool + currentSegment *recFormatFMP4Segment + nextSequenceNumber uint32 +} + +func newRecFormatFMP4(a *Agent) recFormat { + f := &recFormatFMP4{ + a: a, + } + + nextID := 1 + + addTrack := func(codec fmp4.Codec) *recFormatFMP4Track { + initTrack := &fmp4.InitTrack{ + TimeScale: 90000, + Codec: codec, + } + initTrack.ID = nextID + nextID++ + + track := newRecFormatFMP4Track(f, initTrack) + f.tracks = append(f.tracks, track) + + return track + } + + updateCodecs := func() { + // if codec parameters have been updated, + // and current segment has already written codec parameters on disk, + // close current segment. + if f.currentSegment != nil && f.currentSegment.fi != nil { + f.currentSegment.close() //nolint:errcheck + f.currentSegment = nil + } + } + + for _, media := range a.stream.Desc().Medias { + for _, forma := range media.Formats { + switch forma := forma.(type) { + case *format.AV1: + codec := &fmp4.CodecAV1{ + SequenceHeader: []byte{ + 8, 0, 0, 0, 66, 167, 191, 228, 96, 13, 0, 64, + }, + } + track := addTrack(codec) + + firstReceived := false + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.AV1) + if tunit.TU == nil { + return nil + } + + randomAccess := false + + for _, obu := range tunit.TU { + var h av1.OBUHeader + err := h.Unmarshal(obu) + if err != nil { + return err + } + + if h.Type == av1.OBUTypeSequenceHeader { + if !bytes.Equal(codec.SequenceHeader, obu) { + codec.SequenceHeader = obu + updateCodecs() + } + randomAccess = true + } + } + + if !firstReceived { + if !randomAccess { + return nil + } + firstReceived = true + } + + sampl, err := fmp4.NewPartSampleAV1( + randomAccess, + tunit.TU) + if err != nil { + return err + } + + return track.record(&sample{ + PartSample: sampl, + dts: tunit.PTS, + }) + }) + + case *format.VP9: + codec := &fmp4.CodecVP9{ + Width: 1280, + Height: 720, + Profile: 1, + BitDepth: 8, + ChromaSubsampling: 1, + ColorRange: false, + } + track := addTrack(codec) + + firstReceived := false + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.VP9) + if tunit.Frame == nil { + return nil + } + + var h vp9.Header + err := h.Unmarshal(tunit.Frame) + if err != nil { + return err + } + + randomAccess := false + + if h.FrameType == vp9.FrameTypeKeyFrame { + randomAccess = true + + if w := h.Width(); codec.Width != w { + codec.Width = w + updateCodecs() + } + if h := h.Width(); codec.Height != h { + codec.Height = h + updateCodecs() + } + if codec.Profile != h.Profile { + codec.Profile = h.Profile + updateCodecs() + } + if codec.BitDepth != h.ColorConfig.BitDepth { + codec.BitDepth = h.ColorConfig.BitDepth + updateCodecs() + } + if c := h.ChromaSubsampling(); codec.ChromaSubsampling != c { + codec.ChromaSubsampling = c + updateCodecs() + } + if codec.ColorRange != h.ColorConfig.ColorRange { + codec.ColorRange = h.ColorConfig.ColorRange + updateCodecs() + } + } + + if !firstReceived { + if !randomAccess { + return nil + } + firstReceived = true + } + + return track.record(&sample{ + PartSample: &fmp4.PartSample{ + IsNonSyncSample: !randomAccess, + Payload: tunit.Frame, + }, + dts: tunit.PTS, + }) + }) + + case *format.VP8: + // TODO + + case *format.H265: + vps, sps, pps := forma.SafeParams() + + if vps == nil || sps == nil || pps == nil { + vps = []byte{ + 0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x02, 0x20, + 0x00, 0x00, 0x03, 0x00, 0xb0, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x7b, 0x18, 0xb0, 0x24, + } + + sps = []byte{ + 0x42, 0x01, 0x01, 0x02, 0x20, 0x00, 0x00, 0x03, + 0x00, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, + 0x00, 0x7b, 0xa0, 0x07, 0x82, 0x00, 0x88, 0x7d, + 0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88, + 0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9, + 0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc, + 0xa2, 0x23, 0xff, 0x00, 0x01, 0x00, 0x01, 0x6a, + 0x02, 0x02, 0x02, 0x01, + } + + pps = []byte{ + 0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40, + } + } + + codec := &fmp4.CodecH265{ + VPS: vps, + SPS: sps, + PPS: pps, + } + track := addTrack(codec) + + var dtsExtractor *h265.DTSExtractor + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.H265) + if tunit.AU == nil { + return nil + } + + randomAccess := false + + for _, nalu := range tunit.AU { + typ := h265.NALUType((nalu[0] >> 1) & 0b111111) + + switch typ { + case h265.NALUType_VPS_NUT: + if !bytes.Equal(codec.VPS, nalu) { + codec.VPS = nalu + updateCodecs() + } + + case h265.NALUType_SPS_NUT: + if !bytes.Equal(codec.SPS, nalu) { + codec.SPS = nalu + updateCodecs() + } + + case h265.NALUType_PPS_NUT: + if !bytes.Equal(codec.PPS, nalu) { + codec.PPS = nalu + updateCodecs() + } + + case h265.NALUType_IDR_W_RADL, h265.NALUType_IDR_N_LP, h265.NALUType_CRA_NUT: + randomAccess = true + } + } + + if dtsExtractor == nil { + if !randomAccess { + return nil + } + dtsExtractor = h265.NewDTSExtractor() + } + + dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } + + sampl, err := fmp4.NewPartSampleH26x( + int32(durationGoToMp4(tunit.PTS-dts, 90000)), + randomAccess, + tunit.AU) + if err != nil { + return err + } + + return track.record(&sample{ + PartSample: sampl, + dts: dts, + }) + }) + + case *format.H264: + sps, pps := forma.SafeParams() + + if sps == nil || pps == nil { + sps = []byte{ + 0x67, 0x42, 0xc0, 0x1f, 0xd9, 0x00, 0xf0, 0x11, + 0x7e, 0xf0, 0x11, 0x00, 0x00, 0x03, 0x00, 0x01, + 0x00, 0x00, 0x03, 0x00, 0x30, 0x8f, 0x18, 0x32, + 0x48, + } + + pps = []byte{ + 0x68, 0xcb, 0x8c, 0xb2, + } + } + + codec := &fmp4.CodecH264{ + SPS: sps, + PPS: pps, + } + track := addTrack(codec) + + var dtsExtractor *h264.DTSExtractor + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.H264) + if tunit.AU == nil { + return nil + } + + randomAccess := false + + for _, nalu := range tunit.AU { + typ := h264.NALUType(nalu[0] & 0x1F) + switch typ { + case h264.NALUTypeSPS: + if !bytes.Equal(codec.SPS, nalu) { + codec.SPS = nalu + updateCodecs() + } + + case h264.NALUTypePPS: + if !bytes.Equal(codec.PPS, nalu) { + codec.PPS = nalu + updateCodecs() + } + + case h264.NALUTypeIDR: + randomAccess = true + } + } + + if dtsExtractor == nil { + if !randomAccess { + return nil + } + dtsExtractor = h264.NewDTSExtractor() + } + + dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } + + sampl, err := fmp4.NewPartSampleH26x( + int32(durationGoToMp4(tunit.PTS-dts, 90000)), + randomAccess, + tunit.AU) + if err != nil { + return err + } + + return track.record(&sample{ + PartSample: sampl, + dts: dts, + }) + }) + + case *format.MPEG4Video: + config := forma.SafeParams() + + if config == nil { + config = []byte{ + 0x00, 0x00, 0x01, 0xb0, 0x01, 0x00, 0x00, 0x01, + 0xb5, 0x89, 0x13, 0x00, 0x00, 0x01, 0x00, 0x00, + 0x00, 0x01, 0x20, 0x00, 0xc4, 0x8d, 0x88, 0x00, + 0xf5, 0x3c, 0x04, 0x87, 0x14, 0x63, 0x00, 0x00, + 0x01, 0xb2, 0x4c, 0x61, 0x76, 0x63, 0x35, 0x38, + 0x2e, 0x31, 0x33, 0x34, 0x2e, 0x31, 0x30, 0x30, + } + } + + codec := &fmp4.CodecMPEG4Video{ + Config: config, + } + track := addTrack(codec) + + firstReceived := false + var lastPTS time.Duration + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Video) + if tunit.Frame == nil { + return nil + } + + randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) + + if bytes.HasPrefix(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.VisualObjectSequenceStartCode)}) { + end := bytes.Index(tunit.Frame[4:], []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) + if end >= 0 { + config := tunit.Frame[:end+4] + + if !bytes.Equal(codec.Config, config) { + codec.Config = config + updateCodecs() + } + } + } + + if !firstReceived { + if !randomAccess { + return nil + } + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)") + } + lastPTS = tunit.PTS + + return track.record(&sample{ + PartSample: &fmp4.PartSample{ + Payload: tunit.Frame, + IsNonSyncSample: !randomAccess, + }, + dts: tunit.PTS, + }) + }) + + case *format.MPEG1Video: + codec := &fmp4.CodecMPEG1Video{ + Config: []byte{ + 0x00, 0x00, 0x01, 0xb3, 0x78, 0x04, 0x38, 0x35, + 0xff, 0xff, 0xe0, 0x18, 0x00, 0x00, 0x01, 0xb5, + 0x14, 0x4a, 0x00, 0x01, 0x00, 0x00, + }, + } + track := addTrack(codec) + + firstReceived := false + var lastPTS time.Duration + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Video) + if tunit.Frame == nil { + return nil + } + + randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8}) + + if bytes.HasPrefix(tunit.Frame, []byte{0, 0, 1, 0xB3}) { + end := bytes.Index(tunit.Frame[4:], []byte{0, 0, 1, 0xB8}) + if end >= 0 { + config := tunit.Frame[:end+4] + + if !bytes.Equal(codec.Config, config) { + codec.Config = config + updateCodecs() + } + } + } + + if !firstReceived { + if !randomAccess { + return nil + } + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)") + } + lastPTS = tunit.PTS + + return track.record(&sample{ + PartSample: &fmp4.PartSample{ + Payload: tunit.Frame, + IsNonSyncSample: !randomAccess, + }, + dts: tunit.PTS, + }) + }) + + case *format.MJPEG: + codec := &fmp4.CodecMJPEG{ + Width: 800, + Height: 600, + } + track := addTrack(codec) + + parsed := false + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.MJPEG) + if tunit.Frame == nil { + return nil + } + + if !parsed { + parsed = true + width, height, err := jpegExtractSize(tunit.Frame) + if err != nil { + return err + } + codec.Width = width + codec.Height = height + updateCodecs() + } + + return track.record(&sample{ + PartSample: &fmp4.PartSample{ + Payload: tunit.Frame, + }, + dts: tunit.PTS, + }) + }) + + case *format.Opus: + codec := &fmp4.CodecOpus{ + ChannelCount: func() int { + if forma.IsStereo { + return 2 + } + return 1 + }(), + } + track := addTrack(codec) + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.Opus) + if tunit.Packets == nil { + return nil + } + + pts := tunit.PTS + + for _, packet := range tunit.Packets { + err := track.record(&sample{ + PartSample: &fmp4.PartSample{ + Payload: packet, + }, + dts: pts, + }) + if err != nil { + return err + } + + pts += opus.PacketDuration(packet) + } + + return nil + }) + + case *format.MPEG4Audio: + codec := &fmp4.CodecMPEG4Audio{ + Config: *forma.GetConfig(), + } + track := addTrack(codec) + + sampleRate := time.Duration(forma.ClockRate()) + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Audio) + if tunit.AUs == nil { + return nil + } + + for i, au := range tunit.AUs { + auPTS := tunit.PTS + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* + time.Second/sampleRate + + err := track.record(&sample{ + PartSample: &fmp4.PartSample{ + Payload: au, + }, + dts: auPTS, + }) + if err != nil { + return err + } + } + + return nil + }) + + case *format.MPEG1Audio: + codec := &fmp4.CodecMPEG1Audio{ + SampleRate: 32000, + ChannelCount: 2, + } + track := addTrack(codec) + + parsed := false + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Audio) + if tunit.Frames == nil { + return nil + } + + pts := tunit.PTS + + for _, frame := range tunit.Frames { + var h mpeg1audio.FrameHeader + err := h.Unmarshal(frame) + if err != nil { + return err + } + + if !parsed { + parsed = true + codec.SampleRate = h.SampleRate + codec.ChannelCount = mpeg1audioChannelCount(h.ChannelMode) + updateCodecs() + } + + err = track.record(&sample{ + PartSample: &fmp4.PartSample{ + Payload: frame, + }, + dts: pts, + }) + if err != nil { + return err + } + + pts += time.Duration(h.SampleCount()) * + time.Second / time.Duration(h.SampleRate) + } + + return nil + }) + + case *format.AC3: + codec := &fmp4.CodecAC3{ + SampleRate: forma.SampleRate, + ChannelCount: forma.ChannelCount, + Fscod: 0, + Bsid: 8, + Bsmod: 0, + Acmod: 7, + LfeOn: true, + BitRateCode: 7, + } + track := addTrack(codec) + + parsed := false + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.AC3) + if tunit.Frames == nil { + return nil + } + + pts := tunit.PTS + + for _, frame := range tunit.Frames { + var syncInfo ac3.SyncInfo + err := syncInfo.Unmarshal(frame) + if err != nil { + return fmt.Errorf("invalid AC-3 frame: %s", err) + } + + var bsi ac3.BSI + err = bsi.Unmarshal(frame[5:]) + if err != nil { + return fmt.Errorf("invalid AC-3 frame: %s", err) + } + + if !parsed { + parsed = true + codec.SampleRate = syncInfo.SampleRate() + codec.ChannelCount = bsi.ChannelCount() + codec.Fscod = syncInfo.Fscod + codec.Bsid = bsi.Bsid + codec.Bsmod = bsi.Bsmod + codec.Acmod = bsi.Acmod + codec.LfeOn = bsi.LfeOn + codec.BitRateCode = syncInfo.Frmsizecod >> 1 + updateCodecs() + } + + err = track.record(&sample{ + PartSample: &fmp4.PartSample{ + Payload: frame, + }, + dts: pts, + }) + if err != nil { + return err + } + + pts += time.Duration(ac3.SamplesPerFrame) * + time.Second / time.Duration(codec.SampleRate) + } + + return nil + }) + + case *format.G722: + // TODO + + case *format.G711: + // TODO + + case *format.LPCM: + codec := &fmp4.CodecLPCM{ + LittleEndian: false, + BitDepth: forma.BitDepth, + SampleRate: forma.SampleRate, + ChannelCount: forma.ChannelCount, + } + track := addTrack(codec) + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.LPCM) + if tunit.Samples == nil { + return nil + } + + return track.record(&sample{ + PartSample: &fmp4.PartSample{ + Payload: tunit.Samples, + }, + dts: tunit.PTS, + }) + }) + } + } + } + + a.Log(logger.Info, "recording %d %s", + len(f.tracks), + func() string { + if len(f.tracks) == 1 { + return "track" + } + return "tracks" + }()) + + return f +} + +func (f *recFormatFMP4) close() { + if f.currentSegment != nil { + f.currentSegment.close() //nolint:errcheck + } +} diff --git a/internal/record/part.go b/internal/record/rec_format_fmp4_part.go similarity index 57% rename from internal/record/part.go rename to internal/record/rec_format_fmp4_part.go index 88131af1e1e..4d6d313b64c 100644 --- a/internal/record/part.go +++ b/internal/record/rec_format_fmp4_part.go @@ -12,7 +12,11 @@ import ( "github.com/bluenviron/mediamtx/internal/logger" ) -func writePart(f io.Writer, sequenceNumber uint32, partTracks map[*track]*fmp4.PartTrack) error { +func writePart( + f io.Writer, + sequenceNumber uint32, + partTracks map[*recFormatFMP4Track]*fmp4.PartTrack, +) error { fmp4PartTracks := make([]*fmp4.PartTrack, len(partTracks)) i := 0 for _, partTrack := range partTracks { @@ -35,58 +39,60 @@ func writePart(f io.Writer, sequenceNumber uint32, partTracks map[*track]*fmp4.P return err } -type part struct { - s *segment +type recFormatFMP4Part struct { + s *recFormatFMP4Segment sequenceNumber uint32 startDTS time.Duration - partTracks map[*track]*fmp4.PartTrack + created time.Time + partTracks map[*recFormatFMP4Track]*fmp4.PartTrack endDTS time.Duration } -func newPart( - s *segment, +func newRecFormatFMP4Part( + s *recFormatFMP4Segment, sequenceNumber uint32, startDTS time.Duration, -) *part { - return &part{ +) *recFormatFMP4Part { + return &recFormatFMP4Part{ s: s, startDTS: startDTS, sequenceNumber: sequenceNumber, - partTracks: make(map[*track]*fmp4.PartTrack), + created: timeNow(), + partTracks: make(map[*recFormatFMP4Track]*fmp4.PartTrack), } } -func (p *part) close() error { - if p.s.f == nil { - p.s.fpath = encodeRecordPath(&recordPathParams{time: timeNow()}, p.s.r.path) - p.s.r.Log(logger.Debug, "creating segment %s", p.s.fpath) +func (p *recFormatFMP4Part) close() error { + if p.s.fi == nil { + p.s.fpath = encodeRecordPath(&recordPathParams{time: p.created}, p.s.f.a.path) + p.s.f.a.Log(logger.Debug, "creating segment %s", p.s.fpath) err := os.MkdirAll(filepath.Dir(p.s.fpath), 0o755) if err != nil { return err } - f, err := os.Create(p.s.fpath) + fi, err := os.Create(p.s.fpath) if err != nil { return err } - p.s.r.onSegmentCreate(p.s.fpath) + p.s.f.a.onSegmentCreate(p.s.fpath) - err = writeInit(f, p.s.r.tracks) + err = writeInit(fi, p.s.f.tracks) if err != nil { - f.Close() + fi.Close() return err } - p.s.f = f + p.s.fi = fi } - return writePart(p.s.f, p.sequenceNumber, p.partTracks) + return writePart(p.s.fi, p.sequenceNumber, p.partTracks) } -func (p *part) record(track *track, sample *sample) error { +func (p *recFormatFMP4Part) record(track *recFormatFMP4Track, sample *sample) error { partTrack, ok := p.partTracks[track] if !ok { partTrack = &fmp4.PartTrack{ @@ -102,6 +108,6 @@ func (p *part) record(track *track, sample *sample) error { return nil } -func (p *part) duration() time.Duration { +func (p *recFormatFMP4Part) duration() time.Duration { return p.endDTS - p.startDTS } diff --git a/internal/record/segment.go b/internal/record/rec_format_fmp4_segment.go similarity index 52% rename from internal/record/segment.go rename to internal/record/rec_format_fmp4_segment.go index 224a1471a68..a2c83a04616 100644 --- a/internal/record/segment.go +++ b/internal/record/rec_format_fmp4_segment.go @@ -13,7 +13,7 @@ import ( var timeNow = time.Now -func writeInit(f io.Writer, tracks []*track) error { +func writeInit(f io.Writer, tracks []*recFormatFMP4Track) error { fmp4Tracks := make([]*fmp4.InitTrack, len(tracks)) for i, track := range tracks { fmp4Tracks[i] = track.initTrack @@ -33,52 +33,52 @@ func writeInit(f io.Writer, tracks []*track) error { return err } -type segment struct { - r *Agent +type recFormatFMP4Segment struct { + f *recFormatFMP4 startDTS time.Duration fpath string - f *os.File - curPart *part + fi *os.File + curPart *recFormatFMP4Part } -func newSegment( - r *Agent, +func newRecFormatFMP4Segment( + f *recFormatFMP4, startDTS time.Duration, -) *segment { - return &segment{ - r: r, +) *recFormatFMP4Segment { + return &recFormatFMP4Segment{ + f: f, startDTS: startDTS, } } -func (s *segment) close() error { +func (s *recFormatFMP4Segment) close() error { var err error if s.curPart != nil { err = s.curPart.close() } - if s.f != nil { - s.r.Log(logger.Debug, "closing segment %s", s.fpath) - err2 := s.f.Close() + if s.fi != nil { + s.f.a.Log(logger.Debug, "closing segment %s", s.fpath) + err2 := s.fi.Close() if err == nil { err = err2 } if err2 == nil { - s.r.onSegmentComplete(s.fpath) + s.f.a.onSegmentComplete(s.fpath) } } return err } -func (s *segment) record(track *track, sample *sample) error { +func (s *recFormatFMP4Segment) record(track *recFormatFMP4Track, sample *sample) error { if s.curPart == nil { - s.curPart = newPart(s, s.r.nextSequenceNumber, sample.dts) - s.r.nextSequenceNumber++ - } else if s.curPart.duration() >= s.r.partDuration { + s.curPart = newRecFormatFMP4Part(s, s.f.nextSequenceNumber, sample.dts) + s.f.nextSequenceNumber++ + } else if s.curPart.duration() >= s.f.a.partDuration { err := s.curPart.close() s.curPart = nil @@ -86,8 +86,8 @@ func (s *segment) record(track *track, sample *sample) error { return err } - s.curPart = newPart(s, s.r.nextSequenceNumber, sample.dts) - s.r.nextSequenceNumber++ + s.curPart = newRecFormatFMP4Part(s, s.f.nextSequenceNumber, sample.dts) + s.f.nextSequenceNumber++ } return s.curPart.record(track, sample) diff --git a/internal/record/rec_format_fmp4_track.go b/internal/record/rec_format_fmp4_track.go new file mode 100644 index 00000000000..de0a4a5bc6d --- /dev/null +++ b/internal/record/rec_format_fmp4_track.go @@ -0,0 +1,57 @@ +package record + +import ( + "github.com/bluenviron/mediacommon/pkg/formats/fmp4" +) + +type recFormatFMP4Track struct { + f *recFormatFMP4 + initTrack *fmp4.InitTrack + + nextSample *sample +} + +func newRecFormatFMP4Track( + f *recFormatFMP4, + initTrack *fmp4.InitTrack, +) *recFormatFMP4Track { + return &recFormatFMP4Track{ + f: f, + initTrack: initTrack, + } +} + +func (t *recFormatFMP4Track) record(sample *sample) error { + // wait the first video sample before setting hasVideo + if t.initTrack.Codec.IsVideo() { + t.f.hasVideo = true + } + + if t.f.currentSegment == nil { + t.f.currentSegment = newRecFormatFMP4Segment(t.f, sample.dts) + } + + sample, t.nextSample = t.nextSample, sample + if sample == nil { + return nil + } + sample.Duration = uint32(durationGoToMp4(t.nextSample.dts-sample.dts, t.initTrack.TimeScale)) + + err := t.f.currentSegment.record(t, sample) + if err != nil { + return err + } + + if (!t.f.hasVideo || t.initTrack.Codec.IsVideo()) && + !t.nextSample.IsNonSyncSample && + (t.nextSample.dts-t.f.currentSegment.startDTS) >= t.f.a.segmentDuration { + err := t.f.currentSegment.close() + if err != nil { + return err + } + + t.f.currentSegment = newRecFormatFMP4Segment(t.f, t.nextSample.dts) + } + + return nil +} diff --git a/internal/record/rec_format_mpegts.go b/internal/record/rec_format_mpegts.go new file mode 100644 index 00000000000..2683aa02440 --- /dev/null +++ b/internal/record/rec_format_mpegts.go @@ -0,0 +1,332 @@ +package record + +import ( + "bufio" + "bytes" + "fmt" + "io" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/pkg/codecs/ac3" + "github.com/bluenviron/mediacommon/pkg/codecs/h264" + "github.com/bluenviron/mediacommon/pkg/codecs/h265" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4video" + "github.com/bluenviron/mediacommon/pkg/formats/mpegts" + + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/unit" +) + +const ( + mpegtsMaxBufferSize = 64 * 1024 +) + +func durationGoToMPEGTS(v time.Duration) int64 { + return int64(v.Seconds() * 90000) +} + +type dynamicWriter struct { + w io.Writer +} + +func (d *dynamicWriter) Write(p []byte) (int, error) { + return d.w.Write(p) +} + +func (d *dynamicWriter) setTarget(w io.Writer) { + d.w = w +} + +type recFormatMPEGTS struct { + a *Agent + + dw *dynamicWriter + bw *bufio.Writer + mw *mpegts.Writer + hasVideo bool + currentSegment *recFormatMPEGTSSegment +} + +func newRecFormatMPEGTS(a *Agent) recFormat { + f := &recFormatMPEGTS{ + a: a, + } + + var tracks []*mpegts.Track + + addTrack := func(codec mpegts.Codec) *mpegts.Track { + track := &mpegts.Track{ + Codec: codec, + } + tracks = append(tracks, track) + return track + } + + for _, media := range a.stream.Desc().Medias { + for _, forma := range media.Formats { + switch forma := forma.(type) { + case *format.H265: + track := addTrack(&mpegts.CodecH265{}) + + var dtsExtractor *h265.DTSExtractor + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.H265) + if tunit.AU == nil { + return nil + } + + randomAccess := h265.IsRandomAccess(tunit.AU) + + if dtsExtractor == nil { + if !randomAccess { + return nil + } + dtsExtractor = h265.NewDTSExtractor() + } + + dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } + + return f.recordH26x(track, dts, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU) + }) + + case *format.H264: + track := addTrack(&mpegts.CodecH264{}) + + var dtsExtractor *h264.DTSExtractor + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.H264) + if tunit.AU == nil { + return nil + } + + idrPresent := h264.IDRPresent(tunit.AU) + + if dtsExtractor == nil { + if !idrPresent { + return nil + } + dtsExtractor = h264.NewDTSExtractor() + } + + dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } + + return f.recordH26x(track, dts, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), idrPresent, tunit.AU) + }) + + case *format.MPEG4Video: + track := addTrack(&mpegts.CodecMPEG4Video{}) + + firstReceived := false + var lastPTS time.Duration + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Video) + if tunit.Frame == nil { + return nil + } + + if !firstReceived { + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)") + } + lastPTS = tunit.PTS + + f.hasVideo = true + randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) + + err := f.setupSegment(tunit.PTS, true, randomAccess) + if err != nil { + return err + } + + return f.mw.WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + }) + + case *format.MPEG1Video: + track := addTrack(&mpegts.CodecMPEG1Video{}) + + firstReceived := false + var lastPTS time.Duration + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Video) + if tunit.Frame == nil { + return nil + } + + if !firstReceived { + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)") + } + lastPTS = tunit.PTS + + f.hasVideo = true + randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8}) + + err := f.setupSegment(tunit.PTS, true, randomAccess) + if err != nil { + return err + } + + return f.mw.WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + }) + + case *format.Opus: + track := addTrack(&mpegts.CodecOpus{ + ChannelCount: func() int { + if forma.IsStereo { + return 2 + } + return 1 + }(), + }) + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.Opus) + if tunit.Packets == nil { + return nil + } + + err := f.setupSegment(tunit.PTS, false, true) + if err != nil { + return err + } + + return f.mw.WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets) + }) + + case *format.MPEG4Audio: + track := addTrack(&mpegts.CodecMPEG4Audio{ + Config: *forma.GetConfig(), + }) + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Audio) + if tunit.AUs == nil { + return nil + } + + err := f.setupSegment(tunit.PTS, false, true) + if err != nil { + return err + } + + return f.mw.WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs) + }) + + case *format.MPEG1Audio: + track := addTrack(&mpegts.CodecMPEG1Audio{}) + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Audio) + if tunit.Frames == nil { + return nil + } + + err := f.setupSegment(tunit.PTS, false, true) + if err != nil { + return err + } + + return f.mw.WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames) + }) + + case *format.AC3: + track := addTrack(&mpegts.CodecAC3{}) + + sampleRate := time.Duration(forma.SampleRate) + + a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.AC3) + if tunit.Frames == nil { + return nil + } + + for i, frame := range tunit.Frames { + framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame* + time.Second/sampleRate + + err := f.mw.WriteAC3(track, durationGoToMPEGTS(framePTS), frame) + if err != nil { + return err + } + } + + return nil + }) + } + } + } + + f.dw = &dynamicWriter{} + f.bw = bufio.NewWriterSize(f.dw, mpegtsMaxBufferSize) + f.mw = mpegts.NewWriter(f.bw, tracks) + + a.Log(logger.Info, "recording %d %s", + len(tracks), + func() string { + if len(tracks) == 1 { + return "track" + } + return "tracks" + }()) + + return f +} + +func (f *recFormatMPEGTS) close() { + if f.currentSegment != nil { + f.currentSegment.close() //nolint:errcheck + } +} + +func (f *recFormatMPEGTS) setupSegment(dts time.Duration, isVideo bool, randomAccess bool) error { + switch { + case f.currentSegment == nil: + f.currentSegment = newRecFormatMPEGTSSegment(f, dts) + + case (!f.hasVideo || isVideo) && + randomAccess && + (dts-f.currentSegment.startDTS) >= f.a.segmentDuration: + err := f.currentSegment.close() + if err != nil { + return err + } + + f.currentSegment = newRecFormatMPEGTSSegment(f, dts) + + case (dts - f.currentSegment.lastFlush) >= f.a.partDuration: + err := f.bw.Flush() + if err != nil { + return err + } + + f.currentSegment.lastFlush = dts + } + + return nil +} + +func (f *recFormatMPEGTS) recordH26x(track *mpegts.Track, goDTS time.Duration, + pts int64, dts int64, randomAccess bool, au [][]byte, +) error { + f.hasVideo = true + + err := f.setupSegment(goDTS, true, randomAccess) + if err != nil { + return err + } + + return f.mw.WriteH26x(track, pts, dts, randomAccess, au) +} diff --git a/internal/record/rec_format_mpegts_segment.go b/internal/record/rec_format_mpegts_segment.go new file mode 100644 index 00000000000..89a1baa5ce8 --- /dev/null +++ b/internal/record/rec_format_mpegts_segment.go @@ -0,0 +1,73 @@ +package record + +import ( + "os" + "path/filepath" + "time" + + "github.com/bluenviron/mediamtx/internal/logger" +) + +type recFormatMPEGTSSegment struct { + f *recFormatMPEGTS + startDTS time.Duration + lastFlush time.Duration + + created time.Time + fpath string + fi *os.File +} + +func newRecFormatMPEGTSSegment(f *recFormatMPEGTS, startDTS time.Duration) *recFormatMPEGTSSegment { + s := &recFormatMPEGTSSegment{ + f: f, + startDTS: startDTS, + lastFlush: startDTS, + created: timeNow(), + } + + f.dw.setTarget(s) + + return s +} + +func (s *recFormatMPEGTSSegment) close() error { + err := s.f.bw.Flush() + + if s.fi != nil { + s.f.a.Log(logger.Debug, "closing segment %s", s.fpath) + err2 := s.fi.Close() + if err == nil { + err = err2 + } + + if err2 == nil { + s.f.a.onSegmentComplete(s.fpath) + } + } + + return err +} + +func (s *recFormatMPEGTSSegment) Write(p []byte) (int, error) { + if s.fi == nil { + s.fpath = encodeRecordPath(&recordPathParams{time: s.created}, s.f.a.path) + s.f.a.Log(logger.Debug, "creating segment %s", s.fpath) + + err := os.MkdirAll(filepath.Dir(s.fpath), 0o755) + if err != nil { + return 0, err + } + + fi, err := os.Create(s.fpath) + if err != nil { + return 0, err + } + + s.f.a.onSegmentCreate(s.fpath) + + s.fi = fi + } + + return s.fi.Write(p) +} diff --git a/internal/record/track.go b/internal/record/track.go deleted file mode 100644 index 22fce7510d6..00000000000 --- a/internal/record/track.go +++ /dev/null @@ -1,57 +0,0 @@ -package record - -import ( - "github.com/bluenviron/mediacommon/pkg/formats/fmp4" -) - -type track struct { - r *Agent - initTrack *fmp4.InitTrack - - nextSample *sample -} - -func newTrack( - r *Agent, - initTrack *fmp4.InitTrack, -) *track { - return &track{ - r: r, - initTrack: initTrack, - } -} - -func (t *track) record(sample *sample) error { - // wait the first video sample before setting hasVideo - if t.initTrack.Codec.IsVideo() { - t.r.hasVideo = true - } - - if t.r.currentSegment == nil { - t.r.currentSegment = newSegment(t.r, sample.dts) - } - - sample, t.nextSample = t.nextSample, sample - if sample == nil { - return nil - } - sample.Duration = uint32(durationGoToMp4(t.nextSample.dts-sample.dts, t.initTrack.TimeScale)) - - err := t.r.currentSegment.record(t, sample) - if err != nil { - return err - } - - if (!t.r.hasVideo || t.initTrack.Codec.IsVideo()) && - !t.nextSample.IsNonSyncSample && - (t.nextSample.dts-t.r.currentSegment.startDTS) >= t.r.segmentDuration { - err := t.r.currentSegment.close() - if err != nil { - return err - } - - t.r.currentSegment = newSegment(t.r, t.nextSample.dts) - } - - return nil -} diff --git a/internal/stream/stream.go b/internal/stream/stream.go index ce01d3e4ae9..31aab9867f5 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -119,6 +119,25 @@ func (s *Stream) RemoveReader(r *asyncwriter.Writer) { } } +// MediasForReader returns all medias that a reader is reading. +func (s *Stream) MediasForReader(r *asyncwriter.Writer) []*description.Media { + s.mutex.Lock() + defer s.mutex.Unlock() + + var medias []*description.Media + + for media, sm := range s.smedias { + for _, sf := range sm.formats { + if _, ok := sf.readers[r]; ok { + medias = append(medias, media) + break + } + } + } + + return medias +} + // WriteUnit writes a Unit. func (s *Stream) WriteUnit(medi *description.Media, forma format.Format, u unit.Unit) { sm := s.smedias[medi] diff --git a/mediamtx.yml b/mediamtx.yml index 55f043d2b5a..21514f0ac1e 100644 --- a/mediamtx.yml +++ b/mediamtx.yml @@ -300,9 +300,10 @@ pathDefaults: # Available variables are %path (path name), %Y %m %d %H %M %S %f (time in strftime format) recordPath: ./recordings/%path/%Y-%m-%d_%H-%M-%S-%f # Format of recorded segments. - # Currently the only available format is fmp4 (fragmented MP4). + # Available formats are "fmp4" (fragmented MP4) and "mpegts" (MPEG-TS). recordFormat: fmp4 # fMP4 segments are concatenation of small MP4 files (parts), each with this duration. + # MPEG-TS segments are concatenation of 188-bytes packets, flushed to disk with this period. # When a system failure occurs, the last part gets lost. # Therefore, the part duration is equal to the RPO (recovery point objective). recordPartDuration: 100ms