Skip to content

Commit

Permalink
[AMLII-2110] Fix Path Errors in Integrations Launcher (#30032)
Browse files Browse the repository at this point in the history
  • Loading branch information
soberpeach authored Oct 16, 2024
1 parent b0ca9e6 commit abaca87
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 42 deletions.
34 changes: 21 additions & 13 deletions pkg/logs/launchers/integration/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Launcher struct {
// fileInfo stores information about each file that is needed in order to keep
// track of the combined and overall disk usage by the logs files
type fileInfo struct {
filename string
fileWithPath string
lastModified time.Time
size int64
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func (s *Launcher) run() {
s.integrationToFile[cfg.IntegrationID] = logFile
}

filetypeSource := s.makeFileSource(source, logFile.filename)
filetypeSource := s.makeFileSource(source, logFile.fileWithPath)
s.sources.AddSource(filetypeSource)
}
}
Expand Down Expand Up @@ -179,8 +179,17 @@ func (s *Launcher) receiveLogs(log integrations.IntegrationLog) {
// Ensure the individual file doesn't exceed integrations_logs_files_max_size
// Add 1 because we write the \n at the end as well
logSize := int64(len(log.Log)) + 1

if logSize > s.fileSizeMax {
ddLog.Warnf("Individual log size (%d bytes) is larger than maximum allowable file size (%d bytes), skipping writing to log file: %s", logSize, s.fileSizeMax, log.Log)
return
} else if logSize > s.combinedUsageMax {
ddLog.Warnf("Individual log size (%d bytes) is larger than maximum allowable file size (%d bytes), skipping writing to log file: %s", logSize, s.combinedUsageMax, log.Log)
return
}

if fileToUpdate.size+logSize > s.fileSizeMax {
file, err := os.Create(fileToUpdate.filename)
file, err := os.Create(fileToUpdate.fileWithPath)
if err != nil {
ddLog.Error("Failed to delete and remake oversize file:", err)
return
Expand Down Expand Up @@ -211,7 +220,7 @@ func (s *Launcher) receiveLogs(log integrations.IntegrationLog) {
return
}

file, err := os.Create(leastRecentlyModifiedFile.filename)
file, err := os.Create(leastRecentlyModifiedFile.fileWithPath)
if err != nil {
ddLog.Error("Error creating log file:", err)
continue
Expand All @@ -223,7 +232,7 @@ func (s *Launcher) receiveLogs(log integrations.IntegrationLog) {
}
}

err := s.writeLogToFileFunction(filepath.Join(s.runPath, fileToUpdate.filename), log.Log)
err := s.writeLogToFileFunction(fileToUpdate.fileWithPath, log.Log)
if err != nil {
ddLog.Warn("Error writing log to file:", err)
return
Expand All @@ -236,12 +245,11 @@ func (s *Launcher) receiveLogs(log integrations.IntegrationLog) {
}

func (s *Launcher) deleteFile(file *fileInfo) error {
filename := filepath.Join(s.runPath, file.filename)
err := os.Remove(filename)
err := os.Remove(file.fileWithPath)
if err != nil {
return err
}
ddLog.Info("Successfully deleted log file:", filename)
ddLog.Info("Successfully deleted log file:", file.fileWithPath)

s.combinedUsageSize -= file.size

Expand Down Expand Up @@ -321,7 +329,7 @@ func (s *Launcher) createFile(source string) (*fileInfo, error) {
}

fileInfo := &fileInfo{
filename: filepath,
fileWithPath: filepath,
lastModified: time.Now(),
size: 0,
}
Expand Down Expand Up @@ -349,8 +357,8 @@ func computeMaxDiskUsage(runPath string, logsTotalUsageSetting int64, usageRatio
diskReserved := float64(usage.Total) * (1 - usageRatio)
diskAvailable := int64(usage.Available) - int64(math.Ceil(diskReserved))

if diskAvailable < 0 {
ddLog.Warn("Available disk calculated as less than 0: ", diskAvailable, ". Disk reserved:", diskReserved)
if diskAvailable <= 0 {
ddLog.Warnf("Available disk calculated as %d bytes, disk reserved is %f bytes. Check %s and make sure there is enough free space on disk", diskAvailable, diskReserved, "integrations_logs_disk_ratio")
diskAvailable = 0
}

Expand All @@ -370,12 +378,12 @@ func (s *Launcher) scanInitialFiles(dir string) error {
}

fileInfo := &fileInfo{
filename: info.Name(),
fileWithPath: filepath.Join(dir, info.Name()),
size: info.Size(),
lastModified: info.ModTime(),
}

integrationID := fileNameToID(fileInfo.filename)
integrationID := fileNameToID(fileInfo.fileWithPath)

s.integrationToFile[integrationID] = fileInfo
s.combinedUsageSize += info.Size()
Expand Down
69 changes: 40 additions & 29 deletions pkg/logs/launchers/integration/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (suite *LauncherTestSuite) TestSendLog() {
assert.Equal(suite.T(), foundSource.Config.Type, config.FileType)
assert.Equal(suite.T(), foundSource.Config.Source, "foo")
assert.Equal(suite.T(), foundSource.Config.Service, "bar")
expectedPath := filepath.Join(suite.s.runPath, suite.s.integrationToFile[id].filename)
expectedPath := suite.s.integrationToFile[id].fileWithPath

assert.Equal(suite.T(), logSample, <-fileLogChan)
assert.Equal(suite.T(), expectedPath, <-filepathChan)
Expand All @@ -113,8 +113,8 @@ func (suite *LauncherTestSuite) TestZeroCombinedUsageMaxFileCreated() {
suite.s.combinedUsageMax = 0

filename := "sample_integration_123.log"
filepath := filepath.Join(suite.s.runPath, filename)
file, err := os.Create(filepath)
fileWithPath := filepath.Join(suite.s.runPath, filename)
file, err := os.Create(fileWithPath)
assert.Nil(suite.T(), err)

file.Close()
Expand Down Expand Up @@ -143,44 +143,53 @@ func (suite *LauncherTestSuite) TestZeroCombinedUsageMaxFileNotCreated() {
}

func (suite *LauncherTestSuite) TestSmallCombinedUsageMax() {
suite.s.combinedUsageMax = 10
suite.s.combinedUsageMax = 15

filename := "sample_integration_123.log"
filepath := filepath.Join(suite.s.runPath, filename)
file, err := os.Create(filepath)
fileWithPath := filepath.Join(suite.s.runPath, filename)
file, err := os.Create(fileWithPath)
assert.Nil(suite.T(), err)

file.Close()

suite.s.Start(nil, nil, nil, nil)

// Launcher should write this log
writtenLog := "sample"
shortLog := "sample"
integrationLog := integrations.IntegrationLog{
Log: writtenLog,
Log: shortLog,
IntegrationID: "sample_integration:123",
}
suite.s.receiveLogs(integrationLog)
fileStat, err := os.Stat(filepath)
fileStat, err := os.Stat(fileWithPath)
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), fileStat.Size(), int64(len(writtenLog)+1))
assert.Equal(suite.T(), fileStat.Size(), int64(len(shortLog)+1))

// Launcher should delete file for this log
unwrittenLog := "sample log two"
// Launcher should delete and remake the log file for this log since it would break combinedUsageMax threshold
longLog := "sample log two"
integrationLogTwo := integrations.IntegrationLog{
Log: unwrittenLog,
Log: longLog,
IntegrationID: "sample_integration:123",
}
suite.s.receiveLogs(integrationLogTwo)
_, err = os.Stat(fileWithPath)
assert.Nil(suite.T(), err)

_, err = os.Stat(filepath)
assert.True(suite.T(), os.IsNotExist(err))
// Launcher should skip writing this log since it's larger than combinedUsageMax
unwrittenLog := "this log is too long"
unwrittenIntegrationLog := integrations.IntegrationLog{
Log: unwrittenLog,
IntegrationID: "sample_integration:123",
}
suite.s.receiveLogs(unwrittenIntegrationLog)
_, err = os.Stat(fileWithPath)
assert.Nil(suite.T(), err)

// Remake the file
suite.s.receiveLogs(integrationLog)
fileStat, err = os.Stat(filepath)
fileStat, err = os.Stat(fileWithPath)
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), fileStat.Size(), int64(len(writtenLog)+1))
assert.Equal(suite.T(), fileStat.Size(), int64(len(shortLog)+1))
}

func (suite *LauncherTestSuite) TestWriteLogToFile() {
Expand Down Expand Up @@ -215,12 +224,12 @@ func (suite *LauncherTestSuite) TestWriteMultipleLogsToFile() {
// TestDeleteFile tests that deleteFile properly deletes the correct file
func (suite *LauncherTestSuite) TestDeleteFile() {
filename := "testfile.log"
filepath := filepath.Join(suite.s.runPath, filename)
file, err := os.Create(filepath)
fileinfo := &fileInfo{filename: filename, size: int64(0)}
fileWithPath := filepath.Join(suite.s.runPath, filename)
file, err := os.Create(fileWithPath)
fileinfo := &fileInfo{fileWithPath: fileWithPath, size: int64(0)}
assert.Nil(suite.T(), err)

info, err := os.Stat(filepath)
info, err := os.Stat(fileWithPath)
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), int64(0), info.Size(), "Newly created file size not zero")

Expand All @@ -229,14 +238,14 @@ func (suite *LauncherTestSuite) TestDeleteFile() {
file.Write(data)
file.Close()

info, err = os.Stat(filepath)
info, err = os.Stat(fileWithPath)
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), int64(2*1024*1024), info.Size())

err = suite.s.deleteFile(fileinfo)
assert.Nil(suite.T(), err)

_, err = os.Stat(filepath)
_, err = os.Stat(fileWithPath)
assert.True(suite.T(), os.IsNotExist(err))
}

Expand Down Expand Up @@ -281,8 +290,8 @@ func (suite *LauncherTestSuite) TestFileExceedsSingleFileLimit() {
suite.s.fileSizeMax = oneMB

filename := "sample_integration_123.log"
filepath := filepath.Join(suite.s.runPath, filename)
file, err := os.Create(filepath)
fileWithPath := filepath.Join(suite.s.runPath, filename)
file, err := os.Create(fileWithPath)
assert.Nil(suite.T(), err)

file.Write(make([]byte, oneMB))
Expand All @@ -308,7 +317,8 @@ func (suite *LauncherTestSuite) TestScanInitialFiles() {
filename := "sample_integration_123.log"
fileSize := int64(1 * 1024 * 1024)

file, err := os.Create(filepath.Join(suite.s.runPath, filename))
fileWithPath := filepath.Join(suite.s.runPath, filename)
file, err := os.Create(fileWithPath)
assert.Nil(suite.T(), err)

data := make([]byte, fileSize)
Expand All @@ -320,7 +330,7 @@ func (suite *LauncherTestSuite) TestScanInitialFiles() {
actualFileInfo := suite.s.integrationToFile[fileID]

assert.NotEmpty(suite.T(), suite.s.integrationToFile)
assert.Equal(suite.T(), actualFileInfo.filename, filename)
assert.Equal(suite.T(), actualFileInfo.fileWithPath, fileWithPath)
assert.Equal(suite.T(), fileSize, actualFileInfo.size)
assert.Equal(suite.T(), fileSize, suite.s.combinedUsageSize)
}
Expand All @@ -331,7 +341,8 @@ func (suite *LauncherTestSuite) TestCreateFileAfterScanInitialFile() {
filename := "sample_integration_123.log"
fileSize := int64(1 * 1024 * 1024)

file, err := os.Create(filepath.Join(suite.s.runPath, filename))
fileWithPath := filepath.Join(suite.s.runPath, filename)
file, err := os.Create(fileWithPath)
assert.Nil(suite.T(), err)

data := make([]byte, fileSize)
Expand All @@ -343,7 +354,7 @@ func (suite *LauncherTestSuite) TestCreateFileAfterScanInitialFile() {
scannedFile := suite.s.integrationToFile[fileID]

assert.NotEmpty(suite.T(), suite.s.integrationToFile)
assert.Equal(suite.T(), filename, scannedFile.filename)
assert.Equal(suite.T(), fileWithPath, scannedFile.fileWithPath)
assert.Equal(suite.T(), fileSize, scannedFile.size)
assert.Equal(suite.T(), fileSize, suite.s.combinedUsageSize)

Expand Down

0 comments on commit abaca87

Please sign in to comment.