From 18ce7a7cbff87ee3b466bf5d62aebf52fe687374 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Tue, 16 May 2023 15:35:37 -0300 Subject: [PATCH 1/7] recordtester: Check the right number of sessions --- internal/app/recordtester/recordtester_app.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/internal/app/recordtester/recordtester_app.go b/internal/app/recordtester/recordtester_app.go index 7d51baf3..c444f7aa 100644 --- a/internal/app/recordtester/recordtester_app.go +++ b/internal/app/recordtester/recordtester_app.go @@ -258,17 +258,20 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. return 252, err } glog.Infof("Sessions: %+v", sessions) - sessionsLength := len(sessions) - if sessionsLength == 2 { - // We often see failures for 2 sessions, this should be fixed once we move to catalyst recording but for now - // we want to ignore these to reduce the alert noise - return 0, nil - } else if sessionsLength != 1 { - err := fmt.Errorf("invalid session count, got %d", sessionsLength) + + expectedSessions := 1 + if pauseDuration > 0 { + expectedSessions = 2 + } + + if len(sessions) != expectedSessions { + err := fmt.Errorf("invalid session count, expected %d but got %d", + expectedSessions, len(sessions)) glog.Error(err) // exit(251, fileName, *fileArg, err) return 251, err } + sess := sessions[0] if len(sess.Profiles) != len(stream.Profiles) { glog.Infof("session: %+v", sess) From bc00f719110c2fb9f0d4be179c61d1a0ef0881f6 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Tue, 16 May 2023 15:40:14 -0300 Subject: [PATCH 2/7] recordtester: Allow configuring recording wait time --- cmd/recordtester/recordtester.go | 5 ++++- internal/app/recordtester/recordtester_app.go | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/cmd/recordtester/recordtester.go b/cmd/recordtester/recordtester.go index 94814b51..1678c4d9 100644 --- a/cmd/recordtester/recordtester.go +++ b/cmd/recordtester/recordtester.go @@ -165,6 +165,8 @@ func main() { vodImportUrl := fs.String("vod-import-url", "https://storage.googleapis.com/lp_testharness_assets/bbb_sunflower_1080p_30fps_normal_2min.mp4", "URL for VOD import") continuousTest := fs.Duration("continuous-test", 0, "Do continuous testing") useHttp := fs.Bool("http", false, "Do HTTP tests instead of RTMP") + forceRecordingUrl := fs.Bool("force-recording-url", false, "Whether to force the API to return a recording URL (skip the user session timeout)") + recordingWaitTime := fs.Duration("recording-wait-time", 6*time.Minute+20*time.Second, "How long to wait after the stream ends before checking for recording") testMP4 := fs.Bool("mp4", false, "Download MP4 of recording") testStreamHealth := fs.Bool("stream-health", false, "Check stream health during test") testLive := fs.Bool("live", false, "Check Live workflow") @@ -330,7 +332,8 @@ func main() { Analyzers: lanalyzers, Ingest: ingest, RecordObjectStoreId: *recordObjectStoreId, - UseForceURL: true, + UseForceURL: *forceRecordingUrl, + RecordingWaitTime: *recordingWaitTime, UseHTTP: *useHttp, TestMP4: *testMP4, TestStreamHealth: *testStreamHealth, diff --git a/internal/app/recordtester/recordtester_app.go b/internal/app/recordtester/recordtester_app.go index c444f7aa..46bab310 100644 --- a/internal/app/recordtester/recordtester_app.go +++ b/internal/app/recordtester/recordtester_app.go @@ -50,6 +50,7 @@ type ( Ingest *api.Ingest RecordObjectStoreId string UseForceURL bool + RecordingWaitTime time.Duration UseHTTP bool TestMP4 bool TestStreamHealth bool @@ -63,6 +64,7 @@ type ( ingest *api.Ingest recordObjectStoreId string useForceURL bool + recordingWaitTime time.Duration useHTTP bool mp4 bool streamHealth bool @@ -86,6 +88,7 @@ func NewRecordTester(gctx context.Context, opts RecordTesterOptions, serfOpts Se cancel: cancel, recordObjectStoreId: opts.RecordObjectStoreId, useForceURL: opts.UseForceURL, + recordingWaitTime: opts.RecordingWaitTime, useHTTP: opts.UseHTTP, mp4: opts.TestMP4, streamHealth: opts.TestStreamHealth, @@ -292,7 +295,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. if rt.useForceURL { time.Sleep(5 * time.Second) } else { - time.Sleep(6*time.Minute + 20*time.Second) + time.Sleep(rt.recordingWaitTime) } if err = rt.isCancelled(); err != nil { return 0, err From 1d0bbe7c7844322d4335feeaed29c9ae917f78db Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Tue, 16 May 2023 15:48:20 -0300 Subject: [PATCH 3/7] recordtester: Test both streamed sessions --- internal/app/recordtester/recordtester_app.go | 76 +++++++++---------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/internal/app/recordtester/recordtester_app.go b/internal/app/recordtester/recordtester_app.go index 46bab310..5a6eea7b 100644 --- a/internal/app/recordtester/recordtester_app.go +++ b/internal/app/recordtester/recordtester_app.go @@ -312,51 +312,51 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. return 0, err } - sess = sessions[0] - statusShould := api.RecordingStatusReady - if rt.useForceURL { - statusShould = api.RecordingStatusWaiting - } - if sess.RecordingStatus != statusShould { - err := fmt.Errorf("recording status is %s but should be %s", sess.RecordingStatus, statusShould) - return 240, err - // exit(250, fileName, *fileArg, err) - } - if sess.RecordingURL == "" { - err := fmt.Errorf("recording URL should appear by now") - return 249, err - // exit(249, fileName, *fileArg, err) - } - glog.Infof("recordingURL=%s downloading now", sess.RecordingURL) + for _, sess := range sessions { + sess = sessions[0] + statusShould := api.RecordingStatusReady + if rt.useForceURL { + statusShould = api.RecordingStatusWaiting + } + if sess.RecordingStatus != statusShould { + err := fmt.Errorf("recording status is %s but should be %s", sess.RecordingStatus, statusShould) + return 240, err + // exit(250, fileName, *fileArg, err) + } + if sess.RecordingURL == "" { + err := fmt.Errorf("recording URL should appear by now") + return 249, err + // exit(249, fileName, *fileArg, err) + } + glog.Infof("recordingURL=%s downloading now", sess.RecordingURL) - // started := time.Now() - // downloader := testers.NewM3utester2(gctx, sess.RecordingURL, false, false, false, false, 5*time.Second, nil) - // <-downloader.Done() - // glog.Infof(`Pulling stopped after %s`, time.Since(started)) - // exit(55, fileName, *fileArg, err) - glog.Info("Done Record Test") + // started := time.Now() + // downloader := testers.NewM3utester2(gctx, sess.RecordingURL, false, false, false, false, 5*time.Second, nil) + // <-downloader.Done() + // glog.Infof(`Pulling stopped after %s`, time.Since(started)) + // exit(55, fileName, *fileArg, err) + glog.Info("Done Record Test") - // lapi.DeleteStream(stream.ID) - // exit(0, fileName, *fileArg, err) - if err = rt.isCancelled(); err != nil { - return 0, err - } - if rt.mp4 { - es, err := rt.checkDownMp4(stream, sess.Mp4Url, testDuration, pauseDuration > 0) + // lapi.DeleteStream(stream.ID) + // exit(0, fileName, *fileArg, err) + if err = rt.isCancelled(); err != nil { + return 0, err + } + if rt.mp4 { + es, err := rt.checkDownMp4(stream, sess.Mp4Url, testDuration, pauseDuration > 0) + if err != nil { + return es, err + } + } + + es, err := rt.checkDown(stream, sess.RecordingURL, testDuration, pauseDuration > 0) if err != nil { return es, err } } - es, err := rt.checkDown(stream, sess.RecordingURL, testDuration, pauseDuration > 0) - if es == 0 { - rt.lapi.DeleteStream(stream.ID) - // exit(0, fileName, *fileArg, err) - } - - // uploader := testers.NewRtmpStreamer(gctx, rtmpURL) - // uploader.StartUpload(fileName, rtmpURL, -1, 30*time.Second) - return es, err + rt.lapi.DeleteStream(stream.ID) + return 0, nil } func (rt *recordTester) getIngestInfo() (*api.Ingest, error) { From 72c03d079e933e1433fc9984c342363c31f646bd Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Thu, 18 May 2023 19:33:59 -0300 Subject: [PATCH 4/7] recordtester: Fix duration checks (no double anymore) --- internal/app/recordtester/recordtester_app.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/internal/app/recordtester/recordtester_app.go b/internal/app/recordtester/recordtester_app.go index 5a6eea7b..6b8f7f4c 100644 --- a/internal/app/recordtester/recordtester_app.go +++ b/internal/app/recordtester/recordtester_app.go @@ -343,13 +343,13 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. return 0, err } if rt.mp4 { - es, err := rt.checkDownMp4(stream, sess.Mp4Url, testDuration, pauseDuration > 0) + es, err := rt.checkDownMp4(stream, sess.Mp4Url, testDuration) if err != nil { return es, err } } - es, err := rt.checkDown(stream, sess.RecordingURL, testDuration, pauseDuration > 0) + es, err := rt.checkDown(stream, sess.RecordingURL, testDuration) if err != nil { return es, err } @@ -421,7 +421,7 @@ func (rt *recordTester) isCancelled() error { return nil } -func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDuration time.Duration, doubled bool) (int, error) { +func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDuration time.Duration) (int, error) { es := 0 started := time.Now() glog.V(model.VERBOSE).Infof("Downloading mp4 url=%s stream id=%s", url, stream.ID) @@ -455,15 +455,11 @@ func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDurat glog.Warningf("Error parsing mp4 for manifestID=%s url=%s err=%v", stream.ID, url, err) return 203, err } - durDiffShould := 2 * time.Second - if doubled { - durDiffShould *= durDiffShould - } durDiff := streamDuration - dur if durDiff < 0 { durDiff = -durDiff } - if durDiff > durDiffShould { + if durDiff > 2*time.Second { ers := fmt.Errorf("duration of mp4 differ by %s (got %s, should %s)", durDiff, dur, streamDuration) glog.Error(ers) return 300, err @@ -471,7 +467,7 @@ func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDurat return es, nil } -func (rt *recordTester) checkDown(stream *api.Stream, url string, streamDuration time.Duration, doubled bool) (int, error) { +func (rt *recordTester) checkDown(stream *api.Stream, url string, streamDuration time.Duration) (int, error) { es := 0 started := time.Now() downloader := testers.NewM3utester2(rt.ctx, url, false, false, false, false, 5*time.Second, nil, false) @@ -488,7 +484,7 @@ func (rt *recordTester) checkDown(stream *api.Stream, url string, streamDuration } glog.Infof("Stats for %s: %s", stream.ID, vs.String()) glog.Infof("Stats for %s raw: %+v", stream.ID, vs) - if ok, ers := vs.IsOk(streamDuration, doubled); !ok { + if ok, ers := vs.IsOk(streamDuration, false); !ok { glog.Warningf("NOT OK! (%s)", ers) es = 36 return es, errors.New(ers) From 14501959357a15efa1df6d4597db79ed5abbb455 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Thu, 18 May 2023 19:39:39 -0300 Subject: [PATCH 5/7] recordtester: Stop doubling testDuration var That was making the checks for the double size of the recording --- internal/app/recordtester/recordtester_app.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/app/recordtester/recordtester_app.go b/internal/app/recordtester/recordtester_app.go index 6b8f7f4c..57beeb13 100644 --- a/internal/app/recordtester/recordtester_app.go +++ b/internal/app/recordtester/recordtester_app.go @@ -194,7 +194,6 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. glog.Warningf("Second time streaming returned error err=%v", sterr) return 3, err } - testDuration *= 2 } } else { @@ -245,7 +244,6 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. if err = rt.isCancelled(); err != nil { return 0, err } - testDuration *= 2 } } if err := rt.isCancelled(); err != nil { From 570fc5b12ec89a04a71ff5e0a5f8fea47dc24158 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Thu, 18 May 2023 19:48:33 -0300 Subject: [PATCH 6/7] rt: Create helper var and comment about pauseDuration --- internal/app/recordtester/recordtester_app.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/internal/app/recordtester/recordtester_app.go b/internal/app/recordtester/recordtester_app.go index 57beeb13..b24c7e4b 100644 --- a/internal/app/recordtester/recordtester_app.go +++ b/internal/app/recordtester/recordtester_app.go @@ -169,6 +169,10 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. }) } + // when pauseDuration is set, we will stream the same file twice sleeping for + // the specified duration in between each. + streamTwice := pauseDuration > 0 + mediaURL := fmt.Sprintf("%s/%s/index.m3u8", ingest.Playback, stream.PlaybackID) if rt.serfOpts.UseSerf { index := 0 @@ -186,7 +190,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. glog.Warningf("Streaming returned error err=%v", sterr) return 3, err } - if pauseDuration > 0 { + if streamTwice { glog.Infof("Pause specified, waiting %s before streaming second time", pauseDuration) time.Sleep(pauseDuration) sterr = rt.doOneHTTPStream(fileName, streamName, broadcasters[0], testDuration, stream) @@ -219,7 +223,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. if err = rt.isCancelled(); err != nil { return 0, err } - if pauseDuration > 0 { + if streamTwice { glog.Infof("Pause specified, waiting %s before streaming second time", pauseDuration) time.Sleep(pauseDuration) sr2 := testers.NewStreamer2(rt.ctx, testers.Streamer2Options{MistMode: true}, testerFuncs...) @@ -261,7 +265,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. glog.Infof("Sessions: %+v", sessions) expectedSessions := 1 - if pauseDuration > 0 { + if streamTwice { expectedSessions = 2 } From 63daf7d280c78b3fa90fd8cee4a286784e6a7044 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Thu, 18 May 2023 19:50:59 -0300 Subject: [PATCH 7/7] rt: Remove all commented out code --- internal/app/recordtester/recordtester_app.go | 39 +++---------------- 1 file changed, 5 insertions(+), 34 deletions(-) diff --git a/internal/app/recordtester/recordtester_app.go b/internal/app/recordtester/recordtester_app.go index b24c7e4b..ee8f2d2a 100644 --- a/internal/app/recordtester/recordtester_app.go +++ b/internal/app/recordtester/recordtester_app.go @@ -113,7 +113,6 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. apiTry++ continue } - // exit(255, fileName, *fileArg, err) return 255, err } break @@ -121,20 +120,13 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. apiTry = 0 glog.Infof("Got broadcasters: %+v", broadcasters) fmt.Printf("Streaming video file '%s'\n", fileName) - /* - httpIngestURLTemplates := make([]string, 0, len(broadcasters)) - for _, b := range broadcasters { - httpIngestURLTemplates = append(httpIngestURLTemplates, fmt.Sprintf("%s/live/%%s", b)) - } - */ + if rt.useHTTP && len(broadcasters) == 0 { - // exit(254, fileName, *fileArg, errors.New("Empty list of broadcasters")) return 254, errors.New("empty list of broadcasters") } else if (!rt.useHTTP && ingest.Ingest == "") || ingest.Playback == "" { return 254, errors.New("empty ingest URLs") - // exit(254, fileName, *fileArg, errors.New("Empty list of ingests")) } - // glog.Infof("All cool!") + hostName, _ := os.Hostname() streamName := fmt.Sprintf("%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00")) var stream *api.Stream @@ -146,7 +138,6 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. continue } glog.Errorf("Error creating stream using Livepeer API: %v", err) - // exit(253, fileName, *fileArg, err) return 253, err } break @@ -155,12 +146,8 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. rt.streamID = stream.ID rt.stream = stream messenger.SendMessage(fmt.Sprintf(":information_source: Created stream id=%s", stream.ID)) - // createdAPIStreams = append(createdAPIStreams, stream.ID) glog.V(model.VERBOSE).Infof("Created Livepeer stream id=%s streamKey=%s playbackId=%s name=%s", stream.ID, stream.StreamKey, stream.PlaybackID, streamName) - // glog.Infof("Waiting 5 second for stream info to propagate to the Postgres replica") - // time.Sleep(5 * time.Second) rtmpURL := fmt.Sprintf("%s/%s", ingest.Ingest, stream.StreamKey) - // rtmpURL = fmt.Sprintf("%s/%s", ingests[0].Ingest, stream.ID) testerFuncs := []testers.StartTestFunc{} if rt.streamHealth { @@ -203,8 +190,8 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. sr2 := testers.NewStreamer2(rt.ctx, testers.Streamer2Options{MistMode: true}, testerFuncs...) sr2.StartStreaming(fileName, rtmpURL, mediaURL, 2*time.Minute, testDuration) - // <-sr2.Done() srerr := sr2.Err() + glog.Infof("Streaming stream id=%s done err=%v", stream.ID, srerr) var re *testers.RTMPError if errors.As(srerr, &re) { @@ -259,7 +246,6 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. sessions, err := rt.lapi.GetSessionsNew(stream.ID, false) if err != nil { glog.Errorf("Error getting sessions for stream id=%s err=%v", stream.ID, err) - // exit(252, fileName, *fileArg, err) return 252, err } glog.Infof("Sessions: %+v", sessions) @@ -273,7 +259,6 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. err := fmt.Errorf("invalid session count, expected %d but got %d", expectedSessions, len(sessions)) glog.Error(err) - // exit(251, fileName, *fileArg, err) return 251, err } @@ -282,12 +267,10 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. glog.Infof("session: %+v", sess) err := fmt.Errorf("got %d profiles but should have %d", len(sess.Profiles), len(stream.Profiles)) return 251, err - // exit(251, fileName, *fileArg, err) } if sess.RecordingStatus != api.RecordingStatusWaiting { err := fmt.Errorf("recording status is %s but should be %s", sess.RecordingStatus, api.RecordingStatusWaiting) return 250, err - // exit(250, fileName, *fileArg, err) } if err = rt.isCancelled(); err != nil { return 0, err @@ -307,7 +290,6 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. if err != nil { err := fmt.Errorf("error getting sessions for stream id=%s err=%v", stream.ID, err) return 252, err - // exit(252, fileName, *fileArg, err) } glog.Infof("Sessions: %+v", sessions) if err = rt.isCancelled(); err != nil { @@ -323,24 +305,13 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. if sess.RecordingStatus != statusShould { err := fmt.Errorf("recording status is %s but should be %s", sess.RecordingStatus, statusShould) return 240, err - // exit(250, fileName, *fileArg, err) } if sess.RecordingURL == "" { err := fmt.Errorf("recording URL should appear by now") return 249, err - // exit(249, fileName, *fileArg, err) } glog.Infof("recordingURL=%s downloading now", sess.RecordingURL) - // started := time.Now() - // downloader := testers.NewM3utester2(gctx, sess.RecordingURL, false, false, false, false, 5*time.Second, nil) - // <-downloader.Done() - // glog.Infof(`Pulling stopped after %s`, time.Since(started)) - // exit(55, fileName, *fileArg, err) - glog.Info("Done Record Test") - - // lapi.DeleteStream(stream.ID) - // exit(0, fileName, *fileArg, err) if err = rt.isCancelled(); err != nil { return 0, err } @@ -356,6 +327,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. return es, err } } + glog.Info("Done Record Test") rt.lapi.DeleteStream(stream.ID) return 0, nil @@ -398,7 +370,6 @@ func (rt *recordTester) doOneHTTPStream(fileName, streamName, broadcasterURL str continue } glog.Errorf("Error creating stream session using Livepeer API: %v", err) - // exit(253, fileName, *fileArg, err) return err } break @@ -407,7 +378,7 @@ func (rt *recordTester) doOneHTTPStream(fileName, streamName, broadcasterURL str httpIngestBaseURL := fmt.Sprintf("%s/live/%s", broadcasterURL, session.ID) glog.Infof("httpIngestBaseURL=%s", httpIngestBaseURL) hs.StartUpload(fileName, httpIngestBaseURL, stream.ID, -1, -1, testDuration, 0) - // <-hs.Done() + stats, err := hs.Stats() glog.Infof("Streaming stream id=%s done err=%v", stream.ID, err) glog.Infof("Stats: %+v", stats)