Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix cleaning of recordings in case of multiple recordDeleteAfter values (#3557) #3741

Merged
merged 1 commit into from
Sep 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/httpp"
"github.com/bluenviron/mediamtx/internal/record"
"github.com/bluenviron/mediamtx/internal/recordstore"
"github.com/bluenviron/mediamtx/internal/restrictnetwork"
"github.com/bluenviron/mediamtx/internal/servers/hls"
"github.com/bluenviron/mediamtx/internal/servers/rtmp"
Expand Down Expand Up @@ -56,6 +56,27 @@ func paramName(ctx *gin.Context) (string, bool) {
return name[1:], true
}

func recordingsOfPath(
pathConf *conf.Path,
pathName string,
) *defs.APIRecording {
ret := &defs.APIRecording{
Name: pathName,
}

segments, _ := recordstore.FindSegments(pathConf, pathName)

ret.Segments = make([]*defs.APIRecordingSegment, len(segments))

for i, seg := range segments {
ret.Segments[i] = &defs.APIRecordingSegment{
Start: seg.Start,
}
}

return ret
}

// PathManager contains methods used by the API and Metrics server.
type PathManager interface {
APIPathsList() (*defs.APIPathList, error)
Expand Down Expand Up @@ -1062,7 +1083,7 @@ func (a *API) onRecordingsList(ctx *gin.Context) {
c := a.Conf
a.mutex.RUnlock()

pathNames := getAllPathsWithRecordings(c.Paths)
pathNames := recordstore.FindAllPathsWithSegments(c.Paths)

data := defs.APIRecordingList{}

Expand All @@ -1077,8 +1098,8 @@ func (a *API) onRecordingsList(ctx *gin.Context) {
data.Items = make([]*defs.APIRecording, len(pathNames))

for i, pathName := range pathNames {
_, pathConf, _, _ := conf.FindPathConf(c.Paths, pathName)
data.Items[i] = recordingEntry(pathConf, pathName)
pathConf, _, _ := conf.FindPathConf(c.Paths, pathName)
data.Items[i] = recordingsOfPath(pathConf, pathName)
}

ctx.JSON(http.StatusOK, data)
Expand All @@ -1095,13 +1116,13 @@ func (a *API) onRecordingsGet(ctx *gin.Context) {
c := a.Conf
a.mutex.RUnlock()

_, pathConf, _, err := conf.FindPathConf(c.Paths, pathName)
pathConf, _, err := conf.FindPathConf(c.Paths, pathName)
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}

ctx.JSON(http.StatusOK, recordingEntry(pathConf, pathName))
ctx.JSON(http.StatusOK, recordingsOfPath(pathConf, pathName))
}

func (a *API) onRecordingDeleteSegment(ctx *gin.Context) {
Expand All @@ -1117,18 +1138,18 @@ func (a *API) onRecordingDeleteSegment(ctx *gin.Context) {
c := a.Conf
a.mutex.RUnlock()

_, pathConf, _, err := conf.FindPathConf(c.Paths, pathName)
pathConf, _, err := conf.FindPathConf(c.Paths, pathName)
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}

pathFormat := record.PathAddExtension(
pathFormat := recordstore.PathAddExtension(
strings.ReplaceAll(pathConf.RecordPath, "%path", pathName),
pathConf.RecordFormat,
)

segmentPath := record.Path{
segmentPath := recordstore.Path{
Start: start,
}.Encode(pathFormat)

Expand Down
1 change: 1 addition & 0 deletions internal/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ func TestRecordingsList(t *testing.T) {
cnf := tempConf(t, "pathDefaults:\n"+
" recordPath: "+filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")+"\n"+
"paths:\n"+
" mypath1:\n"+
" all_others:\n")

api := API{
Expand Down
137 changes: 0 additions & 137 deletions internal/api/record.go

This file was deleted.

14 changes: 7 additions & 7 deletions internal/conf/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,38 +46,38 @@
}

// FindPathConf returns the configuration corresponding to the given path name.
func FindPathConf(pathConfs map[string]*Path, name string) (string, *Path, []string, error) {
func FindPathConf(pathConfs map[string]*Path, name string) (*Path, []string, error) {

Check warning on line 49 in internal/conf/path.go

View check run for this annotation

Codecov / codecov/patch

internal/conf/path.go#L49

Added line #L49 was not covered by tests
err := isValidPathName(name)
if err != nil {
return "", nil, nil, fmt.Errorf("invalid path name: %w (%s)", err, name)
return nil, nil, fmt.Errorf("invalid path name: %w (%s)", err, name)

Check warning on line 52 in internal/conf/path.go

View check run for this annotation

Codecov / codecov/patch

internal/conf/path.go#L52

Added line #L52 was not covered by tests
}

// normal path
if pathConf, ok := pathConfs[name]; ok {
return name, pathConf, nil, nil
return pathConf, nil, nil

Check warning on line 57 in internal/conf/path.go

View check run for this annotation

Codecov / codecov/patch

internal/conf/path.go#L57

Added line #L57 was not covered by tests
}

// regular expression-based path
for pathConfName, pathConf := range pathConfs {
if pathConf.Regexp != nil && pathConfName != "all" && pathConfName != "all_others" {
m := pathConf.Regexp.FindStringSubmatch(name)
if m != nil {
return pathConfName, pathConf, m, nil
return pathConf, m, nil

Check warning on line 65 in internal/conf/path.go

View check run for this annotation

Codecov / codecov/patch

internal/conf/path.go#L65

Added line #L65 was not covered by tests
}
}
}

// all_others
// process all_others after every other entry
for pathConfName, pathConf := range pathConfs {
if pathConfName == "all" || pathConfName == "all_others" {
m := pathConf.Regexp.FindStringSubmatch(name)
if m != nil {
return pathConfName, pathConf, m, nil
return pathConf, m, nil

Check warning on line 75 in internal/conf/path.go

View check run for this annotation

Codecov / codecov/patch

internal/conf/path.go#L75

Added line #L75 was not covered by tests
}
}
}

return "", nil, nil, fmt.Errorf("path '%s' is not configured", name)
return nil, nil, fmt.Errorf("path '%s' is not configured", name)

Check warning on line 80 in internal/conf/path.go

View check run for this annotation

Codecov / codecov/patch

internal/conf/path.go#L80

Added line #L80 was not covered by tests
}

// Path is a path configuration.
Expand Down
51 changes: 9 additions & 42 deletions internal/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os/signal"
"path/filepath"
"reflect"
"sort"
"strings"
"time"

Expand All @@ -25,7 +24,7 @@ import (
"github.com/bluenviron/mediamtx/internal/metrics"
"github.com/bluenviron/mediamtx/internal/playback"
"github.com/bluenviron/mediamtx/internal/pprof"
"github.com/bluenviron/mediamtx/internal/record"
"github.com/bluenviron/mediamtx/internal/recordcleaner"
"github.com/bluenviron/mediamtx/internal/rlimit"
"github.com/bluenviron/mediamtx/internal/servers/hls"
"github.com/bluenviron/mediamtx/internal/servers/rtmp"
Expand All @@ -44,38 +43,6 @@ var defaultConfPaths = []string{
"/etc/mediamtx/mediamtx.yml",
}

func gatherCleanerEntries(paths map[string]*conf.Path) []record.CleanerEntry {
out := make(map[record.CleanerEntry]struct{})

for _, pa := range paths {
if pa.Record && pa.RecordDeleteAfter != 0 {
entry := record.CleanerEntry{
Path: pa.RecordPath,
Format: pa.RecordFormat,
DeleteAfter: time.Duration(pa.RecordDeleteAfter),
}
out[entry] = struct{}{}
}
}

out2 := make([]record.CleanerEntry, len(out))
i := 0

for v := range out {
out2[i] = v
i++
}

sort.Slice(out2, func(i, j int) bool {
if out2[i].Path != out2[j].Path {
return out2[i].Path < out2[j].Path
}
return out2[i].DeleteAfter < out2[j].DeleteAfter
})

return out2
}

var cli struct {
Version bool `help:"print version"`
Confpath string `arg:"" default:""`
Expand All @@ -92,7 +59,7 @@ type Core struct {
authManager *auth.Manager
metrics *metrics.Metrics
pprof *pprof.PPROF
recordCleaner *record.Cleaner
recordCleaner *recordcleaner.Cleaner
playbackServer *playback.Server
pathManager *pathManager
rtspServer *rtsp.Server
Expand Down Expand Up @@ -333,12 +300,10 @@ func (p *Core) createResources(initial bool) error {
p.pprof = i
}

cleanerEntries := gatherCleanerEntries(p.conf.Paths)
if len(cleanerEntries) != 0 &&
p.recordCleaner == nil {
p.recordCleaner = &record.Cleaner{
Entries: cleanerEntries,
Parent: p,
if p.recordCleaner == nil {
p.recordCleaner = &recordcleaner.Cleaner{
PathConfs: p.conf.Paths,
Parent: p,
}
p.recordCleaner.Initialize()
}
Expand Down Expand Up @@ -707,8 +672,10 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
closeLogger

closeRecorderCleaner := newConf == nil ||
!reflect.DeepEqual(gatherCleanerEntries(newConf.Paths), gatherCleanerEntries(p.conf.Paths)) ||
closeLogger
if !closeRecorderCleaner && !reflect.DeepEqual(newConf.Paths, p.conf.Paths) {
p.recordCleaner.ReloadPathConfs(newConf.Paths)
}

closePlaybackServer := newConf == nil ||
newConf.Playback != p.conf.Playback ||
Expand Down
Loading
Loading