From 7febd3ef92f42b422c86561913c160a3b3953ad9 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Mon, 20 Dec 2021 15:00:28 -0300 Subject: [PATCH] record-tester: Remove all fatal logs from m3utester2 --- internal/testers/m3utester2.go | 113 ++++++++++++++++------------ internal/testers/mediadownloader.go | 26 +++---- 2 files changed, 78 insertions(+), 61 deletions(-) diff --git a/internal/testers/m3utester2.go b/internal/testers/m3utester2.go index 6b5258b6..97729a75 100644 --- a/internal/testers/m3utester2.go +++ b/internal/testers/m3utester2.go @@ -190,13 +190,11 @@ func (mut *m3utester2) VODStats() model.VODStats { return vs } -func (mut *m3utester2) doSavePlaylist() { +func (mut *m3utester2) doSavePlaylist() error { if mut.savePlayList != nil { - err := ioutil.WriteFile(mut.savePlayListName, mut.savePlayList.Encode().Bytes(), 0644) - if err != nil { - glog.Fatal(err) - } + return ioutil.WriteFile(mut.savePlayListName, mut.savePlayList.Encode().Bytes(), 0644) } + return nil } func (mut *m3utester2) initSave(streamURL, mediaURL string) { @@ -221,7 +219,7 @@ func (mut *m3utester2) initSave(streamURL, mediaURL string) { } func newM3uMediaStream(ctx context.Context, cancel context.CancelFunc, name, resolution string, u *url.URL, wowzaMode bool, masterDR chan *downloadResult, - sm *segmentsMatcher, latencyResults chan *latencyResult, save, failIfTranscodingStops, statsOnly bool) *m3uMediaStream { + sm *segmentsMatcher, latencyResults chan *latencyResult, save, failIfTranscodingStops, statsOnly bool) (*m3uMediaStream, error) { ms := &m3uMediaStream{ finite: finite{ @@ -247,7 +245,7 @@ func newM3uMediaStream(ctx context.Context, cancel context.CancelFunc, name, res if ms.save { streamName, mediaStreamName, err := parseMediaURL(u.String()) if err != nil { - glog.Fatal(err) + return nil, err } ms.saveDirName = filepath.Join(streamName, mediaStreamName) ms.savePlayListName = filepath.Join(ms.saveDirName, mediaStreamName+".m3u8") @@ -256,14 +254,14 @@ func newM3uMediaStream(ctx context.Context, cancel context.CancelFunc, name, res } glog.Infof("Save dir name: '%s', main playlist save name %s", ms.saveDirName, ms.savePlayListName) mpl, err := m3u8.NewMediaPlaylist(0, 1024) - mpl.MediaType = m3u8.VOD - mpl.Live = false if err != nil { - panic(err) + return nil, err } + mpl.MediaType = m3u8.VOD + mpl.Live = false ms.savePlayList = mpl } - return ms + return ms, nil } func (mut *m3utester2) Stats() model.Stats1 { @@ -636,10 +634,17 @@ func (mut *m3utester2) manifestPullerLoop(waitForTarget time.Duration) { if mut.save { mut.initSave("", surl) mut.savePlayList.Append(mediaName+"/"+mediaName+".m3u8", nil, m3u8.VariantParams{Name: mediaName}) - mut.doSavePlaylist() + if err := mut.doSavePlaylist(); err != nil { + mut.fatalEnd(err) + return + } } - stream := newM3uMediaStream(mut.ctx, mut.cancel, mediaName, mres, mut.initialURL, mut.wowzaMode, mut.driftCheckResults, mut.segmentsMatcher, mut.latencyResults, + stream, err := newM3uMediaStream(mut.ctx, mut.cancel, mediaName, mres, mut.initialURL, mut.wowzaMode, mut.driftCheckResults, mut.segmentsMatcher, mut.latencyResults, mut.save, mut.failIfTranscodingStops, mut.statsOnly) + if err != nil { + mut.fatalEnd(err) + return + } mut.streams[resolution(mres)] = stream return } @@ -717,20 +722,28 @@ func (mut *m3utester2) manifestPullerLoop(waitForTarget time.Duration) { if mut.sourceRes == "" { mut.sourceRes = ress } - stream := newM3uMediaStream(mut.ctx, mut.cancel, variant.URI, ress, pvrui, mut.wowzaMode, mut.driftCheckResults, + stream, err := newM3uMediaStream(mut.ctx, mut.cancel, variant.URI, ress, pvrui, mut.wowzaMode, mut.driftCheckResults, mut.segmentsMatcher, mut.latencyResults, mut.save, mut.failIfTranscodingStops, mut.statsOnly) + if err != nil { + mut.fatalEnd(err) + return + } mut.streams[res] = stream if mut.save { needSavePlaylist = true _, mediaName, err := parseMediaURL(pvrui.String()) if err != nil { - glog.Fatal(err) + mut.fatalEnd(err) + return } mut.savePlayList.Append(mediaName+"/"+mediaName+".m3u8", nil, variant.VariantParams) } } if needSavePlaylist { - mut.doSavePlaylist() + if err := mut.doSavePlaylist(); err != nil { + mut.fatalEnd(err) + return + } } time.Sleep(2 * time.Second) } @@ -827,10 +840,12 @@ func (ms *m3uMediaStream) workerLoop(masterDR chan *downloadResult, latencyResul seg.Duration = dres.task.duration seg.Title = dres.task.title if err := ms.insertSegmentToSavePlaylist(dres.task.seqNo, seg); err != nil { - glog.Fatal(err) + ms.fatalEnd(err) + return } if err := ioutil.WriteFile(ms.savePlayListName, ms.savePlayList.Encode().Bytes(), 0644); err != nil { - glog.Fatal(err) + ms.fatalEnd(err) + return } go func(segFileName, fullpath string, b []byte) { if err := ioutil.WriteFile(fullpath, b, 0644); err != nil { @@ -1002,10 +1017,12 @@ func (ms *m3uMediaStream) manifestPullerLoop(wowzaMode bool) { gpl, plt, err := m3u8.Decode(*bytes.NewBuffer(b), true) if err != nil { - glog.Fatal(err) + ms.fatalEnd(err) + return } if plt != m3u8.MEDIA { - glog.Fatalf("Expecting media playlist, got %d (url=%s)", plt, surl) + ms.fatalEnd(fmt.Errorf("Expecting media playlist, got %d (url=%s)", plt, surl)) + return } pl := gpl.(*m3u8.MediaPlaylist) // pl, err := m3u8.NewMediaPlaylist(100, 100) @@ -1055,7 +1072,12 @@ func (ms *m3uMediaStream) manifestPullerLoop(wowzaMode bool) { ms.downloadResults <- &downloadResult{name: segment.URI, seqNo: segSeqNo, status: "200 OK", duration: time.Duration(segment.Duration * float64(time.Second))} } else { - ms.downTasks <- downloadTask{baseURL: ms.u, url: segment.URI, seqNo: segSeqNo, title: segment.Title, duration: segment.Duration, appTime: now} + segUrl, err := url.Parse(segment.URI) + if err != nil { + ms.fatalEnd(err) + return + } + ms.downTasks <- downloadTask{baseURL: ms.u, url: segUrl, seqNo: segSeqNo, title: segment.Title, duration: segment.Duration, appTime: now} ms.segmentsToDownload++ metrics.Census.IncSegmentsToDownload() } @@ -1085,37 +1107,32 @@ func (ms *m3uMediaStream) isFiniteDownloadsFinished() bool { } func (ms *m3uMediaStream) insertSegmentToSavePlaylist(seqNo uint64, seg *m3u8.MediaSegment) error { - var err error - err = ms.savePlayList.InsertSegment(seqNo, seg) - if err == m3u8.ErrPlaylistFull { - mpl, err := m3u8.NewMediaPlaylist(0, uint(len(ms.savePlayList.Segments)*2)) - if err != nil { - glog.Fatal(err) - } - mpl.TargetDuration = ms.savePlayList.TargetDuration - mpl.SeqNo = ms.savePlayList.SeqNo - mpl.MediaType = m3u8.VOD - mpl.Live = false - for _, oseg := range ms.savePlayList.Segments { - if oseg != nil { - if err = mpl.InsertSegment(oseg.SeqId, oseg); err != nil { - glog.Fatal(err) - } + err := ms.savePlayList.InsertSegment(seqNo, seg) + if err != m3u8.ErrPlaylistFull { + return err + } + mpl, err := m3u8.NewMediaPlaylist(0, uint(len(ms.savePlayList.Segments)*2)) + if err != nil { + return err + } + mpl.TargetDuration = ms.savePlayList.TargetDuration + mpl.SeqNo = ms.savePlayList.SeqNo + mpl.MediaType = m3u8.VOD + mpl.Live = false + for _, oseg := range ms.savePlayList.Segments { + if oseg != nil { + if err = mpl.InsertSegment(oseg.SeqId, oseg); err != nil { + return err } } - err = ms.savePlayList.InsertSegment(seqNo, seg) } - return err + return ms.savePlayList.InsertSegment(seqNo, seg) } func downloadSegment(task *downloadTask, res chan *downloadResult) { - purl, err := url.Parse(task.url) - if err != nil { - glog.Fatal(err) - } - fsurl := task.url - if !purl.IsAbs() { - fsurl = task.baseURL.ResolveReference(purl).String() + fsurl := task.url.String() + if !task.url.IsAbs() { + fsurl = task.baseURL.ResolveReference(task.url).String() } try := 0 for { @@ -1166,7 +1183,7 @@ func downloadSegment(task *downloadTask, res chan *downloadResult) { glog.V(model.DEBUG).Infof("==============>>>>>>>>>>>>> Saving segment %s", sn) ioutil.WriteFile(sn, b, 0644) sid := strconv.FormatInt(time.Now().Unix(), 10) - if savedName, service, serr := SaveToExternalStorage(sid+"_"+task.url, b); serr != nil { + if savedName, service, serr := SaveToExternalStorage(sid+"_"+task.url.String(), b); serr != nil { messenger.SendFatalMessage(fmt.Sprintf("Failure to save segment to %s %v", service, serr)) } else { messenger.SendMessage(fmt.Sprintf("Segment %s (which can't be parsed) saved to %s %s", task.url, service, savedName)) @@ -1178,7 +1195,7 @@ func downloadSegment(task *downloadTask, res chan *downloadResult) { // ioutil.WriteFile(sn, b, 0644) // glog.V(model.DEBUG).Infof("Download %s result: %s len %d timeStart %s segment duration %s", fsurl, resp.Status, len(b), fsttim, dur) glog.V(model.DEBUG).Infof("Download %s result: %s len %d timeStart %s segment duration %s took=%s", fsurl, resp.Status, len(b), fsttim, dur, time.Since(start)) - res <- &downloadResult{status: resp.Status, bytes: len(b), try: try, name: task.url, seqNo: task.seqNo, + res <- &downloadResult{status: resp.Status, bytes: len(b), try: try, name: task.url.String(), seqNo: task.seqNo, videoParseError: verr, startTime: fsttim, duration: dur, mySeqNo: task.mySeqNo, appTime: task.appTime, downloadCompetedAt: completedAt, downloadStartedAt: start, data: b, task: task, } diff --git a/internal/testers/mediadownloader.go b/internal/testers/mediadownloader.go index 7eaac4e7..7090a35c 100644 --- a/internal/testers/mediadownloader.go +++ b/internal/testers/mediadownloader.go @@ -41,7 +41,7 @@ type downloadStats struct { type downloadTask struct { baseURL *url.URL - url string + url *url.URL seqNo uint64 title string duration float64 @@ -186,13 +186,9 @@ func (md *mediaDownloader) statsFormatted() string { } func (md *mediaDownloader) downloadSegment(task *downloadTask, res chan downloadResult) { - purl, err := url.Parse(task.url) - if err != nil { - glog.Fatal(err) - } - fsurl := task.url - if !purl.IsAbs() { - fsurl = md.u.ResolveReference(purl).String() + fsurl := task.url.String() + if !task.url.IsAbs() { + fsurl = md.u.ResolveReference(task.url).String() } try := 0 for { @@ -250,7 +246,7 @@ func (md *mediaDownloader) downloadSegment(task *downloadTask, res chan download } } if md.picartoMode { - fsttim = time.Duration(mistGetTimeFromSegURI(task.url)) * time.Millisecond + fsttim = time.Duration(mistGetTimeFromSegURI(task.url.String())) * time.Millisecond } } else { // add keys @@ -319,7 +315,7 @@ func (md *mediaDownloader) downloadSegment(task *downloadTask, res chan download if md.saveSegmentsToDisk { seg := new(m3u8.MediaSegment) - seg.URI = task.url + seg.URI = task.url.String() seg.SeqId = task.seqNo seg.Duration = task.duration seg.Title = task.title @@ -331,7 +327,7 @@ func (md *mediaDownloader) downloadSegment(task *downloadTask, res chan download upts := strings.Split(fsurl, "/") // fn := upts[len(upts)-2] + "-" + path.Base(task.url) ind := len(upts) - 2 - fn := path.Base(task.url) + fn := path.Base(task.url.String()) if !md.livepeerNameSchema { // ind = 0 // fn = upts[0] @@ -378,7 +374,7 @@ func (md *mediaDownloader) downloadSegment(task *downloadTask, res chan download glog.V(model.DEBUG).Infof("Segment %s saved to %s", seg.URI, filepath.Join(md.saveDir, fn)) }(fn, fullpath, b) } - res <- downloadResult{status: resp.Status, bytes: len(b), try: try, name: task.url, seqNo: task.seqNo, downloadCompetedAt: now, + res <- downloadResult{status: resp.Status, bytes: len(b), try: try, name: task.url.String(), seqNo: task.seqNo, downloadCompetedAt: now, videoParseError: verr, startTime: fsttim, duration: dur, mySeqNo: task.mySeqNo, appTime: task.appTime, keyFrames: keyFrames} return } @@ -560,7 +556,11 @@ func (md *mediaDownloader) manifestDownloadLoop() { if err == nil { seqNo = parsedSeq } - md.downTasks <- downloadTask{url: segment.URI, seqNo: seqNo, title: segment.Title, duration: segment.Duration, mySeqNo: mySeqNo, appTime: now} + segUrl, err := url.Parse(segment.URI) + if err != nil { + glog.Fatal(err) + } + md.downTasks <- downloadTask{url: segUrl, seqNo: seqNo, title: segment.Title, duration: segment.Duration, mySeqNo: mySeqNo, appTime: now} md.segmentsToDownload++ now = now.Add(time.Millisecond) // glog.V(model.VERBOSE).Infof("segment %s is of length %f seqId=%d", segment.URI, segment.Duration, segment.SeqId)