Skip to content

Commit

Permalink
use native timestamps instead of time.Duration (#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 authored Oct 7, 2024
1 parent 076f27f commit 8a07427
Show file tree
Hide file tree
Showing 28 changed files with 492 additions and 379 deletions.
24 changes: 12 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,23 @@ type ClientOnRequestFunc func(*http.Request)
type ClientOnTracksFunc func([]*Track) error

// ClientOnDataAV1Func is the prototype of the function passed to OnDataAV1().
type ClientOnDataAV1Func func(pts time.Duration, tu [][]byte)
type ClientOnDataAV1Func func(pts int64, tu [][]byte)

// ClientOnDataVP9Func is the prototype of the function passed to OnDataVP9().
type ClientOnDataVP9Func func(pts time.Duration, frame []byte)
type ClientOnDataVP9Func func(pts int64, frame []byte)

// ClientOnDataH26xFunc is the prototype of the function passed to OnDataH26x().
type ClientOnDataH26xFunc func(pts time.Duration, dts time.Duration, au [][]byte)
type ClientOnDataH26xFunc func(pts int64, dts int64, au [][]byte)

// ClientOnDataMPEG4AudioFunc is the prototype of the function passed to OnDataMPEG4Audio().
type ClientOnDataMPEG4AudioFunc func(pts time.Duration, aus [][]byte)
type ClientOnDataMPEG4AudioFunc func(pts int64, aus [][]byte)

// ClientOnDataOpusFunc is the prototype of the function passed to OnDataOpus().
type ClientOnDataOpusFunc func(pts time.Duration, packets [][]byte)
type ClientOnDataOpusFunc func(pts int64, packets [][]byte)

type clientOnStreamTracksFunc func(ctx context.Context, isLeading bool, tracks []*Track) ([]*clientTrack, bool)

type clientOnDataFunc func(pts time.Duration, dts time.Duration, data [][]byte)
type clientOnDataFunc func(pts int64, dts int64, data [][]byte)

func clientAbsoluteURL(base *url.URL, relative string) (*url.URL, error) {
u, err := url.Parse(relative)
Expand Down Expand Up @@ -186,35 +186,35 @@ func (c *Client) Wait() chan error {

// OnDataAV1 sets a callback that is called when data from an AV1 track is received.
func (c *Client) OnDataAV1(track *Track, cb ClientOnDataAV1Func) {
c.tracks[track].onData = func(pts time.Duration, _ time.Duration, data [][]byte) {
c.tracks[track].onData = func(pts int64, _ int64, data [][]byte) {
cb(pts, data)
}
}

// OnDataVP9 sets a callback that is called when data from a VP9 track is received.
func (c *Client) OnDataVP9(track *Track, cb ClientOnDataVP9Func) {
c.tracks[track].onData = func(pts time.Duration, _ time.Duration, data [][]byte) {
c.tracks[track].onData = func(pts int64, _ int64, data [][]byte) {
cb(pts, data[0])
}
}

// OnDataH26x sets a callback that is called when data from an H26x track is received.
func (c *Client) OnDataH26x(track *Track, cb ClientOnDataH26xFunc) {
c.tracks[track].onData = func(pts time.Duration, dts time.Duration, data [][]byte) {
c.tracks[track].onData = func(pts int64, dts int64, data [][]byte) {
cb(pts, dts, data)
}
}

// OnDataMPEG4Audio sets a callback that is called when data from a MPEG-4 Audio track is received.
func (c *Client) OnDataMPEG4Audio(track *Track, cb ClientOnDataMPEG4AudioFunc) {
c.tracks[track].onData = func(pts time.Duration, _ time.Duration, data [][]byte) {
c.tracks[track].onData = func(pts int64, _ int64, data [][]byte) {
cb(pts, data)
}
}

// OnDataOpus sets a callback that is called when data from an Opus track is received.
func (c *Client) OnDataOpus(track *Track, cb ClientOnDataOpusFunc) {
c.tracks[track].onData = func(pts time.Duration, _ time.Duration, data [][]byte) {
c.tracks[track].onData = func(pts int64, _ int64, data [][]byte) {
cb(pts, data)
}
}
Expand Down Expand Up @@ -267,7 +267,7 @@ func (c *Client) setTracks(tracks []*Track) (map[*Track]*clientTrack, error) {
for _, track := range tracks {
c.tracks[track] = &clientTrack{
track: track,
onData: func(_, _ time.Duration, _ [][]byte) {},
onData: func(_, _ int64, _ [][]byte) {},
}
}

Expand Down
45 changes: 24 additions & 21 deletions client_stream_processor_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@ func (p *clientStreamProcessorFMP4) run(ctx context.Context) error {
p.leadingTrackID = fmp4PickLeadingTrack(&p.init)

tracks := make([]*Track, len(p.init.Tracks))

for i, track := range p.init.Tracks {
tracks[i] = &Track{
Codec: codecs.FromFMP4(track.Codec),
Codec: codecs.FromFMP4(track.Codec),
ClockRate: int(track.TimeScale),
}
}

Expand Down Expand Up @@ -121,26 +123,28 @@ func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, seg *seg

ntpAvailable := false
var ntpAbsolute time.Time
var ntpRelative time.Duration
var ntpRelative int64
var leadingClockRate int

if p.trackProcessors == nil || seg.dateTime != nil {
partTrack := findFirstPartTrackOfLeadingTrack(parts, p.leadingTrackID)
if partTrack == nil {
return fmt.Errorf("could not find data of leading track")
}
partTrack := findFirstPartTrackOfLeadingTrack(parts, p.leadingTrackID)
if partTrack == nil {
return fmt.Errorf("could not find data of leading track")
}

if p.trackProcessors == nil {
err := p.initializeTrackProcessors(ctx, partTrack)
if err != nil {
return err
}
if p.trackProcessors == nil {
err := p.initializeTrackProcessors(ctx, partTrack)
if err != nil {
return err
}
}

if seg.dateTime != nil {
ntpAvailable = true
ntpAbsolute = *seg.dateTime
ntpRelative = p.timeConv.convert(partTrack.BaseTime, p.timeConv.leadingTimeScale)
}
leadingTrackProc := p.trackProcessors[partTrack.ID]
leadingClockRate = leadingTrackProc.track.track.ClockRate

if seg.dateTime != nil {
ntpAvailable = true
ntpAbsolute = *seg.dateTime
ntpRelative = p.timeConv.convert(int64(partTrack.BaseTime), leadingClockRate)
}

partTrackCount := 0
Expand All @@ -155,7 +159,7 @@ func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, seg *seg
err := trackProc.push(ctx, &procEntryFMP4{
ntpAvailable: ntpAvailable,
ntpAbsolute: ntpAbsolute,
ntpRelative: ntpRelative,
ntpRelative: multiplyAndDivide(ntpRelative, int64(trackProc.track.track.ClockRate), int64(leadingClockRate)),
partTrack: partTrack,
})
if err != nil {
Expand Down Expand Up @@ -196,8 +200,8 @@ func (p *clientStreamProcessorFMP4) initializeTrackProcessors(
timeScale := findTimeScaleOfLeadingTrack(p.init.Tracks, p.leadingTrackID)

p.timeConv = &clientTimeConvFMP4{
leadingTimeScale: timeScale,
initialBaseTime: partTrack.BaseTime,
leadingTimeScale: int64(timeScale),
leadingBaseTime: int64(partTrack.BaseTime),
}
p.timeConv.initialize()

Expand All @@ -219,7 +223,6 @@ func (p *clientStreamProcessorFMP4) initializeTrackProcessors(
for i, track := range p.clientStreamTracks {
trackProc := &clientTrackProcessorFMP4{
track: track,
timeScale: p.init.Tracks[i].TimeScale,
timeConv: p.timeConv,
onPartTrackProcessed: p.onPartTrackProcessed,
}
Expand Down
10 changes: 7 additions & 3 deletions client_stream_processor_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,11 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, firs
leadingTrackID := mpegtsPickLeadingTrack(p.reader.Tracks())

tracks := make([]*Track, len(p.reader.Tracks()))

for i, mpegtsTrack := range p.reader.Tracks() {
tracks[i] = &Track{
Codec: codecs.FromMPEGTS(mpegtsTrack.Codec),
Codec: codecs.FromMPEGTS(mpegtsTrack.Codec),
ClockRate: 90000,
}
}

Expand All @@ -180,7 +182,7 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, firs

ntpAvailable := false
var ntpAbsolute time.Time
var ntpRelative time.Duration
var ntpRelative int64

for i, mpegtsTrack := range p.reader.Tracks() {
track := p.clientStreamTracks[i]
Expand Down Expand Up @@ -225,7 +227,9 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, firs

ntp := time.Time{}
if ntpAvailable {
ntp = ntpAbsolute.Add(dts - ntpRelative)
diff := dts - ntpRelative
diffDur := timestampToDuration(diff, 90000)
ntp = ntpAbsolute.Add(diffDur)
}

return trackProc.push(ctx, &procEntryMPEGTS{
Expand Down
67 changes: 44 additions & 23 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,20 @@ func TestClient(t *testing.T) {
pps = testPPS
}

var audioClockRate int
if format == "fmp4" {
audioClockRate = 44100
} else {
audioClockRate = 90000
}

require.Equal(t, []*Track{
{
Codec: &codecs.H264{
SPS: sps,
PPS: pps,
},
ClockRate: 90000,
},
{
Codec: &codecs.MPEG4Audio{
Expand All @@ -423,14 +431,15 @@ func TestClient(t *testing.T) {
ChannelCount: 2,
},
},
ClockRate: audioClockRate,
},
}, tracks)

c.OnDataH26x(tracks[0], func(pts time.Duration, dts time.Duration, au [][]byte) {
c.OnDataH26x(tracks[0], func(pts int64, dts int64, au [][]byte) {
switch videoCount {
case 0:
require.Equal(t, time.Duration(0), dts)
require.Equal(t, 2*time.Second, pts)
require.Equal(t, int64(0), dts)
require.Equal(t, int64(2*90000), pts)
require.Equal(t, [][]byte{
{7, 1, 2, 3},
{8},
Expand All @@ -441,16 +450,16 @@ func TestClient(t *testing.T) {
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 0, time.UTC), ntp)

case 1:
require.Equal(t, 33333333*time.Nanosecond, dts)
require.Equal(t, 2*time.Second+33333333*time.Nanosecond, pts)
require.Equal(t, int64(3000), dts)
require.Equal(t, int64(2*90000+3000), pts)
require.Equal(t, [][]byte{{1, 4, 5, 6}}, au)
ntp, ok := c.AbsoluteTime(tracks[0])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 33333333, time.UTC), ntp)

case 2:
require.Equal(t, 66666666*time.Nanosecond, dts)
require.Equal(t, 66666666*time.Nanosecond, pts)
require.Equal(t, int64(6000), dts)
require.Equal(t, int64(6000), pts)
require.Equal(t, [][]byte{{4}}, au)
_, ok := c.AbsoluteTime(tracks[0])
require.Equal(t, false, ok)
Expand All @@ -459,17 +468,17 @@ func TestClient(t *testing.T) {
videoCount++
})

c.OnDataMPEG4Audio(tracks[1], func(pts time.Duration, aus [][]byte) {
c.OnDataMPEG4Audio(tracks[1], func(pts int64, aus [][]byte) {
switch audioCount {
case 0:
require.Equal(t, 0*time.Second, pts)
require.Equal(t, int64(0), pts)
require.Equal(t, [][]byte{{1, 2, 3, 4}}, aus)
ntp, ok := c.AbsoluteTime(tracks[1])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 0, time.UTC), ntp)

case 1:
require.Equal(t, 33333333*time.Nanosecond, pts)
require.Equal(t, int64(0.0333336*float64(tracks[1].ClockRate)), pts)
require.Equal(t, [][]byte{{5, 6, 7, 8}}, aus)
ntp, ok := c.AbsoluteTime(tracks[1])
require.Equal(t, true, ok)
Expand Down Expand Up @@ -516,6 +525,7 @@ func TestClientFMP4MultiRenditions(t *testing.T) {
"#EXT-X-INDEPENDENT-SEGMENTS\n" +
"#EXT-X-TARGETDURATION:2\n" +
"#EXT-X-MAP:URI=\"init_video.mp4\"\n" +
"#EXT-X-PROGRAM-DATE-TIME:2015-02-05T01:02:02Z\n" +
"#EXTINF:2,\n" +
"segment_video.mp4\n" +
"#EXT-X-ENDLIST\n"))
Expand All @@ -529,6 +539,7 @@ func TestClientFMP4MultiRenditions(t *testing.T) {
"#EXT-X-INDEPENDENT-SEGMENTS\n" +
"#EXT-X-TARGETDURATION:2\n" +
"#EXT-X-MAP:URI=\"init_audio.mp4\"\n" +
"#EXT-X-PROGRAM-DATE-TIME:2015-02-05T01:02:02Z\n" +
"#EXTINF:2,\n" +
"segment_audio.mp4\n" +
"#EXT-X-ENDLIST"))
Expand Down Expand Up @@ -589,7 +600,8 @@ func TestClientFMP4MultiRenditions(t *testing.T) {
err := mp4ToWriter(&fmp4.Part{
Tracks: []*fmp4.PartTrack{
{
ID: 1,
ID: 1,
BaseTime: 3000,
Samples: []*fmp4.PartSample{{
Duration: 44100,
Payload: []byte{1, 2, 3, 4},
Expand Down Expand Up @@ -627,30 +639,38 @@ func TestClientFMP4MultiRenditions(t *testing.T) {
SPS: testSPS,
PPS: testPPS,
},
ClockRate: 90000,
},
{
Codec: &codecs.MPEG4Audio{
Config: testConfig,
},
ClockRate: 44100,
},
}, tracks)

c.OnDataH26x(tracks[0], func(pts time.Duration, dts time.Duration, au [][]byte) {
require.Equal(t, 3*time.Second, pts)
require.Equal(t, time.Duration(0), dts)
c.OnDataH26x(tracks[0], func(pts int64, dts int64, au [][]byte) {
require.Equal(t, int64(3*90000), pts)
require.Equal(t, int64(0), dts)
require.Equal(t, [][]byte{
{7, 1, 2, 3},
{8},
{5},
}, au)
ntp, ok := c.AbsoluteTime(tracks[0])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 0, time.UTC), ntp)
packetRecv <- struct{}{}
})

c.OnDataMPEG4Audio(tracks[1], func(pts time.Duration, aus [][]byte) {
require.Equal(t, 0*time.Second, pts)
c.OnDataMPEG4Audio(tracks[1], func(pts int64, aus [][]byte) {
require.Equal(t, int64(3000), pts)
require.Equal(t, [][]byte{
{1, 2, 3, 4},
}, aus)
ntp, ok := c.AbsoluteTime(tracks[1])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 34693877, time.UTC), ntp)
packetRecv <- struct{}{}
})

Expand Down Expand Up @@ -820,17 +840,18 @@ func TestClientFMP4LowLatency(t *testing.T) {
SPS: testSPS,
PPS: testPPS,
},
ClockRate: 90000,
},
}, tracks)

c.OnDataH26x(tracks[0], func(pts time.Duration, dts time.Duration, au [][]byte) {
c.OnDataH26x(tracks[0], func(pts int64, dts int64, au [][]byte) {
switch recvCount {
case 0:
ntp, ok := c.AbsoluteTime(tracks[0])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 4, 0, time.UTC), ntp)
require.Equal(t, 0*time.Second, pts)
require.Equal(t, time.Duration(0), dts)
require.Equal(t, int64(0), pts)
require.Equal(t, int64(0), dts)
require.Equal(t, [][]byte{
{7, 1, 2, 3},
{8},
Expand All @@ -841,16 +862,16 @@ func TestClientFMP4LowLatency(t *testing.T) {
ntp, ok := c.AbsoluteTime(tracks[0])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 4, 33333333, time.UTC), ntp)
require.Equal(t, 33333333*time.Nanosecond, pts)
require.Equal(t, 33333333*time.Nanosecond, dts)
require.Equal(t, int64(3000), pts)
require.Equal(t, int64(3000), dts)
require.Equal(t, [][]byte{{1, 4, 5, 6}}, au)

case 2:
ntp, ok := c.AbsoluteTime(tracks[0])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 4, 66666666, time.UTC), ntp)
require.Equal(t, 66666666*time.Nanosecond, pts)
require.Equal(t, 66666666*time.Nanosecond, dts)
require.Equal(t, int64(6000), pts)
require.Equal(t, int64(6000), dts)
require.Equal(t, [][]byte{{1, 7, 8, 9}}, au)

default:
Expand Down
Loading

0 comments on commit 8a07427

Please sign in to comment.