Skip to content

Commit

Permalink
record-tester: Allow recording streams to custom object store (#129)
Browse files Browse the repository at this point in the history
* apis/livepeer: Allow specifying record object store ID

* cmd/record-tester: Record object store ID cli flag

* Fix logs

* test-streamer2: Wait a little longer before ending the test

Will probably show other errors apart from the stream health one.

* test-streamer: Sort errors for the alerts
  • Loading branch information
victorges authored Apr 8, 2022
1 parent c6ad7ba commit 8369182
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 82 deletions.
54 changes: 19 additions & 35 deletions apis/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ type (
} `json:"servers,omitempty"`
}

createStreamReq struct {
Name string `json:"name,omitempty"`
Presets []string `json:"presets,omitempty"`
CreateStreamReq struct {
Name string `json:"name,omitempty"`
ParentID string `json:"parentId,omitempty"`
Presets []string `json:"presets,omitempty"`
// one of
// - P720p60fps16x9
// - P720p30fps16x9
Expand All @@ -77,8 +78,9 @@ type (
// - P240p30fps16x9
// - P240p30fps4x3
// - P144p30fps16x9
Profiles []Profile `json:"profiles,omitempty"`
Record bool `json:"record,omitempty"`
Profiles []Profile `json:"profiles,omitempty"`
Record bool `json:"record,omitempty"`
RecordObjectStoreId string `json:"recordObjectStoreId,omitempty"`
}

// Profile transcoding profile
Expand Down Expand Up @@ -346,15 +348,6 @@ var StandardProfiles = []Profile{
},
}

// CreateStream creates stream with specified name and profiles
func (lapi *API) CreateStream(name string, presets ...string) (string, error) {
csr, err := lapi.CreateStreamEx(name, false, presets)
if err != nil {
return "", err
}
return csr.ID, err
}

// DeleteStream deletes stream
func (lapi *API) DeleteStream(id string) error {
glog.V(model.DEBUG).Infof("Deleting Livepeer stream '%s' ", id)
Expand Down Expand Up @@ -383,36 +376,27 @@ func (lapi *API) DeleteStream(id string) error {

// CreateStreamEx creates stream with specified name and profiles
func (lapi *API) CreateStreamEx(name string, record bool, presets []string, profiles ...Profile) (*CreateStreamResp, error) {
return lapi.CreateStreamEx2(name, record, "", presets, profiles...)
return lapi.CreateStream(CreateStreamReq{Name: name, Record: record, Presets: presets, Profiles: profiles})
}

// CreateStreamEx creates stream with specified name and profiles
func (lapi *API) CreateStreamEx2(name string, record bool, parentID string, presets []string, profiles ...Profile) (*CreateStreamResp, error) {
// presets := profiles
// if len(presets) == 0 {
// presets = lapi.presets
// }
glog.Infof("Creating Livepeer stream '%s' with presets '%v' and profiles %+v", name, presets, profiles)
reqs := &createStreamReq{
Name: name,
Presets: presets,
Record: record,
}
if len(presets) == 0 {
reqs.Profiles = StandardProfiles
// CreateStream creates stream with specified name and profiles
func (lapi *API) CreateStream(csr CreateStreamReq) (*CreateStreamResp, error) {
if csr.Name == "" {
return nil, errors.New("stream must have a name")
}
if len(profiles) > 0 {
reqs.Profiles = profiles
if len(csr.Presets) == 0 && len(csr.Profiles) == 0 {
csr.Profiles = StandardProfiles
}
b, err := json.Marshal(reqs)
glog.Infof("Creating Livepeer stream '%s' with presets '%v' and profiles %+v", csr.Name, csr.Presets, csr.Profiles)
b, err := json.Marshal(csr)
if err != nil {
glog.V(model.SHORT).Infof("Error marshalling create stream request %v", err)
return nil, err
}
glog.Infof("Sending: %s", b)
u := fmt.Sprintf("%s/api/stream", lapi.choosenServer)
if parentID != "" {
u = fmt.Sprintf("%s/api/stream/%s/stream", lapi.choosenServer, parentID)
if csr.ParentID != "" {
u = fmt.Sprintf("%s/api/stream/%s/stream", lapi.choosenServer, csr.ParentID)
}
req, err := uhttp.NewRequest("POST", u, bytes.NewBuffer(b))
if err != nil {
Expand Down Expand Up @@ -440,7 +424,7 @@ func (lapi *API) CreateStreamEx2(name string, record bool, parentID string, pres
if len(r.Errors) > 0 {
return nil, fmt.Errorf("Error creating stream: %+v", r.Errors)
}
glog.Infof("Stream %s created with id %s", name, r.ID)
glog.Infof("Stream %s created with id %s", csr.Name, r.ID)
return r, nil
}

Expand Down
9 changes: 5 additions & 4 deletions cmd/lapi/lapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,18 @@ func main() {
}
}

func createStream(token, presets, name string) (string, *livepeer.API, error) {
fmt.Printf("Creating new stream with name %s profiles %s\n", name, presets)
profiles := strings.Split(presets, ",")
func createStream(token, presetsStr, name string) (string, *livepeer.API, error) {
fmt.Printf("Creating new stream with name %s presets %s\n", name, presetsStr)
presets := strings.Split(presetsStr, ",")

lapi := livepeer.NewLivepeer(token, server, nil) // hardcode AC server for now
lapi.Init()
sid, err := lapi.CreateStream(name, profiles...)
stream, err := lapi.CreateStream(livepeer.CreateStreamReq{Name: name, Presets: presets})
if err != nil {
fmt.Println("Error creating stream:")
return "", nil, err
}
sid := stream.ID
fmt.Printf("Stream created: %+v", sid)
return sid, lapi, nil
}
Expand Down
16 changes: 9 additions & 7 deletions cmd/recordtester/recordtester.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func main() {
useHttp := fs.Bool("http", false, "Do HTTP tests instead of RTMP")
testMP4 := fs.Bool("mp4", false, "Download MP4 of recording")
testStreamHealth := fs.Bool("stream-health", false, "Check stream health during test")
recordObjectStoreId := fs.String("record-object-store-id", "", "ID for the Object Store to use for recording storage. Forwarded to the streams created in the API")
discordURL := fs.String("discord-url", "", "URL of Discord's webhook to send messages to Discord channel")
discordUserName := fs.String("discord-user-name", "", "User name to use when sending messages to Discord")
discordUsersToNotify := fs.String("discord-users", "", "Id's of users to notify in case of failure")
Expand Down Expand Up @@ -198,13 +199,14 @@ func main() {
messenger.Init(gctx, *discordURL, *discordUserName, *discordUsersToNotify, "", "", "")

rtOpts := recordtester.RecordTesterOptions{
API: lapi,
Analyzers: lanalyzers,
Ingest: ingest,
UseForceURL: true,
UseHTTP: *useHttp,
TestMP4: *testMP4,
TestStreamHealth: *testStreamHealth,
API: lapi,
Analyzers: lanalyzers,
Ingest: ingest,
RecordObjectStoreId: *recordObjectStoreId,
UseForceURL: true,
UseHTTP: *useHttp,
TestMP4: *testMP4,
TestStreamHealth: *testStreamHealth,
}
if *sim > 1 {
var testers []recordtester.IRecordTester
Expand Down
81 changes: 48 additions & 33 deletions internal/app/recordtester/recordtester_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,31 @@ type (

RecordTesterOptions struct {
*livepeer.API
Analyzers testers.AnalyzerByRegion
Ingest *livepeer.Ingest
VODStats model.VODStats
UseForceURL bool
UseHTTP bool
TestMP4 bool
TestStreamHealth bool
Analyzers testers.AnalyzerByRegion
Ingest *livepeer.Ingest
RecordObjectStoreId string
UseForceURL bool
UseHTTP bool
TestMP4 bool
TestStreamHealth bool
}

recordTester struct {
lapi *livepeer.API
lanalyzers testers.AnalyzerByRegion
ingest *livepeer.Ingest
useForceURL bool
ctx context.Context
cancel context.CancelFunc
vodeStats model.VODStats
streamID string
stream *livepeer.CreateStreamResp
useHTTP bool
mp4 bool
streamHealth bool
ctx context.Context
cancel context.CancelFunc
lapi *livepeer.API
lanalyzers testers.AnalyzerByRegion
ingest *livepeer.Ingest
recordObjectStoreId string
useForceURL bool
useHTTP bool
mp4 bool
streamHealth bool

// mutable fields
streamID string
stream *livepeer.CreateStreamResp
vodStats model.VODStats
}
)

Expand Down Expand Up @@ -98,15 +101,16 @@ var standardProfiles = []livepeer.Profile{
func NewRecordTester(gctx context.Context, opts RecordTesterOptions) IRecordTester {
ctx, cancel := context.WithCancel(gctx)
rt := &recordTester{
lapi: opts.API,
lanalyzers: opts.Analyzers,
ingest: opts.Ingest,
useForceURL: opts.UseForceURL,
ctx: ctx,
cancel: cancel,
useHTTP: opts.UseHTTP,
mp4: opts.TestMP4,
streamHealth: opts.TestStreamHealth,
lapi: opts.API,
lanalyzers: opts.Analyzers,
ingest: opts.Ingest,
ctx: ctx,
cancel: cancel,
recordObjectStoreId: opts.RecordObjectStoreId,
useForceURL: opts.UseForceURL,
useHTTP: opts.UseHTTP,
mp4: opts.TestMP4,
streamHealth: opts.TestStreamHealth,
}
return rt
}
Expand Down Expand Up @@ -153,7 +157,12 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
streamName := fmt.Sprintf("%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00"))
var stream *livepeer.CreateStreamResp
for {
stream, err = rt.lapi.CreateStreamEx(streamName, true, nil, standardProfiles...)
stream, err = rt.lapi.CreateStream(livepeer.CreateStreamReq{
Name: streamName,
Profiles: standardProfiles,
Record: true,
RecordObjectStoreId: rt.recordObjectStoreId,
})
if err != nil {
if testers.Timedout(err) && apiTry < 3 {
apiTry++
Expand Down Expand Up @@ -276,7 +285,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
sess := sessions[0]
if len(sess.Profiles) != len(stream.Profiles) {
glog.Infof("session: %+v", sess)
err := fmt.Errorf("got %d, but should have %d", len(sess.Profiles), len(stream.Profiles))
err := fmt.Errorf("got %d profiles but should have %d", len(sess.Profiles), len(stream.Profiles))
return 251, err
// exit(251, fileName, *fileArg, err)
}
Expand Down Expand Up @@ -386,7 +395,13 @@ func (rt *recordTester) doOneHTTPStream(fileName, streamName, broadcasterURL str
var err error
apiTry := 0
for {
session, err = rt.lapi.CreateStreamEx2(streamName, true, stream.ID, nil, standardProfiles...)
session, err = rt.lapi.CreateStream(livepeer.CreateStreamReq{
Name: streamName,
ParentID: stream.ID,
Profiles: standardProfiles,
Record: true,
RecordObjectStoreId: rt.recordObjectStoreId,
})
if err != nil {
if testers.Timedout(err) && apiTry < 3 {
apiTry++
Expand Down Expand Up @@ -478,7 +493,7 @@ func (rt *recordTester) checkDown(stream *livepeer.CreateStreamResp, url string,
return 0, err
}
vs := downloader.VODStats()
rt.vodeStats = vs
rt.vodStats = vs
if len(vs.SegmentsNum) != len(standardProfiles)+1 {
glog.Warningf("Number of renditions doesn't match! Has %d should %d", len(vs.SegmentsNum), len(standardProfiles)+1)
es = 35
Expand All @@ -504,7 +519,7 @@ func (rt *recordTester) Done() <-chan struct{} {
}

func (rt *recordTester) VODStats() model.VODStats {
return rt.vodeStats
return rt.vodStats
}

func (rt *recordTester) Clean() {
Expand Down
4 changes: 2 additions & 2 deletions internal/testers/http_load_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,12 @@ func (hlt *HTTPLoadTester) startStreams(baseManifestID, sourceFileName string, r
}
manifestID := fmt.Sprintf("%s_%d_%d", baseManifestID, repeatNum, i)
if hlt.lapi != nil {
sid, err := hlt.lapi.CreateStream(manifestID)
stream, err := hlt.lapi.CreateStream(livepeer.CreateStreamReq{Name: manifestID})
if err != nil {
glog.Errorf("Error creating stream using Livepeer API: %v", err)
return err
}
manifestID = sid
manifestID = stream.ID
}
httpIngestURLTemplate := httpIngestURLTemplates[i%len(httpIngestURLTemplates)]
httpIngestURL := fmt.Sprintf(httpIngestURLTemplate, manifestID)
Expand Down
22 changes: 21 additions & 1 deletion internal/testers/streamer2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -108,14 +109,15 @@ func (sr *streamer2) StartStreaming(sourceFileName string, rtmpIngestURL, mediaU
if err := test.GlobalErr(); err != nil {
if sr.globalError == nil {
sr.globalError = err
time.AfterFunc(10*time.Second, cancel)
time.AfterFunc(waitForTarget, cancel)
}
errs = append(errs, err.Error())
}
case <-ctx.Done():
if len(errs) > 0 {
msg := errs[0]
if len(errs) > 1 {
sortErrs(errs)
msg = "Multiple errors: " + strings.Join(errs, "; ")
}
sr.fatalEnd(errors.New(msg))
Expand Down Expand Up @@ -150,6 +152,24 @@ func onAnyDone(ctx context.Context, finites []Finite) <-chan Finite {
return finished
}

func sortErrs(errs []string) {
sortIdx := func(idx int) int {
err := strings.ToLower(errs[idx])
if strings.Contains(err, "health") {
// stream health errs should go last. they're never the root cause when
// there are multiple errors.
return 1
}
return 0
}
sort.Slice(errs, func(idx1, idx2 int) bool {
if si1, si2 := sortIdx(idx1), sortIdx(idx2); si1 != si2 {
return si1 < si2
}
return errs[idx1] < errs[idx2]
})
}

// StartPulling pull arbitrary HLS stream and report found errors
/*
func (sr *streamer2) StartPulling(mediaURL string) {
Expand Down

0 comments on commit 8369182

Please sign in to comment.