Skip to content

Commit

Permalink
vod: Allow video manipulation logic to be used as a library (#134)
Browse files Browse the repository at this point in the history
* api-transcoder: TOD (transcoding-on-demand) command line tool

* move `HlsSegment` to `model` package

* add public `StartSegmenting`

* Segmenter for reading from memory

* dockerfile: Fix base builder image alpine version

ffmpeg has been bumped to major 5 on alpine 3.16, which
makes our build break since we call some old functions.

Tried a looooot of fixes for this and this seems to be the
most worth it for now.

Co-authored-by: Victor Elias <[email protected]>
  • Loading branch information
darkdarkdragon and victorges authored Jun 9, 2022
1 parent d426981 commit 251def1
Show file tree
Hide file tree
Showing 11 changed files with 676 additions and 88 deletions.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,10 @@ tags
# Support for Project snippet scope

# End of https://www.toptal.com/developers/gitignore/api/emacs,vim,go,visualstudiocode
/lapi
/loadtester
/mapi
/mist-api-connector
/testdriver
/recordtester
/api-transcoder
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.15-alpine as builder
FROM golang:1.17-alpine3.15 as builder

RUN apk add --no-cache make gcc musl-dev linux-headers git

Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ connector:
monitor:
go build -ldflags="$(ldflags)" -o "$(GO_BUILD_DIR)" cmd/stream-monitor/stream-monitor.go

.PHONY: api-transcoder
api-transcoder:
go build -ldflags="$(ldflags)" cmd/api-transcoder/api-transcoder.go

.PHONY: docker
docker:
docker build -t livepeer/streamtester:latest --build-arg version=$(shell git describe --dirty) .
Expand Down
120 changes: 117 additions & 3 deletions apis/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"mime"
"mime/multipart"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"

Expand All @@ -20,7 +25,7 @@ import (
)

// ErrNotExists returned if stream is not found
var ErrNotExists = errors.New("Stream does not exists")
var ErrNotExists = errors.New("stream does not exists")

const httpTimeout = 4 * time.Second
const setActiveTimeout = 1500 * time.Millisecond
Expand Down Expand Up @@ -54,6 +59,7 @@ type (
accessToken string
presets []string
httpClient *http.Client
broadcasters []string
}

geoResp struct {
Expand Down Expand Up @@ -88,7 +94,7 @@ type (
Name string `json:"name,omitempty"`
Width int `json:"width,omitempty"`
Height int `json:"height,omitempty"`
Bitrate int `json:"bitrate,omitempty"`
Bitrate int `json:"bitrate"`
Fps int `json:"fps"`
FpsDen int `json:"fpsDen,omitempty"`
Gop string `json:"gop,omitempty"`
Expand Down Expand Up @@ -369,7 +375,7 @@ func (lapi *API) DeleteStream(id string) error {
return err
}
if resp.StatusCode != 204 {
return fmt.Errorf("Error deleting stream %s: status is %s", id, resp.Status)
return fmt.Errorf("error deleting stream %s: status is %s", id, resp.Status)
}
return nil
}
Expand Down Expand Up @@ -801,6 +807,114 @@ func (lapi *API) GetMultistreamTargetR(id string) (*MultistreamTarget, error) {
}
}

func (lapi *API) PushSegment(sid string, seqNo int, dur time.Duration, segData []byte) ([][]byte, error) {
var err error
if len(lapi.broadcasters) == 0 {
lapi.broadcasters, err = lapi.Broadcasters()
if err != nil {
return nil, err
}
if len(lapi.broadcasters) == 0 {
return nil, fmt.Errorf("no broadcasters available")
}
}
urlToUp := fmt.Sprintf("%s/live/%s/%d.ts", lapi.broadcasters[0], sid, seqNo)
var body io.Reader
body = bytes.NewReader(segData)
req, err := uhttp.NewRequest("POST", urlToUp, body)
if err != nil {
panic(err)
}
req.Header.Set("Accept", "multipart/mixed")
req.Header.Set("Content-Duration", strconv.FormatInt(dur.Milliseconds(), 10))
postStarted := time.Now()
resp, err := lapi.httpClient.Do(req)
postTook := time.Since(postStarted)
var timedout bool
var status string
if err != nil {
uerr := err.(*url.Error)
timedout = uerr.Timeout()
}
if resp != nil {
status = resp.Status
}
glog.V(model.DEBUG).Infof("Post segment manifest=%s seqNo=%d dur=%s took=%s timed_out=%v status='%v' err=%v",
sid, seqNo, dur, postTook, timedout, status, err)
if err != nil {
return nil, err
}
glog.V(model.VERBOSE).Infof("Got transcoded manifest=%s seqNo=%d resp status=%s reading body started", sid, seqNo, resp.Status)
if resp.StatusCode != http.StatusOK {
b, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
glog.V(model.DEBUG).Infof("Got manifest=%s seqNo=%d resp status=%s error in body $%s", sid, seqNo, resp.Status, string(b))
return nil, fmt.Errorf("transcode error %s: %s", resp.Status, b)
}
started := time.Now()
mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
glog.Error("Error getting mime type ", err, sid)
panic(err)
// return
}
glog.V(model.VERBOSE).Infof("mediaType=%s params=%+v", mediaType, params)
if glog.V(model.VVERBOSE) {
for k, v := range resp.Header {
glog.Infof("Header '%s': '%s'", k, v)
}
}
var segments [][]byte
var urls []string
if mediaType == "multipart/mixed" {
mr := multipart.NewReader(resp.Body, params["boundary"])
for {
p, merr := mr.NextPart()
if merr == io.EOF {
break
}
if merr != nil {
glog.Error("Could not process multipart part ", merr, sid)
err = merr
break
}
mediaType, _, err := mime.ParseMediaType(p.Header.Get("Content-Type"))
if err != nil {
glog.Error("Error getting mime type ", err, sid)
for k, v := range p.Header {
glog.Infof("Header '%s': '%s'", k, v)
}
}
body, merr := ioutil.ReadAll(p)
if merr != nil {
glog.Errorf("error reading body manifest=%s seqNo=%d err=%v", sid, seqNo, merr)
err = merr
break
}
if mediaType == "application/vnd+livepeer.uri" {
urls = append(urls, string(body))
} else {
var v glog.Level = model.DEBUG
if len(body) < 5 {
v = 0
}
glog.V(v).Infof("Read back segment for manifest=%s seqNo=%d profile=%d len=%d bytes", sid, seqNo, len(segments), len(body))
segments = append(segments, body)
}
}
}
took := time.Since(started)
glog.V(model.VERBOSE).Infof("Reading body back for manifest=%s seqNo=%d took=%s profiles=%d", sid, seqNo, took, len(segments))
// glog.Infof("Body: %s", string(tbody))

if err != nil {
httpErr := fmt.Errorf(`error reading http request body for manifest=%s seqNo=%d err=%w`, sid, seqNo, err)
glog.Error(httpErr)
return nil, err
}
return segments, nil
}

func Timedout(e error) bool {
t, ok := e.(interface {
Timeout() bool
Expand Down
Loading

0 comments on commit 251def1

Please sign in to comment.