Skip to content

Commit

Permalink
Embed broadcaster in orchtester job (#162)
Browse files Browse the repository at this point in the history
Embed broadcaster into orchtester and create orch-tester Docker image
  • Loading branch information
leszko authored Jun 8, 2022
1 parent f40ba6f commit d426981
Show file tree
Hide file tree
Showing 8 changed files with 1,001 additions and 59 deletions.
31 changes: 31 additions & 0 deletions .github/workflows/orch-tester-dockerimage.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: Orch Tester Docker Image CI
on: push

jobs:
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v1
- name: Docker Registry
env:
DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }}
DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }}
run: docker login -u $DOCKER_USERNAME -p $DOCKER_PASSWORD

- name: Tags
id: tags
uses: livepeer/action-gh-release-tags@v0
with:
always-latest-on-branch: master

- name: Build the Docker image
run: |
TAGS='${{ steps.tags.outputs.tags }}'
docker build . --file Dockerfile.orch-tester $(printf ' -t livepeer/orch-tester:%s' $TAGS) --build-arg "version=${{ steps.tags.outputs.version }}"
- name: Push Docker Container to Registry
run: |
for TAG in ${{ steps.tags.outputs.tags }}; do
docker push livepeer/orch-tester:$TAG
done
4 changes: 3 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ jobs:
run: go get -v -t -d ./...

- name: Trigger test suite
run: go test ./... -v
run: |
# Skip orch-tester, because we not released as a binary
go test $(go list ./... | grep -v 'orch-tester') -v
15 changes: 0 additions & 15 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ RUN wget https://storage.googleapis.com/lp_testharness_assets/bbb_sunflower_1080
RUN wget https://storage.googleapis.com/lp_testharness_assets/official_test_source_2s_keys_24pfs_30s.mp4
RUN wget -qO- https://storage.googleapis.com/lp_testharness_assets/official_test_source_2s_keys_24pfs_30s_hls.tar.gz | tar xvz -C .

# ENV GOFLAGS "-mod=readonly"
ARG version

COPY go.mod go.mod
Expand All @@ -33,11 +32,7 @@ RUN go build -ldflags="-X 'github.com/livepeer/stream-tester/model.Version=$vers
RUN go build -ldflags="-X 'github.com/livepeer/stream-tester/model.Version=$version' -X 'github.com/livepeer/stream-tester/model.IProduction=true'" cmd/mist-api-connector/mist-api-connector.go
RUN go build -ldflags="-X 'github.com/livepeer/stream-tester/model.Version=$version' -X 'github.com/livepeer/stream-tester/model.IProduction=true'" cmd/loadtester/loadtester.go
RUN go build -ldflags="-X 'github.com/livepeer/stream-tester/model.Version=$version' -X 'github.com/livepeer/stream-tester/model.IProduction=true'" cmd/stream-monitor/stream-monitor.go
RUN go build -ldflags="-X 'github.com/livepeer/stream-tester/model.Version=$version' -X 'github.com/livepeer/stream-tester/model.IProduction=true'" cmd/orch-tester/orch_tester.go
RUN go build -ldflags="-X 'github.com/livepeer/stream-tester/model.Version=$version' -X 'github.com/livepeer/stream-tester/model.IProduction=true'" cmd/recordtester/recordtester.go
# RUN ls -a /usr
# RUN find / -name libavformat.so.58


FROM alpine:3.15.4
RUN apk add --no-cache ca-certificates ffmpeg
Expand All @@ -55,14 +50,4 @@ COPY --from=builder /root/testdriver testdriver
COPY --from=builder /root/mist-api-connector mist-api-connector
COPY --from=builder /root/loadtester loadtester
COPY --from=builder /root/stream-monitor stream-monitor
COPY --from=builder /root/orch_tester orch_tester
COPY --from=builder /root/recordtester recordtester
# COPY --from=builder /usr/lib/libavformat.so.58 /usr/lib/libavformat.so.58
# COPY --from=builder /usr/lib/libavutil.so.56 /usr/lib/libavutil.so.56
# COPY --from=builder /usr/lib/libavcodec.so.58 /usr/lib/libavcodec.so.58

