Skip to content

Commit

Permalink
[AMLII-2060] Add more robust error handling for files (#29817)
Browse files Browse the repository at this point in the history
Co-authored-by: Brian Floersch <[email protected]>
  • Loading branch information
soberpeach and gh123man authored Oct 4, 2024
1 parent 4dc6ed6 commit 66190f7
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 7 deletions.
44 changes: 38 additions & 6 deletions pkg/logs/launchers/integration/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package integration

import (
"errors"
"math"
"os"
"path/filepath"
Expand Down Expand Up @@ -67,10 +68,21 @@ func NewLauncher(sources *sources.LogSources, integrationsLogsComp integrations.

logsTotalUsageSetting := datadogConfig.GetInt64("logs_config.integrations_logs_total_usage") * 1024 * 1024
logsUsageRatio := datadogConfig.GetFloat64("logs_config.integrations_logs_disk_ratio")
maxDiskUsage, err := computeMaxDiskUsage(runPath, logsTotalUsageSetting, logsUsageRatio)
if err != nil {
ddLog.Warn("Unable to compute integrations logs max disk usage, using default value of 100 MB:", err)

var maxDiskUsage int64
if logsUsageRatio > 1 || logsUsageRatio < 0 {
ddLog.Warn("Logs usage ratio setting must be between 0 and 1, current value is:", logsUsageRatio, ". Falling back to integrations_logs_total_usage setting.")
maxDiskUsage = logsTotalUsageSetting
} else {
maxDiskUsage, err = computeMaxDiskUsage(runPath, logsTotalUsageSetting, logsUsageRatio)
if err != nil {
ddLog.Warn("Unable to compute integrations logs max disk usage, falling back to integrations_logs_total_usage setting:", err)
maxDiskUsage = logsTotalUsageSetting
}

if maxDiskUsage == 0 {
ddLog.Warn("No space available to store logs. Logs from integrations will be dropped. Please allocate space for logs to be stored.")
}
}

return &Launcher{
Expand Down Expand Up @@ -109,6 +121,9 @@ func (s *Launcher) run() {
for {
select {
case cfg := <-s.addedConfigs:
if s.combinedUsageMax == 0 {
continue
}

sources, err := ad.CreateSources(cfg.Config)
if err != nil {
Expand Down Expand Up @@ -140,6 +155,10 @@ func (s *Launcher) run() {
}

case log := <-s.integrationsLogsChan:
if s.combinedUsageMax == 0 {
continue
}

s.receiveLogs(log)
case <-s.stop:
return
Expand Down Expand Up @@ -181,11 +200,15 @@ func (s *Launcher) receiveLogs(log integrations.IntegrationLog) {
// deleting files until total usage falls below the set maximum
for s.combinedUsageSize+logSize > s.combinedUsageMax {
leastRecentlyModifiedFile := s.getLeastRecentlyModifiedFile()
if leastRecentlyModifiedFile == nil {
ddLog.Error("Could not determine least recently modified file, skipping writing log to file.")
return
}

err := s.deleteFile(leastRecentlyModifiedFile)
if err != nil {
ddLog.Error("Error deleting log file:", err)
continue
return
}

file, err := os.Create(leastRecentlyModifiedFile.filename)
Expand All @@ -212,7 +235,6 @@ func (s *Launcher) receiveLogs(log integrations.IntegrationLog) {
fileToUpdate.size += logSize
}

// deleteFile deletes the given file
func (s *Launcher) deleteFile(file *fileInfo) error {
filename := filepath.Join(s.runPath, file.filename)
err := os.Remove(filename)
Expand Down Expand Up @@ -315,7 +337,7 @@ func (s *Launcher) integrationLogFilePath(id string) string {
return logFilePath
}

// computerDiskUsageMax computes the max disk space the launcher can use based
// computeDiskUsageMax computes the max disk space the launcher can use based
// off the integrations_logs_disk_ratio and integrations_logs_total_usage
// settings
func computeMaxDiskUsage(runPath string, logsTotalUsageSetting int64, usageRatio float64) (int64, error) {
Expand All @@ -327,6 +349,11 @@ func computeMaxDiskUsage(runPath string, logsTotalUsageSetting int64, usageRatio
diskReserved := float64(usage.Total) * (1 - usageRatio)
diskAvailable := int64(usage.Available) - int64(math.Ceil(diskReserved))

if diskAvailable < 0 {
ddLog.Warn("Available disk calculated as less than 0: ", diskAvailable, ". Disk reserved:", diskReserved)
diskAvailable = 0
}

return min(logsTotalUsageSetting, diskAvailable), nil
}

Expand Down Expand Up @@ -363,6 +390,11 @@ func (s *Launcher) scanInitialFiles(dir string) error {
for s.combinedUsageSize > s.combinedUsageMax {
leastRecentlyModifiedFile := s.getLeastRecentlyModifiedFile()

if leastRecentlyModifiedFile == nil {
ddLog.Error("Could not determine least recently modified file")
return errors.New("getLeastRecentlyModifiedFile returned nil when trying to delete files")
}

err = s.deleteFile(leastRecentlyModifiedFile)
if err != nil {
ddLog.Warn("Error deleting log file:", err)
Expand Down
98 changes: 97 additions & 1 deletion pkg/logs/launchers/integration/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func (suite *LauncherTestSuite) TestFileCreation() {
}

func (suite *LauncherTestSuite) TestSendLog() {

mockConf := &integration.Config{}
mockConf.Provider = "container"
mockConf.LogsConfig = integration.Data(`[{"type": "integration", "source": "foo", "service": "bar"}]`)
Expand Down Expand Up @@ -99,6 +98,90 @@ func (suite *LauncherTestSuite) TestSendLog() {
assert.Equal(suite.T(), expectedPath, <-filepathChan)
}

// TestNegativeCombinedUsageMax ensures errors in combinedUsageMax don't result
// in panics from `deleteFile`
func (suite *LauncherTestSuite) TestNegativeCombinedUsageMax() {
suite.s.combinedUsageMax = -1
err := suite.s.scanInitialFiles(suite.s.runPath)
assert.NotNil(suite.T(), err)
}

// TestZeroCombinedUsageMax ensures the launcher won't panic when
// combinedUsageMax is zero. Realistically the launcher would never run receiveLogs since there is a check for
func (suite *LauncherTestSuite) TestZeroCombinedUsageMaxFileCreated() {
suite.s.combinedUsageMax = 0

filename := "sample_integration_123.log"
filepath := filepath.Join(suite.s.runPath, filename)
file, err := os.Create(filepath)
assert.Nil(suite.T(), err)

file.Close()

suite.s.Start(nil, nil, nil, nil)

integrationLog := integrations.IntegrationLog{
Log: "sample log",
IntegrationID: "sample_integration:123",
}

suite.s.receiveLogs(integrationLog)
}

func (suite *LauncherTestSuite) TestZeroCombinedUsageMaxFileNotCreated() {
suite.s.combinedUsageMax = 0

suite.s.Start(nil, nil, nil, nil)

integrationLog := integrations.IntegrationLog{
Log: "sample log",
IntegrationID: "sample_integration:123",
}

suite.s.receiveLogs(integrationLog)
}

func (suite *LauncherTestSuite) TestSmallCombinedUsageMax() {
suite.s.combinedUsageMax = 10

filename := "sample_integration_123.log"
filepath := filepath.Join(suite.s.runPath, filename)
file, err := os.Create(filepath)
assert.Nil(suite.T(), err)

file.Close()

suite.s.Start(nil, nil, nil, nil)

// Launcher should write this log
writtenLog := "sample"
integrationLog := integrations.IntegrationLog{
Log: writtenLog,
IntegrationID: "sample_integration:123",
}
suite.s.receiveLogs(integrationLog)
fileStat, err := os.Stat(filepath)
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), fileStat.Size(), int64(len(writtenLog)+1))

// Launcher should delete file for this log
unwrittenLog := "sample log two"
integrationLogTwo := integrations.IntegrationLog{
Log: unwrittenLog,
IntegrationID: "sample_integration:123",
}
suite.s.receiveLogs(integrationLogTwo)

_, err = os.Stat(filepath)
assert.True(suite.T(), os.IsNotExist(err))

// Remake the file
suite.s.receiveLogs(integrationLog)
fileStat, err = os.Stat(filepath)
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), fileStat.Size(), int64(len(writtenLog)+1))
}

func (suite *LauncherTestSuite) TestWriteLogToFile() {
logText := "hello world"
err := suite.s.writeLogToFileFunction(suite.testPath, logText)
Expand Down Expand Up @@ -458,3 +541,16 @@ func TestReadOnlyFileSystem(t *testing.T) {
// send a second log to make sure the launcher isn't blocking
integrationsComp.SendLog(logSample, id)
}

// TestCombinedDiskUsageFallback ensures the launcher falls back to the
// logsTotalUsageSetting if there is an error in the logsUsageRatio
func TestCombinedDiskUsageFallback(t *testing.T) {
totalUsage := 100
pkgconfigsetup.Datadog().SetWithoutSource("logs_config.integrations_logs_disk_ratio", -1)
pkgconfigsetup.Datadog().SetWithoutSource("logs_config.integrations_logs_total_usage", totalUsage)

integrationsComp := integrationsmock.Mock()
s := NewLauncher(sources.NewLogSources(), integrationsComp)

assert.Equal(t, s.combinedUsageMax, int64(totalUsage*1024*1024))
}

0 comments on commit 66190f7

Please sign in to comment.