Skip to content

Commit

Permalink
fix cleaning of recordings in case of multiple recordDeleteAfter valu…
Browse files Browse the repository at this point in the history
…es (#3557)
  • Loading branch information
aler9 committed Sep 5, 2024
1 parent 6e9d79c commit a84a4a8
Show file tree
Hide file tree
Showing 34 changed files with 738 additions and 588 deletions.
39 changes: 30 additions & 9 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/httpp"
"github.com/bluenviron/mediamtx/internal/record"
"github.com/bluenviron/mediamtx/internal/recordstore"
"github.com/bluenviron/mediamtx/internal/restrictnetwork"
"github.com/bluenviron/mediamtx/internal/servers/hls"
"github.com/bluenviron/mediamtx/internal/servers/rtmp"
Expand Down Expand Up @@ -56,6 +56,27 @@ func paramName(ctx *gin.Context) (string, bool) {
return name[1:], true
}

func recordingsOfPath(
pathConf *conf.Path,
pathName string,
) *defs.APIRecording {
ret := &defs.APIRecording{
Name: pathName,
}

segments, _ := recordstore.FindSegments(pathConf, pathName)

ret.Segments = make([]*defs.APIRecordingSegment, len(segments))

for i, seg := range segments {
ret.Segments[i] = &defs.APIRecordingSegment{
Start: seg.Start,
}
}

return ret
}

// PathManager contains methods used by the API and Metrics server.
type PathManager interface {
APIPathsList() (*defs.APIPathList, error)
Expand Down Expand Up @@ -1062,7 +1083,7 @@ func (a *API) onRecordingsList(ctx *gin.Context) {
c := a.Conf
a.mutex.RUnlock()

pathNames := getAllPathsWithRecordings(c.Paths)
pathNames := recordstore.FindAllPathsWithSegments(c.Paths)

data := defs.APIRecordingList{}

Expand All @@ -1077,8 +1098,8 @@ func (a *API) onRecordingsList(ctx *gin.Context) {
data.Items = make([]*defs.APIRecording, len(pathNames))

for i, pathName := range pathNames {
_, pathConf, _, _ := conf.FindPathConf(c.Paths, pathName)
data.Items[i] = recordingEntry(pathConf, pathName)
pathConf, _, _ := conf.FindPathConf(c.Paths, pathName)
data.Items[i] = recordingsOfPath(pathConf, pathName)
}

ctx.JSON(http.StatusOK, data)
Expand All @@ -1095,13 +1116,13 @@ func (a *API) onRecordingsGet(ctx *gin.Context) {
c := a.Conf
a.mutex.RUnlock()

_, pathConf, _, err := conf.FindPathConf(c.Paths, pathName)
pathConf, _, err := conf.FindPathConf(c.Paths, pathName)
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}

ctx.JSON(http.StatusOK, recordingEntry(pathConf, pathName))
ctx.JSON(http.StatusOK, recordingsOfPath(pathConf, pathName))
}

func (a *API) onRecordingDeleteSegment(ctx *gin.Context) {
Expand All @@ -1117,18 +1138,18 @@ func (a *API) onRecordingDeleteSegment(ctx *gin.Context) {
c := a.Conf
a.mutex.RUnlock()

_, pathConf, _, err := conf.FindPathConf(c.Paths, pathName)
pathConf, _, err := conf.FindPathConf(c.Paths, pathName)
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}

pathFormat := record.PathAddExtension(
pathFormat := recordstore.PathAddExtension(
strings.ReplaceAll(pathConf.RecordPath, "%path", pathName),
pathConf.RecordFormat,
)

segmentPath := record.Path{
segmentPath := recordstore.Path{
Start: start,
}.Encode(pathFormat)

Expand Down
1 change: 1 addition & 0 deletions internal/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ func TestRecordingsList(t *testing.T) {
cnf := tempConf(t, "pathDefaults:\n"+
" recordPath: "+filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")+"\n"+
"paths:\n"+
" mypath1:\n"+
" all_others:\n")

api := API{
Expand Down
137 changes: 0 additions & 137 deletions internal/api/record.go

This file was deleted.

14 changes: 7 additions & 7 deletions internal/conf/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,38 +46,38 @@ func srtCheckPassphrase(passphrase string) error {
}

// FindPathConf returns the configuration corresponding to the given path name.
func FindPathConf(pathConfs map[string]*Path, name string) (string, *Path, []string, error) {
func FindPathConf(pathConfs map[string]*Path, name string) (*Path, []string, error) {

Check warning on line 49 in internal/conf/path.go

View check run for this annotation

Codecov / codecov/patch

internal/conf/path.go#L49

Added line #L49 was not covered by tests
err := isValidPathName(name)
if err != nil {
return "", nil, nil, fmt.Errorf("invalid path name: %w (%s)", err, name)
return nil, nil, fmt.Errorf("invalid path name: %w (%s)", err, name)

Check warning on line 52 in internal/conf/path.go

View check run for this annotation

Codecov / codecov/patch

internal/conf/path.go#L52

Added line #L52 was not covered by tests
}

// normal path
if pathConf, ok := pathConfs[name]; ok {
return name, pathConf, nil, nil
return pathConf, nil, nil

Check warning on line 57 in internal/conf/path.go

View check run for this annotation

Codecov / codecov/patch

internal/conf/path.go#L57

Added line #L57 was not covered by tests
}

// regular expression-based path
for pathConfName, pathConf := range pathConfs {
if pathConf.Regexp != nil && pathConfName != "all" && pathConfName != "all_others" {
m := pathConf.Regexp.FindStringSubmatch(name)
if m != nil {
return pathConfName, pathConf, m, nil
return pathConf, m, nil

Check warning on line 65 in internal/conf/path.go

View check run for this annotation

Codecov / codecov/patch

internal/conf/path.go#L65

Added line #L65 was not covered by tests
}
}
}

// all_others
// process all_others after every other entry
for pathConfName, pathConf := range pathConfs {
if pathConfName == "all" || pathConfName == "all_others" {
m := pathConf.Regexp.FindStringSubmatch(name)
if m != nil {
return pathConfName, pathConf, m, nil
return pathConf, m, nil

Check warning on line 75 in internal/conf/path.go

View check run for this annotation

Codecov / codecov/patch

internal/conf/path.go#L75

Added line #L75 was not covered by tests
}
}
}

return "", nil, nil, fmt.Errorf("path '%s' is not configured", name)
return nil, nil, fmt.Errorf("path '%s' is not configured", name)

Check warning on line 80 in internal/conf/path.go

View check run for this annotation

Codecov / codecov/patch

internal/conf/path.go#L80

Added line #L80 was not covered by tests
}

// Path is a path configuration.
Expand Down
51 changes: 9 additions & 42 deletions internal/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os/signal"
"path/filepath"
"reflect"
"sort"
"strings"
"time"

Expand All @@ -25,7 +24,7 @@ import (
"github.com/bluenviron/mediamtx/internal/metrics"
"github.com/bluenviron/mediamtx/internal/playback"
"github.com/bluenviron/mediamtx/internal/pprof"
"github.com/bluenviron/mediamtx/internal/record"
"github.com/bluenviron/mediamtx/internal/recordcleaner"
"github.com/bluenviron/mediamtx/internal/rlimit"
"github.com/bluenviron/mediamtx/internal/servers/hls"
"github.com/bluenviron/mediamtx/internal/servers/rtmp"
Expand All @@ -44,38 +43,6 @@ var defaultConfPaths = []string{
"/etc/mediamtx/mediamtx.yml",
}

func gatherCleanerEntries(paths map[string]*conf.Path) []record.CleanerEntry {
out := make(map[record.CleanerEntry]struct{})

for _, pa := range paths {
if pa.Record && pa.RecordDeleteAfter != 0 {
entry := record.CleanerEntry{
Path: pa.RecordPath,
Format: pa.RecordFormat,
DeleteAfter: time.Duration(pa.RecordDeleteAfter),
}
out[entry] = struct{}{}
}
}

out2 := make([]record.CleanerEntry, len(out))
i := 0

for v := range out {
out2[i] = v
i++
}

sort.Slice(out2, func(i, j int) bool {
if out2[i].Path != out2[j].Path {
return out2[i].Path < out2[j].Path
}
return out2[i].DeleteAfter < out2[j].DeleteAfter
})

return out2
}

var cli struct {
Version bool `help:"print version"`
Confpath string `arg:"" default:""`
Expand All @@ -92,7 +59,7 @@ type Core struct {
authManager *auth.Manager
metrics *metrics.Metrics
pprof *pprof.PPROF
recordCleaner *record.Cleaner
recordCleaner *recordcleaner.Cleaner
playbackServer *playback.Server
pathManager *pathManager
rtspServer *rtsp.Server
Expand Down Expand Up @@ -333,12 +300,10 @@ func (p *Core) createResources(initial bool) error {
p.pprof = i
}

cleanerEntries := gatherCleanerEntries(p.conf.Paths)
if len(cleanerEntries) != 0 &&
p.recordCleaner == nil {
p.recordCleaner = &record.Cleaner{
Entries: cleanerEntries,
Parent: p,
if p.recordCleaner == nil {
p.recordCleaner = &recordcleaner.Cleaner{
PathConfs: p.conf.Paths,
Parent: p,
}
p.recordCleaner.Initialize()
}
Expand Down Expand Up @@ -707,8 +672,10 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
closeLogger

closeRecorderCleaner := newConf == nil ||
!reflect.DeepEqual(gatherCleanerEntries(newConf.Paths), gatherCleanerEntries(p.conf.Paths)) ||
closeLogger
if !closeRecorderCleaner && !reflect.DeepEqual(newConf.Paths, p.conf.Paths) {
p.recordCleaner.ReloadPathConfs(newConf.Paths)
}

closePlaybackServer := newConf == nil ||
newConf.Playback != p.conf.Playback ||
Expand Down
Loading

0 comments on commit a84a4a8

Please sign in to comment.