Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recordtester: Fix session checks for catalyst recording tests #293

Merged
merged 7 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/recordtester/recordtester.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ func main() {
vodImportUrl := fs.String("vod-import-url", "https://storage.googleapis.com/lp_testharness_assets/bbb_sunflower_1080p_30fps_normal_2min.mp4", "URL for VOD import")
continuousTest := fs.Duration("continuous-test", 0, "Do continuous testing")
useHttp := fs.Bool("http", false, "Do HTTP tests instead of RTMP")
forceRecordingUrl := fs.Bool("force-recording-url", false, "Whether to force the API to return a recording URL (skip the user session timeout)")
recordingWaitTime := fs.Duration("recording-wait-time", 6*time.Minute+20*time.Second, "How long to wait after the stream ends before checking for recording")
testMP4 := fs.Bool("mp4", false, "Download MP4 of recording")
testStreamHealth := fs.Bool("stream-health", false, "Check stream health during test")
testLive := fs.Bool("live", false, "Check Live workflow")
Expand Down Expand Up @@ -330,7 +332,8 @@ func main() {
Analyzers: lanalyzers,
Ingest: ingest,
RecordObjectStoreId: *recordObjectStoreId,
UseForceURL: true,
UseForceURL: *forceRecordingUrl,
RecordingWaitTime: *recordingWaitTime,
UseHTTP: *useHttp,
TestMP4: *testMP4,
TestStreamHealth: *testStreamHealth,
Expand Down
99 changes: 37 additions & 62 deletions internal/app/recordtester/recordtester_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type (
Ingest *api.Ingest
RecordObjectStoreId string
UseForceURL bool
RecordingWaitTime time.Duration
UseHTTP bool
TestMP4 bool
TestStreamHealth bool
Expand All @@ -63,6 +64,7 @@ type (
ingest *api.Ingest
recordObjectStoreId string
useForceURL bool
recordingWaitTime time.Duration
useHTTP bool
mp4 bool
streamHealth bool
Expand All @@ -86,6 +88,7 @@ func NewRecordTester(gctx context.Context, opts RecordTesterOptions, serfOpts Se
cancel: cancel,
recordObjectStoreId: opts.RecordObjectStoreId,
useForceURL: opts.UseForceURL,
recordingWaitTime: opts.RecordingWaitTime,
useHTTP: opts.UseHTTP,
mp4: opts.TestMP4,
streamHealth: opts.TestStreamHealth,
Expand All @@ -110,28 +113,20 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
apiTry++
continue
}
// exit(255, fileName, *fileArg, err)
return 255, err
}
break
}
apiTry = 0
glog.Infof("Got broadcasters: %+v", broadcasters)
fmt.Printf("Streaming video file '%s'\n", fileName)
/*
httpIngestURLTemplates := make([]string, 0, len(broadcasters))
for _, b := range broadcasters {
httpIngestURLTemplates = append(httpIngestURLTemplates, fmt.Sprintf("%s/live/%%s", b))
}
*/

if rt.useHTTP && len(broadcasters) == 0 {
// exit(254, fileName, *fileArg, errors.New("Empty list of broadcasters"))
return 254, errors.New("empty list of broadcasters")
} else if (!rt.useHTTP && ingest.Ingest == "") || ingest.Playback == "" {
return 254, errors.New("empty ingest URLs")
// exit(254, fileName, *fileArg, errors.New("Empty list of ingests"))
}
// glog.Infof("All cool!")

hostName, _ := os.Hostname()
streamName := fmt.Sprintf("%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00"))
var stream *api.Stream
Expand All @@ -143,7 +138,6 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
continue
}
glog.Errorf("Error creating stream using Livepeer API: %v", err)
// exit(253, fileName, *fileArg, err)
return 253, err
}
break
Expand All @@ -152,12 +146,8 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
rt.streamID = stream.ID
rt.stream = stream
messenger.SendMessage(fmt.Sprintf(":information_source: Created stream id=%s", stream.ID))
// createdAPIStreams = append(createdAPIStreams, stream.ID)
glog.V(model.VERBOSE).Infof("Created Livepeer stream id=%s streamKey=%s playbackId=%s name=%s", stream.ID, stream.StreamKey, stream.PlaybackID, streamName)
// glog.Infof("Waiting 5 second for stream info to propagate to the Postgres replica")
// time.Sleep(5 * time.Second)
rtmpURL := fmt.Sprintf("%s/%s", ingest.Ingest, stream.StreamKey)
// rtmpURL = fmt.Sprintf("%s/%s", ingests[0].Ingest, stream.ID)

