diff --git a/cmd/recordtester/recordtester.go b/cmd/recordtester/recordtester.go index b6e93027..b109cdde 100644 --- a/cmd/recordtester/recordtester.go +++ b/cmd/recordtester/recordtester.go @@ -17,10 +17,12 @@ import ( "github.com/golang/glog" serfClient "github.com/hashicorp/serf/client" - api "github.com/livepeer/go-api-client" + "github.com/livepeer/go-api-client" "github.com/livepeer/joy4/format" "github.com/livepeer/livepeer-data/pkg/client" + "github.com/livepeer/stream-tester/internal/app/common" "github.com/livepeer/stream-tester/internal/app/recordtester" + "github.com/livepeer/stream-tester/internal/app/transcodetester" "github.com/livepeer/stream-tester/internal/app/vodtester" "github.com/livepeer/stream-tester/internal/metrics" "github.com/livepeer/stream-tester/internal/server" @@ -79,6 +81,8 @@ func main() { testStreamHealth := fs.Bool("stream-health", false, "Check stream health during test") testLive := fs.Bool("live", false, "Check Live workflow") testVod := fs.Bool("vod", false, "Check VOD workflow") + transcodeBucketUrl := fs.String("transcode-bucket-url", "", "Object Store URL to test Transcode API in the format 's3+http(s)://:@/'") + testTranscode := fs.Bool("transcode", false, "Check Transcode API workflow") catalystPipelineStrategy := fs.String("catalyst-pipeline-strategy", "", "Which catalyst pipeline strategy to use regarding. The appropriate values are defined by catalyst-api itself.") recordObjectStoreId := fs.String("record-object-store-id", "", "ID for the Object Store to use for recording storage. Forwarded to the streams created in the API") discordURL := fs.String("discord-url", "", "URL of Discord's webhook to send messages to Discord channel") @@ -256,10 +260,6 @@ func main() { TestMP4: *testMP4, TestStreamHealth: *testStreamHealth, } - vtOpts := vodtester.VodTesterOptions{ - API: lapi, - CatalystPipelineStrategy: *catalystPipelineStrategy, - } if *sim > 1 { var testers []recordtester.IRecordTester var eses []int @@ -317,17 +317,37 @@ func main() { }) } if *testVod { + vtOpts := common.TesterOptions{ + API: lapi, + CatalystPipelineStrategy: *catalystPipelineStrategy, + } eg.Go(func() error { - cvtOpts := vodtester.ContinuousVodTesterOptions{ + cvtOpts := common.ContinuousTesterOptions{ PagerDutyIntegrationKey: *pagerDutyIntegrationKey, PagerDutyComponent: *pagerDutyComponent, PagerDutyLowUrgency: *pagerDutyLowUrgency, - VodTesterOptions: vtOpts, + TesterOptions: vtOpts, } cvt := vodtester.NewContinuousVodTester(egCtx, cvtOpts) return cvt.Start(fileName, *vodImportUrl, *testDuration, *taskPollDuration, *continuousTest) }) } + if *testTranscode { + ttOpts := common.TesterOptions{ + API: lapi, + CatalystPipelineStrategy: *catalystPipelineStrategy, + } + eg.Go(func() error { + cttOpts := common.ContinuousTesterOptions{ + PagerDutyIntegrationKey: *pagerDutyIntegrationKey, + PagerDutyComponent: *pagerDutyComponent, + PagerDutyLowUrgency: *pagerDutyLowUrgency, + TesterOptions: ttOpts, + } + ctt := transcodetester.NewContinuousTranscodeTester(egCtx, cttOpts) + return ctt.Start(*fileArg, *transcodeBucketUrl, *testDuration, *taskPollDuration, *continuousTest) + }) + } if err := eg.Wait(); err != nil { glog.Warningf("Continuous test ended with err=%v", err) } diff --git a/go.mod b/go.mod index 6c4d9d12..a293b093 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/golang/glog v1.0.0 github.com/gosuri/uilive v0.0.3 // indirect github.com/gosuri/uiprogress v0.0.1 - github.com/livepeer/go-api-client v0.4.1-0.20221207101406-c3675c55eed5 + github.com/livepeer/go-api-client v0.4.2-0.20230105141727-2a1044f1eb2e github.com/livepeer/go-livepeer v0.5.31 github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07 github.com/livepeer/leaderboard-serverless v1.0.0 @@ -39,7 +39,7 @@ require ( cloud.google.com/go v0.81.0 // indirect github.com/Azure/azure-pipeline-go v0.2.2 // indirect github.com/StackExchange/wmi v1.2.1 // indirect - github.com/aws/aws-sdk-go v1.34.28 // indirect + github.com/aws/aws-sdk-go v1.34.28 github.com/beorn7/perks v1.0.1 // indirect github.com/btcsuite/btcd v0.22.0-beta // indirect github.com/cenkalti/backoff v2.2.1+incompatible // indirect diff --git a/go.sum b/go.sum index 63480b6c..4411852a 100644 --- a/go.sum +++ b/go.sum @@ -602,6 +602,7 @@ github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sL github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -726,8 +727,8 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/livepeer/go-api-client v0.4.1-0.20221207101406-c3675c55eed5 h1:sxyLVN5lD4JB5THu2+48BbhNTifJK67YvW8DyNuPBJI= -github.com/livepeer/go-api-client v0.4.1-0.20221207101406-c3675c55eed5/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= +github.com/livepeer/go-api-client v0.4.2-0.20230105141727-2a1044f1eb2e h1:R9agI6FLJVOmX7YPR7vA0Rp9ONpDzJbDYSgpNwRHJiQ= +github.com/livepeer/go-api-client v0.4.2-0.20230105141727-2a1044f1eb2e/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= github.com/livepeer/go-livepeer v0.5.31 h1:LcN+qDnqWRws7fdVYc4ucZPVcLQRs2tehUYCQVnlnRw= github.com/livepeer/go-livepeer v0.5.31/go.mod h1:cpBikcGWApkx0cyR0Ht+uAym7j3uAwXGpPbvaOA8XUU= github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw= @@ -885,6 +886,7 @@ github.com/opencontainers/selinux v1.8.2/go.mod h1:MUIHuUEvKB1wtJjQdOyYRgOnLD2xA github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c h1:Lgl0gzECD8GnQ5QCWA8o6BtfL6mDH5rQgM4/fX3avOs= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= @@ -1403,7 +1405,6 @@ golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211109184856-51b60fd695b3/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 h1:OH54vjqzRWmbJ62fjuhxy7AxFFgoHN0/DPc/UrL8cAs= golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/app/common/continuous_tester.go b/internal/app/common/continuous_tester.go new file mode 100644 index 00000000..08b162d0 --- /dev/null +++ b/internal/app/common/continuous_tester.go @@ -0,0 +1,144 @@ +package common + +import ( + "context" + "fmt" + "net/url" + "time" + + "github.com/PagerDuty/go-pagerduty" + + "github.com/golang/glog" + "github.com/livepeer/stream-tester/messenger" +) + +type ( + IContinuousTester interface { + // Start test. Blocks until error. + Start(start func(ctx context.Context) error, testDuration, pauseBetweenTests time.Duration) error + Cancel() + Done() <-chan struct{} + } + + ContinuousTesterOptions struct { + PagerDutyIntegrationKey string + PagerDutyComponent string + PagerDutyLowUrgency bool + TesterOptions + } + + continuousTester struct { + ctx context.Context + cancel context.CancelFunc + host string // API host being tested + pagerDutyIntegrationKey string + pagerDutyComponent string + pagerDutyLowUrgency bool + name string + } +) + +func NewContinuousTester(gctx context.Context, opts ContinuousTesterOptions, testerName string) IContinuousTester { + ctx, cancel := context.WithCancel(gctx) + server := opts.API.GetServer() + u, _ := url.Parse(server) + ct := &continuousTester{ + ctx: ctx, + cancel: cancel, + host: u.Host, + pagerDutyIntegrationKey: opts.PagerDutyIntegrationKey, + pagerDutyComponent: opts.PagerDutyComponent, + pagerDutyLowUrgency: opts.PagerDutyLowUrgency, + name: testerName, + } + return ct +} + +func (ct *continuousTester) Start(start func(ctx context.Context) error, testDuration, pauseBetweenTests time.Duration) error { + messenger.SendMessage(fmt.Sprintf("Starting continuous %s test of %s", ct.name, ct.host)) + for { + msg := fmt.Sprintf(":arrow_right: Starting %s %s test to %s", testDuration, ct.name, ct.host) + messenger.SendMessage(msg) + + ctx, cancel := context.WithTimeout(ct.ctx, testDuration) + err := start(ctx) + ctxErr := ctx.Err() + cancel() + + if ct.ctx.Err() != nil { + messenger.SendMessage(fmt.Sprintf("Continuous test of %s on %s cancelled", ct.name, ct.host)) + return ct.ctx.Err() + } else if ctxErr != nil { + msg := fmt.Sprintf("Test of %s on %s timed out, potential deadlock! ctxErr=%q err=%q", ct.name, ct.host, ctxErr, err) + messenger.SendFatalMessage(msg) + } else if err != nil { + msg := fmt.Sprintf(":rotating_light: Test of %s on %s ended with err=%v", ct.name, ct.host, err) + messenger.SendFatalMessage(msg) + glog.Warning(msg) + ct.sendPagerdutyEvent(err) + } else { + msg := fmt.Sprintf(":white_check_mark: Test of %s on %s succeeded", ct.name, ct.host) + messenger.SendMessage(msg) + glog.Info(msg) + ct.sendPagerdutyEvent(nil) + } + glog.Infof("Waiting %s before next test of %s", pauseBetweenTests, ct.name) + select { + case <-ct.ctx.Done(): + messenger.SendMessage(fmt.Sprintf("Continuous test of %s on %s cancelled", ct.name, ct.host)) + return err + case <-time.After(pauseBetweenTests): + } + } + return nil +} + +func (ct *continuousTester) sendPagerdutyEvent(err error) { + if ct.pagerDutyIntegrationKey == "" { + return + } + severity, lopriPrefix, dedupKey := "error", "", fmt.Sprintf("cont-%s-tester:%s", ct.name, ct.host) + if ct.pagerDutyLowUrgency { + severity, lopriPrefix = "warning", "[LOPRI] " + dedupKey = "lopri-" + dedupKey + } + event := pagerduty.V2Event{ + RoutingKey: ct.pagerDutyIntegrationKey, + Action: "trigger", + DedupKey: dedupKey, + } + if err == nil { + event.Action = "resolve" + _, err := pagerduty.ManageEvent(event) + if err != nil { + messenger.SendMessage(fmt.Sprintf("Error resolving PagerDuty event: %v", err)) + } + return + } + summary := fmt.Sprintf("%s:vhs: %s %s for `%s` error: %v", lopriPrefix, ct.name, ct.pagerDutyComponent, ct.host, err) + if len(summary) > 1024 { + summary = summary[:1021] + "..." + } + event.Payload = &pagerduty.V2Payload{ + Source: ct.host, + Component: ct.pagerDutyComponent, + Severity: severity, + Summary: summary, + Timestamp: time.Now().UTC().Format(time.RFC3339), + } + resp, err := pagerduty.ManageEvent(event) + if err != nil { + glog.Error(fmt.Errorf("PAGERDUTY Error: %w", err)) + messenger.SendFatalMessage(fmt.Sprintf("Error creating PagerDuty event: %v", err)) + } else { + glog.Infof("Incident status: %s message: %s", resp.Status, resp.Message) + } +} + +func (ct *continuousTester) Cancel() { + ct.cancel() +} + +func (ct *continuousTester) Done() <-chan struct{} { + return ct.ctx.Done() +} diff --git a/internal/app/common/tester_app.go b/internal/app/common/tester_app.go new file mode 100644 index 00000000..7ad9c0db --- /dev/null +++ b/internal/app/common/tester_app.go @@ -0,0 +1,68 @@ +package common + +import ( + "context" + "fmt" + "github.com/golang/glog" + "github.com/livepeer/go-api-client" + "time" +) + +type ( + TesterApp struct { + Ctx context.Context + CancelFunc context.CancelFunc + CatalystPipelineStrategy string + Lapi *api.Client + } + + TesterOptions struct { + API *api.Client + CatalystPipelineStrategy string + } +) + +func (ta *TesterApp) CheckTaskProcessing(taskPollDuration time.Duration, processingTask api.Task) error { + startTime := time.Now() + for { + time.Sleep(taskPollDuration) + + if err := ta.isCancelled(); err != nil { + return err + } + + // we already sleep before the first check, so no need for strong consistency + task, err := ta.Lapi.GetTask(processingTask.ID, false) + if err != nil { + glog.Errorf("Error retrieving task id=%s err=%v", processingTask.ID, err) + return fmt.Errorf("error retrieving task id=%s: %w", processingTask.ID, err) + } + if task.Status.Phase == "completed" { + glog.Infof("Task success, taskId=%s", task.ID) + return nil + } + if task.Status.Phase != "pending" && task.Status.Phase != "running" && task.Status.Phase != "waiting" { + glog.Errorf("Error processing task, taskId=%s status=%s error=%v", task.ID, task.Status.Phase, task.Status.ErrorMessage) + return fmt.Errorf("error processing task, taskId=%s status=%s error=%v", task.ID, task.Status.Phase, task.Status.ErrorMessage) + } + + glog.Infof("Waiting for task to be processed id=%s pollWait=%s elapsed=%s progressPct=%.1f%%", task.ID, taskPollDuration, time.Since(startTime), 100*task.Status.Progress) + } +} + +func (ta *TesterApp) isCancelled() error { + select { + case <-ta.Ctx.Done(): + return context.Canceled + default: + } + return nil +} + +func (ta *TesterApp) Cancel() { + ta.CancelFunc() +} + +func (ta *TesterApp) Done() <-chan struct{} { + return ta.Ctx.Done() +} diff --git a/internal/app/recordtester/continuous_record_tester.go b/internal/app/recordtester/continuous_record_tester.go index bfe71227..2e1fe868 100644 --- a/internal/app/recordtester/continuous_record_tester.go +++ b/internal/app/recordtester/continuous_record_tester.go @@ -18,7 +18,7 @@ import ( type ( // IContinuousRecordTester ... IContinuousRecordTester interface { - // Start start test. Blocks until error. + // Start test. Blocks until error. Start(fileName string, testDuration, pauseDuration, pauseBetweenTests time.Duration) error Cancel() Done() <-chan struct{} diff --git a/internal/app/recordtester/recordtester_app.go b/internal/app/recordtester/recordtester_app.go index 0129f093..af92d0f2 100644 --- a/internal/app/recordtester/recordtester_app.go +++ b/internal/app/recordtester/recordtester_app.go @@ -24,7 +24,7 @@ import ( type ( // IRecordTester ... IRecordTester interface { - // Start start test. Blocks until finished. + // Start test. Blocks until finished. Start(fileName string, testDuration, pauseDuration time.Duration) (int, error) Cancel() Done() <-chan struct{} diff --git a/internal/app/transcodetester/continuous_transcode_tester.go b/internal/app/transcodetester/continuous_transcode_tester.go new file mode 100644 index 00000000..5ef97b0f --- /dev/null +++ b/internal/app/transcodetester/continuous_transcode_tester.go @@ -0,0 +1,35 @@ +package transcodetester + +import ( + "context" + "time" + + "github.com/livepeer/stream-tester/internal/app/common" +) + +type ( + IContinuousTranscodeTester interface { + // Start test. Blocks until error. + Start(fileName string, transcodeBucketUrl string, testDuration, taskPollDuration, pauseBetweenTests time.Duration) error + } + + continuousTranscodeTester struct { + ct common.IContinuousTester + opts common.TesterOptions + } +) + +func NewContinuousTranscodeTester(gctx context.Context, opts common.ContinuousTesterOptions) IContinuousTranscodeTester { + return &continuousTranscodeTester{ + ct: common.NewContinuousTester(gctx, opts, "transcode"), + opts: opts.TesterOptions, + } +} + +func (ctt *continuousTranscodeTester) Start(fileName string, transcodeBucketUrl string, testDuration, taskPollDuration, pauseBetweenTests time.Duration) error { + start := func(ctx context.Context) error { + tt := NewTranscodeTester(ctx, ctt.opts) + return tt.Start(fileName, transcodeBucketUrl, taskPollDuration) + } + return ctt.ct.Start(start, testDuration, pauseBetweenTests) +} diff --git a/internal/app/transcodetester/transcodetester_app.go b/internal/app/transcodetester/transcodetester_app.go new file mode 100644 index 00000000..4498d15f --- /dev/null +++ b/internal/app/transcodetester/transcodetester_app.go @@ -0,0 +1,256 @@ +package transcodetester + +import ( + "context" + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/livepeer/stream-tester/internal/app/common" + "math/rand" + "net/http" + "net/url" + path2 "path" + "strings" + "time" + + "github.com/golang/glog" + "github.com/livepeer/go-api-client" + "golang.org/x/sync/errgroup" +) + +type ( + ITranscodeTester interface { + // Start test. Blocks until finished. + Start(fileName string, transcodeBucketUrl string, taskPollDuration time.Duration) error + Cancel() + Done() <-chan struct{} + } + + transcodeTester struct { + common.TesterApp + } + + objectStore struct { + accessKeyId string + secretAccessKey string + endpoint string + bucket string + } +) + +func NewTranscodeTester(gctx context.Context, opts common.TesterOptions) ITranscodeTester { + ctx, cancel := context.WithCancel(gctx) + vt := &transcodeTester{ + TesterApp: common.TesterApp{ + Lapi: opts.API, + Ctx: ctx, + CancelFunc: cancel, + }, + } + return vt +} + +func (tt *transcodeTester) Start(fileName string, transcodeBucketUrl string, taskPollDuration time.Duration) error { + defer tt.Cancel() + + eg, egCtx := errgroup.WithContext(tt.Ctx) + + eg.Go(func() error { + if err := tt.transcodeFromUrlTester(fileName, transcodeBucketUrl, taskPollDuration); err != nil { + glog.Errorf("Error in transcode from url err=%v", err) + return fmt.Errorf("error in transcode from url: %w", err) + } + return nil + }) + + eg.Go(func() error { + if err := tt.transcodeFromPrivateBucketTester(fileName, transcodeBucketUrl, taskPollDuration); err != nil { + glog.Errorf("Error in transcode from private bucket err=%v", err) + return fmt.Errorf("error in transcode from private bucket: %w", err) + } + return nil + }) + + go func() { + <-egCtx.Done() + tt.Cancel() + }() + if err := eg.Wait(); err != nil { + return err + } + + glog.Info("Done Transcode API Test") + return nil +} + +func (tt *transcodeTester) transcodeFromUrlTester(inUrl string, bucketUrl string, taskPollDuration time.Duration) error { + os, err := parseObjectStore(bucketUrl) + if err != nil { + glog.Errorf("Error parsing bucket url=%s: err=%v", bucketUrl, err) + return fmt.Errorf("error parsing bucket url=%s: %w", bucketUrl, err) + } + path := path2.Join("/output", randomPath()) + + task, err := tt.transcodeFromUrl(inUrl, os, path) + if err != nil { + glog.Errorf("Error transcoding a file from url=%s: err=%v", inUrl, err) + return fmt.Errorf("error transcoding a file from url=%s: %w", inUrl, err) + } + return tt.checkTaskProcessingAndRenditionFiles(taskPollDuration, *task, os, path) +} + +func (tt *transcodeTester) transcodeFromUrl(inUrl string, os objectStore, path string) (*api.Task, error) { + return tt.Lapi.TranscodeFile(api.TranscodeFileReq{ + Input: api.TranscodeFileReqInput{ + Url: inUrl, + }, + Storage: api.TranscodeFileReqStorage{ + Type: "s3", + Endpoint: os.endpoint, + Credentials: &api.TranscodeFileReqCredentials{ + AccessKeyId: os.accessKeyId, + SecretAccessKey: os.secretAccessKey, + }, + Bucket: os.bucket, + }, + Outputs: api.TranscodeFileReqOutputs{ + Hls: api.TranscodeFileReqOutputsHls{ + Path: path, + }, + }, + }) +} + +func (tt *transcodeTester) transcodeFromPrivateBucketTester(inUrl string, bucketUrl string, taskPollDuration time.Duration) error { + url, err := url.Parse(inUrl) + if err != nil { + glog.Errorf("Error parsing input file url=%s: err=%v", inUrl, err) + return fmt.Errorf("error parsing input url=%s: %w", inUrl, err) + } + os, err := parseObjectStore(bucketUrl) + if err != nil { + glog.Errorf("Error parsing bucket url=%s: err=%v", bucketUrl, err) + return fmt.Errorf("error parsing bucket url=%s: %w", bucketUrl, err) + } + + randPath := randomPath() + inPath := path2.Join("/input", randPath, "source"+path2.Ext(url.Path)) + outPath := path2.Join("/output", randPath) + + if err := tt.copyFileIntoInputBucket(inUrl, os, inPath); err != nil { + glog.Errorf("Error copying file into input bucket=%s: err=%v", os.bucket, err) + return fmt.Errorf("error copying file into input bucket=%s: %w", os.bucket, err) + } + + task, err := tt.transcodeFromPrivateBucket(os, inPath, outPath) + if err != nil { + glog.Errorf("Error transcoding a file from private bucket=%s, path=%s: err=%v", os.bucket, inPath, err) + return fmt.Errorf("error transcoding a file from private bucket=%s, path=%s: %w", os.bucket, inPath, err) + } + + return tt.checkTaskProcessingAndRenditionFiles(taskPollDuration, *task, os, outPath) +} + +func (tt *transcodeTester) transcodeFromPrivateBucket(os objectStore, inPath, outPath string) (*api.Task, error) { + return tt.Lapi.TranscodeFile(api.TranscodeFileReq{ + Input: api.TranscodeFileReqInput{ + Type: "s3", + Endpoint: os.endpoint, + Credentials: &api.TranscodeFileReqCredentials{ + AccessKeyId: os.accessKeyId, + SecretAccessKey: os.secretAccessKey, + }, + Bucket: os.bucket, + Path: inPath, + }, + Storage: api.TranscodeFileReqStorage{ + Type: "s3", + Endpoint: os.endpoint, + Credentials: &api.TranscodeFileReqCredentials{ + AccessKeyId: os.accessKeyId, + SecretAccessKey: os.secretAccessKey, + }, + Bucket: os.bucket, + }, + Outputs: api.TranscodeFileReqOutputs{ + Hls: api.TranscodeFileReqOutputsHls{ + Path: outPath, + }, + }, + }) +} + +func (tt *transcodeTester) copyFileIntoInputBucket(inUrl string, os objectStore, path string) error { + uploader := s3manager.NewUploader(newAwsSession(os)) + resp, err := http.Get(inUrl) + if err != nil { + return err + } + defer resp.Body.Close() + + _, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(os.bucket), + Key: aws.String(path), + Body: resp.Body, + }) + return err +} + +func (tt *transcodeTester) checkTaskProcessingAndRenditionFiles(taskPollDuration time.Duration, task api.Task, os objectStore, path string) error { + if err := tt.CheckTaskProcessing(taskPollDuration, task); err != nil { + glog.Errorf("Error in transcoding task taskId=%s: err=%v", task.ID, err) + return fmt.Errorf("error in transcoding task taskId=%s: %w", task.ID, err) + } + if err := tt.checkRenditionFiles(os, path); err != nil { + glog.Errorf("Error in checking rendition segments in the bucket=%s, path=%s, err=%v", os.bucket, path, err) + return fmt.Errorf("error in checking rendition segments in the bucket=%s, path=%s: %w", os.bucket, path, err) + } + return nil +} + +func (tt *transcodeTester) checkRenditionFiles(os objectStore, path string) error { + svc := s3.New(newAwsSession(os)) + _, err := svc.GetObjectWithContext(tt.Ctx, &s3.GetObjectInput{ + Bucket: aws.String(os.bucket), + Key: aws.String(path2.Join(path, "index.m3u8")), + }) + return err +} + +func parseObjectStore(bucketUrl string) (objectStore, error) { + url, err := url.Parse(bucketUrl) + if err != nil { + return objectStore{}, err + } + + os := objectStore{} + os.accessKeyId = url.User.Username() + os.secretAccessKey, _ = url.User.Password() + os.bucket = strings.TrimPrefix(url.Path, "/") + os.endpoint = fmt.Sprintf("%s://%s", strings.TrimPrefix(url.Scheme, "s3+"), url.Host) + return os, nil +} + +func newAwsSession(os objectStore) *session.Session { + region := "unused" + return session.Must(session.NewSession(&aws.Config{ + Endpoint: &os.endpoint, + Credentials: credentials.NewStaticCredentials(os.accessKeyId, os.secretAccessKey, ""), + Region: ®ion, + })) +} + +func randomPath() string { + const length = 10 + const charset = "abcdefghijklmnopqrstuvwxyz0123456789" + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + res := make([]byte, length) + for i := 0; i < length; i++ { + res[i] = charset[r.Intn(length)] + } + return "/" + string(res) +} diff --git a/internal/app/vodtester/continuous_vod_tester.go b/internal/app/vodtester/continuous_vod_tester.go index 987d57b6..4a02476d 100644 --- a/internal/app/vodtester/continuous_vod_tester.go +++ b/internal/app/vodtester/continuous_vod_tester.go @@ -2,146 +2,36 @@ package vodtester import ( "context" - "fmt" - "net/url" "time" - "github.com/PagerDuty/go-pagerduty" - - "github.com/golang/glog" - "github.com/livepeer/stream-tester/messenger" + "github.com/livepeer/stream-tester/internal/app/common" ) type ( // IContinuousVodTester ... IContinuousVodTester interface { - // Start start test. Blocks until error. + // Start test. Blocks until error. Start(fileName string, vodImportUrl string, testDuration, taskPollDuration, pauseBetweenTests time.Duration) error - Cancel() - Done() <-chan struct{} - } - - ContinuousVodTesterOptions struct { - PagerDutyIntegrationKey string - PagerDutyComponent string - PagerDutyLowUrgency bool - VodTesterOptions } continuousVodTester struct { - ctx context.Context - cancel context.CancelFunc - host string // API host being tested - pagerDutyIntegrationKey string - pagerDutyComponent string - pagerDutyLowUrgency bool - vtOpts VodTesterOptions + ct common.IContinuousTester + opts common.TesterOptions } ) // NewContinuousVodTester returns new object -func NewContinuousVodTester(gctx context.Context, opts ContinuousVodTesterOptions) IContinuousVodTester { - ctx, cancel := context.WithCancel(gctx) - server := opts.API.GetServer() - u, _ := url.Parse(server) - cvt := &continuousVodTester{ - ctx: ctx, - cancel: cancel, - host: u.Host, - pagerDutyIntegrationKey: opts.PagerDutyIntegrationKey, - pagerDutyComponent: opts.PagerDutyComponent, - pagerDutyLowUrgency: opts.PagerDutyLowUrgency, - vtOpts: opts.VodTesterOptions, +func NewContinuousVodTester(gctx context.Context, opts common.ContinuousTesterOptions) IContinuousVodTester { + return &continuousVodTester{ + ct: common.NewContinuousTester(gctx, opts, "vod"), + opts: opts.TesterOptions, } - return cvt } func (cvt *continuousVodTester) Start(fileName string, vodImportUrl string, testDuration, taskPollDuration, pauseBetweenTests time.Duration) error { - messenger.SendMessage(fmt.Sprintf("Starting continuous vod test of %s", cvt.host)) - for { - msg := fmt.Sprintf(":arrow_right: Starting %s vod test to %s", testDuration, cvt.host) - messenger.SendMessage(msg) - - ctx, cancel := context.WithTimeout(cvt.ctx, testDuration) - vt := NewVodTester(ctx, cvt.vtOpts) - es, err := vt.Start(fileName, vodImportUrl, taskPollDuration) - ctxErr := ctx.Err() - cancel() - - if cvt.ctx.Err() != nil { - messenger.SendMessage(fmt.Sprintf("Continuous test of VOD on %s cancelled", cvt.host)) - return cvt.ctx.Err() - } else if ctxErr != nil { - msg := fmt.Sprintf("Test of VOD on %s timed out, potential deadlock! ctxErr=%q err=%q", cvt.host, ctxErr, err) - messenger.SendFatalMessage(msg) - } else if err != nil || es != 0 { - msg := fmt.Sprintf(":rotating_light: Test of VOD on %s ended with err=%v errCode=%v", cvt.host, err, es) - messenger.SendFatalMessage(msg) - glog.Warning(msg) - cvt.sendPagerdutyEvent(vt, err) - } else { - msg := fmt.Sprintf(":white_check_mark: Test of VOD on %s succeeded", cvt.host) - messenger.SendMessage(msg) - glog.Info(msg) - cvt.sendPagerdutyEvent(vt, nil) - } - glog.Infof("Waiting %s before next test of VOD", pauseBetweenTests) - select { - case <-cvt.ctx.Done(): - messenger.SendMessage(fmt.Sprintf("Continuous test of VOD on %s cancelled", cvt.host)) - return err - case <-time.After(pauseBetweenTests): - } - } - return nil -} - -func (cvt *continuousVodTester) sendPagerdutyEvent(vt IVodTester, err error) { - if cvt.pagerDutyIntegrationKey == "" { - return - } - severity, lopriPrefix, dedupKey := "error", "", fmt.Sprintf("cont-vod-tester:%s", cvt.host) - if cvt.pagerDutyLowUrgency { - severity, lopriPrefix = "warning", "[LOPRI] " - dedupKey = "lopri-" + dedupKey + start := func(ctx context.Context) error { + vt := NewVodTester(ctx, cvt.opts) + return vt.Start(fileName, vodImportUrl, taskPollDuration) } - event := pagerduty.V2Event{ - RoutingKey: cvt.pagerDutyIntegrationKey, - Action: "trigger", - DedupKey: dedupKey, - } - if err == nil { - event.Action = "resolve" - _, err := pagerduty.ManageEvent(event) - if err != nil { - messenger.SendMessage(fmt.Sprintf("Error resolving PagerDuty event: %v", err)) - } - return - } - summary := fmt.Sprintf("%s:vhs: VOD %s for `%s` error: %v", lopriPrefix, cvt.pagerDutyComponent, cvt.host, err) - if len(summary) > 1024 { - summary = summary[:1021] + "..." - } - event.Payload = &pagerduty.V2Payload{ - Source: cvt.host, - Component: cvt.pagerDutyComponent, - Severity: severity, - Summary: summary, - Timestamp: time.Now().UTC().Format(time.RFC3339), - } - resp, err := pagerduty.ManageEvent(event) - if err != nil { - glog.Error(fmt.Errorf("PAGERDUTY Error: %w", err)) - messenger.SendFatalMessage(fmt.Sprintf("Error creating PagerDuty event: %v", err)) - } else { - glog.Infof("Incident status: %s message: %s", resp.Status, resp.Message) - } -} - -func (cvt *continuousVodTester) Cancel() { - cvt.cancel() -} - -func (cvt *continuousVodTester) Done() <-chan struct{} { - return cvt.ctx.Done() + return cvt.ct.Start(start, testDuration, pauseBetweenTests) } diff --git a/internal/app/vodtester/vodtester_app.go b/internal/app/vodtester/vodtester_app.go index ceef3383..1bd4bba9 100644 --- a/internal/app/vodtester/vodtester_app.go +++ b/internal/app/vodtester/vodtester_app.go @@ -9,47 +9,42 @@ import ( "time" "github.com/golang/glog" - api "github.com/livepeer/go-api-client" + "github.com/livepeer/go-api-client" + "github.com/livepeer/stream-tester/internal/app/common" "golang.org/x/sync/errgroup" ) type ( // IVodTester ... IVodTester interface { - // Start start test. Blocks until finished. - Start(fileName string, vodImportUrl string, taskPollDuration time.Duration) (int, error) + // Start test. Blocks until finished. + Start(fileName string, vodImportUrl string, taskPollDuration time.Duration) error Cancel() Done() <-chan struct{} } - VodTesterOptions struct { - API *api.Client - CatalystPipelineStrategy string - } - vodTester struct { - ctx context.Context - cancel context.CancelFunc - lapi *api.Client - catalystPipelineStrategy string + common.TesterApp } ) // NewVodTester ... -func NewVodTester(gctx context.Context, opts VodTesterOptions) IVodTester { +func NewVodTester(gctx context.Context, opts common.TesterOptions) IVodTester { ctx, cancel := context.WithCancel(gctx) vt := &vodTester{ - lapi: opts.API, - ctx: ctx, - cancel: cancel, + TesterApp: common.TesterApp{ + Lapi: opts.API, + Ctx: ctx, + CancelFunc: cancel, + }, } return vt } -func (vt *vodTester) Start(fileName string, vodImportUrl string, taskPollDuration time.Duration) (int, error) { - defer vt.cancel() +func (vt *vodTester) Start(fileName string, vodImportUrl string, taskPollDuration time.Duration) error { + defer vt.Cancel() - eg, egCtx := errgroup.WithContext(vt.ctx) + eg, egCtx := errgroup.WithContext(vt.Ctx) eg.Go(func() error { @@ -63,28 +58,28 @@ func (vt *vodTester) Start(fileName string, vodImportUrl string, taskPollDuratio return fmt.Errorf("error importing asset from url=%s: %w", vodImportUrl, err) } - _, transcodeTask, err := vt.lapi.TranscodeAsset(importAsset.ID, assetName, api.StandardProfiles[0]) + _, transcodeTask, err := vt.Lapi.TranscodeAsset(importAsset.ID, assetName, api.StandardProfiles[0]) if err != nil { glog.Errorf("Error transcoding asset assetId=%s err=%v", importAsset.ID, err) return fmt.Errorf("error transcoding asset assetId=%s: %w", importAsset.ID, err) } - err = vt.checkTaskProcessing(taskPollDuration, *transcodeTask) + err = vt.CheckTaskProcessing(taskPollDuration, *transcodeTask) if err != nil { - glog.Errorf("Error in trasncoding task taskId=%s", transcodeTask.ID) + glog.Errorf("Error in transcoding task taskId=%s", transcodeTask.ID) return fmt.Errorf("error in transcoding task taskId=%s: %w", transcodeTask.ID, err) } - exportTask, err := vt.lapi.ExportAsset(importAsset.ID) + exportTask, err := vt.Lapi.ExportAsset(importAsset.ID) if err != nil { glog.Errorf("Error exporting asset assetId=%s err=%v", importAsset.ID, err) return fmt.Errorf("error exporting asset assetId=%s: %w", importAsset.ID, err) } - err = vt.checkTaskProcessing(taskPollDuration, *exportTask) + err = vt.CheckTaskProcessing(taskPollDuration, *exportTask) if err != nil { glog.Errorf("Error in export task taskId=%s", exportTask.ID) @@ -117,26 +112,26 @@ func (vt *vodTester) Start(fileName string, vodImportUrl string, taskPollDuratio }) go func() { <-egCtx.Done() - vt.cancel() + vt.Cancel() }() if err := eg.Wait(); err != nil { - return 1, err + return err } glog.Info("Done VOD Test") - return 0, nil + return nil } func (vt *vodTester) uploadViaUrlTester(vodImportUrl string, taskPollDuration time.Duration, assetName string) (*api.Asset, error) { - importAsset, importTask, err := vt.lapi.UploadViaURL(vodImportUrl, assetName, vt.catalystPipelineStrategy) + importAsset, importTask, err := vt.Lapi.UploadViaURL(vodImportUrl, assetName, vt.CatalystPipelineStrategy) if err != nil { glog.Errorf("Error importing asset err=%v", err) return nil, fmt.Errorf("error importing asset: %w", err) } glog.Infof("Importing asset taskId=%s outputAssetId=%s", importTask.ID, importAsset.ID) - err = vt.checkTaskProcessing(taskPollDuration, *importTask) + err = vt.CheckTaskProcessing(taskPollDuration, *importTask) if err != nil { glog.Errorf("Error processing asset assetId=%s taskId=%s", importAsset.ID, importTask.ID) @@ -147,7 +142,7 @@ func (vt *vodTester) uploadViaUrlTester(vodImportUrl string, taskPollDuration ti func (vt *vodTester) directUploadTester(fileName string, taskPollDuration time.Duration) error { hostName, _ := os.Hostname() assetName := fmt.Sprintf("vod_test_upload_direct_%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00")) - requestUpload, err := vt.lapi.RequestUpload(assetName, vt.catalystPipelineStrategy) + requestUpload, err := vt.Lapi.RequestUpload(assetName, vt.CatalystPipelineStrategy) if err != nil { glog.Errorf("Error requesting upload for assetName=%s err=%v", assetName, err) @@ -169,13 +164,13 @@ func (vt *vodTester) directUploadTester(fileName string, taskPollDuration time.D return fmt.Errorf("error opening file=%s: %w", fileName, err) } - err = vt.lapi.UploadAsset(vt.ctx, uploadEndpoint, file) + err = vt.Lapi.UploadAsset(vt.Ctx, uploadEndpoint, file) if err != nil { glog.Errorf("Error uploading file filePath=%s err=%v", fileName, err) return fmt.Errorf("error uploading for assetId=%s taskId=%s: %w", uploadAsset.ID, uploadTask.ID, err) } - err = vt.checkTaskProcessing(taskPollDuration, uploadTask) + err = vt.CheckTaskProcessing(taskPollDuration, uploadTask) if err != nil { glog.Errorf("Error processing asset assetId=%s taskId=%s", uploadAsset.ID, uploadTask.ID) } @@ -186,14 +181,14 @@ func (vt *vodTester) resumableUploadTester(fileName string, taskPollDuration tim hostName, _ := os.Hostname() assetName := fmt.Sprintf("vod_test_upload_resumable_%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00")) - requestUpload, err := vt.lapi.RequestUpload(assetName, vt.catalystPipelineStrategy) + requestUpload, err := vt.Lapi.RequestUpload(assetName, vt.CatalystPipelineStrategy) if err != nil { glog.Errorf("Error requesting upload for assetName=%s err=%v", assetName, err) return fmt.Errorf("error requesting upload for assetName=%s: %w", assetName, err) } - tusUploadEndpoint := patchURLHost(requestUpload.TusEndpoint, vt.lapi.GetServer()) + tusUploadEndpoint := patchURLHost(requestUpload.TusEndpoint, vt.Lapi.GetServer()) uploadAsset := requestUpload.Asset uploadTask := api.Task{ ID: requestUpload.Task.ID, @@ -206,14 +201,14 @@ func (vt *vodTester) resumableUploadTester(fileName string, taskPollDuration tim return fmt.Errorf("error opening file=%s: %w", fileName, err) } - err = vt.lapi.ResumableUpload(tusUploadEndpoint, file) + err = vt.Lapi.ResumableUpload(tusUploadEndpoint, file) if err != nil { glog.Errorf("Error resumable uploading file filePath=%s err=%v", fileName, err) return fmt.Errorf("error resumable uploading for assetId=%s taskId=%s: %w", uploadAsset.ID, uploadTask.ID, err) } - err = vt.checkTaskProcessing(taskPollDuration, uploadTask) + err = vt.CheckTaskProcessing(taskPollDuration, uploadTask) if err != nil { glog.Errorf("Error processing asset assetId=%s taskId=%s", uploadAsset.ID, uploadTask.ID) @@ -222,34 +217,6 @@ func (vt *vodTester) resumableUploadTester(fileName string, taskPollDuration tim return err } -func (vt *vodTester) checkTaskProcessing(taskPollDuration time.Duration, processingTask api.Task) error { - startTime := time.Now() - for { - time.Sleep(taskPollDuration) - - if err := vt.isCancelled(); err != nil { - return err - } - - // we already sleep before the first check, so no need for strong consistency - task, err := vt.lapi.GetTask(processingTask.ID, false) - if err != nil { - glog.Errorf("Error retrieving task id=%s err=%v", processingTask.ID, err) - return fmt.Errorf("error retrieving task id=%s: %w", processingTask.ID, err) - } - if task.Status.Phase == "completed" { - glog.Infof("Task success, taskId=%s", task.ID) - return nil - } - if task.Status.Phase != "pending" && task.Status.Phase != "running" && task.Status.Phase != "waiting" { - glog.Errorf("Error processing task, taskId=%s status=%s error=%v", task.ID, task.Status.Phase, task.Status.ErrorMessage) - return fmt.Errorf("error processing task, taskId=%s status=%s error=%v", task.ID, task.Status.Phase, task.Status.ErrorMessage) - } - - glog.Infof("Waiting for task to be processed id=%s pollWait=%s elapsed=%s progressPct=%.1f%%", task.ID, taskPollDuration, time.Since(startTime), 100*task.Status.Progress) - } -} - // Patches the target URL with the source URL host, only if the latter is not // contained in the first. Used for doing resumable uploads to the same region // under test. @@ -272,20 +239,3 @@ func patchURLHost(target, src string) string { } return targetURL.String() } - -func (vt *vodTester) isCancelled() error { - select { - case <-vt.ctx.Done(): - return context.Canceled - default: - } - return nil -} - -func (vt *vodTester) Cancel() { - vt.cancel() -} - -func (vt *vodTester) Done() <-chan struct{} { - return vt.ctx.Done() -}