diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 8197a4f4..d55eddd8 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -149,7 +149,7 @@ jobs: uses: livepeer/action-gh-checksum-and-gpg-sign@latest with: artifacts-dir: releases - release-name: ${{ github.ref_type == 'tag' && github.ref_name || github.sha }} + release-name: ${{ (github.ref_type == 'tag' && github.ref_name) || github.event.pull_request.head.sha || github.sha }} gpg-key: ${{ secrets.CI_GPG_SIGNING_KEY }} gpg-key-passphrase: ${{ secrets.CI_GPG_SIGNING_PASSPHRASE }} @@ -171,7 +171,7 @@ jobs: uses: google-github-actions/upload-cloud-storage@v1 with: path: releases - destination: "build.livepeer.live/${{ matrix.project }}/${{ (github.ref_type == 'tag' && github.ref_name) || github.sha }}" + destination: "build.livepeer.live/${{ matrix.project }}/${{ (github.ref_type == 'tag' && github.ref_name) || github.event.pull_request.head.sha || github.sha }}" parent: false - name: Upload branch manifest file diff --git a/cmd/mist-api-connector/mist-api-connector.go b/cmd/mist-api-connector/mist-api-connector.go index ac076f8c..52632bac 100644 --- a/cmd/mist-api-connector/mist-api-connector.go +++ b/cmd/mist-api-connector/mist-api-connector.go @@ -11,8 +11,8 @@ import ( "time" "github.com/golang/glog" + "github.com/livepeer/go-api-client" "github.com/livepeer/livepeer-data/pkg/mistconnector" - "github.com/livepeer/stream-tester/apis/livepeer" mistapi "github.com/livepeer/stream-tester/apis/mist" "github.com/livepeer/stream-tester/internal/app/mistapiconnector" "github.com/livepeer/stream-tester/internal/metrics" @@ -44,18 +44,20 @@ func main() { noMistScrapeMetrics := fs.Bool("no-mist-scrape-metrics", false, "Scrape statistics from MistServer and publish to RabbitMQ") sendAudio := fs.String("send-audio", "record", "when should we send audio? {always|never|record}") apiToken := fs.String("api-token", "", "Token of the Livepeer API to be used by the Mist server") - apiServer := fs.String("api-server", livepeer.ACServer, "Livepeer API server to use") + apiServer := fs.String("api-server", api.ProdServer, "Livepeer API server to use") routePrefix := fs.String("route-prefix", "", "Prefix to be prepended to all created routes e.g. 'nyc-'") playbackDomain := fs.String("playback-domain", "", "regex of domain to create routes for (ex: playback.livepeer.live)") mistURL := fs.String("route-mist-url", "", "external URL of this Mist instance (used for routing) (ex: https://mist-server-0.livepeer.live)") baseStreamName := fs.String("base-stream-name", "", "Base stream name to be used in wildcard-based routing scheme") - fEtcdEndpoints := fs.String("etcd-endpoints", "", "Comma-separated list of ETCD endpoints") - etcdCaCert := fs.String("etcd-cacert", "", "ETCD CA file name") - etcdCert := fs.String("etcd-cert", "", "ETCD client certificate file name") - etcdKey := fs.String("etcd-key", "", "ETCD client certificate key file name") amqpUrl := fs.String("amqp-url", "", "RabbitMQ url") ownRegion := fs.String("own-region", "", "Identifier of the region where the service is running, used for mapping external data back to current region") _ = fs.String("config", "", "config file (optional)") + // Below are some deprecated flags. + // Keep them around for backward compatibility on deploys. + _ = fs.String("etcd-endpoints", "", "DEPRECATED") + _ = fs.String("etcd-cacert", "", "DEPRECATED") + _ = fs.String("etcd-cert", "", "DEPRECATED") + _ = fs.String("etcd-key", "", "DEPRECATED") consulPrefix := fs.String("consul-prefix", "", "DEPRECATED - use --route-prefix") consulMistURL := fs.String("consul-mist-url", "", "DEPRECATED - use --route-mist-url") @@ -92,16 +94,14 @@ func main() { if len(mcreds) != 2 { glog.Fatal("Mist server's credentials should be in form 'login:password'") } - lapi := livepeer.NewLivepeer(*apiToken, *apiServer, nil) - lapi.Init() + lapi, _ := api.NewAPIClientGeolocated(api.ClientOptions{ + Server: *apiServer, + AccessToken: *apiToken, + }) mapi = mistapi.NewMist(*mistHost, mcreds[0], mcreds[1], *apiToken, *mistPort) ensureLoggedIn(mapi, *mistConnectTimeout) metrics.InitCensus(hostName, model.Version, "mistconnector") - var etcdEndpoints []string - if len(*fEtcdEndpoints) > 0 { - etcdEndpoints = strings.Split(*fEtcdEndpoints, ",") - } opts := mistapiconnector.MacOptions{ NodeID: hostName, @@ -115,10 +115,6 @@ func main() { BaseStreamName: *baseStreamName, CheckBandwidth: false, SendAudio: *sendAudio, - EtcdEndpoints: etcdEndpoints, - EtcdCaCert: *etcdCaCert, - EtcdCert: *etcdCert, - EtcdKey: *etcdKey, AMQPUrl: *amqpUrl, OwnRegion: *ownRegion, MistStreamSource: *mistStreamSource, diff --git a/go.mod b/go.mod index 405f6a14..0c9eed77 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/golang/glog v1.0.0 github.com/gosuri/uilive v0.0.3 // indirect github.com/gosuri/uiprogress v0.0.1 - github.com/livepeer/go-api-client v0.4.2-0.20230123220918-f572338de3f0 + github.com/livepeer/go-api-client v0.4.2 github.com/livepeer/go-livepeer v0.5.31 github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07 github.com/livepeer/leaderboard-serverless v1.0.0 @@ -29,10 +29,10 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.5.7 go.etcd.io/etcd/client/v3 v3.5.0-rc.0 go.opencensus.io v0.23.0 - golang.org/x/net v0.0.0-20220607020251-c690dde0001d + golang.org/x/net v0.0.0-20220225172249-27dd8689420f golang.org/x/text v0.3.7 google.golang.org/api v0.46.0 - google.golang.org/grpc v1.38.0 + google.golang.org/grpc v1.38.0 // indirect ) require ( @@ -46,8 +46,8 @@ require ( github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/deckarep/golang-set v1.8.0 // indirect - github.com/ethereum/go-ethereum v1.10.22 // indirect + github.com/deckarep/golang-set v1.7.1 // indirect + github.com/ethereum/go-ethereum v1.10.14 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-stack/stack v1.8.0 // indirect @@ -83,7 +83,7 @@ require ( github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.2.0 // indirect - github.com/stretchr/testify v1.7.2 // indirect + github.com/stretchr/testify v1.7.0 // indirect github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/numcpus v0.2.2 // indirect go.etcd.io/etcd/api/v3 v3.5.0 // indirect @@ -93,18 +93,18 @@ require ( go.uber.org/zap v1.17.0 // indirect golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 // indirect golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect - golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57 // indirect + golang.org/x/mod v0.5.1 // indirect golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect golang.org/x/tools v0.1.9 // indirect - golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect google.golang.org/protobuf v1.28.1 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect howett.net/plist v1.0.0 // indirect ) @@ -116,8 +116,7 @@ require ( require ( github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect - github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect - github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect + github.com/btcsuite/btcd v0.22.0-beta // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/google/btree v1.0.0 // indirect diff --git a/go.sum b/go.sum index 29ffa7f2..15472e6f 100644 --- a/go.sum +++ b/go.sum @@ -53,9 +53,6 @@ github.com/Azure/azure-pipeline-go v0.2.1/go.mod h1:UGSo8XybXnIGZ3epmeBw7Jdz+HiU github.com/Azure/azure-pipeline-go v0.2.2 h1:6oiIS9yaG6XCCzhgAgKFfIWyo4LLCiDhZot6ltoThhY= github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1/go.mod h1:fBF9PQNqB8scdgpZ3ufzaLntG0AG7C1WjPMsiFOmfHM= -github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I= -github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0/go.mod h1:tPaiy8S5bQ+S5sOiDlINkp7+Ef339+Nz5L5XO+cnOHo= github.com/Azure/azure-storage-blob-go v0.7.0/go.mod h1:f9YQKtsG1nMisotuTPpO0tjNuEjKRYAcJU8/ydDI++4= github.com/Azure/azure-storage-blob-go v0.8.0 h1:53qhf0Oxa0nOjgbDeeYPUeyiNmafAFEY95rZLK0Tj6o= github.com/Azure/azure-storage-blob-go v0.8.0/go.mod h1:lPI3aLPpuLTeUwh1sViKXFxwl2B6teiRqI0deQUvsw0= @@ -86,7 +83,6 @@ github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbt github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/Flaque/filet v0.0.0-20201012163910-45f684403088/go.mod h1:TK+jB3mBs+8ZMWhU5BqZKnZWJ1MrLo8etNVg51ueTBo= @@ -176,10 +172,6 @@ github.com/btcsuite/btcd v0.0.0-20171128150713-2e60448ffcc6/go.mod h1:Dmm/EzmjnC github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= github.com/btcsuite/btcd v0.22.0-beta h1:LTDpDKUM5EeOFBPM8IXpinEcmZ6FWfNZbE3lfrfdnWo= github.com/btcsuite/btcd v0.22.0-beta/go.mod h1:9n5ntfhhHQBIhUvlhDvD3Qg6fRUj4jkN0VB8L8svzOA= -github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k= -github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU= -github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= -github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce/go.mod h1:0DVlHczLPewLcPGEIeUEzfOJhqGPQ0mJJRDBtD307+o= @@ -307,8 +299,6 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= -github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= -github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -324,13 +314,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/deckarep/golang-set v0.0.0-20180603214616-504e848d77ea/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ= +github.com/deckarep/golang-set v1.7.1 h1:SCQV0S6gTtp6itiFrTqI+pfmJ4LN85S1YzhDf9rTHJQ= github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ= -github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsPppp4= -github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo= -github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0= -github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= -github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc= -github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= github.com/deepmap/oapi-codegen v1.6.0/go.mod h1:ryDa9AgbELGeB+YEXE1dR53yAjHwFvE9iAUlWl9Al3M= github.com/deepmap/oapi-codegen v1.8.2/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRktEqrX88CvjIw= @@ -342,13 +327,10 @@ github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8 github.com/dlclark/regexp2 v1.2.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= github.com/dnaeon/go-vcr v1.0.1/go.mod h1:aBB1+wY4s93YsC3HHjMBMrwTj2R9FHDzUr9KyGc8n1E= -github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko= -github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= github.com/docker/distribution v0.0.0-20190905152932-14b96e55d84c/go.mod h1:0+TTO4EOBfRPhZXAeF1Vu+W3hHZ8eLp8PgKVZlcvtFY= github.com/docker/distribution v2.7.1-0.20190205005809-0d3efadf0154+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= github.com/docker/docker v1.4.2-0.20180625184442-8e610b2b55bf/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= -github.com/docker/docker v1.6.2/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/docker v20.10.11+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-events v0.0.0-20170721190031-9461782956ad/go.mod h1:Uw6UezgYA44ePAFQYUehOuCzmy5zmg/+nl2ZfMWGkpA= @@ -361,7 +343,6 @@ github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZ github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dop251/goja v0.0.0-20200721192441-a695b0cdd498/go.mod h1:Mw6PkjjMXWbTj+nnj4s3QPXq1jaT0s5pC0iFD4+BOAA= github.com/dop251/goja v0.0.0-20211011172007-d99e4b8cbf48/go.mod h1:R9ET47fwRVRPZnOGvHxxhuZcbrMCuiqOz3Rlrh4KSnk= -github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf/go.mod h1:R9ET47fwRVRPZnOGvHxxhuZcbrMCuiqOz3Rlrh4KSnk= github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y= github.com/dsnet/golib/memfile v1.0.0 h1:J9pUspY2bDCbF9o+YGwcf3uG6MdyITfh/Fk3/CaEiFs= github.com/dsnet/golib/memfile v1.0.0/go.mod h1:tXGNW9q3RwvWt1VV2qrRKlSSz0npnh12yftCSCy2T64= @@ -383,9 +364,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/ethereum/go-ethereum v1.9.24/go.mod h1:JIfVb6esrqALTExdz9hRYvrP0xBDf6wCncIu1hNwHpM= +github.com/ethereum/go-ethereum v1.10.14 h1:EJ/ucQzFlgKgwblIwU8R6ABnZ9kgUnIG2+Q1tiSrt4M= github.com/ethereum/go-ethereum v1.10.14/go.mod h1:W3yfrFyL9C1pHcwY5hmRHVDaorTiQxhYBkKyu5mEDHw= -github.com/ethereum/go-ethereum v1.10.22 h1:HbEgsDo1YTGIf4KB/NNpn+XH+PiNJXUZ9ksRxiqWyMc= -github.com/ethereum/go-ethereum v1.10.22/go.mod h1:EYFyF19u3ezGLD4RqOkLq+ZCXzYbLoNDdZlMt7kyKFg= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/eventials/go-tus v0.0.0-20220610120217-05d0564bb571 h1:0i+Y7klNNqXwzAQ2qlIWeZyiMtDB/rf5fSaFzIW7lsk= github.com/eventials/go-tus v0.0.0-20220610120217-05d0564bb571/go.mod h1:XYuK1S5+kS6FGhlIUFuZFPvWiSrOIoLk6+ro33Xce3Y= @@ -393,7 +373,6 @@ github.com/fatih/color v1.3.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= -github.com/fjl/gencodec v0.0.0-20220412091415-8bb9e558978c/go.mod h1:AzA8Lj6YtixmJWL+wkKoBGsLWy9gFrAzi4g+5bCKwpY= github.com/fjl/memsize v0.0.0-20180418122429-ca190fb6ffbc/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 h1:FtmdgXiUlNeRsoNMFlKLDt+S+6hbjVMEW6RGQ7aUf7c= github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= @@ -406,7 +385,6 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fullsailor/pkcs7 v0.0.0-20190404230743-d7302db945fa/go.mod h1:KnogPXtdwXqoenmZCw6S+25EAm2MkxbG0deNDu4cbSA= -github.com/garslo/gogen v0.0.0-20170306192744-1d203ffc1f61/go.mod h1:Q0X6pkwTILDlzrGEckF6HKjXe48EgsY/l7K7vhY4MW8= github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff h1:tY80oXqGNY4FhTFhk+o9oFHGINQ/+vhlm8HFzi6znCI= github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff/go.mod h1:x7DCsMOv1taUwEWCzT4cmDeAkigA5/QCwUodaVOe8Ww= @@ -495,8 +473,6 @@ github.com/gogo/protobuf v1.3.0/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-jwt/jwt/v4 v4.3.0 h1:kHL1vqdqWNfATmA0FNMdmZNMyZI1U6O31X4rlIPoBog= -github.com/golang-jwt/jwt/v4 v4.3.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/geo v0.0.0-20190916061304-5b978397cfec/go.mod h1:QZ0nwyI2jOfgRAoBvP+ab5aRr7c9x7lhGEJrKvBwjWI= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -587,7 +563,6 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.5/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= @@ -609,7 +584,6 @@ github.com/gosuri/uiprogress v0.0.1 h1:0kpv/XY/qTmFWl/SkaJykZXrBBzwwadmW8fRb7RJS github.com/gosuri/uiprogress v0.0.1/go.mod h1:C1RTYn4Sc7iEyf6j8ft5dyoZ4212h8G1ol9QQluh5+0= github.com/graph-gophers/graphql-go v0.0.0-20191115155744-f33e81362277/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= github.com/graph-gophers/graphql-go v0.0.0-20201113091052-beb923fada29/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= -github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.1.0/go.mod h1:f5nM7jw/oeRSadq3xCzHAvxcr8HZnzsqU6ILg/0NiiE= @@ -664,9 +638,8 @@ github.com/holiman/uint256 v1.2.0 h1:gpSYcPLWGv4sG43I2mVLiDZCNDh/EpGjSk8tmtxitHM github.com/holiman/uint256 v1.2.0/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc= +github.com/huin/goupnp v1.0.2 h1:RfGLP+h3mvisuWEyybxNq5Eft3NWhHLPeUN72kpKZoI= github.com/huin/goupnp v1.0.2/go.mod h1:0dxJBVBHqTMjIUMkESDTNgOOx/Mw5wYIfyFmdzSamkM= -github.com/huin/goupnp v1.0.3 h1:N8No57ls+MnjlB+JPiCVSOyy/ot7MJTqlo7rn+NYSqQ= -github.com/huin/goupnp v1.0.3/go.mod h1:ZxNlw5WqJj6wSsRK5+YfflQGXYfccj5VgQsMNixHM7Y= github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -689,9 +662,8 @@ github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6/go.mod h1:bS github.com/influxdata/tdigest v0.0.0-20181121200506-bf2b5ad3c0a9/go.mod h1:Js0mqiSBE6Ffsg94weZZ2c+v/ciT8QRHFOap7EKDrR0= github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368/go.mod h1:Wbbw6tYNvwa5dlB6304Sd+82Z3f7PmVZHVKU637d4po= github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA= +github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458 h1:6OvNmYgJyexcZ3pYbTI9jWx5tHo1Dee/tWbLMfPe2TA= github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= -github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= -github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jaypipes/ghw v0.9.0 h1:TWF4wNIGtZcgDJaiNcFgby5BR8s2ixcUe0ydxNO2McY= github.com/jaypipes/ghw v0.9.0/go.mod h1:dXMo19735vXOjpIBDyDYSp31sB2u4hrtRCMxInqQ64k= github.com/jaypipes/pcidb v1.0.0 h1:vtZIfkiCUE42oYbJS0TAq9XSfSmcsgo9IdxSm9qzYU8= @@ -731,7 +703,6 @@ github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef/go.mod h1:Ct9fl0F6iIOGgxJ5npU/IUOhOhqlVrGjyIZc8/MagT0= github.com/karalabe/usb v0.0.0-20190919080040-51dc0efba356/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU= github.com/karalabe/usb v0.0.0-20211005121534-4c5740d64559/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU= -github.com/karalabe/usb v0.0.2/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU= github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4= github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= @@ -761,17 +732,14 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/livepeer/go-api-client v0.4.2-0.20230105141727-2a1044f1eb2e h1:R9agI6FLJVOmX7YPR7vA0Rp9ONpDzJbDYSgpNwRHJiQ= -github.com/livepeer/go-api-client v0.4.2-0.20230105141727-2a1044f1eb2e/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= -github.com/livepeer/go-api-client v0.4.2-0.20230123220918-f572338de3f0 h1:GqtMRVnhYnlBVDCL5pFo0Fcewv7cw1wXkG9DgsowfE8= -github.com/livepeer/go-api-client v0.4.2-0.20230123220918-f572338de3f0/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= +github.com/livepeer/go-api-client v0.4.2 h1:jfYY6lIpB6XOXSqJOKOc6eQ1VGlYJNh5W+7HFV0v6VE= +github.com/livepeer/go-api-client v0.4.2/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= github.com/livepeer/go-livepeer v0.5.31 h1:LcN+qDnqWRws7fdVYc4ucZPVcLQRs2tehUYCQVnlnRw= github.com/livepeer/go-livepeer v0.5.31/go.mod h1:cpBikcGWApkx0cyR0Ht+uAym7j3uAwXGpPbvaOA8XUU= github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw= @@ -860,7 +828,6 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2QJNHXfbSQ= @@ -1037,8 +1004,6 @@ github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rs/xhandler v0.0.0-20160618193221-ed27b6fd6521/go.mod h1:RvLn4FgxWubrpZHtQLnOf6EwhN2hEMusxZOhcW9H3UQ= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= -github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= -github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= @@ -1102,12 +1067,10 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s= -github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxmlfBCDBXRzr0eAQJ48XC1hBu1np4CS5+cHEYfwpc= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= -github.com/supranational/blst v0.3.8-0.20220526154634-513d2456b344/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= @@ -1135,11 +1098,8 @@ github.com/tyler-smith/go-bip39 v1.0.2/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2 github.com/urfave/cli v0.0.0-20171014202726-7bc6a0acffa5/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= -github.com/urfave/cli v1.22.2 h1:gsqYFH8bb9ekPA12kRo0hfjngWQjkJPlN9R0N78BoUo= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= -github.com/urfave/cli/v2 v2.10.2 h1:x3p8awjp/2arX+Nl/G2040AZpOCHS/eMJJ1/a+mye4Y= -github.com/urfave/cli/v2 v2.10.2/go.mod h1:f8iq5LtQ/bLxafbdBSLPPNsgaW0l/2fYYEHhAyPlwvo= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= @@ -1159,8 +1119,6 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1: github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= -github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= -github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -1232,7 +1190,6 @@ golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 h1:0es+/5331RGQPcXlMfP+WrnIIS6dNnNRe0WB02W0F4M= golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1249,7 +1206,6 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -1279,9 +1235,8 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.5.1 h1:OJxoQ/rynoF0dcCdI7cLPktw/hR2cueqYfjm43oqK38= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= -golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57 h1:LQmS1nU0twXLA96Kt7U9qtHJEbBk3z6Q0V4UXjZkpr4= -golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1324,7 +1279,6 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= -golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -1339,15 +1293,13 @@ golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211108170745-6635138e15ea/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.0.0-20220607020251-c690dde0001d h1:4SFsTMi4UahlKoloni7L4eYzhFRifURQLw+yv0QDCx8= -golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1545,7 +1497,6 @@ golang.org/x/tools v0.0.0-20191113191852-77e3bb0ad9e7/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191126055441-b0650ceb63d9/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191216173652-a0e659d51361/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= @@ -1579,15 +1530,13 @@ golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.9 h1:j9KsMiaP1c3B0OTQGth0/k+miLGTgLsAFUCrF2vLcF8= golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df h1:5Pf6pFKu98ODmgnpvkJ3kFUOQGGLIzLIkbzUHp47618= -golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.6.0/go.mod h1:9mxDZsDKxgMAuccQkewq682L+0eCu4dCN2yonUJTCLU= @@ -1749,6 +1698,7 @@ gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76 gopkg.in/square/go-jose.v2 v2.5.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/urfave/cli.v1 v1.20.0 h1:NdAVW6RYxDif9DhDHaAortIu956m2c0v+09AZBPTbE0= gopkg.in/urfave/cli.v1 v1.20.0/go.mod h1:vuBzUtMdQeixQj8LVd+/98pzhxNGQoyuPBlsXHOQNO0= gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0/go.mod h1:WDnlLJ4WF5VGsH/HVa3CI79GS0ol3YnhVnKP89i0kNg= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= @@ -1763,9 +1713,8 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= diff --git a/internal/app/mistapiconnector/mistapiconnector_app.go b/internal/app/mistapiconnector/mistapiconnector_app.go index 2e5ed198..70173ad4 100644 --- a/internal/app/mistapiconnector/mistapiconnector_app.go +++ b/internal/app/mistapiconnector/mistapiconnector_app.go @@ -2,49 +2,33 @@ package mistapiconnector import ( "context" - "crypto/tls" "encoding/json" - "errors" "fmt" - "io/ioutil" + "io" "net/http" "net/url" "os" "os/signal" - "strconv" "strings" "sync" "syscall" "time" "github.com/golang/glog" + "github.com/livepeer/go-api-client" "github.com/livepeer/livepeer-data/pkg/data" "github.com/livepeer/livepeer-data/pkg/event" - "github.com/livepeer/stream-tester/apis/livepeer" "github.com/livepeer/stream-tester/apis/mist" - mistapi "github.com/livepeer/stream-tester/apis/mist" "github.com/livepeer/stream-tester/internal/metrics" "github.com/livepeer/stream-tester/internal/utils" "github.com/livepeer/stream-tester/model" amqp "github.com/rabbitmq/amqp091-go" - "go.etcd.io/etcd/client/pkg/v3/transport" - clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/client/v3/concurrency" - "google.golang.org/grpc" ) const streamPlaybackPrefix = "playback_" -const traefikKeyPathRouters = `traefik/http/routers/` -const traefikKeyPathServices = `traefik/http/services/` -const traefikKeyPathMiddlewares = `traefik/http/middlewares/` const audioAlways = "always" const audioRecord = "record" const audioEnabledStreamSuffix = "rec" -const etcdDialTimeout = 5 * time.Second -const etcdAutoSyncInterval = 5 * time.Minute -const etcdSessionTTL = 10 // in seconds -const etcdSessionRecoverBackoff = 3 * time.Second -const etcdSessionRecoverTimeout = 2 * time.Minute const waitForPushError = 7 * time.Second const keepStreamAfterEnd = 15 * time.Second const statsCollectionPeriod = 10 * time.Second @@ -55,8 +39,6 @@ const eventMultistreamConnected = "multistream.connected" const eventMultistreamError = "multistream.error" const eventMultistreamDisconnected = "multistream.disconnected" -var playbackPrefixes = []string{"hls", "cmaf"} - type ( // IMac creates new Mist API Connector application IMac interface { @@ -65,29 +47,25 @@ type ( SrvShutCh() chan error } - etcdRevData struct { - revision int64 - entries []string - } - pushStatus struct { - target *livepeer.MultistreamTarget + target *api.MultistreamTarget profile string pushStartEmitted bool pushStopped bool - pushedBytes int64 - pushedMediaTime time.Duration + metrics *data.MultistreamMetrics } streamInfo struct { - id string + id string + isLazy bool + stream *api.Stream + startedAt time.Time + + mu sync.Mutex + done chan struct{} stopped bool multistreamStarted bool - stream *livepeer.CreateStreamResp - done chan struct{} - mu sync.Mutex pushStatus map[string]*pushStatus - startedAt time.Time } trackListDesc struct { @@ -114,17 +92,15 @@ type ( MacOptions struct { NodeID, MistHost string MistAPI *mist.API - LivepeerAPI *livepeer.API + LivepeerAPI *api.Client BalancerHost string CheckBandwidth bool RoutePrefix, PlaybackDomain, MistURL, SendAudio, BaseStreamName string - EtcdEndpoints []string - EtcdCaCert, EtcdCert, EtcdKey string - AMQPUrl, OwnRegion string - MistStreamSource string - MistHardcodedBroadcasters string - NoMistScrapeMetrics bool + AMQPUrl, OwnRegion string + MistStreamSource string + MistHardcodedBroadcasters string + NoMistScrapeMetrics bool } trackList map[string]*trackListDesc @@ -133,11 +109,12 @@ type ( ctx context.Context cancel context.CancelFunc mapi *mist.API - lapi *livepeer.API + lapi *api.Client balancerHost string srv *http.Server srvShutCh chan error mu sync.RWMutex + createStreamLock sync.Mutex mistHot string checkBandwidth bool routePrefix string @@ -145,17 +122,12 @@ type ( playbackDomain string sendAudio string baseStreamName string - useEtcd bool - etcdClient *clientv3.Client - etcdSession *concurrency.Session - etcdPub2rev map[string]etcdRevData // public key to revision of etcd keys - pub2info map[string]*streamInfo // public key to info + streamInfo map[string]*streamInfo // public key to info producer *event.AMQPProducer nodeID string ownRegion string mistStreamSource string mistHardcodedBroadcasters string - // pub2id map[string]string // public key to stream id } ) @@ -164,12 +136,7 @@ func NewMac(opts MacOptions) (IMac, error) { if opts.BalancerHost != "" && !strings.Contains(opts.BalancerHost, ":") { opts.BalancerHost = opts.BalancerHost + ":8042" // must set default port for Mist's Load Balancer } - useEtcd := false - var cli *clientv3.Client - var sess *concurrency.Session - var err error ctx, cancel := context.WithCancel(context.Background()) - var producer *event.AMQPProducer if opts.AMQPUrl != "" { pu, err := url.Parse(opts.AMQPUrl) @@ -193,67 +160,19 @@ func NewMac(opts MacOptions) (IMac, error) { } else { glog.Infof("AMQP url is empty!") } - - glog.Infof("etcd endpoints: %+v, len %d", opts.EtcdEndpoints, len(opts.EtcdEndpoints)) - if len(opts.EtcdEndpoints) > 0 { - var tcfg *tls.Config - if opts.EtcdCaCert != "" || opts.EtcdCert != "" || opts.EtcdKey != "" { - tlsifo := transport.TLSInfo{ - CertFile: opts.EtcdCert, - KeyFile: opts.EtcdKey, - TrustedCAFile: opts.EtcdCaCert, - } - tcfg, err = tlsifo.ClientConfig() - if err != nil { - cancel() - return nil, err - } - } - useEtcd = true - cli, err = clientv3.New(clientv3.Config{ - Endpoints: opts.EtcdEndpoints, - DialTimeout: etcdDialTimeout, - AutoSyncInterval: etcdAutoSyncInterval, - TLS: tcfg, - DialOptions: []grpc.DialOption{grpc.WithBlock()}, - }) - if err != nil { - err = fmt.Errorf("mist-api-connector: Error connecting etcd err=%w", err) - cancel() - return nil, err - } - syncCtx, syncCancel := context.WithTimeout(ctx, etcdDialTimeout) - err = cli.Sync(syncCtx) - syncCancel() - if err != nil { - err = fmt.Errorf("mist-api-connector: Error syncing etcd endpoints err=%w", err) - cancel() - return nil, err - } - sess, err = newEtcdSession(cli) - if err != nil { - cancel() - return nil, err - } - } mc := &mac{ - nodeID: opts.NodeID, - mistHot: opts.MistHost, - mapi: opts.MistAPI, - lapi: opts.LivepeerAPI, - checkBandwidth: opts.CheckBandwidth, - balancerHost: opts.BalancerHost, - // pub2id: make(map[string]string), // public key to stream id - pub2info: make(map[string]*streamInfo), // public key to info + nodeID: opts.NodeID, + mistHot: opts.MistHost, + mapi: opts.MistAPI, + lapi: opts.LivepeerAPI, + checkBandwidth: opts.CheckBandwidth, + balancerHost: opts.BalancerHost, + streamInfo: make(map[string]*streamInfo), routePrefix: opts.RoutePrefix, mistURL: opts.MistURL, playbackDomain: opts.PlaybackDomain, sendAudio: opts.SendAudio, baseStreamName: opts.BaseStreamName, - useEtcd: useEtcd, - etcdClient: cli, - etcdSession: sess, - etcdPub2rev: make(map[string]etcdRevData), // public key to revision of etcd keys srvShutCh: make(chan error), ctx: ctx, cancel: cancel, @@ -262,7 +181,6 @@ func NewMac(opts MacOptions) (IMac, error) { mistStreamSource: opts.MistStreamSource, mistHardcodedBroadcasters: opts.MistHardcodedBroadcasters, } - go mc.recoverSessionLoop() if producer != nil && !opts.NoMistScrapeMetrics { startMetricsCollector(ctx, statsCollectionPeriod, opts.NodeID, opts.OwnRegion, opts.MistAPI, producer, ownExchangeName, mc) } @@ -270,7 +188,7 @@ func NewMac(opts MacOptions) (IMac, error) { } // LivepeerProfiles2MistProfiles converts Livepeer's API profiles to Mist's ones -func LivepeerProfiles2MistProfiles(lps []livepeer.Profile) []mist.Profile { +func LivepeerProfiles2MistProfiles(lps []api.Profile) []mist.Profile { var res []mist.Profile for _, p := range lps { mp := mist.Profile{ @@ -310,10 +228,9 @@ func (mc *mac) triggerConnClose(w http.ResponseWriter, r *http.Request, lines [] if mc.baseStreamName != "" && strings.Contains(playbackID, "+") { playbackID = strings.Split(playbackID, "+")[1] } - mc.mu.Lock() - if info, has := mc.pub2info[playbackID]; has { + if info, ok := mc.getStreamInfoLogged(playbackID); ok { glog.Infof("Setting stream's protocol=%s manifestID=%s playbackID=%s active status to false", protocol, info.id, playbackID) - _, err := mc.lapi.SetActiveR(info.id, false, info.startedAt) + _, err := mc.lapi.SetActive(info.id, false, info.startedAt) if err != nil { glog.Error(err) } @@ -321,12 +238,9 @@ func (mc *mac) triggerConnClose(w http.ResponseWriter, r *http.Request, lines [] info.mu.Lock() info.stopped = true info.mu.Unlock() - go mc.removeInfoAfter(playbackID, info) + mc.removeInfoDelayed(playbackID, info.done) metrics.StopStream(true) - } else { - glog.Warningf("%s conn close stream playbackID=%s not found", protocol, playbackID) } - mc.mu.Unlock() } w.WriteHeader(http.StatusOK) w.Write([]byte("yes")) @@ -350,11 +264,15 @@ func (mc *mac) triggerDefaultStream(w http.ResponseWriter, r *http.Request, line // urip[2] = streamPlaybackPrefix + urip[2] playbackID := urip[2] streamNameInMist := streamPlaybackPrefix + playbackID - // check if stream is in our map of currently playing streams - mc.mu.Lock() - defer mc.mu.Unlock() // hold the lock until exit so that trigger to RTMP_REWRITE can't create - // another Mist stream in the same time - if info, has := mc.pub2info[playbackID]; has { + // hold the creation lock until exit so that another trigger to + // RTMP_REWRITE can't create a Mist stream at the same time + mc.createStreamLock.Lock() + defer mc.createStreamLock.Unlock() + + mc.mu.RLock() + info := mc.streamInfo[playbackID] + mc.mu.RUnlock() + if info != nil { info.mu.Lock() streamStopped := info.stopped info.mu.Unlock() @@ -431,54 +349,6 @@ func (mc *mac) triggerDefaultStream(w http.ResponseWriter, r *http.Request, line return true } -func (mc *mac) generateRouteKeys(stream *livepeer.CreateStreamResp) []string { - serviceName := mc.routePrefix + serviceNameFromMistURL(mc.mistURL) - wildcardPlaybackID := mc.wildcardPlaybackID(stream) - playbackID := mc.routePrefix + stream.PlaybackID - - keys := []string{ - traefikKeyPathServices + serviceName + "/loadbalancer/servers/0/url", - mc.mistURL, - traefikKeyPathServices + serviceName + "/loadbalancer/passhostheader", - "false", - } - - // Add a millisecond timestamp as the priority so new streams always win over old, stale streams - priority := strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10) - - for _, prefix := range playbackPrefixes { - playbackIDPrefix := fmt.Sprintf("%s-%s", playbackID, prefix) - keys = append(keys, - traefikKeyPathRouters+playbackIDPrefix+"/service", - serviceName, - traefikKeyPathRouters+playbackIDPrefix+"/priority", - priority, - ) - if mc.baseStreamName == "" { - rule := fmt.Sprintf("HostRegexp(`%s`) && PathPrefix(`/%s/%s/`)", mc.playbackDomain, prefix, stream.PlaybackID) - keys = append(keys, traefikKeyPathRouters+playbackIDPrefix+"/rule", rule) - } else { - rule := fmt.Sprintf("HostRegexp(`%s`) && (PathPrefix(`/%s/%s/`) || PathPrefix(`/%s/%s/`))", mc.playbackDomain, prefix, stream.PlaybackID, prefix, wildcardPlaybackID) - keys = append(keys, - traefikKeyPathRouters+playbackIDPrefix+"/rule", - rule, - traefikKeyPathRouters+playbackIDPrefix+"/middlewares/0", - playbackIDPrefix+"-1", - traefikKeyPathRouters+playbackIDPrefix+"/middlewares/1", - playbackIDPrefix+"-2", - - traefikKeyPathMiddlewares+playbackIDPrefix+"-1/stripprefix/prefixes/0", - fmt.Sprintf(`/%s/%s`, prefix, stream.PlaybackID), - traefikKeyPathMiddlewares+playbackIDPrefix+"-1/stripprefix/prefixes/1", - fmt.Sprintf(`/%s/%s`, prefix, wildcardPlaybackID), - traefikKeyPathMiddlewares+playbackIDPrefix+"-2/addprefix/prefix", - fmt.Sprintf(`/%s/%s`, prefix, wildcardPlaybackID), - ) - } - } - return keys -} - func (mc *mac) triggerPushRewrite(w http.ResponseWriter, r *http.Request, lines []string, rawRequest string) bool { if len(lines) != 3 { glog.Errorf("Expected 3 lines, got %d, request \n%s", len(lines), rawRequest) @@ -510,14 +380,6 @@ func (mc *mac) triggerPushRewrite(w http.ResponseWriter, r *http.Request, lines stream, err := mc.lapi.GetStreamByKey(streamKey) if err != nil || stream == nil { glog.Errorf("Error getting stream info from Livepeer API streamKey=%s err=%v", streamKey, err) - /* - if err == livepeer.ErrNotExists { - // mc.mapi.DeleteStreams(streamKey) - w.Write([]byte(lines[0])) - } else { - w.WriteHeader(http.StatusNotFound) - } - */ w.WriteHeader(http.StatusNotFound) w.Write([]byte("false")) return false @@ -541,21 +403,21 @@ func (mc *mac) triggerPushRewrite(w http.ResponseWriter, r *http.Request, lines if stream.PlaybackID != "" { mc.mu.Lock() - defer mc.mu.Unlock() - if info, has := mc.pub2info[stream.PlaybackID]; has { + if info, ok := mc.streamInfo[stream.PlaybackID]; ok { info.mu.Lock() - streamStopped := info.stopped + glog.Infof("Stream playbackID=%s stopped=%v already in map, removing its info", stream.PlaybackID, info.stopped) info.mu.Unlock() - glog.Infof("Stream playbackID=%s stopped=%v already in map, removing its info", stream.PlaybackID, streamStopped) - mc.removeInfo(stream.PlaybackID) + mc.removeInfoLocked(stream.PlaybackID) } - mc.pub2info[stream.PlaybackID] = &streamInfo{ + info := &streamInfo{ id: stream.ID, stream: stream, done: make(chan struct{}), pushStatus: make(map[string]*pushStatus), startedAt: time.Now(), } + mc.streamInfo[stream.PlaybackID] = info + mc.mu.Unlock() streamKey = stream.PlaybackID // streamKey = strings.ReplaceAll(streamKey, "-", "") if mc.balancerHost != "" { @@ -566,7 +428,7 @@ func (mc *mac) triggerPushRewrite(w http.ResponseWriter, r *http.Request, lines } else { responseName = mc.wildcardPlaybackID(stream) } - ok, err := mc.lapi.SetActiveR(stream.ID, true, mc.pub2info[stream.PlaybackID].startedAt) + ok, err := mc.lapi.SetActive(stream.ID, true, info.startedAt) if err != nil { glog.Error(err) } else if !ok { @@ -589,18 +451,6 @@ func (mc *mac) triggerPushRewrite(w http.ResponseWriter, r *http.Request, lines return true } } - if mc.useEtcd { - // now create routing rule in the etcd for HLS playback - err = mc.putEtcdKeys(mc.etcdSession, stream.PlaybackID, - mc.generateRouteKeys(stream)..., - ) - if err != nil { - glog.Errorf("Error creating etcd Traefik rule for playbackID=%s streamID=%s err=%v", stream.PlaybackID, stream.ID, err) - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte("false")) - return true - } - } go mc.emitStreamStateEvent(stream, data.StreamState{Active: true}) w.Write([]byte(responseName)) metrics.StartStream() @@ -625,12 +475,17 @@ func (mc *mac) triggerLiveTrackList(w http.ResponseWriter, r *http.Request, line videoTracksNum := tl.CountVideoTracks() playbackID := mistStreamName2playbackID(lines[0]) glog.Infof("for video %s got %d video tracks", playbackID, videoTracksNum) - glog.Infof("===> pub %+v", mc.pub2info) - mc.mu.RLock() - defer mc.mu.RUnlock() - if info, ok := mc.pub2info[playbackID]; ok { - if len(info.stream.Multistream.Targets) > 0 && !info.multistreamStarted && videoTracksNum > 1 { + + if info, ok := mc.getStreamInfoLogged(playbackID); ok { + info.mu.Lock() + shouldStart := !info.multistreamStarted && + len(info.stream.Multistream.Targets) > 0 && videoTracksNum > 1 + if shouldStart { info.multistreamStarted = true + } + info.mu.Unlock() + + if shouldStart { mc.startMultistream(lines[0], playbackID, info) } } @@ -647,11 +502,7 @@ func (mc *mac) triggerPushOutStart(w http.ResponseWriter, r *http.Request, lines } go func() { playbackID := mistStreamName2playbackID(lines[0]) - // glog.Infof("for video %s got %d video tracks", playbackID, videoTracksNum) - glog.Infof("===> pub %+v", mc.pub2info) - mc.mu.RLock() - defer mc.mu.RUnlock() - if info, ok := mc.pub2info[playbackID]; ok { + if info, ok := mc.getStreamInfoLogged(playbackID); ok { info.mu.Lock() defer info.mu.Unlock() if pushInfo, ok := info.pushStatus[lines[1]]; ok { @@ -684,7 +535,7 @@ func (mc *mac) waitPush(info *streamInfo, pushInfo *pushStatus) { } } -func (mc *mac) emitStreamStateEvent(stream *livepeer.CreateStreamResp, state data.StreamState) { +func (mc *mac) emitStreamStateEvent(stream *api.Stream, state data.StreamState) { streamID := stream.ParentID if streamID == "" { streamID = stream.ID @@ -693,7 +544,7 @@ func (mc *mac) emitStreamStateEvent(stream *livepeer.CreateStreamResp, state dat mc.emitAmqpEvent(ownExchangeName, "stream.state."+streamID, stateEvt) } -func (mc *mac) emitWebhookEvent(stream *livepeer.CreateStreamResp, pushInfo *pushStatus, eventKey string) { +func (mc *mac) emitWebhookEvent(stream *api.Stream, pushInfo *pushStatus, eventKey string) { streamID, sessionID := stream.ParentID, stream.ID if streamID == "" { streamID = sessionID @@ -738,10 +589,7 @@ func (mc *mac) triggerPushEnd(w http.ResponseWriter, r *http.Request, lines []st go func() { playbackID := mistStreamName2playbackID(lines[1]) // glog.Infof("for video %s got %d video tracks", playbackID, videoTracksNum) - glog.Infof("===> pub %+v", mc.pub2info) - mc.mu.RLock() - defer mc.mu.RUnlock() - if info, ok := mc.pub2info[playbackID]; ok { + if info, ok := mc.getStreamInfoLogged(playbackID); ok { info.mu.Lock() defer info.mu.Unlock() if pushInfo, ok := info.pushStatus[lines[2]]; ok { @@ -767,7 +615,7 @@ func (mc *mac) handleDefaultStreamTrigger(w http.ResponseWriter, r *http.Request w.Write([]byte("false")) return } - b, err := ioutil.ReadAll(r.Body) + b, err := io.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte("false")) @@ -821,177 +669,36 @@ func (mc *mac) handleDefaultStreamTrigger(w http.ResponseWriter, r *http.Request } } -func (mc *mac) removeInfoAfter(playbackID string, info *streamInfo) { - select { - case <-info.done: - return - case <-time.After(keepStreamAfterEnd): - } - mc.mu.Lock() - mc.removeInfo(playbackID) - mc.mu.Unlock() -} - -// must be called inside mu.Lock -func (mc *mac) removeInfo(playbackID string) { - if info, ok := mc.pub2info[playbackID]; ok { - close(info.done) - delete(mc.pub2info, playbackID) - if mc.useEtcd { - mc.deleteEtcdKeys(playbackID) - } - } -} - -// putEtcdKeys puts keys in one transaction -func (mc *mac) putEtcdKeys(sess *concurrency.Session, playbackID string, kvs ...string) error { - if len(kvs) == 0 || len(kvs)%2 != 0 { - return errors.New("number of arguments should be even") - } - cmp := clientv3.Compare(clientv3.CreateRevision(kvs[0]), ">", -1) // basically noop - will always be true - thn := make([]clientv3.Op, 0, len(kvs)/2) - get := clientv3.OpGet(kvs[0]) - for i := 0; i < len(kvs); i += 2 { - thn = append(thn, clientv3.OpPut(kvs[i], kvs[i+1], clientv3.WithLease(sess.Lease()))) - } - ctx, cancel := context.WithTimeout(context.Background(), etcdDialTimeout) - resp, err := mc.etcdClient.Txn(ctx).If(cmp).Then(thn...).Else(get).Commit() - cancel() - if err != nil { - glog.Errorf("mist-api-connector: error putting keys for playbackID=%s err=%v", playbackID, err) - return err - } - if resp == nil { - return fmt.Errorf("mist-api-connector: error putting keys for playbackID=%s - nil response", playbackID) - } - if !resp.Succeeded { - panic("unexpected") - } - glog.Infof("for playbackID=%s created %d keys in etcd revision=%d", playbackID, len(kvs)/2, resp.Header.Revision) - mc.etcdPub2rev[playbackID] = etcdRevData{resp.Header.Revision, kvs} - return nil -} - -func (mc *mac) deleteEtcdKeys(playbackID string) { - etcdPlaybackID := mc.routePrefix + playbackID - if rev, ok := mc.etcdPub2rev[playbackID]; ok { - pathKey := traefikKeyPathRouters + etcdPlaybackID - // Just need to check the revision on one rule, use the first playbackPrefix - ruleKey := fmt.Sprintf("%s-%s/rule", pathKey, playbackPrefixes[0]) - cmp := clientv3.Compare(clientv3.ModRevision(ruleKey), "=", rev.revision) - ctx, cancel := context.WithTimeout(context.Background(), etcdDialTimeout) - thn := []clientv3.Op{ - clientv3.OpDelete(pathKey, clientv3.WithRange(pathKey+"~")), - } - if mc.baseStreamName != "" { - middleWaresKey := traefikKeyPathMiddlewares + etcdPlaybackID - thn = append(thn, - clientv3.OpDelete(middleWaresKey, clientv3.WithRange(middleWaresKey+"~")), - ) - } - get := clientv3.OpGet(ruleKey) - resp, err := mc.etcdClient.Txn(ctx).If(cmp).Then(thn...).Else(get).Commit() - cancel() - delete(mc.etcdPub2rev, playbackID) - if err != nil || resp == nil { - glog.Errorf("mist-api-connector: error deleting keys for playbackID=%s err=%v", playbackID, err) - } else { - if resp.Succeeded { - glog.Errorf("mist-api-connector: success deleting keys for playbackID=%s rev=%d", playbackID, rev.revision) - } else { - var curRev int64 - if len(resp.Responses) > 0 && len(resp.Responses[0].GetResponseRange().Kvs) > 0 { - curRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision - } - glog.Errorf("mist-api-connector: unsuccessful deleting keys for playbackID=%s myRev=%d curRev=%d pathKey=%s", - playbackID, rev.revision, curRev, pathKey) - } - } - } else { - glog.Errorf("mist-api-connector: etcd revision for stream playbackID=%s not found", playbackID) - } -} - -func (mc *mac) recoverSessionLoop() { - if mc.etcdClient == nil { - return - } - clientCtx := mc.etcdClient.Ctx() - for clientCtx.Err() == nil { +func (mc *mac) removeInfoDelayed(playbackID string, done chan struct{}) { + go func() { select { - case <-clientCtx.Done(): - // client closed, which means app shutted down - return - case <-mc.etcdSession.Done(): - } - glog.Infof("etcd session with lease=%d is lost, trying to recover", mc.etcdSession.Lease()) - - ctx, cancel := context.WithTimeout(clientCtx, etcdSessionRecoverTimeout) - err := mc.recoverEtcdSession(ctx) - cancel() - - if err != nil && clientCtx.Err() == nil { - glog.Errorf("mist-api-connector: unrecoverable etcd session. err=%q.", err) + case <-done: return + case <-time.After(keepStreamAfterEnd): + mc.removeInfo(playbackID) } - } -} - -func (mc *mac) recoverEtcdSession(ctx context.Context) error { - for { - err := mc.recoverEtcdSessionOnce() - if err == nil { - return nil - } - - select { - case <-time.After(etcdSessionRecoverBackoff): - glog.Errorf("mist-api-connector: Retrying etcd session recover. err=%q", err) - continue - case <-ctx.Done(): - return fmt.Errorf("mist-api-connector: Timeout recovering etcd session err=%w", err) - } - } + }() } -func (mc *mac) recoverEtcdSessionOnce() error { - sess, err := newEtcdSession(mc.etcdClient) - if err != nil { - return err - } - +func (mc *mac) removeInfo(playbackID string) { mc.mu.Lock() defer mc.mu.Unlock() - for playbackId, rev := range mc.etcdPub2rev { - err := mc.putEtcdKeys(sess, playbackId, rev.entries...) - if err != nil { - sess.Close() - return fmt.Errorf("mist-api-connector: Error re-creating etcd keys. playbackId=%q, err=%w", playbackId, err) - } - } - - mc.etcdSession.Close() - mc.etcdSession = sess - glog.Infof("Recovered etcd session. lease=%d", sess.Lease()) - return nil + mc.removeInfoLocked(playbackID) } -func newEtcdSession(etcdClient *clientv3.Client) (*concurrency.Session, error) { - glog.Infof("Starting new etcd session ttl=%d", etcdSessionTTL) - sess, err := concurrency.NewSession(etcdClient, concurrency.WithTTL(etcdSessionTTL)) - if err != nil { - glog.Errorf("Failed to start etcd session err=%q", err) - return nil, fmt.Errorf("mist-api-connector: Error creating etcd session err=%w", err) +// must be called inside mc.mu.Lock +func (mc *mac) removeInfoLocked(playbackID string) { + if info, ok := mc.streamInfo[playbackID]; ok { + close(info.done) + delete(mc.streamInfo, playbackID) } - glog.Infof("etcd got lease %d", sess.Lease()) - return sess, nil } -func (mc *mac) wildcardPlaybackID(stream *livepeer.CreateStreamResp) string { +func (mc *mac) wildcardPlaybackID(stream *api.Stream) string { return mc.baseNameForStream(stream) + "+" + stream.PlaybackID } -func (mc *mac) baseNameForStream(stream *livepeer.CreateStreamResp) string { +func (mc *mac) baseNameForStream(stream *api.Stream) string { baseName := mc.baseStreamName if mc.shouldEnableAudio(stream) { baseName += audioEnabledStreamSuffix @@ -999,7 +706,7 @@ func (mc *mac) baseNameForStream(stream *livepeer.CreateStreamResp) string { return baseName } -func (mc *mac) shouldEnableAudio(stream *livepeer.CreateStreamResp) bool { +func (mc *mac) shouldEnableAudio(stream *api.Stream) bool { audio := false if mc.sendAudio == audioAlways { audio = true @@ -1009,7 +716,7 @@ func (mc *mac) shouldEnableAudio(stream *livepeer.CreateStreamResp) bool { return audio } -func (mc *mac) createMistStream(streamName string, stream *livepeer.CreateStreamResp, skipTranscoding bool) error { +func (mc *mac) createMistStream(streamName string, stream *api.Stream, skipTranscoding bool) error { if len(stream.Presets) == 0 && len(stream.Profiles) == 0 { stream.Presets = append(stream.Presets, "P144p30fps16x9") } @@ -1025,10 +732,6 @@ func (mc *mac) createMistStream(streamName string, stream *livepeer.CreateStream } func (mc *mac) handleHealthcheck(w http.ResponseWriter, r *http.Request) { - if !mc.isEtcdSessionHealthy() { - w.WriteHeader(http.StatusServiceUnavailable) - return - } if config, err := mc.mapi.GetConfig(); err != nil || config == nil { glog.Errorf("Error getting mist config on healthcheck. err=%q", err) w.WriteHeader(http.StatusServiceUnavailable) @@ -1037,22 +740,9 @@ func (mc *mac) handleHealthcheck(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } -func (mc *mac) isEtcdSessionHealthy() bool { - if mc.etcdSession == nil { - return true - } - select { - case <-mc.etcdSession.Done(): - return false - default: - return true - } -} - func (mc *mac) webServerHandlers() *http.ServeMux { mux := http.NewServeMux() utils.AddPProfHandlers(mux) - // mux.Handle("/metrics", utils.InitPrometheusExporter("mistconnector")) mux.Handle("/metrics", metrics.Exporter) mux.HandleFunc("/_healthz", mc.handleHealthcheck) @@ -1078,8 +768,8 @@ func (mc *mac) StartServer(bindAddr string) error { return err } -func (mc *mac) addTrigger(triggers mistapi.TriggersMap, name, ownURI, def, params string, sync bool) bool { - nt := mistapi.Trigger{ +func (mc *mac) addTrigger(triggers mist.TriggersMap, name, ownURI, def, params string, sync bool) bool { + nt := mist.Trigger{ Default: def, Handler: ownURI, Sync: sync, @@ -1107,7 +797,7 @@ func (mc *mac) SetupTriggers(ownURI string) error { return err } if triggers == nil { - triggers = make(mistapi.TriggersMap) + triggers = make(mist.TriggersMap) } added := mc.addTrigger(triggers, "PUSH_REWRITE", ownURI, "000reallylongnonexistenstreamnamethatreallyshouldntexist000", "", true) // DEFAULT_STREAM needed when using Mist's load balancing @@ -1138,21 +828,13 @@ func (mc *mac) SetupTriggers(ownURI string) error { return err } -func serviceNameFromMistURL(murl string) string { - murl = strings.TrimPrefix(murl, "https://") - murl = strings.TrimPrefix(murl, "http://") - murl = strings.ReplaceAll(murl, ".", "-") - murl = strings.ReplaceAll(murl, "/", "-") - return murl -} - func (mc *mac) startMultistream(wildcardPlaybackID, playbackID string, info *streamInfo) { for i := range info.stream.Multistream.Targets { - go func(targetRef livepeer.MultistreamTargetRef) { + go func(targetRef api.MultistreamTargetRef) { glog.Infof("==> starting multistream %s", targetRef.ID) - target, err := mc.lapi.GetMultistreamTargetR(targetRef.ID) + target, pushURL, err := mc.getPushUrl(info.stream, &targetRef) if err != nil { - glog.Errorf("Error fetching multistream target. targetId=%s stream=%s err=%v", + glog.Errorf("Error building multistream target push URL. targetId=%s stream=%s err=%v", targetRef.ID, wildcardPlaybackID, err) return } else if target.Disabled { @@ -1160,55 +842,30 @@ func (mc *mac) startMultistream(wildcardPlaybackID, playbackID string, info *str targetRef.ID, wildcardPlaybackID) return } - // Find the actual parameters of the profile we're using - var videoSelector string - // Not actually the source. But the highest quality. - if targetRef.Profile == "source" { - videoSelector = "maxbps" - } else { - var prof *livepeer.Profile - for _, p := range info.stream.Profiles { - if p.Name == targetRef.Profile { - prof = &p - break - } - } - if prof == nil { - glog.Errorf("Error starting multistream to target. targetId=%s stream=%s err=couldn't find profile %s", - targetRef.ID, wildcardPlaybackID, targetRef.Profile) - return - } - videoSelector = fmt.Sprintf("~%dx%d", prof.Width, prof.Height) - } - join := "?" - if strings.Contains(target.URL, "?") { - join = "&" - } - audioSelector := "maxbps" - if targetRef.VideoOnly { - audioSelector = "silent" - } - // Inject ?video=~widthxheight to send the correct rendition - selectorURL := fmt.Sprintf("%s%svideo=%s&audio=%s", target.URL, join, videoSelector, audioSelector) + info.mu.Lock() - info.pushStatus[selectorURL] = &pushStatus{target: target, profile: targetRef.Profile} + info.pushStatus[pushURL] = &pushStatus{ + target: target, + profile: targetRef.Profile, + metrics: &data.MultistreamMetrics{}, + } + info.mu.Unlock() - err = mc.mapi.StartPush(wildcardPlaybackID, selectorURL) + err = mc.mapi.StartPush(wildcardPlaybackID, pushURL) if err != nil { glog.Errorf("Error starting multistream to target. targetId=%s stream=%s err=%v", targetRef.ID, wildcardPlaybackID, err) - delete(info.pushStatus, selectorURL) + info.mu.Lock() + delete(info.pushStatus, pushURL) info.mu.Unlock() return } - glog.Infof("Started multistream to target. targetId=%s stream=%s url=%s", wildcardPlaybackID, targetRef.ID, selectorURL) - info.mu.Unlock() + glog.Infof("Started multistream to target. targetId=%s stream=%s url=%s", wildcardPlaybackID, targetRef.ID, pushURL) }(info.stream.Multistream.Targets[i]) } } func (mc *mac) startSignalHandler() { exitc := make(chan os.Signal, 1) - // signal.Notify(exitc, syscall.SIGINT, syscall.SIGTERM, os.Interrupt, os.Kill) signal.Notify(exitc, syscall.SIGTERM, syscall.SIGINT) go func() { gotSig := <-exitc @@ -1224,70 +881,108 @@ func (mc *mac) startSignalHandler() { }() } +func (mc *mac) getPushUrl(stream *api.Stream, targetRef *api.MultistreamTargetRef) (*api.MultistreamTarget, string, error) { + target, err := mc.lapi.GetMultistreamTarget(targetRef.ID) + if err != nil { + return nil, "", fmt.Errorf("error fetching multistream target %s: %w", targetRef.ID, err) + } + // Find the actual parameters of the profile we're using + var videoSelector string + // Not actually the source. But the highest quality. + if targetRef.Profile == "source" { + videoSelector = "maxbps" + } else { + var prof *api.Profile + for _, p := range stream.Profiles { + if p.Name == targetRef.Profile { + prof = &p + break + } + } + if prof == nil { + return nil, "", fmt.Errorf("profile not found: %s", targetRef.Profile) + } + videoSelector = fmt.Sprintf("~%dx%d", prof.Width, prof.Height) + } + join := "?" + if strings.Contains(target.URL, "?") { + join = "&" + } + audioSelector := "maxbps" + if targetRef.VideoOnly { + audioSelector = "silent" + } + // Inject ?video=~widthxheight to send the correct rendition + return target, fmt.Sprintf("%s%svideo=%s&audio=%s", target.URL, join, videoSelector, audioSelector), nil +} + func (mc *mac) shutdown() { ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - // start calling /setactve/false and sending events on active connections eagerly - deactivateGroup := &sync.WaitGroup{} - mc.deactiveAllStreams(ctx, deactivateGroup) - err := mc.srv.Shutdown(ctx) glog.Infof("Done shutting down server with err=%v", err) - if mc.useEtcd { - mc.etcdClient.Close() - } - deactivateGroup.Wait() mc.cancel() mc.srvShutCh <- err } -// deactiveAllStreams sends /setactive/false for all the active streams as well -// as AMQP events with the inactive state. -func (mc *mac) deactiveAllStreams(ctx context.Context, wg *sync.WaitGroup) { - mc.mu.Lock() - ids := make([]string, 0, len(mc.pub2info)) - streams := make([]*livepeer.CreateStreamResp, 0, len(mc.pub2info)) - for _, info := range mc.pub2info { - ids = append(ids, info.id) - streams = append(streams, info.stream) +func (mc *mac) getStreamInfoLogged(playbackID string) (*streamInfo, bool) { + info, err := mc.getStreamInfo(playbackID) + if err != nil { + glog.Errorf("Error getting stream info playbackID=%q err=%q", playbackID, err) + return nil, false } - mc.mu.Unlock() + return info, true +} - wg.Add(1) - go func() { - defer wg.Done() - for _, stream := range streams { - mc.emitStreamStateEvent(stream, data.StreamState{Active: false}) - } - err := mc.producer.Shutdown(ctx) - if err != nil { - glog.Errorf("Error shutting down AMQP producer err=%v", err) - } - }() +func (mc *mac) getStreamInfo(playbackID string) (*streamInfo, error) { + playbackID = mistStreamName2playbackID(playbackID) - if len(ids) == 0 { - return + mc.mu.RLock() + info := mc.streamInfo[playbackID] + mc.mu.RUnlock() + + if info != nil { + return info, nil } - wg.Add(1) - go func() { - defer wg.Done() - updated, err := mc.lapi.DeactivateMany(ids) + + glog.Infof("getStreamInfo: Fetching stream not found in memory. playbackID=%s", playbackID) + stream, err := mc.lapi.GetStreamByPlaybackID(playbackID) + if err != nil { + return nil, fmt.Errorf("error getting stream by playback ID %s: %w", playbackID, err) + } + + pushes := make(map[string]*pushStatus) + for _, ref := range stream.Multistream.Targets { + target, pushURL, err := mc.getPushUrl(stream, &ref) if err != nil { - glog.Errorf("Error setting many isActive to false ids=%+v err=%v", ids, err) - } else { - glog.Infof("Set many isActive to false ids=%+v rowCount=%d", ids, updated) + return nil, err } - }() -} + pushes[pushURL] = &pushStatus{ + target: target, + profile: ref.Profile, + // Assume setup was all successful + pushStartEmitted: true, + } + } -func (mc *mac) getStreamInfo(mistID string) *streamInfo { - playbackID := mistStreamName2playbackID(mistID) - mc.mu.RLock() - info := mc.pub2info[playbackID] - mc.mu.RUnlock() - return info + info = &streamInfo{ + id: stream.ID, + stream: stream, + isLazy: true, // flag it as a lazy stream info to avoid sending metrics + done: make(chan struct{}), + pushStatus: pushes, + // Assume setup was all successful + multistreamStarted: true, + } + glog.Infof("getStreamInfo: Created info lazily for stream. playbackID=%s id=%s numPushes=%d", playbackID, stream.ID, len(pushes)) + + mc.mu.Lock() + mc.streamInfo[playbackID] = info + mc.mu.Unlock() + + return info, nil } func (mc *mac) SrvShutCh() chan error { diff --git a/internal/app/mistapiconnector/stats_collector.go b/internal/app/mistapiconnector/stats_collector.go index 4c43fa1e..f3ccde93 100644 --- a/internal/app/mistapiconnector/stats_collector.go +++ b/internal/app/mistapiconnector/stats_collector.go @@ -13,7 +13,7 @@ import ( ) type infoProvider interface { - getStreamInfo(mistID string) *streamInfo + getStreamInfo(mistID string) (*streamInfo, error) } type metricsCollector struct { @@ -26,6 +26,7 @@ type metricsCollector struct { func startMetricsCollector(ctx context.Context, period time.Duration, nodeID, ownRegion string, mapi *mist.API, producer *event.AMQPProducer, amqpExchange string, infop infoProvider) { mc := &metricsCollector{nodeID, ownRegion, mapi, producer, amqpExchange, infop} + mc.collectMetricsLogged(ctx, period) go mc.mainLoop(ctx, period) } @@ -37,15 +38,19 @@ func (c *metricsCollector) mainLoop(loopCtx context.Context, period time.Duratio case <-loopCtx.Done(): return case <-ticker.C: - ctx, cancel := context.WithTimeout(loopCtx, period) - if err := c.collectMetrics(ctx); err != nil { - glog.Errorf("Error collecting mist metrics. err=%v", err) - } - cancel() + c.collectMetricsLogged(loopCtx, period) } } } +func (c *metricsCollector) collectMetricsLogged(ctx context.Context, timeout time.Duration) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + if err := c.collectMetrics(ctx); err != nil { + glog.Errorf("Error collecting mist metrics. err=%v", err) + } +} + func (c *metricsCollector) collectMetrics(ctx context.Context) error { defer func() { if rec := recover(); rec != nil { @@ -60,13 +65,19 @@ func (c *metricsCollector) collectMetrics(ctx context.Context) error { streamsMetrics := compileStreamMetrics(mistStats) for streamID, metrics := range streamsMetrics { - info := c.getStreamInfo(streamID) - if info == nil { - glog.Infof("Mist exported metrics from unknown stream. streamId=%q metrics=%+v", streamID, metrics) + info, err := c.getStreamInfo(streamID) + if err != nil { + return fmt.Errorf("error getting stream info for %s: %w", streamID, err) + } + if info.isLazy { + // avoid spamming metrics for playback-only catalyst instances. This means + // that if mapic restarts we will stop sending metrics from previous + // streams as well, but that's a minor issue (curr stream health is dying). + glog.Infof("Skipping metrics for lazily created stream info. streamId=%q metrics=%+v", streamID, metrics) continue } mseEvent := createMetricsEvent(c.nodeID, c.ownRegion, info, metrics) - err := c.producer.Publish(ctx, event.AMQPMessage{ + err = c.producer.Publish(ctx, event.AMQPMessage{ Exchange: c.amqpExchange, Key: fmt.Sprintf("stream.metrics.%s", info.stream.ID), Body: mseEvent, @@ -87,6 +98,10 @@ func createMetricsEvent(nodeID, region string, info *streamInfo, metrics *stream multistream := make([]*data.MultistreamTargetMetrics, len(metrics.pushes)) for i, push := range metrics.pushes { pushInfo := info.pushStatus[push.OriginalURI] + if pushInfo == nil { + glog.Infof("Mist exported metrics from unknown push. streamId=%q pushURL=%q", info.id, push.OriginalURI) + continue + } var metrics *data.MultistreamMetrics if push.Stats != nil { metrics = &data.MultistreamMetrics{ @@ -94,14 +109,16 @@ func createMetricsEvent(nodeID, region string, info *streamInfo, metrics *stream Bytes: push.Stats.Bytes, MediaTimeMs: push.Stats.MediaTime, } - if metrics.Bytes > pushInfo.pushedBytes { - census.IncMultistreamBytes(metrics.Bytes-pushInfo.pushedBytes, info.stream.PlaybackID) // manifestID === playbackID - pushInfo.pushedBytes = metrics.Bytes - } - if mediaTime := time.Duration(metrics.MediaTimeMs) * time.Millisecond; mediaTime > pushInfo.pushedMediaTime { - census.IncMultistreamTime(mediaTime-pushInfo.pushedMediaTime, info.stream.PlaybackID) - pushInfo.pushedMediaTime = mediaTime + if last := pushInfo.metrics; last != nil { + if metrics.Bytes > last.Bytes { + census.IncMultistreamBytes(metrics.Bytes-last.Bytes, info.stream.PlaybackID) // manifestID === playbackID + } + if metrics.MediaTimeMs > last.MediaTimeMs { + diff := time.Duration(metrics.MediaTimeMs-last.MediaTimeMs) * time.Millisecond + census.IncMultistreamTime(diff, info.stream.PlaybackID) + } } + pushInfo.metrics = metrics } multistream[i] = &data.MultistreamTargetMetrics{ Target: pushToMultistreamTargetInfo(pushInfo),