From abaca877ff92778b5d049a46cd3fc777c7d4da5a Mon Sep 17 00:00:00 2001 From: Lucas Liseth <36653792+soberpeach@users.noreply.github.com> Date: Wed, 16 Oct 2024 15:52:10 -0400 Subject: [PATCH] [AMLII-2110] Fix Path Errors in Integrations Launcher (#30032) --- pkg/logs/launchers/integration/launcher.go | 34 +++++---- .../launchers/integration/launcher_test.go | 69 +++++++++++-------- 2 files changed, 61 insertions(+), 42 deletions(-) diff --git a/pkg/logs/launchers/integration/launcher.go b/pkg/logs/launchers/integration/launcher.go index b822e90eabad7..de88863114936 100644 --- a/pkg/logs/launchers/integration/launcher.go +++ b/pkg/logs/launchers/integration/launcher.go @@ -50,7 +50,7 @@ type Launcher struct { // fileInfo stores information about each file that is needed in order to keep // track of the combined and overall disk usage by the logs files type fileInfo struct { - filename string + fileWithPath string lastModified time.Time size int64 } @@ -149,7 +149,7 @@ func (s *Launcher) run() { s.integrationToFile[cfg.IntegrationID] = logFile } - filetypeSource := s.makeFileSource(source, logFile.filename) + filetypeSource := s.makeFileSource(source, logFile.fileWithPath) s.sources.AddSource(filetypeSource) } } @@ -179,8 +179,17 @@ func (s *Launcher) receiveLogs(log integrations.IntegrationLog) { // Ensure the individual file doesn't exceed integrations_logs_files_max_size // Add 1 because we write the \n at the end as well logSize := int64(len(log.Log)) + 1 + + if logSize > s.fileSizeMax { + ddLog.Warnf("Individual log size (%d bytes) is larger than maximum allowable file size (%d bytes), skipping writing to log file: %s", logSize, s.fileSizeMax, log.Log) + return + } else if logSize > s.combinedUsageMax { + ddLog.Warnf("Individual log size (%d bytes) is larger than maximum allowable file size (%d bytes), skipping writing to log file: %s", logSize, s.combinedUsageMax, log.Log) + return + } + if fileToUpdate.size+logSize > s.fileSizeMax { - file, err := os.Create(fileToUpdate.filename) + file, err := os.Create(fileToUpdate.fileWithPath) if err != nil { ddLog.Error("Failed to delete and remake oversize file:", err) return @@ -211,7 +220,7 @@ func (s *Launcher) receiveLogs(log integrations.IntegrationLog) { return } - file, err := os.Create(leastRecentlyModifiedFile.filename) + file, err := os.Create(leastRecentlyModifiedFile.fileWithPath) if err != nil { ddLog.Error("Error creating log file:", err) continue @@ -223,7 +232,7 @@ func (s *Launcher) receiveLogs(log integrations.IntegrationLog) { } } - err := s.writeLogToFileFunction(filepath.Join(s.runPath, fileToUpdate.filename), log.Log) + err := s.writeLogToFileFunction(fileToUpdate.fileWithPath, log.Log) if err != nil { ddLog.Warn("Error writing log to file:", err) return @@ -236,12 +245,11 @@ func (s *Launcher) receiveLogs(log integrations.IntegrationLog) { } func (s *Launcher) deleteFile(file *fileInfo) error { - filename := filepath.Join(s.runPath, file.filename) - err := os.Remove(filename) + err := os.Remove(file.fileWithPath) if err != nil { return err } - ddLog.Info("Successfully deleted log file:", filename) + ddLog.Info("Successfully deleted log file:", file.fileWithPath) s.combinedUsageSize -= file.size @@ -321,7 +329,7 @@ func (s *Launcher) createFile(source string) (*fileInfo, error) { } fileInfo := &fileInfo{ - filename: filepath, + fileWithPath: filepath, lastModified: time.Now(), size: 0, } @@ -349,8 +357,8 @@ func computeMaxDiskUsage(runPath string, logsTotalUsageSetting int64, usageRatio diskReserved := float64(usage.Total) * (1 - usageRatio) diskAvailable := int64(usage.Available) - int64(math.Ceil(diskReserved)) - if diskAvailable < 0 { - ddLog.Warn("Available disk calculated as less than 0: ", diskAvailable, ". Disk reserved:", diskReserved) + if diskAvailable <= 0 { + ddLog.Warnf("Available disk calculated as %d bytes, disk reserved is %f bytes. Check %s and make sure there is enough free space on disk", diskAvailable, diskReserved, "integrations_logs_disk_ratio") diskAvailable = 0 } @@ -370,12 +378,12 @@ func (s *Launcher) scanInitialFiles(dir string) error { } fileInfo := &fileInfo{ - filename: info.Name(), + fileWithPath: filepath.Join(dir, info.Name()), size: info.Size(), lastModified: info.ModTime(), } - integrationID := fileNameToID(fileInfo.filename) + integrationID := fileNameToID(fileInfo.fileWithPath) s.integrationToFile[integrationID] = fileInfo s.combinedUsageSize += info.Size() diff --git a/pkg/logs/launchers/integration/launcher_test.go b/pkg/logs/launchers/integration/launcher_test.go index 21d5c293ff860..652f6479019de 100644 --- a/pkg/logs/launchers/integration/launcher_test.go +++ b/pkg/logs/launchers/integration/launcher_test.go @@ -93,7 +93,7 @@ func (suite *LauncherTestSuite) TestSendLog() { assert.Equal(suite.T(), foundSource.Config.Type, config.FileType) assert.Equal(suite.T(), foundSource.Config.Source, "foo") assert.Equal(suite.T(), foundSource.Config.Service, "bar") - expectedPath := filepath.Join(suite.s.runPath, suite.s.integrationToFile[id].filename) + expectedPath := suite.s.integrationToFile[id].fileWithPath assert.Equal(suite.T(), logSample, <-fileLogChan) assert.Equal(suite.T(), expectedPath, <-filepathChan) @@ -113,8 +113,8 @@ func (suite *LauncherTestSuite) TestZeroCombinedUsageMaxFileCreated() { suite.s.combinedUsageMax = 0 filename := "sample_integration_123.log" - filepath := filepath.Join(suite.s.runPath, filename) - file, err := os.Create(filepath) + fileWithPath := filepath.Join(suite.s.runPath, filename) + file, err := os.Create(fileWithPath) assert.Nil(suite.T(), err) file.Close() @@ -143,11 +143,11 @@ func (suite *LauncherTestSuite) TestZeroCombinedUsageMaxFileNotCreated() { } func (suite *LauncherTestSuite) TestSmallCombinedUsageMax() { - suite.s.combinedUsageMax = 10 + suite.s.combinedUsageMax = 15 filename := "sample_integration_123.log" - filepath := filepath.Join(suite.s.runPath, filename) - file, err := os.Create(filepath) + fileWithPath := filepath.Join(suite.s.runPath, filename) + file, err := os.Create(fileWithPath) assert.Nil(suite.T(), err) file.Close() @@ -155,32 +155,41 @@ func (suite *LauncherTestSuite) TestSmallCombinedUsageMax() { suite.s.Start(nil, nil, nil, nil) // Launcher should write this log - writtenLog := "sample" + shortLog := "sample" integrationLog := integrations.IntegrationLog{ - Log: writtenLog, + Log: shortLog, IntegrationID: "sample_integration:123", } suite.s.receiveLogs(integrationLog) - fileStat, err := os.Stat(filepath) + fileStat, err := os.Stat(fileWithPath) assert.Nil(suite.T(), err) - assert.Equal(suite.T(), fileStat.Size(), int64(len(writtenLog)+1)) + assert.Equal(suite.T(), fileStat.Size(), int64(len(shortLog)+1)) - // Launcher should delete file for this log - unwrittenLog := "sample log two" + // Launcher should delete and remake the log file for this log since it would break combinedUsageMax threshold + longLog := "sample log two" integrationLogTwo := integrations.IntegrationLog{ - Log: unwrittenLog, + Log: longLog, IntegrationID: "sample_integration:123", } suite.s.receiveLogs(integrationLogTwo) + _, err = os.Stat(fileWithPath) + assert.Nil(suite.T(), err) - _, err = os.Stat(filepath) - assert.True(suite.T(), os.IsNotExist(err)) + // Launcher should skip writing this log since it's larger than combinedUsageMax + unwrittenLog := "this log is too long" + unwrittenIntegrationLog := integrations.IntegrationLog{ + Log: unwrittenLog, + IntegrationID: "sample_integration:123", + } + suite.s.receiveLogs(unwrittenIntegrationLog) + _, err = os.Stat(fileWithPath) + assert.Nil(suite.T(), err) // Remake the file suite.s.receiveLogs(integrationLog) - fileStat, err = os.Stat(filepath) + fileStat, err = os.Stat(fileWithPath) assert.Nil(suite.T(), err) - assert.Equal(suite.T(), fileStat.Size(), int64(len(writtenLog)+1)) + assert.Equal(suite.T(), fileStat.Size(), int64(len(shortLog)+1)) } func (suite *LauncherTestSuite) TestWriteLogToFile() { @@ -215,12 +224,12 @@ func (suite *LauncherTestSuite) TestWriteMultipleLogsToFile() { // TestDeleteFile tests that deleteFile properly deletes the correct file func (suite *LauncherTestSuite) TestDeleteFile() { filename := "testfile.log" - filepath := filepath.Join(suite.s.runPath, filename) - file, err := os.Create(filepath) - fileinfo := &fileInfo{filename: filename, size: int64(0)} + fileWithPath := filepath.Join(suite.s.runPath, filename) + file, err := os.Create(fileWithPath) + fileinfo := &fileInfo{fileWithPath: fileWithPath, size: int64(0)} assert.Nil(suite.T(), err) - info, err := os.Stat(filepath) + info, err := os.Stat(fileWithPath) assert.Nil(suite.T(), err) assert.Equal(suite.T(), int64(0), info.Size(), "Newly created file size not zero") @@ -229,14 +238,14 @@ func (suite *LauncherTestSuite) TestDeleteFile() { file.Write(data) file.Close() - info, err = os.Stat(filepath) + info, err = os.Stat(fileWithPath) assert.Nil(suite.T(), err) assert.Equal(suite.T(), int64(2*1024*1024), info.Size()) err = suite.s.deleteFile(fileinfo) assert.Nil(suite.T(), err) - _, err = os.Stat(filepath) + _, err = os.Stat(fileWithPath) assert.True(suite.T(), os.IsNotExist(err)) } @@ -281,8 +290,8 @@ func (suite *LauncherTestSuite) TestFileExceedsSingleFileLimit() { suite.s.fileSizeMax = oneMB filename := "sample_integration_123.log" - filepath := filepath.Join(suite.s.runPath, filename) - file, err := os.Create(filepath) + fileWithPath := filepath.Join(suite.s.runPath, filename) + file, err := os.Create(fileWithPath) assert.Nil(suite.T(), err) file.Write(make([]byte, oneMB)) @@ -308,7 +317,8 @@ func (suite *LauncherTestSuite) TestScanInitialFiles() { filename := "sample_integration_123.log" fileSize := int64(1 * 1024 * 1024) - file, err := os.Create(filepath.Join(suite.s.runPath, filename)) + fileWithPath := filepath.Join(suite.s.runPath, filename) + file, err := os.Create(fileWithPath) assert.Nil(suite.T(), err) data := make([]byte, fileSize) @@ -320,7 +330,7 @@ func (suite *LauncherTestSuite) TestScanInitialFiles() { actualFileInfo := suite.s.integrationToFile[fileID] assert.NotEmpty(suite.T(), suite.s.integrationToFile) - assert.Equal(suite.T(), actualFileInfo.filename, filename) + assert.Equal(suite.T(), actualFileInfo.fileWithPath, fileWithPath) assert.Equal(suite.T(), fileSize, actualFileInfo.size) assert.Equal(suite.T(), fileSize, suite.s.combinedUsageSize) } @@ -331,7 +341,8 @@ func (suite *LauncherTestSuite) TestCreateFileAfterScanInitialFile() { filename := "sample_integration_123.log" fileSize := int64(1 * 1024 * 1024) - file, err := os.Create(filepath.Join(suite.s.runPath, filename)) + fileWithPath := filepath.Join(suite.s.runPath, filename) + file, err := os.Create(fileWithPath) assert.Nil(suite.T(), err) data := make([]byte, fileSize) @@ -343,7 +354,7 @@ func (suite *LauncherTestSuite) TestCreateFileAfterScanInitialFile() { scannedFile := suite.s.integrationToFile[fileID] assert.NotEmpty(suite.T(), suite.s.integrationToFile) - assert.Equal(suite.T(), filename, scannedFile.filename) + assert.Equal(suite.T(), fileWithPath, scannedFile.fileWithPath) assert.Equal(suite.T(), fileSize, scannedFile.size) assert.Equal(suite.T(), fileSize, suite.s.combinedUsageSize)