testerFuncs := []testers.StartTestFunc{}
if rt.streamHealth {
Expand All @@ -166,6 +156,10 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
})
}

// when pauseDuration is set, we will stream the same file twice sleeping for
// the specified duration in between each.
streamTwice := pauseDuration > 0

mediaURL := fmt.Sprintf("%s/%s/index.m3u8", ingest.Playback, stream.PlaybackID)
if rt.serfOpts.UseSerf {
index := 0
Expand All @@ -183,22 +177,21 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
glog.Warningf("Streaming returned error err=%v", sterr)
return 3, err
}
if pauseDuration > 0 {
if streamTwice {
glog.Infof("Pause specified, waiting %s before streaming second time", pauseDuration)
time.Sleep(pauseDuration)
sterr = rt.doOneHTTPStream(fileName, streamName, broadcasters[0], testDuration, stream)
if sterr != nil {
glog.Warningf("Second time streaming returned error err=%v", sterr)
return 3, err
}
testDuration *= 2
}
} else {

sr2 := testers.NewStreamer2(rt.ctx, testers.Streamer2Options{MistMode: true}, testerFuncs...)
sr2.StartStreaming(fileName, rtmpURL, mediaURL, 2*time.Minute, testDuration)
// <-sr2.Done()
srerr := sr2.Err()

glog.Infof("Streaming stream id=%s done err=%v", stream.ID, srerr)
var re *testers.RTMPError
if errors.As(srerr, &re) {
Expand All @@ -217,7 +210,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
if err = rt.isCancelled(); err != nil {
return 0, err
}
if pauseDuration > 0 {
if streamTwice {
glog.Infof("Pause specified, waiting %s before streaming second time", pauseDuration)
time.Sleep(pauseDuration)
sr2 := testers.NewStreamer2(rt.ctx, testers.Streamer2Options{MistMode: true}, testerFuncs...)
Expand All @@ -242,7 +235,6 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
if err = rt.isCancelled(); err != nil {
return 0, err
}
testDuration *= 2
}
}
if err := rt.isCancelled(); err != nil {
Expand All @@ -254,32 +246,31 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
sessions, err := rt.lapi.GetSessionsNew(stream.ID, false)
if err != nil {
glog.Errorf("Error getting sessions for stream id=%s err=%v", stream.ID, err)
// exit(252, fileName, *fileArg, err)
return 252, err
}
glog.Infof("Sessions: %+v", sessions)
sessionsLength := len(sessions)
if sessionsLength == 2 {
// We often see failures for 2 sessions, this should be fixed once we move to catalyst recording but for now
// we want to ignore these to reduce the alert noise
return 0, nil
} else if sessionsLength != 1 {
err := fmt.Errorf("invalid session count, got %d", sessionsLength)

expectedSessions := 1
if streamTwice {
expectedSessions = 2
}

if len(sessions) != expectedSessions {
err := fmt.Errorf("invalid session count, expected %d but got %d",
expectedSessions, len(sessions))
glog.Error(err)
// exit(251, fileName, *fileArg, err)
return 251, err
}

sess := sessions[0]
if len(sess.Profiles) != len(stream.Profiles) {
glog.Infof("session: %+v", sess)
err := fmt.Errorf("got %d profiles but should have %d", len(sess.Profiles), len(stream.Profiles))
return 251, err
// exit(251, fileName, *fileArg, err)
}
if sess.RecordingStatus != api.RecordingStatusWaiting {
err := fmt.Errorf("recording status is %s but should be %s", sess.RecordingStatus, api.RecordingStatusWaiting)
return 250, err
// exit(250, fileName, *fileArg, err)
}
if err = rt.isCancelled(); err != nil {
return 0, err
Expand All @@ -289,7 +280,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
if rt.useForceURL {
time.Sleep(5 * time.Second)
} else {
time.Sleep(6*time.Minute + 20*time.Second)
time.Sleep(rt.recordingWaitTime)
}
if err = rt.isCancelled(); err != nil {
return 0, err
Expand All @@ -299,13 +290,13 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
if err != nil {
err := fmt.Errorf("error getting sessions for stream id=%s err=%v", stream.ID, err)
return 252, err
// exit(252, fileName, *fileArg, err)
}
glog.Infof("Sessions: %+v", sessions)
if err = rt.isCancelled(); err != nil {
return 0, err
}

for _, sess := range sessions {
sess = sessions[0]
statusShould := api.RecordingStatusReady
if rt.useForceURL {
Expand All @@ -314,43 +305,32 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
if sess.RecordingStatus != statusShould {
err := fmt.Errorf("recording status is %s but should be %s", sess.RecordingStatus, statusShould)
return 240, err
// exit(250, fileName, *fileArg, err)
}
if sess.RecordingURL == "" {
err := fmt.Errorf("recording URL should appear by now")
return 249, err
// exit(249, fileName, *fileArg, err)
}
glog.Infof("recordingURL=%s downloading now", sess.RecordingURL)

// started := time.Now()
// downloader := testers.NewM3utester2(gctx, sess.RecordingURL, false, false, false, false, 5*time.Second, nil)
// <-downloader.Done()
// glog.Infof(`Pulling stopped after %s`, time.Since(started))
// exit(55, fileName, *fileArg, err)
glog.Info("Done Record Test")

// lapi.DeleteStream(stream.ID)
// exit(0, fileName, *fileArg, err)
if err = rt.isCancelled(); err != nil {
return 0, err
}
if rt.mp4 {
es, err := rt.checkDownMp4(stream, sess.Mp4Url, testDuration, pauseDuration > 0)
es, err := rt.checkDownMp4(stream, sess.Mp4Url, testDuration)
if err != nil {
return es, err
}
}

es, err := rt.checkDown(stream, sess.RecordingURL, testDuration, pauseDuration > 0)
if es == 0 {
rt.lapi.DeleteStream(stream.ID)
// exit(0, fileName, *fileArg, err)
es, err := rt.checkDown(stream, sess.RecordingURL, testDuration)
if err != nil {
return es, err
}
}
glog.Info("Done Record Test")

// uploader := testers.NewRtmpStreamer(gctx, rtmpURL)
// uploader.StartUpload(fileName, rtmpURL, -1, 30*time.Second)
return es, err
rt.lapi.DeleteStream(stream.ID)
return 0, nil
}

func (rt *recordTester) getIngestInfo() (*api.Ingest, error) {
Expand Down Expand Up @@ -390,7 +370,6 @@ func (rt *recordTester) doOneHTTPStream(fileName, streamName, broadcasterURL str
continue
}
glog.Errorf("Error creating stream session using Livepeer API: %v", err)
// exit(253, fileName, *fileArg, err)
return err
}
break
Expand All @@ -399,7 +378,7 @@ func (rt *recordTester) doOneHTTPStream(fileName, streamName, broadcasterURL str
httpIngestBaseURL := fmt.Sprintf("%s/live/%s", broadcasterURL, session.ID)
glog.Infof("httpIngestBaseURL=%s", httpIngestBaseURL)
hs.StartUpload(fileName, httpIngestBaseURL, stream.ID, -1, -1, testDuration, 0)
// <-hs.Done()

stats, err := hs.Stats()
glog.Infof("Streaming stream id=%s done err=%v", stream.ID, err)
glog.Infof("Stats: %+v", stats)
Expand All @@ -415,7 +394,7 @@ func (rt *recordTester) isCancelled() error {
return nil
}

func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDuration time.Duration, doubled bool) (int, error) {
func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDuration time.Duration) (int, error) {
es := 0
started := time.Now()
glog.V(model.VERBOSE).Infof("Downloading mp4 url=%s stream id=%s", url, stream.ID)
Expand Down Expand Up @@ -449,23 +428,19 @@ func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDurat
glog.Warningf("Error parsing mp4 for manifestID=%s url=%s err=%v", stream.ID, url, err)
return 203, err
}
durDiffShould := 2 * time.Second
if doubled {
durDiffShould *= durDiffShould
}
durDiff := streamDuration - dur
if durDiff < 0 {
durDiff = -durDiff
}
if durDiff > durDiffShould {
if durDiff > 2*time.Second {
ers := fmt.Errorf("duration of mp4 differ by %s (got %s, should %s)", durDiff, dur, streamDuration)
glog.Error(ers)
return 300, err
}
return es, nil
}

func (rt *recordTester) checkDown(stream *api.Stream, url string, streamDuration time.Duration, doubled bool) (int, error) {
func (rt *recordTester) checkDown(stream *api.Stream, url string, streamDuration time.Duration) (int, error) {
es := 0
started := time.Now()
downloader := testers.NewM3utester2(rt.ctx, url, false, false, false, false, 5*time.Second, nil, false)
Expand All @@ -482,7 +457,7 @@ func (rt *recordTester) checkDown(stream *api.Stream, url string, streamDuration
}
glog.Infof("Stats for %s: %s", stream.ID, vs.String())
glog.Infof("Stats for %s raw: %+v", stream.ID, vs)
if ok, ers := vs.IsOk(streamDuration, doubled); !ok {
if ok, ers := vs.IsOk(streamDuration, false); !ok {
glog.Warningf("NOT OK! (%s)", ers)
es = 36
return es, errors.New(ers)
Expand Down