Skip to content

Commit

Permalink
segmenter: Fix error handling (#164)
Browse files Browse the repository at this point in the history
* testers: Return error from StartSegmenting(R)

* segmenter: Close out chan and fix first error

* segmenter: Make all fatal logs well behaved errors

Also create startSegmentingLoop helper function to avoid some of the
boilerplate repetition.

* api-transcoder: Fix only segmenter user ignoring errors

* segmenter: Support more than 2 streams

It is just for logs anyway but still

* segmenter: Make sure to close intermediate files

Not something that should be affecting task-runner,
but I saw that and why not fix it.

* segmenter: Avoid dead go-routines in segmenter

Might fix task-runner memory as well

* segmenter: ctype -> codecType
  • Loading branch information
victorges authored Jun 13, 2022
1 parent 00faa62 commit 6c4c022
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 35 deletions.
2 changes: 1 addition & 1 deletion cmd/api-transcoder/api-transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func transcode(apiKey, apiHost, src, dst string, presets []string, lprofile *liv
}
}
for _, outFile := range outFiles {
if err = outFile.Close(); err != nil {
if err := outFile.Close(); err != nil {
return err
}
}
Expand Down
98 changes: 64 additions & 34 deletions internal/testers/segmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"time"

Expand Down Expand Up @@ -34,21 +33,20 @@ func StartSegmentingR(ctx context.Context, reader io.ReadSeekCloser, stopAtFileE
glog.Errorf("avutil.OpenRC err=%v", err)
return err
}
go segmentingLoop(ctx, "", inFile, stopAtFileEnd, stopAfter, skipFirst, segLen, useWallTime, out)

return err
startSegmentingLoop(ctx, "", inFile, stopAtFileEnd, stopAfter, skipFirst, segLen, useWallTime, out)
return nil
}

func StartSegmenting(ctx context.Context, fileName string, stopAtFileEnd bool, stopAfter, skipFirst, segLen time.Duration,
useWallTime bool, out chan<- *model.HlsSegment) error {
glog.Infof("Starting segmenting file %s", fileName)
inFile, err := avutil.Open(fileName)
if err != nil {
glog.Fatal(err)
glog.Errorf("avutil.OpenRC err=%v", err)
return err
}
go segmentingLoop(ctx, fileName, inFile, stopAtFileEnd, stopAfter, skipFirst, segLen, useWallTime, out)

return err
startSegmentingLoop(ctx, fileName, inFile, stopAtFileEnd, stopAfter, skipFirst, segLen, useWallTime, out)
return nil
}

func createInMemoryTSMuxer() (av.Muxer, *bytes.Buffer) {
Expand Down Expand Up @@ -97,12 +95,28 @@ func (wt *Walltime) ModifyPacket(pkt *av.Packet, streams []av.CodecData, videoid
return
}

func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxCloser, stopAtFileEnd bool, stopAfter, skipFirst, segLen time.Duration,
func startSegmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxCloser, stopAtFileEnd bool, stopAfter, skipFirst, segLen time.Duration,
useWallTime bool, out chan<- *model.HlsSegment) {
go func() {
defer close(out)
err := segmentingLoop(ctx, fileName, inFileReal, stopAtFileEnd, stopAfter, skipFirst, segLen, useWallTime, out)
if err != nil {
glog.Errorf("Error in segmenting loop. err=%+v", err)
select {
case out <- &model.HlsSegment{Err: err}:
case <-ctx.Done():
}
}
}()
}

