Skip to content

Commit

Permalink
Add Transcode API (#238)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored Jan 11, 2023
1 parent 145c23f commit 294ba0e
Show file tree
Hide file tree
Showing 11 changed files with 581 additions and 217 deletions.
34 changes: 27 additions & 7 deletions cmd/recordtester/recordtester.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (

"github.com/golang/glog"
serfClient "github.com/hashicorp/serf/client"
api "github.com/livepeer/go-api-client"
"github.com/livepeer/go-api-client"
"github.com/livepeer/joy4/format"
"github.com/livepeer/livepeer-data/pkg/client"
"github.com/livepeer/stream-tester/internal/app/common"
"github.com/livepeer/stream-tester/internal/app/recordtester"
"github.com/livepeer/stream-tester/internal/app/transcodetester"
"github.com/livepeer/stream-tester/internal/app/vodtester"
"github.com/livepeer/stream-tester/internal/metrics"
"github.com/livepeer/stream-tester/internal/server"
Expand Down Expand Up @@ -79,6 +81,8 @@ func main() {
testStreamHealth := fs.Bool("stream-health", false, "Check stream health during test")
testLive := fs.Bool("live", false, "Check Live workflow")
testVod := fs.Bool("vod", false, "Check VOD workflow")
transcodeBucketUrl := fs.String("transcode-bucket-url", "", "Object Store URL to test Transcode API in the format 's3+http(s)://<access-key-id>:<secret-access-key>@<endpoint>/<bucket>'")
testTranscode := fs.Bool("transcode", false, "Check Transcode API workflow")
catalystPipelineStrategy := fs.String("catalyst-pipeline-strategy", "", "Which catalyst pipeline strategy to use regarding. The appropriate values are defined by catalyst-api itself.")
recordObjectStoreId := fs.String("record-object-store-id", "", "ID for the Object Store to use for recording storage. Forwarded to the streams created in the API")
discordURL := fs.String("discord-url", "", "URL of Discord's webhook to send messages to Discord channel")
Expand Down Expand Up @@ -256,10 +260,6 @@ func main() {
TestMP4: *testMP4,
TestStreamHealth: *testStreamHealth,
}
vtOpts := vodtester.VodTesterOptions{
API: lapi,
CatalystPipelineStrategy: *catalystPipelineStrategy,
}
if *sim > 1 {
var testers []recordtester.IRecordTester
var eses []int
Expand Down Expand Up @@ -317,17 +317,37 @@ func main() {
})
}
if *testVod {
vtOpts := common.TesterOptions{
API: lapi,
CatalystPipelineStrategy: *catalystPipelineStrategy,
}
eg.Go(func() error {
cvtOpts := vodtester.ContinuousVodTesterOptions{
cvtOpts := common.ContinuousTesterOptions{
PagerDutyIntegrationKey: *pagerDutyIntegrationKey,
PagerDutyComponent: *pagerDutyComponent,
PagerDutyLowUrgency: *pagerDutyLowUrgency,
VodTesterOptions: vtOpts,
TesterOptions: vtOpts,
}
cvt := vodtester.NewContinuousVodTester(egCtx, cvtOpts)
return cvt.Start(fileName, *vodImportUrl, *testDuration, *taskPollDuration, *continuousTest)
})
}
if *testTranscode {
ttOpts := common.TesterOptions{
API: lapi,
CatalystPipelineStrategy: *catalystPipelineStrategy,
}
eg.Go(func() error {
cttOpts := common.ContinuousTesterOptions{
PagerDutyIntegrationKey: *pagerDutyIntegrationKey,
PagerDutyComponent: *pagerDutyComponent,
PagerDutyLowUrgency: *pagerDutyLowUrgency,
TesterOptions: ttOpts,
}
ctt := transcodetester.NewContinuousTranscodeTester(egCtx, cttOpts)
return ctt.Start(*fileArg, *transcodeBucketUrl, *testDuration, *taskPollDuration, *continuousTest)
})
}
if err := eg.Wait(); err != nil {
glog.Warningf("Continuous test ended with err=%v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/golang/glog v1.0.0
github.com/gosuri/uilive v0.0.3 // indirect
github.com/gosuri/uiprogress v0.0.1
github.com/livepeer/go-api-client v0.4.1-0.20221207101406-c3675c55eed5
github.com/livepeer/go-api-client v0.4.2-0.20230105141727-2a1044f1eb2e
github.com/livepeer/go-livepeer v0.5.31
github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07
github.com/livepeer/leaderboard-serverless v1.0.0
Expand All @@ -39,7 +39,7 @@ require (
cloud.google.com/go v0.81.0 // indirect
github.com/Azure/azure-pipeline-go v0.2.2 // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/aws/aws-sdk-go v1.34.28 // indirect
github.com/aws/aws-sdk-go v1.34.28
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd v0.22.0-beta // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sL
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
Expand Down Expand Up @@ -726,8 +727,8 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/livepeer/go-api-client v0.4.1-0.20221207101406-c3675c55eed5 h1:sxyLVN5lD4JB5THu2+48BbhNTifJK67YvW8DyNuPBJI=
github.com/livepeer/go-api-client v0.4.1-0.20221207101406-c3675c55eed5/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-api-client v0.4.2-0.20230105141727-2a1044f1eb2e h1:R9agI6FLJVOmX7YPR7vA0Rp9ONpDzJbDYSgpNwRHJiQ=
github.com/livepeer/go-api-client v0.4.2-0.20230105141727-2a1044f1eb2e/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-livepeer v0.5.31 h1:LcN+qDnqWRws7fdVYc4ucZPVcLQRs2tehUYCQVnlnRw=
github.com/livepeer/go-livepeer v0.5.31/go.mod h1:cpBikcGWApkx0cyR0Ht+uAym7j3uAwXGpPbvaOA8XUU=
github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw=
Expand Down Expand Up @@ -885,6 +886,7 @@ github.com/opencontainers/selinux v1.8.2/go.mod h1:MUIHuUEvKB1wtJjQdOyYRgOnLD2xA
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c h1:Lgl0gzECD8GnQ5QCWA8o6BtfL6mDH5rQgM4/fX3avOs=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
Expand Down Expand Up @@ -1403,7 +1405,6 @@ golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211109184856-51b60fd695b3/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 h1:OH54vjqzRWmbJ62fjuhxy7AxFFgoHN0/DPc/UrL8cAs=
golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
144 changes: 144 additions & 0 deletions internal/app/common/continuous_tester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package common

import (
"context"
"fmt"
"net/url"
"time"

"github.com/PagerDuty/go-pagerduty"

"github.com/golang/glog"
"github.com/livepeer/stream-tester/messenger"
)

type (
IContinuousTester interface {
// Start test. Blocks until error.
Start(start func(ctx context.Context) error, testDuration, pauseBetweenTests time.Duration) error
Cancel()
Done() <-chan struct{}
}

ContinuousTesterOptions struct {
PagerDutyIntegrationKey string
PagerDutyComponent string
PagerDutyLowUrgency bool
TesterOptions
}

continuousTester struct {
ctx context.Context
cancel context.CancelFunc
host string // API host being tested
pagerDutyIntegrationKey string
pagerDutyComponent string
pagerDutyLowUrgency bool
name string
}
)

func NewContinuousTester(gctx context.Context, opts ContinuousTesterOptions, testerName string) IContinuousTester {
ctx, cancel := context.WithCancel(gctx)
server := opts.API.GetServer()
u, _ := url.Parse(server)
ct := &continuousTester{
ctx: ctx,
cancel: cancel,
host: u.Host,
pagerDutyIntegrationKey: opts.PagerDutyIntegrationKey,
pagerDutyComponent: opts.PagerDutyComponent,
pagerDutyLowUrgency: opts.PagerDutyLowUrgency,
name: testerName,
}
return ct
}

func (ct *continuousTester) Start(start func(ctx context.Context) error, testDuration, pauseBetweenTests time.Duration) error {
messenger.SendMessage(fmt.Sprintf("Starting continuous %s test of %s", ct.name, ct.host))
for {
msg := fmt.Sprintf(":arrow_right: Starting %s %s test to %s", testDuration, ct.name, ct.host)
messenger.SendMessage(msg)

ctx, cancel := context.WithTimeout(ct.ctx, testDuration)
err := start(ctx)
ctxErr := ctx.Err()
cancel()

if ct.ctx.Err() != nil {
messenger.SendMessage(fmt.Sprintf("Continuous test of %s on %s cancelled", ct.name, ct.host))
return ct.ctx.Err()
} else if ctxErr != nil {
msg := fmt.Sprintf("Test of %s on %s timed out, potential deadlock! ctxErr=%q err=%q", ct.name, ct.host, ctxErr, err)
messenger.SendFatalMessage(msg)
} else if err != nil {
msg := fmt.Sprintf(":rotating_light: Test of %s on %s ended with err=%v", ct.name, ct.host, err)
messenger.SendFatalMessage(msg)
glog.Warning(msg)
ct.sendPagerdutyEvent(err)
} else {
msg := fmt.Sprintf(":white_check_mark: Test of %s on %s succeeded", ct.name, ct.host)
messenger.SendMessage(msg)
glog.Info(msg)
ct.sendPagerdutyEvent(nil)
}
glog.Infof("Waiting %s before next test of %s", pauseBetweenTests, ct.name)
select {
case <-ct.ctx.Done():
messenger.SendMessage(fmt.Sprintf("Continuous test of %s on %s cancelled", ct.name, ct.host))
return err
case <-time.After(pauseBetweenTests):
}
}
return nil
}

func (ct *continuousTester) sendPagerdutyEvent(err error) {
if ct.pagerDutyIntegrationKey == "" {
return
}
severity, lopriPrefix, dedupKey := "error", "", fmt.Sprintf("cont-%s-tester:%s", ct.name, ct.host)
if ct.pagerDutyLowUrgency {
severity, lopriPrefix = "warning", "[LOPRI] "
dedupKey = "lopri-" + dedupKey
}
event := pagerduty.V2Event{
RoutingKey: ct.pagerDutyIntegrationKey,
Action: "trigger",
DedupKey: dedupKey,
}
if err == nil {
event.Action = "resolve"
_, err := pagerduty.ManageEvent(event)
if err != nil {
messenger.SendMessage(fmt.Sprintf("Error resolving PagerDuty event: %v", err))
}
return
}
summary := fmt.Sprintf("%s:vhs: %s %s for `%s` error: %v", lopriPrefix, ct.name, ct.pagerDutyComponent, ct.host, err)
if len(summary) > 1024 {
summary = summary[:1021] + "..."
}
event.Payload = &pagerduty.V2Payload{
Source: ct.host,
Component: ct.pagerDutyComponent,
Severity: severity,
Summary: summary,
Timestamp: time.Now().UTC().Format(time.RFC3339),
}
resp, err := pagerduty.ManageEvent(event)
if err != nil {
glog.Error(fmt.Errorf("PAGERDUTY Error: %w", err))
messenger.SendFatalMessage(fmt.Sprintf("Error creating PagerDuty event: %v", err))
} else {
glog.Infof("Incident status: %s message: %s", resp.Status, resp.Message)
}
}

func (ct *continuousTester) Cancel() {
ct.cancel()
}

func (ct *continuousTester) Done() <-chan struct{} {
return ct.ctx.Done()
}
68 changes: 68 additions & 0 deletions internal/app/common/tester_app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package common

import (
"context"
"fmt"
"github.com/golang/glog"
"github.com/livepeer/go-api-client"
"time"
)

type (
TesterApp struct {
Ctx context.Context
CancelFunc context.CancelFunc
CatalystPipelineStrategy string
Lapi *api.Client
}

TesterOptions struct {
API *api.Client
CatalystPipelineStrategy string
}
)

func (ta *TesterApp) CheckTaskProcessing(taskPollDuration time.Duration, processingTask api.Task) error {
startTime := time.Now()
for {
time.Sleep(taskPollDuration)

if err := ta.isCancelled(); err != nil {
return err
}

// we already sleep before the first check, so no need for strong consistency
task, err := ta.Lapi.GetTask(processingTask.ID, false)
if err != nil {
glog.Errorf("Error retrieving task id=%s err=%v", processingTask.ID, err)
return fmt.Errorf("error retrieving task id=%s: %w", processingTask.ID, err)
}
if task.Status.Phase == "completed" {
glog.Infof("Task success, taskId=%s", task.ID)
return nil
}
if task.Status.Phase != "pending" && task.Status.Phase != "running" && task.Status.Phase != "waiting" {
glog.Errorf("Error processing task, taskId=%s status=%s error=%v", task.ID, task.Status.Phase, task.Status.ErrorMessage)
return fmt.Errorf("error processing task, taskId=%s status=%s error=%v", task.ID, task.Status.Phase, task.Status.ErrorMessage)
}

glog.Infof("Waiting for task to be processed id=%s pollWait=%s elapsed=%s progressPct=%.1f%%", task.ID, taskPollDuration, time.Since(startTime), 100*task.Status.Progress)
}
}

func (ta *TesterApp) isCancelled() error {
select {
case <-ta.Ctx.Done():
return context.Canceled
default:
}
return nil
}

func (ta *TesterApp) Cancel() {
ta.CancelFunc()
}

func (ta *TesterApp) Done() <-chan struct{} {
return ta.Ctx.Done()
}
2 changes: 1 addition & 1 deletion internal/app/recordtester/continuous_record_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
type (
// IContinuousRecordTester ...
IContinuousRecordTester interface {
// Start start test. Blocks until error.
// Start test. Blocks until error.
Start(fileName string, testDuration, pauseDuration, pauseBetweenTests time.Duration) error
Cancel()
Done() <-chan struct{}
Expand Down
2 changes: 1 addition & 1 deletion internal/app/recordtester/recordtester_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
type (
// IRecordTester ...
IRecordTester interface {
// Start start test. Blocks until finished.
// Start test. Blocks until finished.
Start(fileName string, testDuration, pauseDuration time.Duration) (int, error)
Cancel()
Done() <-chan struct{}
Expand Down
35 changes: 35 additions & 0 deletions internal/app/transcodetester/continuous_transcode_tester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package transcodetester

import (
"context"
"time"

"github.com/livepeer/stream-tester/internal/app/common"
)

type (
IContinuousTranscodeTester interface {
// Start test. Blocks until error.
Start(fileName string, transcodeBucketUrl string, testDuration, taskPollDuration, pauseBetweenTests time.Duration) error
}

continuousTranscodeTester struct {
ct common.IContinuousTester
opts common.TesterOptions
}
)

func NewContinuousTranscodeTester(gctx context.Context, opts common.ContinuousTesterOptions) IContinuousTranscodeTester {
return &continuousTranscodeTester{
ct: common.NewContinuousTester(gctx, opts, "transcode"),
opts: opts.TesterOptions,
}
}

func (ctt *continuousTranscodeTester) Start(fileName string, transcodeBucketUrl string, testDuration, taskPollDuration, pauseBetweenTests time.Duration) error {
start := func(ctx context.Context) error {
tt := NewTranscodeTester(ctx, ctt.opts)
return tt.Start(fileName, transcodeBucketUrl, taskPollDuration)
}
return ctt.ct.Start(start, testDuration, pauseBetweenTests)
}
Loading

0 comments on commit 294ba0e

Please sign in to comment.