diff --git a/Gopkg.lock b/Gopkg.lock index 986204e..6092c2d 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,12 +2,12 @@ [[projects]] - digest = "1:81ec042f928b92023ad84215c95fb5d14c4597e92c6124202a720788ebab47c1" + digest = "1:2fbc7e912355e631e427254ed0a489218f6edc3972b6d7420823ae4105ae01a2" name = "github.com/Roverr/hotstreak" packages = ["."] pruneopts = "UT" - revision = "1b1a61f52d3211f278c8b8e9d03315a85f44505d" - version = "v1.0.0" + revision = "421918a793cc43dff3c1e31005614dcf19d84651" + version = "v1.1.0" [[projects]] digest = "1:582b704bebaa06b48c29b0cec224a6058a09c86883aaddabde889cd1a5f73e1b" @@ -18,12 +18,12 @@ version = "v1.1.1" [[projects]] - digest = "1:31e761d97c76151dde79e9d28964a812c46efc5baee4085b86f68f0c654450de" + digest = "1:09cb61dc19af93deae01587e2fdb1c081e0bf48f1a5ad5fa24f48750dc57dce8" name = "github.com/konsorten/go-windows-terminal-sequences" packages = ["."] pruneopts = "UT" - revision = "f55edac94c9bbba5d6182a4be46d86a2c9b5b50e" - version = "v1.0.2" + revision = "edb144dfd453055e1e49a3d8b410a660b5a87613" + version = "v1.0.3" [[projects]] digest = "1:c805e517269b0ba4c21ded5836019ed7d16953d4026cb7d00041d039c7906be9" @@ -34,20 +34,23 @@ version = "v2.1" [[projects]] - digest = "1:04457f9f6f3ffc5fea48e71d62f2ca256637dee0a04d710288e27e05c8b41976" + digest = "1:05eebdd5727fea23083fce0d98d307d70c86baed644178e81608aaa9f09ea469" name = "github.com/sirupsen/logrus" packages = ["."] pruneopts = "UT" - revision = "839c75faf7f98a33d445d181f3018b5c3409a45e" - version = "v1.4.2" + revision = "60c74ad9be0d874af0ab0daef6ab07c5c5911f0d" + version = "v1.6.0" [[projects]] branch = "master" - digest = "1:72b7c210f8cfe1431d2f300fbf37f25e52aa77324b05ab6b698483054033803e" + digest = "1:020620a097c2bfd056c8db7d31a69ea2cfed874ce985763dcd9ae00f9fa5f74b" name = "golang.org/x/sys" - packages = ["unix"] + packages = [ + "internal/unsafeheader", + "unix", + ] pruneopts = "UT" - revision = "d101bd2416d505c0448a6ce8a282482678040a89" + revision = "fe76b779f299728f3bd63f77ea2c815504229c3b" [solve-meta] analyzer-name = "dep" diff --git a/README.md b/README.md index 4170106..c997a28 100644 --- a/README.md +++ b/README.md @@ -16,9 +16,14 @@ It provides a handy interface to handle streams # Example usage +#### Start a simple stream ```go import "github.com/riltech/streamer" +func errorHandler(err error, id string) { + fmt.Printf("Error with %s stream: %s", id, err) +} + func main() { stream, id := streamer.NewStream( "rtsp://admin:password@host.dyndns.org:447/Streaming/Channel/2", // URI of raw RTSP stream @@ -34,9 +39,49 @@ func main() { MaxSize: 500, // Maximum size of a log in megabytes }, 25*time.Second, // Time to wait before declaring a stream start failed + errorHandler, // Invoked once the processes exits with an error. Can be nil ) // Returns a waitGroup where the stream checking the underlying process for a successful start stream.Start().Wait() } ``` + +#### Restart a stream on error +```go +import "github.com/riltech/streamer" + +func main() { + streams := map[string]streamer.IStream + + errorHandler := func (err error, id string) { + stream, ok := streams[id] + if !ok { + fmt.Errorf("%s stream not found", id) + return + } + stream.Restart().Wait() + } + + stream, id := streamer.NewStream( + "rtsp://admin:password@host.dyndns.org:447/Streaming/Channel/2", // URI of raw RTSP stream + "./videos", // Directory where to store video chunks and indexes. Should exist already + true, // Indicates if stream should be keeping files after it is stopped or clean the directory + true, // Indicates if Audio should be enabled or not + streamer.ProcessLoggingOpts{ + Enabled: true, // Indicates if process logging is enabled + Compress: true, // Indicates if logs should be compressed + Directory: "/tmp/logs/stream", // Directory to store logs + MaxAge: 0, // Max age for a log. 0 is infinite + MaxBackups: 2, // Maximum backups to keep + MaxSize: 500, // Maximum size of a log in megabytes + }, + 25*time.Second, // Time to wait before declaring a stream start failed + errorHandler, // Invoked once the processes exits with an error + ) + + streams[id] = stream + // Returns a waitGroup where the stream checking the underlying process for a successful start + stream.Start().Wait() +} +``` diff --git a/main.go b/main.go index bb2409c..e4d07dd 100644 --- a/main.go +++ b/main.go @@ -25,19 +25,20 @@ type IStream interface { // Stream describes a given host's streaming type Stream struct { - ID string `json:"id"` - Path string `json:"path"` - Running bool `json:"running"` - CMD *exec.Cmd `json:"-"` - Process IProcess `json:"-"` - Mux *sync.Mutex `json:"-"` - Streak *hotstreak.Hotstreak `json:"-"` - OriginalURI string `json:"-"` - StorePath string `json:"-"` - KeepFiles bool `json:"-"` - LoggingOpts *ProcessLoggingOpts `json:"-"` - Logger *lumberjack.Logger `json:"-"` - WaitTimeOut time.Duration `json:"-"` + ID string `json:"id"` + Path string `json:"path"` + Running bool `json:"running"` + CMD *exec.Cmd `json:"-"` + Process IProcess `json:"-"` + Mux *sync.Mutex `json:"-"` + Streak *hotstreak.Hotstreak `json:"-"` + OriginalURI string `json:"-"` + StorePath string `json:"-"` + KeepFiles bool `json:"-"` + LoggingOpts *ProcessLoggingOpts `json:"-"` + Logger *lumberjack.Logger `json:"-"` + WaitTimeOut time.Duration `json:"-"` + errorHandler func(err error, id string) } // Type check @@ -51,6 +52,7 @@ func NewStream( audio bool, loggingOpts ProcessLoggingOpts, waitTimeOut time.Duration, + errorHandler func(err error, id string), ) (*Stream, string) { id := uuid.New().String() path := fmt.Sprintf("%s/%s", storingDirectory, id) @@ -88,12 +90,13 @@ func NewStream( HotWait: time.Minute * 2, ActiveWait: time.Minute * 4, }).Activate(), - OriginalURI: URI, - KeepFiles: keepFiles, - LoggingOpts: &loggingOpts, - Logger: cmdLogger, - Running: false, - WaitTimeOut: waitTimeOut, + OriginalURI: URI, + KeepFiles: keepFiles, + LoggingOpts: &loggingOpts, + Logger: cmdLogger, + Running: false, + WaitTimeOut: waitTimeOut, + errorHandler: errorHandler, } logrus.Debugf("%s store path created | Stream", stream.StorePath) return &stream, id @@ -113,12 +116,15 @@ func (strm *Stream) Start() *sync.WaitGroup { go func() { logrus.Debugf("%s is starting FFMPEG process | Stream", strm.ID) if err := strm.CMD.Run(); err != nil { + strm.Running = false + if strm.errorHandler != nil { + strm.errorHandler(err, strm.ID) + } once.Do(func() { logrus.Errorf("%s process could not start. | Stream\n Error: %s", strm.ID, err, ) - strm.Running = false strm.Mux.Unlock() wg.Done() })