Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMLII-2060] Add more robust error handling for files #29817

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions pkg/logs/launchers/integration/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package integration

import (
"errors"
"math"
"os"
"path/filepath"
Expand Down Expand Up @@ -67,10 +68,21 @@ func NewLauncher(sources *sources.LogSources, integrationsLogsComp integrations.

logsTotalUsageSetting := datadogConfig.GetInt64("logs_config.integrations_logs_total_usage") * 1024 * 1024
logsUsageRatio := datadogConfig.GetFloat64("logs_config.integrations_logs_disk_ratio")
maxDiskUsage, err := computeMaxDiskUsage(runPath, logsTotalUsageSetting, logsUsageRatio)
if err != nil {
ddLog.Warn("Unable to compute integrations logs max disk usage, using default value of 100 MB:", err)

var maxDiskUsage int64
if logsUsageRatio > 1 || logsUsageRatio < 0 {
ddLog.Warn("Logs usage ratio setting must be between 0 and 1, current value is:", logsUsageRatio, ". Falling back to integrations_logs_total_usage setting.")
maxDiskUsage = logsTotalUsageSetting
} else {
maxDiskUsage, err = computeMaxDiskUsage(runPath, logsTotalUsageSetting, logsUsageRatio)
if err != nil {
ddLog.Warn("Unable to compute integrations logs max disk usage, falling back to integrations_logs_total_usage setting:", err)
maxDiskUsage = logsTotalUsageSetting
}

if maxDiskUsage == 0 {
ddLog.Warn("No space available to store logs. Logs from integrations will be dropped. Please allocate space for logs to be stored.")
}
}

soberpeach marked this conversation as resolved.
Show resolved Hide resolved
return &Launcher{
Expand Down Expand Up @@ -109,6 +121,9 @@ func (s *Launcher) run() {
for {
select {
case cfg := <-s.addedConfigs:
if s.combinedUsageMax == 0 {
continue
}

sources, err := ad.CreateSources(cfg.Config)
if err != nil {
Expand Down Expand Up @@ -140,6 +155,10 @@ func (s *Launcher) run() {
}

case log := <-s.integrationsLogsChan:
if s.combinedUsageMax == 0 {
continue
}

s.receiveLogs(log)
case <-s.stop:
return
Expand Down Expand Up @@ -181,11 +200,15 @@ func (s *Launcher) receiveLogs(log integrations.IntegrationLog) {
// deleting files until total usage falls below the set maximum
for s.combinedUsageSize+logSize > s.combinedUsageMax {
leastRecentlyModifiedFile := s.getLeastRecentlyModifiedFile()
if leastRecentlyModifiedFile == nil {
ddLog.Error("Could not determine least recently modified file, skipping writing log to file.")
return
}

err := s.deleteFile(leastRecentlyModifiedFile)
if err != nil {
ddLog.Error("Error deleting log file:", err)
continue
return
}

file, err := os.Create(leastRecentlyModifiedFile.filename)
Expand All @@ -212,7 +235,6 @@ func (s *Launcher) receiveLogs(log integrations.IntegrationLog) {
fileToUpdate.size += logSize
}

// deleteFile deletes the given file
func (s *Launcher) deleteFile(file *fileInfo) error {
filename := filepath.Join(s.runPath, file.filename)
err := os.Remove(filename)
Expand Down Expand Up @@ -315,7 +337,7 @@ func (s *Launcher) integrationLogFilePath(id string) string {
return logFilePath
}

// computerDiskUsageMax computes the max disk space the launcher can use based
// computeDiskUsageMax computes the max disk space the launcher can use based
// off the integrations_logs_disk_ratio and integrations_logs_total_usage
// settings
func computeMaxDiskUsage(runPath string, logsTotalUsageSetting int64, usageRatio float64) (int64, error) {
Expand All @@ -327,6 +349,11 @@ func computeMaxDiskUsage(runPath string, logsTotalUsageSetting int64, usageRatio
diskReserved := float64(usage.Total) * (1 - usageRatio)
diskAvailable := int64(usage.Available) - int64(math.Ceil(diskReserved))

if diskAvailable < 0 {
ddLog.Warn("Available disk calculated as less than 0: ", diskAvailable, ". Disk reserved:", diskReserved)
diskAvailable = 0
}

return min(logsTotalUsageSetting, diskAvailable), nil
}

Expand Down Expand Up @@ -363,6 +390,11 @@ func (s *Launcher) scanInitialFiles(dir string) error {
for s.combinedUsageSize > s.combinedUsageMax {
leastRecentlyModifiedFile := s.getLeastRecentlyModifiedFile()

if leastRecentlyModifiedFile == nil {
ddLog.Error("Could not determine least recently modified file")
return errors.New("getLeastRecentlyModifiedFile returned nil when trying to delete files")
}

err = s.deleteFile(leastRecentlyModifiedFile)
if err != nil {
ddLog.Warn("Error deleting log file:", err)
Expand Down
98 changes: 97 additions & 1 deletion pkg/logs/launchers/integration/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func (suite *LauncherTestSuite) TestFileCreation() {
}

func (suite *LauncherTestSuite) TestSendLog() {

mockConf := &integration.Config{}
mockConf.Provider = "container"
mockConf.LogsConfig = integration.Data(`[{"type": "integration", "source": "foo", "service": "bar"}]`)
Expand Down Expand Up @@ -99,6 +98,90 @@ func (suite *LauncherTestSuite) TestSendLog() {
assert.Equal(suite.T(), expectedPath, <-filepathChan)
}

// TestNegativeCombinedUsageMax ensures errors in combinedUsageMax don't result
// in panics from `deleteFile`
func (suite *LauncherTestSuite) TestNegativeCombinedUsageMax() {
suite.s.combinedUsageMax = -1
err := suite.s.scanInitialFiles(suite.s.runPath)
assert.NotNil(suite.T(), err)
}

// TestZeroCombinedUsageMax ensures the launcher won't panic when
// combinedUsageMax is zero. Realistically the launcher would never run receiveLogs since there is a check for
func (suite *LauncherTestSuite) TestZeroCombinedUsageMaxFileCreated() {
suite.s.combinedUsageMax = 0

filename := "sample_integration_123.log"
filepath := filepath.Join(suite.s.runPath, filename)
file, err := os.Create(filepath)
assert.Nil(suite.T(), err)

file.Close()

suite.s.Start(nil, nil, nil, nil)

integrationLog := integrations.IntegrationLog{
Log: "sample log",
IntegrationID: "sample_integration:123",
}

suite.s.receiveLogs(integrationLog)
}

func (suite *LauncherTestSuite) TestZeroCombinedUsageMaxFileNotCreated() {
suite.s.combinedUsageMax = 0

suite.s.Start(nil, nil, nil, nil)

integrationLog := integrations.IntegrationLog{
Log: "sample log",
IntegrationID: "sample_integration:123",
}

suite.s.receiveLogs(integrationLog)
}

func (suite *LauncherTestSuite) TestSmallCombinedUsageMax() {
suite.s.combinedUsageMax = 10

filename := "sample_integration_123.log"
filepath := filepath.Join(suite.s.runPath, filename)
file, err := os.Create(filepath)
assert.Nil(suite.T(), err)

file.Close()

suite.s.Start(nil, nil, nil, nil)

// Launcher should write this log
writtenLog := "sample"
integrationLog := integrations.IntegrationLog{
Log: writtenLog,
IntegrationID: "sample_integration:123",
}
suite.s.receiveLogs(integrationLog)
fileStat, err := os.Stat(filepath)
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), fileStat.Size(), int64(len(writtenLog)+1))

// Launcher should delete file for this log
unwrittenLog := "sample log two"
integrationLogTwo := integrations.IntegrationLog{
Log: unwrittenLog,
IntegrationID: "sample_integration:123",
}
suite.s.receiveLogs(integrationLogTwo)

_, err = os.Stat(filepath)
assert.True(suite.T(), os.IsNotExist(err))

// Remake the file
suite.s.receiveLogs(integrationLog)
fileStat, err = os.Stat(filepath)
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), fileStat.Size(), int64(len(writtenLog)+1))
}

func (suite *LauncherTestSuite) TestWriteLogToFile() {
logText := "hello world"
err := suite.s.writeLogToFileFunction(suite.testPath, logText)
Expand Down Expand Up @@ -458,3 +541,16 @@ func TestReadOnlyFileSystem(t *testing.T) {
// send a second log to make sure the launcher isn't blocking
integrationsComp.SendLog(logSample, id)
}

// TestCombinedDiskUsageFallback ensures the launcher falls back to the
// logsTotalUsageSetting if there is an error in the logsUsageRatio
func TestCombinedDiskUsageFallback(t *testing.T) {
totalUsage := 100
pkgconfigsetup.Datadog().SetWithoutSource("logs_config.integrations_logs_disk_ratio", -1)
pkgconfigsetup.Datadog().SetWithoutSource("logs_config.integrations_logs_total_usage", totalUsage)

integrationsComp := integrationsmock.Mock()
s := NewLauncher(sources.NewLogSources(), integrationsComp)

assert.Equal(t, s.combinedUsageMax, int64(totalUsage*1024*1024))
}
Loading