# docker build -t livepeer/streamtester:latest .
# docker build -t livepeer/streamtester:latest --build-arg version=$(git describe --dirty) .
# docker push livepeer/streamtester:latest
# docker build -t livepeer/streamtester:test .
# docker push livepeer/streamtester:test
32 changes: 32 additions & 0 deletions Dockerfile.orch-tester
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
FROM livepeerci/build AS builder

RUN apt install -y wget tar

WORKDIR /root
RUN wget -qO- https://storage.googleapis.com/lp_testharness_assets/official_test_source_2s_keys_24pfs_30s_hls.tar.gz | tar xvz -C .

COPY go.mod go.mod
COPY go.sum go.sum

RUN go mod download

ARG version
RUN echo $version

COPY . .

RUN go build -ldflags="-X 'github.com/livepeer/stream-tester/model.Version=$version' -X 'github.com/livepeer/stream-tester/model.IProduction=true'" -tags mainnet cmd/orch-tester/orch_tester.go

FROM debian:stretch-slim

RUN apt update \
&& apt install -y ca-certificates \
&& apt clean && apt autoclean
RUN update-ca-certificates

WORKDIR /root

COPY --from=builder /root/official_test_source_2s_keys_24pfs_30s_hls official_test_source_2s_keys_24pfs_30s_hls
COPY --from=builder /root/orch_tester orch_tester

