Skip to content

Commit

Permalink
Add more robust error handling for files
Browse files Browse the repository at this point in the history
  • Loading branch information
soberpeach committed Oct 4, 2024
1 parent 8d63f09 commit 8bebadc
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 5 deletions.
38 changes: 33 additions & 5 deletions pkg/logs/launchers/integration/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package integration

import (
"errors"
"math"
"os"
"path/filepath"
Expand Down Expand Up @@ -67,10 +68,17 @@ func NewLauncher(sources *sources.LogSources, integrationsLogsComp integrations.

logsTotalUsageSetting := datadogConfig.GetInt64("logs_config.integrations_logs_total_usage") * 1024 * 1024
logsUsageRatio := datadogConfig.GetFloat64("logs_config.integrations_logs_disk_ratio")
maxDiskUsage, err := computeMaxDiskUsage(runPath, logsTotalUsageSetting, logsUsageRatio)
if err != nil {
ddLog.Warn("Unable to compute integrations logs max disk usage, using default value of 100 MB:", err)

var maxDiskUsage int64
if logsUsageRatio > 1 || logsUsageRatio < 0 {
ddLog.Warn("Logs usage ratio setting must be between 0 and 1, current value is:", logsUsageRatio, ". Falling back to integrations_logs_total_usage setting.")
maxDiskUsage = logsTotalUsageSetting
} else {
maxDiskUsage, err = computeMaxDiskUsage(runPath, logsTotalUsageSetting, logsUsageRatio)
if err != nil {
ddLog.Warn("Unable to compute integrations logs max disk usage, falling back to integrations_logs_total_usage setting:", err)
maxDiskUsage = logsTotalUsageSetting
}
}

return &Launcher{
Expand Down Expand Up @@ -109,6 +117,9 @@ func (s *Launcher) run() {
for {
select {
case cfg := <-s.addedConfigs:
if s.combinedUsageMax == 0 {
continue
}

sources, err := ad.CreateSources(cfg.Config)
if err != nil {
Expand Down Expand Up @@ -140,6 +151,10 @@ func (s *Launcher) run() {
}

case log := <-s.integrationsLogsChan:
if s.combinedUsageMax == 0 {
continue
}

s.receiveLogs(log)
case <-s.stop:
return
Expand Down Expand Up @@ -181,6 +196,10 @@ func (s *Launcher) receiveLogs(log integrations.IntegrationLog) {
// deleting files until total usage falls below the set maximum
for s.combinedUsageSize+logSize > s.combinedUsageMax {
leastRecentlyModifiedFile := s.getLeastRecentlyModifiedFile()
if leastRecentlyModifiedFile == nil {
ddLog.Warn("getLeastRecentlyModifiedFile returned nil, skipping writing log to file.")
return
}

err := s.deleteFile(leastRecentlyModifiedFile)
if err != nil {
Expand Down Expand Up @@ -212,7 +231,6 @@ func (s *Launcher) receiveLogs(log integrations.IntegrationLog) {
fileToUpdate.size += logSize
}

// deleteFile deletes the given file
func (s *Launcher) deleteFile(file *fileInfo) error {
filename := filepath.Join(s.runPath, file.filename)
err := os.Remove(filename)
Expand Down Expand Up @@ -315,7 +333,7 @@ func (s *Launcher) integrationLogFilePath(id string) string {
return logFilePath
}

// computerDiskUsageMax computes the max disk space the launcher can use based
// computeDiskUsageMax computes the max disk space the launcher can use based
// off the integrations_logs_disk_ratio and integrations_logs_total_usage
// settings
func computeMaxDiskUsage(runPath string, logsTotalUsageSetting int64, usageRatio float64) (int64, error) {
Expand All @@ -327,6 +345,11 @@ func computeMaxDiskUsage(runPath string, logsTotalUsageSetting int64, usageRatio
diskReserved := float64(usage.Total) * (1 - usageRatio)
diskAvailable := int64(usage.Available) - int64(math.Ceil(diskReserved))

if diskAvailable < 0 {
ddLog.Warn("Available disk calculated as less than 0: ", diskAvailable, ". Disk reserved:", diskReserved)
diskAvailable = 0
}

return min(logsTotalUsageSetting, diskAvailable), nil
}

Expand Down Expand Up @@ -363,6 +386,11 @@ func (s *Launcher) scanInitialFiles(dir string) error {
for s.combinedUsageSize > s.combinedUsageMax {
leastRecentlyModifiedFile := s.getLeastRecentlyModifiedFile()

if leastRecentlyModifiedFile == nil {
ddLog.Warn("getLeastRecentlyModifiedFile returned nil.")
return errors.New("getLeastRecentlyModifiedFile returned nil when trying to delete files")
}

err = s.deleteFile(leastRecentlyModifiedFile)
if err != nil {
ddLog.Warn("Error deleting log file:", err)
Expand Down
21 changes: 21 additions & 0 deletions pkg/logs/launchers/integration/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ func (suite *LauncherTestSuite) TestDeleteFile() {
assert.True(suite.T(), os.IsNotExist(err))
}

// TestNegativeCombinedUsageMax ensures errors in combinedUsageMax don't result
// in panics from `deleteFile`
func (suite *LauncherTestSuite) TestNegativeCombinedUsageMax() {
suite.s.combinedUsageMax = -1
err := suite.s.scanInitialFiles(suite.s.runPath)
assert.NotNil(suite.T(), err)
}

// TestIntegrationLogFilePath ensures the filepath for the logs files are correct
func (suite *LauncherTestSuite) TestIntegrationLogFilePath() {
id := "123456789"
Expand Down Expand Up @@ -458,3 +466,16 @@ func TestReadOnlyFileSystem(t *testing.T) {
// send a second log to make sure the launcher isn't blocking
integrationsComp.SendLog(logSample, id)
}

// TestCombinedDiskUsageFallback ensures the launcher falls back to the
// logsTotalUsageSetting if there is an error in the logsUsageRatio
func TestCombinedDiskUsageFallback(t *testing.T) {
totalUsage := 100
pkgconfigsetup.Datadog().SetWithoutSource("logs_config.integrations_logs_disk_ratio", -1)
pkgconfigsetup.Datadog().SetWithoutSource("logs_config.integrations_logs_total_usage", totalUsage)

integrationsComp := integrationsmock.Mock()
s := NewLauncher(sources.NewLogSources(), integrationsComp)

assert.Equal(t, s.combinedUsageMax, int64(totalUsage*1024*1024))
}

0 comments on commit 8bebadc

Please sign in to comment.