Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error handling - added option for providing a custom handler #5

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ It provides a handy interface to handle streams

# Example usage

#### Start a simple stream
```go
import "github.com/riltech/streamer"

func errorHandler(err error, id string) {
fmt.Printf("Error with %s stream: %s", id, err)
}

func main() {
stream, id := streamer.NewStream(
"rtsp://admin:[email protected]:447/Streaming/Channel/2", // URI of raw RTSP stream
Expand All @@ -34,9 +39,49 @@ func main() {
MaxSize: 500, // Maximum size of a log in megabytes
},
25*time.Second, // Time to wait before declaring a stream start failed
errorHandler, // Invoked once the processes exits with an error. Can be nil
)

// Returns a waitGroup where the stream checking the underlying process for a successful start
stream.Start().Wait()
}
```

#### Restart a stream on error
```go
import "github.com/riltech/streamer"

func main() {
streams := map[string]streamer.IStream

errorHandler := func (err error, id string) {
stream, ok := streams[id]
if !ok {
fmt.Errorf("%s stream not found", id)
return
}
stream.Restart().Wait()
}

stream, id := streamer.NewStream(
"rtsp://admin:[email protected]:447/Streaming/Channel/2", // URI of raw RTSP stream
"./videos", // Directory where to store video chunks and indexes. Should exist already
true, // Indicates if stream should be keeping files after it is stopped or clean the directory
true, // Indicates if Audio should be enabled or not
streamer.ProcessLoggingOpts{
Enabled: true, // Indicates if process logging is enabled
Compress: true, // Indicates if logs should be compressed
Directory: "/tmp/logs/stream", // Directory to store logs
MaxAge: 0, // Max age for a log. 0 is infinite
MaxBackups: 2, // Maximum backups to keep
MaxSize: 500, // Maximum size of a log in megabytes
},
25*time.Second, // Time to wait before declaring a stream start failed
errorHandler, // Invoked once the processes exits with an error
)

streams[id] = stream
// Returns a waitGroup where the stream checking the underlying process for a successful start
stream.Start().Wait()
}
```
46 changes: 26 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,20 @@ type IStream interface {

// Stream describes a given host's streaming
type Stream struct {
ID string `json:"id"`
Path string `json:"path"`
Running bool `json:"running"`
CMD *exec.Cmd `json:"-"`
Process IProcess `json:"-"`
Mux *sync.Mutex `json:"-"`
Streak *hotstreak.Hotstreak `json:"-"`
OriginalURI string `json:"-"`
StorePath string `json:"-"`
KeepFiles bool `json:"-"`
LoggingOpts *ProcessLoggingOpts `json:"-"`
Logger *lumberjack.Logger `json:"-"`
WaitTimeOut time.Duration `json:"-"`
ID string `json:"id"`
Path string `json:"path"`
Running bool `json:"running"`
CMD *exec.Cmd `json:"-"`
Process IProcess `json:"-"`
Mux *sync.Mutex `json:"-"`
Streak *hotstreak.Hotstreak `json:"-"`
OriginalURI string `json:"-"`
StorePath string `json:"-"`
KeepFiles bool `json:"-"`
LoggingOpts *ProcessLoggingOpts `json:"-"`
Logger *lumberjack.Logger `json:"-"`
WaitTimeOut time.Duration `json:"-"`
errorHandler func(err error, id string)
}

// Type check
Expand All @@ -51,6 +52,7 @@ func NewStream(
audio bool,
loggingOpts ProcessLoggingOpts,
waitTimeOut time.Duration,
errorHandler func(err error, id string),
) (*Stream, string) {
id := uuid.New().String()
path := fmt.Sprintf("%s/%s", storingDirectory, id)
Expand Down Expand Up @@ -88,12 +90,13 @@ func NewStream(
HotWait: time.Minute * 2,
ActiveWait: time.Minute * 4,
}).Activate(),
OriginalURI: URI,
KeepFiles: keepFiles,
LoggingOpts: &loggingOpts,
Logger: cmdLogger,
Running: false,
WaitTimeOut: waitTimeOut,
OriginalURI: URI,
KeepFiles: keepFiles,
LoggingOpts: &loggingOpts,
Logger: cmdLogger,
Running: false,
WaitTimeOut: waitTimeOut,
errorHandler: errorHandler,
}
logrus.Debugf("%s store path created | Stream", stream.StorePath)
return &stream, id
Expand All @@ -113,12 +116,15 @@ func (strm *Stream) Start() *sync.WaitGroup {
go func() {
logrus.Debugf("%s is starting FFMPEG process | Stream", strm.ID)
if err := strm.CMD.Run(); err != nil {
strm.Running = false
if strm.errorHandler != nil {
strm.errorHandler(err, strm.ID)
}
once.Do(func() {
logrus.Errorf("%s process could not start. | Stream\n Error: %s",
strm.ID,
err,
)
strm.Running = false
strm.Mux.Unlock()
wg.Done()
})
Expand Down