Skip to content

Commit

Permalink
client: support reading streams in Low-Latency mode (#72) (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 authored Jan 1, 2024
1 parent 9c3bf60 commit 1c70dd1
Show file tree
Hide file tree
Showing 10 changed files with 411 additions and 79 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Features:

* Client

* Read streams in MPEG-TS or fMP4 format
* Read streams in MPEG-TS, fMP4 or Low-latency format
* Read tracks encoded with AV1, VP9, H265, H264, Opus, MPEG-4 Audio (AAC)
* Get absolute timestamp of incoming data

Expand Down
11 changes: 11 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type ClientOnDownloadStreamPlaylistFunc func(url string)
// ClientOnDownloadSegmentFunc is the prototype of Client.OnDownloadSegment.
type ClientOnDownloadSegmentFunc func(url string)

// ClientOnDownloadPartFunc is the prototype of Client.OnDownloadPart.
type ClientOnDownloadPartFunc func(url string)

// ClientOnDecodeErrorFunc is the prototype of Client.OnDecodeError.
type ClientOnDecodeErrorFunc func(err error)

Expand Down Expand Up @@ -88,6 +91,8 @@ type Client struct {
OnDownloadStreamPlaylist ClientOnDownloadStreamPlaylistFunc
// called before downloading a segment.
OnDownloadSegment ClientOnDownloadSegmentFunc
// called before downloading a part.
OnDownloadPart ClientOnDownloadPartFunc
// called when a non-fatal decode error occurs.
OnDecodeError ClientOnDecodeErrorFunc

Expand Down Expand Up @@ -130,6 +135,11 @@ func (c *Client) Start() error {
log.Printf("downloading segment %v", u)
}
}
if c.OnDownloadPart == nil {
c.OnDownloadPart = func(u string) {
log.Printf("downloading part %v", u)
}
}
if c.OnDecodeError == nil {
c.OnDecodeError = func(err error) {
log.Println(err.Error())
Expand Down Expand Up @@ -206,6 +216,7 @@ func (c *Client) runInner() error {
onDownloadPrimaryPlaylist: c.OnDownloadPrimaryPlaylist,
onDownloadStreamPlaylist: c.OnDownloadStreamPlaylist,
onDownloadSegment: c.OnDownloadSegment,
onDownloadPart: c.OnDownloadPart,
onDecodeError: c.OnDecodeError,
rp: rp,
onTracks: c.OnTracks,
Expand Down
32 changes: 28 additions & 4 deletions client_primary_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,27 @@ func checkSupport(codecs []string) bool {
return true
}

func clientDownloadPlaylist(ctx context.Context, httpClient *http.Client, ur *url.URL) (playlist.Playlist, error) {
func cloneURL(ur *url.URL) *url.URL {
return &url.URL{
Scheme: ur.Scheme,
Opaque: ur.Opaque,
User: ur.User,
Host: ur.Host,
Path: ur.Path,
RawPath: ur.RawPath,
OmitHost: ur.OmitHost,
ForceQuery: ur.ForceQuery,
RawQuery: ur.RawQuery,
Fragment: ur.Fragment,
RawFragment: ur.RawFragment,
}
}

func clientDownloadPlaylist(
ctx context.Context,
httpClient *http.Client,
ur *url.URL,
) (playlist.Playlist, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, ur.String(), nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -103,6 +123,7 @@ type clientPrimaryDownloader struct {
onDownloadPrimaryPlaylist ClientOnDownloadPrimaryPlaylistFunc
onDownloadStreamPlaylist ClientOnDownloadStreamPlaylistFunc
onDownloadSegment ClientOnDownloadSegmentFunc
onDownloadPart ClientOnDownloadPartFunc
onDecodeError ClientOnDecodeErrorFunc
rp *clientRoutinePool
onTracks ClientOnTracksFunc
Expand Down Expand Up @@ -145,9 +166,10 @@ func (d *clientPrimaryDownloader) run(ctx context.Context) error {
httpClient: d.httpClient,
onDownloadStreamPlaylist: d.onDownloadStreamPlaylist,
onDownloadSegment: d.onDownloadSegment,
onDownloadPart: d.onDownloadPart,
onDecodeError: d.onDecodeError,
playlistURL: d.primaryPlaylistURL,
initialPlaylist: plt,
firstPlaylist: plt,
rp: d.rp,
onStreamTracks: d.onStreamTracks,
onStreamEnded: d.onStreamEnded,
Expand All @@ -174,9 +196,10 @@ func (d *clientPrimaryDownloader) run(ctx context.Context) error {
httpClient: d.httpClient,
onDownloadStreamPlaylist: d.onDownloadStreamPlaylist,
onDownloadSegment: d.onDownloadSegment,
onDownloadPart: d.onDownloadPart,
onDecodeError: d.onDecodeError,
playlistURL: u,
initialPlaylist: nil,
firstPlaylist: nil,
rp: d.rp,
onStreamTracks: d.onStreamTracks,
onStreamEnded: d.onStreamEnded,
Expand Down Expand Up @@ -204,9 +227,10 @@ func (d *clientPrimaryDownloader) run(ctx context.Context) error {
httpClient: d.httpClient,
onDownloadStreamPlaylist: d.onDownloadStreamPlaylist,
onDownloadSegment: d.onDownloadSegment,
onDownloadPart: d.onDownloadPart,
onDecodeError: d.onDecodeError,
playlistURL: u,
initialPlaylist: nil,
firstPlaylist: nil,
rp: d.rp,
onStreamTracks: d.onStreamTracks,
onSetLeadingTimeSync: d.onSetLeadingTimeSync,
Expand Down
162 changes: 123 additions & 39 deletions client_stream_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,39 +34,39 @@ type clientStreamDownloader struct {
httpClient *http.Client
onDownloadStreamPlaylist ClientOnDownloadStreamPlaylistFunc
onDownloadSegment ClientOnDownloadSegmentFunc
onDownloadPart ClientOnDownloadPartFunc
onDecodeError ClientOnDecodeErrorFunc
playlistURL *url.URL
initialPlaylist *playlist.Media
firstPlaylist *playlist.Media
rp *clientRoutinePool
onStreamTracks clientOnStreamTracksFunc
onStreamEnded func(context.Context)
onSetLeadingTimeSync func(clientTimeSync)
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool)
onData map[*Track]interface{}

segmentQueue *clientSegmentQueue
curSegmentID *int
}

func (d *clientStreamDownloader) run(ctx context.Context) error {
initialPlaylist := d.initialPlaylist
d.initialPlaylist = nil
if initialPlaylist == nil {
if d.firstPlaylist == nil {
var err error
initialPlaylist, err = d.downloadPlaylist(ctx)
d.firstPlaylist, err = d.downloadPlaylist(ctx, false)
if err != nil {
return err
}
}

segmentQueue := &clientSegmentQueue{}
segmentQueue.initialize()
d.segmentQueue = &clientSegmentQueue{}
d.segmentQueue.initialize()

if initialPlaylist.Map != nil && initialPlaylist.Map.URI != "" {
if d.firstPlaylist.Map != nil && d.firstPlaylist.Map.URI != "" {
byts, err := d.downloadSegment(
ctx,
initialPlaylist.Map.URI,
initialPlaylist.Map.ByteRangeStart,
initialPlaylist.Map.ByteRangeLength)
d.firstPlaylist.Map.URI,
d.firstPlaylist.Map.ByteRangeStart,
d.firstPlaylist.Map.ByteRangeLength)
if err != nil {
return err
}
Expand All @@ -75,25 +75,21 @@ func (d *clientStreamDownloader) run(ctx context.Context) error {
ctx: ctx,
isLeading: d.isLeading,
initFile: byts,
segmentQueue: segmentQueue,
segmentQueue: d.segmentQueue,
rp: d.rp,
onStreamTracks: d.onStreamTracks,
onStreamEnded: d.onStreamEnded,
onSetLeadingTimeSync: d.onSetLeadingTimeSync,
onGetLeadingTimeSync: d.onGetLeadingTimeSync,
onData: d.onData,
}
err = proc.initialize()
if err != nil {
return err
}

proc.initialize()
d.rp.add(proc)
} else {
proc := &clientStreamProcessorMPEGTS{
onDecodeError: d.onDecodeError,
isLeading: d.isLeading,
segmentQueue: segmentQueue,
segmentQueue: d.segmentQueue,
rp: d.rp,
onStreamTracks: d.onStreamTracks,
onStreamEnded: d.onStreamEnded,
Expand All @@ -105,33 +101,78 @@ func (d *clientStreamDownloader) run(ctx context.Context) error {
d.rp.add(proc)
}

err := d.fillSegmentQueue(ctx, initialPlaylist, segmentQueue)
if err != nil {
return err
if d.firstPlaylist.ServerControl != nil &&
d.firstPlaylist.ServerControl.CanBlockReload &&
d.firstPlaylist.PreloadHint != nil {
return d.runLowLatency(ctx)
}

return d.runTraditional(ctx)
}

func (d *clientStreamDownloader) runLowLatency(ctx context.Context) error {
pl := d.firstPlaylist

for {
ok := segmentQueue.waitUntilSizeIsBelow(ctx, 1)
if !ok {
return fmt.Errorf("terminated")
byts, err := d.downloadPreloadHint(ctx, pl.PreloadHint)
if err != nil {
return err
}

d.segmentQueue.push(&segmentData{
// dateTime: seg.DateTime,
payload: byts,
})

pl, err = d.downloadPlaylist(ctx, d.firstPlaylist.ServerControl.CanSkipUntil != nil)
if err != nil {
return err
}

if pl.PreloadHint == nil {
return fmt.Errorf("preload hint disappeared")
}
}
}

pl, err := d.downloadPlaylist(ctx)
func (d *clientStreamDownloader) runTraditional(ctx context.Context) error {
pl := d.firstPlaylist

for {
err := d.fillSegmentQueue(ctx, pl)
if err != nil {
return err
}

err = d.fillSegmentQueue(ctx, pl, segmentQueue)
ok := d.segmentQueue.waitUntilSizeIsBelow(ctx, 1)
if !ok {
return fmt.Errorf("terminated")
}

pl, err = d.downloadPlaylist(ctx, false)
if err != nil {
return err
}
}
}

func (d *clientStreamDownloader) downloadPlaylist(ctx context.Context) (*playlist.Media, error) {
d.onDownloadStreamPlaylist(d.playlistURL.String())
func (d *clientStreamDownloader) downloadPlaylist(
ctx context.Context,
skipUntil bool,
) (*playlist.Media, error) {
ur := d.playlistURL

if skipUntil {
newUR := cloneURL(ur)
q := newUR.Query()
q.Add("_HLS_skip", "YES")
newUR.RawQuery = q.Encode()
ur = newUR
}

d.onDownloadStreamPlaylist(ur.String())

pl, err := clientDownloadPlaylist(ctx, d.httpClient, d.playlistURL)
pl, err := clientDownloadPlaylist(ctx, d.httpClient, ur)
if err != nil {
return nil, err
}
Expand All @@ -144,6 +185,46 @@ func (d *clientStreamDownloader) downloadPlaylist(ctx context.Context) (*playlis
return plt, nil
}

func (d *clientStreamDownloader) downloadPreloadHint(
ctx context.Context,
preloadHint *playlist.MediaPreloadHint,
) ([]byte, error) {
u, err := clientAbsoluteURL(d.playlistURL, preloadHint.URI)
if err != nil {
return nil, err
}

d.onDownloadPart(u.String())

req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}

if preloadHint.ByteRangeLength != nil {
req.Header.Add("Range", "bytes="+
strconv.FormatUint(preloadHint.ByteRangeStart, 10)+"-"+
strconv.FormatUint(preloadHint.ByteRangeStart+*preloadHint.ByteRangeLength-1, 10))
}

res, err := d.httpClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusPartialContent {
return nil, fmt.Errorf("bad status code: %d", res.StatusCode)
}

byts, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}

return byts, nil
}

func (d *clientStreamDownloader) downloadSegment(
ctx context.Context,
uri string,
Expand All @@ -167,7 +248,8 @@ func (d *clientStreamDownloader) downloadSegment(
v := uint64(0)
start = &v
}
req.Header.Add("Range", "bytes="+strconv.FormatUint(*start, 10)+"-"+strconv.FormatUint(*start+*length-1, 10))
req.Header.Add("Range", "bytes="+strconv.FormatUint(*start, 10)+
"-"+strconv.FormatUint(*start+*length-1, 10))
}

res, err := d.httpClient.Do(req)
Expand All @@ -191,22 +273,24 @@ func (d *clientStreamDownloader) downloadSegment(
func (d *clientStreamDownloader) fillSegmentQueue(
ctx context.Context,
pl *playlist.Media,
segmentQueue *clientSegmentQueue,
) error {
var seg *playlist.MediaSegment
var segPos int

if d.curSegmentID == nil {
if !pl.Endlist { // live stream: start from clientLiveInitialDistance
seg, segPos = findSegmentWithInvPosition(pl.Segments, clientLiveInitialDistance)
if seg == nil {
return fmt.Errorf("there aren't enough segments to fill the buffer")
}
} else { // VOD stream: start from beginning
if d.firstPlaylist.PlaylistType != nil &&
*d.firstPlaylist.PlaylistType == playlist.MediaPlaylistTypeVOD {
// VOD stream: start from the beginning
if len(pl.Segments) == 0 {
return fmt.Errorf("no segments found")
}
seg = pl.Segments[0]
} else {
// live stream: start from clientLiveInitialDistance
seg, segPos = findSegmentWithInvPosition(pl.Segments, clientLiveInitialDistance)
if seg == nil {
return fmt.Errorf("there aren't enough segments to fill the buffer")
}
}
} else {
var invPos int
Expand All @@ -228,13 +312,13 @@ func (d *clientStreamDownloader) fillSegmentQueue(
return err
}

segmentQueue.push(&segmentData{
d.segmentQueue.push(&segmentData{
dateTime: seg.DateTime,
payload: byts,
})

if pl.Endlist && pl.Segments[len(pl.Segments)-1] == seg {
segmentQueue.push(nil)
d.segmentQueue.push(nil)
<-ctx.Done()
return fmt.Errorf("terminated")
}
Expand Down
Loading

0 comments on commit 1c70dd1

Please sign in to comment.