Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change log retention period to one week #2403

Merged
merged 5 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,20 @@ type LogFileManager struct {
filesystem volume_filesystem.VolumeFilesystem

time logs_clock.LogsClock

logRetentionPeriodInWeeks int
}

func NewLogFileManager(
kurtosisBackend backend_interface.KurtosisBackend,
filesystem volume_filesystem.VolumeFilesystem,
time logs_clock.LogsClock) *LogFileManager {
time logs_clock.LogsClock,
logRetentionPeriodInWeeks int) *LogFileManager {
return &LogFileManager{
kurtosisBackend: kurtosisBackend,
filesystem: filesystem,
time: time,
kurtosisBackend: kurtosisBackend,
filesystem: filesystem,
time: time,
logRetentionPeriodInWeeks: logRetentionPeriodInWeeks,
}
}

Expand Down Expand Up @@ -125,7 +129,7 @@ func (manager *LogFileManager) CreateLogFiles(ctx context.Context) error {
// RemoveLogsBeyondRetentionPeriod implements the Job cron interface. It removes logs a week older than the log retention period.
func (manager *LogFileManager) RemoveLogsBeyondRetentionPeriod() {
// compute the next oldest week
year, weekToRemove := manager.time.Now().Add(time.Duration(-volume_consts.LogRetentionPeriodInWeeks) * oneWeek).ISOWeek()
year, weekToRemove := manager.time.Now().Add(time.Duration(-manager.logRetentionPeriodInWeeks) * oneWeek).ISOWeek()

// remove directory for that week
oldLogsDirPath := getLogsDirPathForWeek(year, weekToRemove)
Expand All @@ -146,7 +150,7 @@ func (manager *LogFileManager) RemoveAllLogs() error {

func (manager *LogFileManager) RemoveEnclaveLogs(enclaveUuid string) error {
currentTime := manager.time.Now()
for i := 0; i < volume_consts.LogRetentionPeriodInWeeks; i++ {
for i := 0; i < manager.logRetentionPeriodInWeeks; i++ {
year, week := currentTime.Add(time.Duration(-i) * oneWeek).ISOWeek()
enclaveLogsDirPathForWeek := getEnclaveLogsDirPath(year, week, enclaveUuid)
if err := manager.filesystem.RemoveAll(enclaveLogsDirPathForWeek); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestRemoveLogsBeyondRetentionPeriod(t *testing.T) {
_, _ = mockFs.Create(week1filepath)
_, _ = mockFs.Create(week2filepath)

logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, mockTime)
logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, mockTime, 5)
logFileManager.RemoveLogsBeyondRetentionPeriod() // should remove week 49 logs

_, err := mockFs.Stat(week49filepath)
Expand All @@ -69,7 +69,7 @@ func TestRemoveEnclaveLogs(t *testing.T) {
_, _ = mockFs.Create(week52filepath)
_, _ = mockFs.Create(week52filepathDiffService)

logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, mockTime)
logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, mockTime, 5)
err := logFileManager.RemoveEnclaveLogs(testEnclaveUuid) // should remove only all log files for enclave one

require.NoError(t, err)
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestRemoveAllLogs(t *testing.T) {
_, _ = mockFs.Create(week52filepath)
_, _ = mockFs.Create(week52filepathDiffService)

logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, mockTime)
logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, mockTime, 5)
err := logFileManager.RemoveAllLogs()

require.NoError(t, err)
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestCreateLogFiles(t *testing.T) {
expectedServiceNameFilePath := getFilepathStr(2022, 52, testEnclaveUuid, testUserService1Name)
expectedServiceShortUuidFilePath := getFilepathStr(2022, 52, testEnclaveUuid, uuid_generator.ShortenedUUIDString(testUserService1Uuid))

logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, mockTime)
logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, mockTime, 5)
err := logFileManager.CreateLogFiles(ctx)
require.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
testUserService2Uuid = "test-user-service-2"
testUserService3Uuid = "test-user-service-3"

retentionPeriodInWeeksForTesting = 5

utcFormat = time.RFC3339
defaultUTCTimestampStr = "2023-09-06T00:35:15-04:00"
logLine1 = "{\"log\":\"Starting feature 'centralized logs'\", \"timestamp\":\"2023-09-06T00:35:15-04:00\"}"
Expand All @@ -43,7 +45,7 @@ const (
notFoundedFilterText = "it shouldn't be found in the log lines"
firstMatchRegexFilterStr = "Starting.*idempotently'"

testTimeOut = 2 * time.Second
testTimeOut = 2 * time.Minute
doNotFollowLogs = false

defaultYear = 2023
Expand Down Expand Up @@ -128,7 +130,7 @@ func TestStreamUserServiceLogsPerWeek_WithFilters(t *testing.T) {

underlyingFs := createFilledPerWeekFilesystem(startingWeek)
mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -211,7 +213,7 @@ func TestStreamUserServiceLogsPerWeek_NoLogsFromPersistentVolume(t *testing.T) {

underlyingFs := createEmptyPerWeekFilesystem(startingWeek)
mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -316,7 +318,7 @@ func TestStreamUserServiceLogsPerWeek_ThousandsOfLogLinesSuccessfulExecution(t *
require.NoError(t, err)

mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -403,7 +405,7 @@ func TestStreamUserServiceLogsPerWeek_EmptyLogLines(t *testing.T) {
require.NoError(t, err)

mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -470,7 +472,7 @@ func TestStreamUserServiceLogsPerWeek_WithLogsAcrossWeeks(t *testing.T) {
require.NoError(t, err)

mockTime := logs_clock.NewMockLogsClock(defaultYear, 4, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -539,7 +541,7 @@ func TestStreamUserServiceLogsPerWeek_WithLogLineAcrossWeeks(t *testing.T) {
require.NoError(t, err)

mockTime := logs_clock.NewMockLogsClock(defaultYear, 4, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -590,7 +592,7 @@ func TestStreamUserServiceLogsPerWeekReturnsTimestampedLogLines(t *testing.T) {
require.NoError(t, err)

mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)

expectedTime, err := time.Parse(utcFormat, defaultUTCTimestampStr)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ const (
// [.../28/d3e8832d671f/61830789f03a.json] is the file containing logs from service with uuid 61830789f03a, in enclave with uuid d3e8832d671f,
// in the 28th week of the current year
type PerWeekStreamLogsStrategy struct {
time logs_clock.LogsClock
time logs_clock.LogsClock
logRetentionPeriodInWeeks int
}

func NewPerWeekStreamLogsStrategy(time logs_clock.LogsClock) *PerWeekStreamLogsStrategy {
func NewPerWeekStreamLogsStrategy(time logs_clock.LogsClock, logRetentionPeriodInWeeks int) *PerWeekStreamLogsStrategy {
return &PerWeekStreamLogsStrategy{
time: time,
time: time,
logRetentionPeriodInWeeks: logRetentionPeriodInWeeks,
}
}

Expand All @@ -55,7 +57,7 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
shouldReturnAllLogs bool,
numLogLines uint32,
) {
paths, err := strategy.getLogFilePaths(fs, volume_consts.LogRetentionPeriodInWeeks, string(enclaveUuid), string(serviceUuid))
paths, err := strategy.getLogFilePaths(fs, strategy.logRetentionPeriodInWeeks, string(enclaveUuid), string(serviceUuid))
if err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred retrieving log file paths for service '%v' in enclave '%v'.", serviceUuid, enclaveUuid)
return
Expand All @@ -68,11 +70,11 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
serviceUuid, enclaveUuid)
return
}
if len(paths) > volume_consts.LogRetentionPeriodInWeeks {
if len(paths) > strategy.logRetentionPeriodInWeeks {
logrus.Warnf(
`We expected to retrieve logs going back '%v' weeks, but instead retrieved logs going back '%v' weeks.
This means logs past the retention period are being returned, likely a bug in Kurtosis.`,
volume_consts.LogRetentionPeriodInWeeks, len(paths))
strategy.logRetentionPeriodInWeeks, len(paths))
}

logsReader, files, err := getLogsReader(fs, paths)
Expand Down Expand Up @@ -347,7 +349,7 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine(

// Returns true if [logLine] has no timestamp
func (strategy *PerWeekStreamLogsStrategy) isWithinRetentionPeriod(logLine *logline.LogLine) (bool, error) {
retentionPeriod := strategy.time.Now().Add(time.Duration(-volume_consts.LogRetentionPeriodInWeeks) * oneWeek)
retentionPeriod := strategy.time.Now().Add(time.Duration(-strategy.logRetentionPeriodInWeeks) * oneWeek)
timestamp := logLine.GetTimestamp()
return timestamp.After(retentionPeriod), nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
testEnclaveUuid = "test-enclave"
testUserService1Uuid = "test-user-service-1"

defaultRetentionPeriodInWeeks = volume_consts.LogRetentionPeriodInWeeks
retentionPeriodInWeeksForTesting = 5

defaultYear = 2023
defaultDay = 0 // sunday
Expand Down Expand Up @@ -54,8 +54,8 @@ func TestGetLogFilePaths(t *testing.T) {
}

mockTime := logs_clock.NewMockLogsClock(defaultYear, currentWeek, defaultDay)
strategy := NewPerWeekStreamLogsStrategy(mockTime)
logFilePaths, err := strategy.getLogFilePaths(filesystem, defaultRetentionPeriodInWeeks, testEnclaveUuid, testUserService1Uuid)
strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid)

require.NoError(t, err)
require.Equal(t, len(expectedLogFilePaths), len(logFilePaths))
Expand Down Expand Up @@ -91,8 +91,8 @@ func TestGetLogFilePathsAcrossNewYear(t *testing.T) {
}

mockTime := logs_clock.NewMockLogsClock(defaultYear, currentWeek, defaultDay)
strategy := NewPerWeekStreamLogsStrategy(mockTime)
logFilePaths, err := strategy.getLogFilePaths(filesystem, defaultRetentionPeriodInWeeks, testEnclaveUuid, testUserService1Uuid)
strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid)

require.NoError(t, err)
require.Equal(t, len(expectedLogFilePaths), len(logFilePaths))
Expand Down Expand Up @@ -128,8 +128,8 @@ func TestGetLogFilePathsAcrossNewYearWith53Weeks(t *testing.T) {
}

mockTime := logs_clock.NewMockLogsClock(2016, currentWeek, 1)
strategy := NewPerWeekStreamLogsStrategy(mockTime)
logFilePaths, err := strategy.getLogFilePaths(filesystem, defaultRetentionPeriodInWeeks, testEnclaveUuid, testUserService1Uuid)
strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid)

require.NoError(t, err)
require.Equal(t, len(expectedLogFilePaths), len(logFilePaths))
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestGetLogFilePathsWithDiffRetentionPeriod(t *testing.T) {
}

mockTime := logs_clock.NewMockLogsClock(defaultYear, currentWeek, defaultDay)
strategy := NewPerWeekStreamLogsStrategy(mockTime)
strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriod, testEnclaveUuid, testUserService1Uuid)

require.NoError(t, err)
Expand Down Expand Up @@ -192,11 +192,11 @@ func TestGetLogFilePathsReturnsAllAvailableWeeks(t *testing.T) {
currentWeek := 2

mockTime := logs_clock.NewMockLogsClock(defaultYear, currentWeek, defaultDay)
strategy := NewPerWeekStreamLogsStrategy(mockTime)
logFilePaths, err := strategy.getLogFilePaths(filesystem, defaultRetentionPeriodInWeeks, testEnclaveUuid, testUserService1Uuid)
strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid)

require.NoError(t, err)
require.Less(t, len(logFilePaths), defaultRetentionPeriodInWeeks)
require.Less(t, len(logFilePaths), retentionPeriodInWeeksForTesting)
for i, filePath := range expectedLogFilePaths {
require.Equal(t, filePath, logFilePaths[i])
}
Expand All @@ -217,8 +217,8 @@ func TestGetLogFilePathsReturnsCorrectPathsIfWeeksMissingInBetween(t *testing.T)
currentWeek := 3

mockTime := logs_clock.NewMockLogsClock(defaultYear, currentWeek, defaultDay)
strategy := NewPerWeekStreamLogsStrategy(mockTime)
logFilePaths, err := strategy.getLogFilePaths(filesystem, defaultRetentionPeriodInWeeks, testEnclaveUuid, testUserService1Uuid)
strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid)

require.NoError(t, err)
require.Len(t, logFilePaths, 1)
Expand Down Expand Up @@ -246,8 +246,8 @@ func TestGetLogFilePathsReturnsCorrectPathsIfCurrentWeekHasNoLogsYet(t *testing.
week2filepath,
}

strategy := NewPerWeekStreamLogsStrategy(mockTime)
logFilePaths, err := strategy.getLogFilePaths(filesystem, defaultRetentionPeriodInWeeks, testEnclaveUuid, testUserService1Uuid)
strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid)

require.NoError(t, err)
require.Equal(t, len(expectedLogFilePaths), len(logFilePaths))
Expand All @@ -264,7 +264,7 @@ func TestIsWithinRetentionPeriod(t *testing.T) {

// week 41 would put the log line outside the retention period
mockTime := logs_clock.NewMockLogsClock(2023, 41, 0)
strategy := NewPerWeekStreamLogsStrategy(mockTime)
strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)

timestamp, err := parseTimestampFromJsonLogLine(jsonLogLine)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ const (

EndOfJsonLine = "}"

// promise to keep 4 weeks of logs for users, but store an additional week for safety
LogRetentionPeriodInWeeks = 5
// promise to keep 1 weeks of logs for users
LogRetentionPeriodInWeeks = 1

RemoveLogsWaitHours = 6 * time.Hour

Expand Down
5 changes: 3 additions & 2 deletions engine/server/engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package main
import (
"context"
"fmt"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/volume_consts"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -174,7 +175,7 @@ func runMain() error {
// TODO: Move log file management into LogsDatabaseClient
osFs := volume_filesystem.NewOsVolumeFilesystem()
realTime := logs_clock.NewRealClock()
logFileManager := log_file_manager.NewLogFileManager(kurtosisBackend, osFs, realTime)
logFileManager := log_file_manager.NewLogFileManager(kurtosisBackend, osFs, realTime, volume_consts.LogRetentionPeriodInWeeks)
logFileManager.StartLogFileManagement(ctx)

enclaveManager, err := getEnclaveManager(
Expand Down Expand Up @@ -389,7 +390,7 @@ func getLogsDatabaseClient(kurtosisBackendType args.KurtosisBackendType, kurtosi
case args.KurtosisBackendType_Docker:
osFs := volume_filesystem.NewOsVolumeFilesystem()
realTime := logs_clock.NewRealClock()
perWeekStreamLogsStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(realTime)
perWeekStreamLogsStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(realTime, volume_consts.LogRetentionPeriodInWeeks)
logsDatabaseClient = persistent_volume.NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, osFs, perWeekStreamLogsStrategy)
case args.KurtosisBackendType_Kubernetes:
logsDatabaseClient = kurtosis_backend.NewKurtosisBackendLogsDatabaseClient(kurtosisBackend)
Expand Down
Loading