func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxCloser,
stopAtFileEnd bool, stopAfter, skipFirst, segLen time.Duration,
useWallTime bool, out chan<- *model.HlsSegment) error {

var err error
var streams []av.CodecData
var videoidx, audioidx int8
streamTypes := map[int8]string{}

ts := &timeShifter{}
filters := pktque.Filters{ts, &pktque.FixTime{MakeIncrement: true}}
Expand All @@ -111,22 +125,28 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo
}
inFile := &pktque.FilterDemuxer{Demuxer: inFileReal, Filter: filters}
if streams, err = inFile.Streams(); err != nil {
msg := fmt.Sprintf("Can't get info about file: '%+v', isNoAudio %v isNoVideo %v", err, errors.Is(err, jerrors.ErrNoAudioInfoFound), errors.Is(err, jerrors.ErrNoVideoInfoFound))
if !(errors.Is(err, jerrors.ErrNoAudioInfoFound) || errors.Is(err, jerrors.ErrNoVideoInfoFound)) {
glog.Fatal(msg)
}
fmt.Println(msg)
panic(msg)
glog.Errorf("Can't get info about file err=%q, isNoAudio=%v isNoVideo=%v stack=%+v", err, errors.Is(err, jerrors.ErrNoAudioInfoFound), errors.Is(err, jerrors.ErrNoVideoInfoFound), err)
return err
}
for i, st := range streams {
if st.Type().IsAudio() {
audioidx = int8(i)
codecType := "unknown"
if codec := st.Type(); codec.IsAudio() {
codecType = "audio"
} else if codec.IsVideo() {
codecType = "video"
}
if st.Type().IsVideo() {
videoidx = int8(i)
streamTypes[int8(i)] = codecType
}
glog.V(model.VERBOSE).Infof("Stream types=%+v", streamTypes)

sendSegment := func(seg *model.HlsSegment) bool {
select {
case out <- &model.HlsSegment{Err: err}:
return true
case <-ctx.Done():
return false
}
}
glog.V(model.VERBOSE).Infof("Video stream index %d, audio stream index %d\n", videoidx, audioidx)

seqNo := 0
// var curPTS time.Duration
Expand All @@ -142,12 +162,12 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo
segFile, buf := createInMemoryTSMuxer()
err = segFile.WriteHeader(streams)
if err != nil {
glog.Fatal(err)
return err
}
if firstFramePacket != nil {
err = segFile.WritePacket(*firstFramePacket)
if err != nil {
glog.Fatal(err)
return err
}
prevPTS = firstFramePacket.Time
firstFramePacket = nil
Expand All @@ -158,7 +178,8 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo
for {
select {
case <-ctx.Done():
return
// don't return ctx.Err() as that would send the error in the out channel
return nil
default:
}
pkt, rerr = inFile.ReadPacket()
Expand All @@ -169,7 +190,7 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo
}
break
}
glog.Fatal(rerr)
return rerr
}
lastPacket = pkt

Expand All @@ -182,29 +203,30 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo
// This matches segmenter algorithm used in ffmpeg
if pkt.IsKeyFrame && pkt.Time >= time.Duration(seqNo+1)*segLen {
firstFramePacket = &pkt
glog.V(model.VERBOSE).Infof("Packet Is Keyframe %v Is Audio %v Is Video %v PTS %s sinc prev %s seqNo %d\n", pkt.IsKeyFrame, pkt.Idx == audioidx, pkt.Idx == videoidx, pkt.Time,
glog.V(model.VERBOSE).Infof("Packet isKeyframe=%v codecType=%v PTS=%s sincPrev=%s seqNo=%d", pkt.IsKeyFrame, streamTypes[pkt.Idx], pkt.Time,
pkt.Time-prevPTS, seqNo+1)
// prevPTS = pkt.Time
curDur = pkt.Time - prevPTS
break
}
err = segFile.WritePacket(pkt)
if err != nil {
glog.Fatal(err)
return err
}
}
err = segFile.WriteTrailer()
if err != nil {
glog.Fatal(err)
return err
}
if rerr == io.EOF && stopAfter > 0 && (prevPTS+curDur) < stopAfter && len(fileName) > 0 {
// re-open same file and stream it again
firstFramePacket = nil
ts.timeShift = lastPacket.Time + 30*time.Millisecond
inf, err := avutil.Open(fileName)
if err != nil {
glog.Fatal(err)
return err
}
defer inf.Close()
inFile.Demuxer = inf
// rs.counter.currentSegments = 0
inFile.Streams()
Expand All @@ -221,7 +243,9 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo
Duration: curDur,
Data: buf.Bytes(),
}
out <- hlsSeg
if !sendSegment(hlsSeg) {
return nil
}
prevPTS = lastPacket.Time
} else {
seqNo--
Expand All @@ -241,7 +265,9 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo
Duration: curDur,
Data: buf.Bytes(),
}
out <- hlsSeg
if !sendSegment(hlsSeg) {
return nil
}
sent = 0
}
}
Expand All @@ -257,7 +283,9 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo
SeqNo: seqNo + 1 + sent,
Pts: prevPTS + curDur,
}
out <- hlsSeg
if !sendSegment(hlsSeg) {
return nil
}
break
}
}
Expand All @@ -267,10 +295,12 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo
SeqNo: seqNo + 1,
Pts: prevPTS + curDur,
}
out <- hlsSeg
if !sendSegment(hlsSeg) {
return nil
}
break
}
seqNo++
}
return
return nil
}

0 comments on commit 6c4c022

Please sign in to comment.