Skip to content

Commit

Permalink
playaback: use a fixed fMP4 part duration
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Apr 7, 2024
1 parent 3144b31 commit 58c91fc
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 101 deletions.
3 changes: 1 addition & 2 deletions internal/playback/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package playback
type muxer interface {
writeInit(init []byte)
setTrack(trackID int)
writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte)
writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte) error
writeFinalDTS(dts int64)
flush() error
finalFlush() error
}
96 changes: 42 additions & 54 deletions internal/playback/muxer_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package playback

import (
"io"
"time"

"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer"
)

var partSize = durationGoToMp4(1*time.Second, fmp4Timescale)

type muxerFMP4Track struct {
started bool
id int
firstDTS uint64
firstDTS int64
lastDTS int64
samples []*fmp4.PartSample
}
Expand Down Expand Up @@ -42,58 +44,27 @@ func (w *muxerFMP4) setTrack(trackID int) {
w.curTrack = findTrack(w.tracks, trackID)
if w.curTrack == nil {
w.curTrack = &muxerFMP4Track{
id: trackID,
id: trackID,
firstDTS: -1,
}
w.tracks = append(w.tracks, w.curTrack)
}
}

func (w *muxerFMP4) writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte) {
if !w.curTrack.started {
if dts >= 0 {
w.curTrack.started = true
w.curTrack.firstDTS = uint64(dts)
func (w *muxerFMP4) writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte) error {
if dts >= 0 {
if w.curTrack.firstDTS < 0 {
w.curTrack.firstDTS = dts

// reset GOP preceding the first frame
if !isNonSyncSample {
w.curTrack.samples = []*fmp4.PartSample{{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
}}
} else {
w.curTrack.samples = append(w.curTrack.samples, &fmp4.PartSample{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
})
w.curTrack.samples = nil

Check warning on line 61 in internal/playback/muxer_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/muxer_fmp4.go#L61

Added line #L61 was not covered by tests
}
w.curTrack.lastDTS = dts
} else {
ptsOffset = 0

if !isNonSyncSample {
w.curTrack.samples = []*fmp4.PartSample{{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
}}
} else {
w.curTrack.samples = append(w.curTrack.samples, &fmp4.PartSample{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
})
}
}
} else {
if w.curTrack.samples == nil {
w.curTrack.firstDTS = uint64(dts)
} else {
diff := dts - w.curTrack.lastDTS
if diff < 0 {
diff = 0
}

w.curTrack.samples[len(w.curTrack.samples)-1].Duration = uint32(diff)
}

Expand All @@ -103,25 +74,48 @@ func (w *muxerFMP4) writeSample(dts int64, ptsOffset int32, isNonSyncSample bool
Payload: payload,
})
w.curTrack.lastDTS = dts

if (w.curTrack.lastDTS - w.curTrack.firstDTS) > int64(partSize) {
err := w.innerFlush(false)
if err != nil {
return err
}

Check warning on line 82 in internal/playback/muxer_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/muxer_fmp4.go#L81-L82

Added lines #L81 - L82 were not covered by tests
}
} else {
// store GOP preceding the first frame, with PTSOffset = 0 and Duration = 0
if !isNonSyncSample {
w.curTrack.samples = []*fmp4.PartSample{{
IsNonSyncSample: isNonSyncSample,
Payload: payload,
}}
} else {
w.curTrack.samples = append(w.curTrack.samples, &fmp4.PartSample{
IsNonSyncSample: isNonSyncSample,
Payload: payload,
})
}

Check warning on line 96 in internal/playback/muxer_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/muxer_fmp4.go#L92-L96

Added lines #L92 - L96 were not covered by tests
}

return nil
}

func (w *muxerFMP4) writeFinalDTS(dts int64) {
if w.curTrack.started && w.curTrack.samples != nil {
if w.curTrack.firstDTS >= 0 {
diff := dts - w.curTrack.lastDTS
if diff < 0 {
diff = 0
}

w.curTrack.samples[len(w.curTrack.samples)-1].Duration = uint32(diff)
}
}

func (w *muxerFMP4) flush2(final bool) error {
func (w *muxerFMP4) innerFlush(final bool) error {
var part fmp4.Part

for _, track := range w.tracks {
if track.started && (len(track.samples) > 1 || (final && len(track.samples) != 0)) {
if track.firstDTS >= 0 && (len(track.samples) > 1 || (final && len(track.samples) != 0)) {
// do not write the final sample
// in order to allow changing its duration to compensate NTP-DTS differences
var samples []*fmp4.PartSample
if !final {
samples = track.samples[:len(track.samples)-1]
Expand All @@ -131,15 +125,13 @@ func (w *muxerFMP4) flush2(final bool) error {

part.Tracks = append(part.Tracks, &fmp4.PartTrack{
ID: track.id,
BaseTime: track.firstDTS,
BaseTime: uint64(track.firstDTS),
Samples: samples,
})

if !final {
track.samples = track.samples[len(track.samples)-1:]
track.firstDTS = uint64(track.lastDTS)
} else {
track.samples = nil
track.firstDTS = track.lastDTS
}
}
}
Expand Down Expand Up @@ -173,9 +165,5 @@ func (w *muxerFMP4) flush2(final bool) error {
}

func (w *muxerFMP4) flush() error {
return w.flush2(false)
}

func (w *muxerFMP4) finalFlush() error {
return w.flush2(true)
return w.innerFlush(true)
}
5 changes: 2 additions & 3 deletions internal/playback/on_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,13 @@ func seekAndMux(
}()
if err != nil {
if errors.Is(err, errStopIteration) {
return nil
break
}

return err
}
}

err = m.finalFlush()
err = m.flush()
if err != nil {
return err
}
Expand Down
44 changes: 16 additions & 28 deletions internal/playback/on_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func TestOnGet(t *testing.T) {

writeSegment1(t, filepath.Join(dir, "mypath", "2008-11-07_11-22-00-500000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-500000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-04-500000.mp4"))

s := &Server{
Address: "127.0.0.1:9996",
Expand All @@ -252,7 +253,7 @@ func TestOnGet(t *testing.T) {
v := url.Values{}
v.Set("path", "mypath")
v.Set("start", time.Date(2008, 11, 0o7, 11, 23, 1, 500000000, time.Local).Format(time.RFC3339Nano))
v.Set("duration", "2")
v.Set("duration", "3")
v.Set("format", "fmp4")
u.RawQuery = v.Encode()

Expand Down Expand Up @@ -283,36 +284,29 @@ func TestOnGet(t *testing.T) {
Duration: 0,
Payload: []byte{3, 4},
},
},
},
},
},
{
SequenceNumber: 1,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 0,
Samples: []*fmp4.PartSample{
{
Duration: 90000,
IsNonSyncSample: true,
Payload: []byte{5, 6},
},
{
Duration: 90000,
Payload: []byte{7, 8},
},
},
},
},
},
{
SequenceNumber: 2,
SequenceNumber: 1,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 90000,
BaseTime: 180000,
Samples: []*fmp4.PartSample{
{
Duration: 90000,
Payload: []byte{7, 8},
Payload: []byte{9, 10},
},
},
},
Expand Down Expand Up @@ -385,6 +379,11 @@ func TestOnGetDifferentInit(t *testing.T) {
Duration: 0,
Payload: []byte{3, 4},
},
{
Duration: 90000,
IsNonSyncSample: true,
Payload: []byte{5, 6},
},
},
},
},
Expand Down Expand Up @@ -456,17 +455,6 @@ func TestOnGetNTPCompensation(t *testing.T) {
Duration: 0,
Payload: []byte{3, 4},
},
},
},
},
},
{
SequenceNumber: 1,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 0,
Samples: []*fmp4.PartSample{
{
Duration: 45000, // 90 - 45
IsNonSyncSample: true,
Expand All @@ -481,11 +469,11 @@ func TestOnGetNTPCompensation(t *testing.T) {
},
},
{
SequenceNumber: 2,
SequenceNumber: 1,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 135000, // 180 - 45
BaseTime: 135000,
Samples: []*fmp4.PartSample{
{
Duration: 90000,
Expand Down
22 changes: 8 additions & 14 deletions internal/playback/segment_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,15 @@ func segmentFMP4SeekAndMuxParts(
return nil, err
}

m.writeSample(
err = m.writeSample(
muxerDTS,
e.SampleCompositionTimeOffsetV1,
(e.SampleFlags&sampleFlagIsNonSyncSample) != 0,
payload,
)
if err != nil {
return nil, err
}

Check warning on line 385 in internal/playback/segment_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/segment_fmp4.go#L384-L385

Added lines #L384 - L385 were not covered by tests

muxerDTS += int64(e.SampleDuration)
}
Expand All @@ -389,12 +392,6 @@ func segmentFMP4SeekAndMuxParts(
if muxerDTS > maxMuxerDTS {
maxMuxerDTS = muxerDTS
}

case "mdat":
err := m.flush()
if err != nil {
return nil, err
}
}
return nil, nil
})
Expand Down Expand Up @@ -474,12 +471,15 @@ func segmentFMP4WriteParts(
return nil, err
}

m.writeSample(
err = m.writeSample(
muxerDTS,
e.SampleCompositionTimeOffsetV1,
(e.SampleFlags&sampleFlagIsNonSyncSample) != 0,
payload,
)
if err != nil {
return nil, err
}

Check warning on line 482 in internal/playback/segment_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/segment_fmp4.go#L481-L482

Added lines #L481 - L482 were not covered by tests

muxerDTS += int64(e.SampleDuration)
}
Expand All @@ -489,12 +489,6 @@ func segmentFMP4WriteParts(
if muxerDTS > maxMuxerDTS {
maxMuxerDTS = muxerDTS
}

case "mdat":
err := m.flush()
if err != nil {
return nil, err
}
}
return nil, nil
})
Expand Down

0 comments on commit 58c91fc

Please sign in to comment.