ENTRYPOINT ["/root/orch_tester"]
158 changes: 136 additions & 22 deletions cmd/orch-tester/orch_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ import (
"errors"
"flag"
"fmt"
"github.com/livepeer/go-livepeer/cmd/livepeer/starter"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/joy4/format"
streamtesterMetrics "github.com/livepeer/stream-tester/internal/metrics"
"github.com/livepeer/stream-tester/internal/server"
"github.com/peterbourgon/ff"
"io/ioutil"
"log"
"math"
Expand All @@ -25,7 +30,6 @@ import (
"github.com/golang/glog"
apiModels "github.com/livepeer/leaderboard-serverless/models"
"github.com/livepeer/m3u8"
streamtesterMetrics "github.com/livepeer/stream-tester/internal/metrics"
"github.com/livepeer/stream-tester/internal/testers"
"github.com/livepeer/stream-tester/model"
streamerModel "github.com/livepeer/stream-tester/model"
Expand All @@ -41,23 +45,47 @@ const streamTesterMistCreds = ""
const prometheusPort = "9090"
const bcastMediaPort = "8935"
const bcastRTMPPort = "1935"
const bcastCliPort = "7935"

const refreshWait = 70 * time.Second
const httpTimeout = 8 * time.Second
const bcastReadyTimeout = 10 * time.Minute

const numSegments = 15

var start time.Time

func init() {
format.RegisterAll()
}

type broadcasterConfig struct {
network *string
ethUrl *string
datadir *string
ethPassword *string
maxTicketEV *string
maxPricePerUnit *int
}

func main() {
flag.Set("logtostderr", "true")

vFlag := flag.Lookup("v")
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
verbosity := flag.String("v", "6", "Log verbosity. {4|5|6}")

region := flag.String("region", "", "Region this service is operating in")
streamTester := flag.String("streamtester", "", "Address for stream-tester server instance")
broadcaster := flag.String("broadcaster", "127.0.0.1", "Broadcaster address")
broadcaster := flag.String("broadcaster", "", "Broadcaster address")
metrics := flag.String("metrics", "127.0.0.1"+":"+prometheusPort, "Broadcaster metrics port")
media := flag.String("media", bcastMediaPort, "Broadcaster HTTP port")
rtmp := flag.String("rtmp", bcastRTMPPort, "broadcaster RTMP port")
leaderboard := flag.String("leaderboard", "127.0.0.1:3001", "HTTP Address of the serverless leadearboard API")
leaderboardSecret := flag.String("leaderboard-secret", "", "Secret for the Leaderboard API")

subgraph := flag.String("subgraph", "https://api.thegraph.com/subgraphs/name/livepeer/livepeer-canary", "Livepeer subgraph URL")

// Video config
videoFile := flag.String("video", "official_test_source_2s_keys_24pfs_30s.mp4", "video file to use, has to be present in stream-tester root")
numProfiles := flag.Int("profiles", 3, "number of video profiles to use on the broadcaster")
Expand All @@ -70,7 +98,23 @@ func main() {
gsBucket := flag.String("gsbucket", "", "Google storage bucket to store segments")
gsKey := flag.String("gskey", "", "Google Storage private key (in json format)")

flag.Parse()
// Embedded Broadcaster config
var bCfg broadcasterConfig
bCfg.network = flag.String("network", "arbitrum-one-rinkeby", "Network to connect to")
bCfg.ethUrl = flag.String("ethUrl", "https://rinkeby.arbitrum.io/rpc", "Ethereum node JSON-RPC URL")
bCfg.datadir = flag.String("datadir", "", "Directory that data is stored in")
bCfg.ethPassword = flag.String("ethPassword", "", "Password for existing Eth account address")
bCfg.maxTicketEV = flag.String("maxTicketEV", "3000000000000", "The maximum acceptable expected value for PM tickets")
bCfg.maxPricePerUnit = flag.Int("maxPricePerUnit", 0, "The maximum transcoding price (in wei) per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price")

// Config file
_ = flag.String("config", "", "Config file in the format 'key value', flags and env vars take precedence over the config file")
err := ff.Parse(flag.CommandLine, os.Args[1:],
ff.WithConfigFileFlag("config"),
ff.WithEnvVarPrefix("OT"),
ff.WithConfigFileParser(ff.PlainParser),
)
vFlag.Value.Set(*verbosity)

if *region == "" {
log.Fatal("region is required")
Expand All @@ -80,14 +124,15 @@ func main() {
defer cancel()

if *streamTester == "" {
glog.Info("Starting embedded streamtester service")
hostName, _ := os.Hostname()
streamtesterMetrics.InitCensus(hostName, model.Version, "streamtester")
s := server.NewStreamerServer(false, streamTesterLapiToken, streamTesterMistCreds, 4242)
go func() {
addr := fmt.Sprintf("%s:%s", "0.0.0.0", streamTesterPort)
s.StartWebServer(ctx, addr)
}()
startEmbeddedStreamTester(ctx)
}

var bcastHost string
if *streamTester == "" && *broadcaster == "" {
bcastHost = defaultHost
startEmbeddedBroadcaster(ctx, bCfg, *presets)
} else {
bcastHost = *broadcaster
}

metricsURL := defaultAddr(*metrics, defaultHost, prometheusPort)
Expand Down Expand Up @@ -120,16 +165,18 @@ func main() {
testers.Bucket = *gsBucket
testers.CredsJSON = *gsKey

refreshWait := 70 * time.Second

var summary statsSummary
start = time.Now()

glog.Infof("Waiting for broadcaster to be ready")
waitUntilBroadcasterIsReady(ctx, bcastHost)

glog.Infof("Starting to test orchestrators")
for _, o := range orchestrators {
time.Sleep(refreshWait)

req := &streamerModel.StartStreamsReq{
Host: *broadcaster,
Host: bcastHost,
RTMP: uint16(rtmpUint),
Media: uint16(mediaUint),
Repeat: uint(*repeat),
Expand Down Expand Up @@ -240,14 +287,6 @@ func main() {
summary.log()
}

func validateURL(addr string) (string, error) {
url, err := url.ParseRequestURI(addr)
if err != nil {
return "", err
}
return url.String(), nil
}

func defaultAddr(addr, defaultHost, defaultPort string) string {
if addr == "" {
addr = defaultHost + ":" + defaultPort
Expand All @@ -266,6 +305,69 @@ func defaultAddr(addr, defaultHost, defaultPort string) string {
return addr
}

func startEmbeddedStreamTester(ctx context.Context) {
glog.Info("Starting embedded streamtester service")
hostName, _ := os.Hostname()
streamtesterMetrics.InitCensus(hostName, model.Version, "streamtester")
s := server.NewStreamerServer(false, streamTesterLapiToken, streamTesterMistCreds, 4242)
go func() {
addr := fmt.Sprintf("%s:%s", "0.0.0.0", streamTesterPort)
s.StartWebServer(ctx, addr)
}()
}

func startEmbeddedBroadcaster(ctx context.Context, bCfg broadcasterConfig, presets string) {
glog.Info("Starting embedded broadcaster service")

// Increase Broadcaster timeouts
common.SegUploadTimeoutMultiplier = 4.0
common.SegmentUploadTimeout = 8 * time.Second
common.HTTPDialTimeout = 8 * time.Second
common.SegHttpPushTimeoutMultiplier = 4.0

// Disable caching for Orchestrator Discovery Webhook
common.WebhookDiscoveryRefreshInterval = 0

// Start broadcaster
cfg := starter.DefaultLivepeerConfig()
cfg.Network = bCfg.network
cfg.MaxSessions = intPointer(200)
cfg.OrchWebhookURL = stringPointer(fmt.Sprintf("http://%s:%s/orchestrators", defaultHost, streamTesterPort))
cfg.EthUrl = bCfg.ethUrl
cfg.Datadir = bCfg.datadir
cfg.Monitor = boolPointer(true)
cfg.EthPassword = bCfg.ethPassword
cfg.LocalVerify = boolPointer(false)
cfg.HttpIngest = boolPointer(true)
cfg.TranscodingOptions = &presets
cfg.MaxTicketEV = bCfg.maxTicketEV
cfg.MaxPricePerUnit = bCfg.maxPricePerUnit
cfg.CliAddr = stringPointer(fmt.Sprintf("0.0.0.0:%s", bcastCliPort))
cfg.Broadcaster = boolPointer(true)
go starter.StartLivepeer(ctx, cfg)
}

func waitUntilBroadcasterIsReady(ctx context.Context, bcastHost string) {
rCtx, _ := context.WithTimeout(ctx, bcastReadyTimeout)
statusEndpoint := fmt.Sprintf("http://%s:%s/status", bcastHost, bcastCliPort)
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-rCtx.Done():
glog.Error("Waiting for broadcaster timed out")
return
case <-ticker.C:
resp, err := http.Get(statusEndpoint)
if err == nil {
resp.Body.Close()
if resp.StatusCode == 200 {
return
}
}
}
}
}

type streamerClient struct {
streamer string
leaderboardAddr string
Expand Down Expand Up @@ -850,3 +952,15 @@ func (s *statsSummary) log() {
glog.Warning("Low number of orchestrators which passed the sanity check, please make sure that the orch-tester job is configured correctly")
}
}

func boolPointer(b bool) *bool {
return &b
}

func intPointer(i int) *int {
return &i
}

func stringPointer(s string) *string {
return &s
}
10 changes: 3 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,17 @@ require (
cloud.google.com/go/storage v1.15.0
contrib.go.opencensus.io/exporter/prometheus v0.1.0
github.com/Azure/azure-storage-blob-go v0.8.0
github.com/Azure/go-autorest/autorest/adal v0.9.5 // indirect
github.com/Necroforger/dgrouter v0.0.0-20190528143456-040421b5a83e
github.com/PagerDuty/go-pagerduty v1.3.0
github.com/bwmarrin/discordgo v0.20.2
github.com/golang/glog v0.0.0-20210429001901-424d2337a529
github.com/gorilla/websocket v1.4.2 // indirect
github.com/gosuri/uilive v0.0.3 // indirect
github.com/gosuri/uiprogress v0.0.1
github.com/livepeer/go-livepeer v0.5.31
github.com/livepeer/joy4 v0.1.2-0.20210601043311-c1b885884cc7
github.com/livepeer/leaderboard-serverless v1.0.0
github.com/livepeer/livepeer-data v0.4.11
github.com/livepeer/m3u8 v0.11.1
github.com/mattn/go-isatty v0.0.8 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/peterbourgon/ff v1.7.0
github.com/peterbourgon/ff/v2 v2.0.0
Expand All @@ -29,14 +27,12 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0-rc.0
go.opencensus.io v0.23.0
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420
golang.org/x/text v0.3.6
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2
golang.org/x/text v0.3.7
google.golang.org/api v0.46.0
google.golang.org/grpc v1.38.0
)

exclude github.com/gosuri/uilive v0.0.4 // cause memory corruption

// replace github.com/livepeer/joy4 => /Users/dark/projects/livepeer/joy4

replace github.com/ethereum/go-ethereum => github.com/ethereum/go-ethereum v1.9.3
Loading

0 comments on commit d426981

Please sign in to comment.