-
Notifications
You must be signed in to change notification settings - Fork 22
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Completed test coverage for entry and agent packages
- Loading branch information
1 parent
7a52d76
commit 16358e3
Showing
22 changed files
with
1,028 additions
and
367 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,133 +1,47 @@ | ||
package agent | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
"path/filepath" | ||
"sync" | ||
"time" | ||
|
||
"github.com/observiq/stanza/errors" | ||
"github.com/observiq/stanza/operator" | ||
"github.com/observiq/stanza/pipeline" | ||
"go.etcd.io/bbolt" | ||
"go.uber.org/zap" | ||
) | ||
|
||
// LogAgent is an entity that handles log monitoring. | ||
type LogAgent struct { | ||
database operator.Database | ||
pipeline *pipeline.Pipeline | ||
pipeline pipeline.Pipeline | ||
|
||
startOnce sync.Once | ||
stopOnce sync.Once | ||
|
||
*zap.SugaredLogger | ||
} | ||
|
||
// Start will start the log monitoring process. | ||
// Start will start the log monitoring process | ||
func (a *LogAgent) Start() (err error) { | ||
a.startOnce.Do(func() { | ||
err = a.pipeline.Start() | ||
if err != nil { | ||
return | ||
} | ||
a.Info("Agent started") | ||
}) | ||
return | ||
} | ||
|
||
// Stop will stop the log monitoring process. | ||
func (a *LogAgent) Stop() { | ||
// Stop will stop the log monitoring process | ||
func (a *LogAgent) Stop() (err error) { | ||
a.stopOnce.Do(func() { | ||
a.pipeline.Stop() | ||
a.database.Close() | ||
a.Info("Agent stopped") | ||
}) | ||
} | ||
|
||
// OpenDatabase will open and create a database. | ||
func OpenDatabase(file string) (operator.Database, error) { | ||
if file == "" { | ||
return operator.NewStubDatabase(), nil | ||
} | ||
|
||
if _, err := os.Stat(filepath.Dir(file)); err != nil { | ||
if os.IsNotExist(err) { | ||
err := os.MkdirAll(filepath.Dir(file), 0755) | ||
if err != nil { | ||
return nil, fmt.Errorf("creating database directory: %s", err) | ||
} | ||
} else { | ||
return nil, err | ||
err = a.pipeline.Stop() | ||
if err != nil { | ||
return | ||
} | ||
} | ||
|
||
options := &bbolt.Options{Timeout: 1 * time.Second} | ||
return bbolt.Open(file, 0666, options) | ||
} | ||
|
||
// LogAgentBuilder is a construct used to build a log agent | ||
type LogAgentBuilder struct { | ||
cfg *Config | ||
logger *zap.SugaredLogger | ||
pluginDir string | ||
databaseFile string | ||
defaultOutput operator.Operator | ||
} | ||
|
||
// NewBuilder creates a new LogAgentBuilder | ||
func NewBuilder(cfg *Config, logger *zap.SugaredLogger) *LogAgentBuilder { | ||
return &LogAgentBuilder{ | ||
cfg: cfg, | ||
logger: logger, | ||
} | ||
} | ||
|
||
// WithPluginDir adds the specified plugin directory when building a log agent | ||
func (b *LogAgentBuilder) WithPluginDir(pluginDir string) *LogAgentBuilder { | ||
b.pluginDir = pluginDir | ||
return b | ||
} | ||
|
||
// WithDatabaseFile adds the specified database file when building a log agent | ||
func (b *LogAgentBuilder) WithDatabaseFile(databaseFile string) *LogAgentBuilder { | ||
b.databaseFile = databaseFile | ||
return b | ||
} | ||
|
||
// WithDefaultOutput adds a default output when building a log agent | ||
func (b *LogAgentBuilder) WithDefaultOutput(defaultOutput operator.Operator) *LogAgentBuilder { | ||
b.defaultOutput = defaultOutput | ||
return b | ||
} | ||
|
||
// Build will build a new log agent using the values defined on the builder | ||
func (b *LogAgentBuilder) Build() (*LogAgent, error) { | ||
database, err := OpenDatabase(b.databaseFile) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "open database") | ||
} | ||
|
||
registry, err := operator.NewPluginRegistry(b.pluginDir) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "load plugin registry") | ||
} | ||
|
||
buildContext := operator.BuildContext{ | ||
Logger: b.logger, | ||
PluginRegistry: registry, | ||
Database: database, | ||
} | ||
|
||
pipeline, err := b.cfg.Pipeline.BuildPipeline(buildContext, b.defaultOutput) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &LogAgent{ | ||
pipeline: pipeline, | ||
database: database, | ||
SugaredLogger: b.logger, | ||
}, nil | ||
err = a.database.Close() | ||
if err != nil { | ||
return | ||
} | ||
}) | ||
return | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,54 +1,95 @@ | ||
package agent | ||
|
||
import ( | ||
"os" | ||
"path/filepath" | ||
"runtime" | ||
"fmt" | ||
"testing" | ||
|
||
"github.com/observiq/stanza/testutil" | ||
"github.com/stretchr/testify/require" | ||
"go.uber.org/zap" | ||
) | ||
|
||
func TestNewLogAgent(t *testing.T) { | ||
mockCfg := Config{} | ||
mockLogger := zap.NewNop().Sugar() | ||
mockPluginDir := "/some/path/plugins" | ||
mockDatabaseFile := "" | ||
agent, err := NewBuilder(&mockCfg, mockLogger). | ||
WithPluginDir(mockPluginDir). | ||
WithDatabaseFile(mockDatabaseFile). | ||
Build() | ||
func TestStartAgentSuccess(t *testing.T) { | ||
logger := zap.NewNop().Sugar() | ||
pipeline := &testutil.Pipeline{} | ||
pipeline.On("Start").Return(nil) | ||
|
||
agent := LogAgent{ | ||
SugaredLogger: logger, | ||
pipeline: pipeline, | ||
} | ||
err := agent.Start() | ||
require.NoError(t, err) | ||
pipeline.AssertCalled(t, "Start") | ||
} | ||
|
||
func TestStartAgentFailure(t *testing.T) { | ||
logger := zap.NewNop().Sugar() | ||
pipeline := &testutil.Pipeline{} | ||
failure := fmt.Errorf("failed to start pipeline") | ||
pipeline.On("Start").Return(failure) | ||
|
||
require.Equal(t, mockLogger, agent.SugaredLogger) | ||
agent := LogAgent{ | ||
SugaredLogger: logger, | ||
pipeline: pipeline, | ||
} | ||
err := agent.Start() | ||
require.Error(t, err, failure) | ||
pipeline.AssertCalled(t, "Start") | ||
} | ||
|
||
func TestOpenDatabase(t *testing.T) { | ||
t.Run("Simple", func(t *testing.T) { | ||
tempDir := testutil.NewTempDir(t) | ||
db, err := OpenDatabase(filepath.Join(tempDir, "test.db")) | ||
require.NoError(t, err) | ||
require.NotNil(t, db) | ||
}) | ||
|
||
t.Run("NonexistantPathIsCreated", func(t *testing.T) { | ||
tempDir := testutil.NewTempDir(t) | ||
db, err := OpenDatabase(filepath.Join(tempDir, "nonexistdir", "test.db")) | ||
require.NoError(t, err) | ||
require.NotNil(t, db) | ||
}) | ||
|
||
t.Run("BadPermissions", func(t *testing.T) { | ||
if runtime.GOOS == "windows" { | ||
t.Skip("Windows does not have the same kind of file permissions") | ||
} | ||
tempDir := testutil.NewTempDir(t) | ||
err := os.MkdirAll(filepath.Join(tempDir, "badperms"), 0666) | ||
require.NoError(t, err) | ||
db, err := OpenDatabase(filepath.Join(tempDir, "badperms", "nonexistdir", "test.db")) | ||
require.Error(t, err) | ||
require.Nil(t, db) | ||
}) | ||
func TestStopAgentSuccess(t *testing.T) { | ||
logger := zap.NewNop().Sugar() | ||
pipeline := &testutil.Pipeline{} | ||
pipeline.On("Stop").Return(nil) | ||
database := &testutil.Database{} | ||
database.On("Close").Return(nil) | ||
|
||
agent := LogAgent{ | ||
SugaredLogger: logger, | ||
pipeline: pipeline, | ||
database: database, | ||
} | ||
err := agent.Stop() | ||
require.NoError(t, err) | ||
pipeline.AssertCalled(t, "Stop") | ||
database.AssertCalled(t, "Close") | ||
} | ||
|
||
func TestStopAgentPipelineFailure(t *testing.T) { | ||
logger := zap.NewNop().Sugar() | ||
pipeline := &testutil.Pipeline{} | ||
failure := fmt.Errorf("failed to start pipeline") | ||
pipeline.On("Stop").Return(failure) | ||
database := &testutil.Database{} | ||
database.On("Close").Return(nil) | ||
|
||
agent := LogAgent{ | ||
SugaredLogger: logger, | ||
pipeline: pipeline, | ||
database: database, | ||
} | ||
err := agent.Stop() | ||
require.Error(t, err, failure) | ||
pipeline.AssertCalled(t, "Stop") | ||
database.AssertNotCalled(t, "Close") | ||
} | ||
|
||
func TestStopAgentDatabaseFailure(t *testing.T) { | ||
logger := zap.NewNop().Sugar() | ||
pipeline := &testutil.Pipeline{} | ||
pipeline.On("Stop").Return(nil) | ||
database := &testutil.Database{} | ||
failure := fmt.Errorf("failed to close database") | ||
database.On("Close").Return(failure) | ||
|
||
agent := LogAgent{ | ||
SugaredLogger: logger, | ||
pipeline: pipeline, | ||
database: database, | ||
} | ||
err := agent.Stop() | ||
require.Error(t, err, failure) | ||
pipeline.AssertCalled(t, "Stop") | ||
database.AssertCalled(t, "Close") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package agent | ||
|
||
import ( | ||
"github.com/observiq/stanza/errors" | ||
"github.com/observiq/stanza/operator" | ||
"go.uber.org/zap" | ||
) | ||
|
||
// LogAgentBuilder is a construct used to build a log agent | ||
type LogAgentBuilder struct { | ||
cfg *Config | ||
logger *zap.SugaredLogger | ||
pluginDir string | ||
databaseFile string | ||
defaultOutput operator.Operator | ||
} | ||
|
||
// NewBuilder creates a new LogAgentBuilder | ||
func NewBuilder(cfg *Config, logger *zap.SugaredLogger) *LogAgentBuilder { | ||
return &LogAgentBuilder{ | ||
cfg: cfg, | ||
logger: logger, | ||
} | ||
} | ||
|
||
// WithPluginDir adds the specified plugin directory when building a log agent | ||
func (b *LogAgentBuilder) WithPluginDir(pluginDir string) *LogAgentBuilder { | ||
b.pluginDir = pluginDir | ||
return b | ||
} | ||
|
||
// WithDatabaseFile adds the specified database file when building a log agent | ||
func (b *LogAgentBuilder) WithDatabaseFile(databaseFile string) *LogAgentBuilder { | ||
b.databaseFile = databaseFile | ||
return b | ||
} | ||
|
||
// WithDefaultOutput adds a default output when building a log agent | ||
func (b *LogAgentBuilder) WithDefaultOutput(defaultOutput operator.Operator) *LogAgentBuilder { | ||
b.defaultOutput = defaultOutput | ||
return b | ||
} | ||
|
||
// Build will build a new log agent using the values defined on the builder | ||
func (b *LogAgentBuilder) Build() (*LogAgent, error) { | ||
database, err := operator.OpenDatabase(b.databaseFile) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "open database") | ||
} | ||
|
||
registry, err := operator.NewPluginRegistry(b.pluginDir) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "load plugin registry") | ||
} | ||
|
||
buildContext := operator.BuildContext{ | ||
Logger: b.logger, | ||
PluginRegistry: registry, | ||
Database: database, | ||
} | ||
|
||
pipeline, err := b.cfg.Pipeline.BuildPipeline(buildContext, b.defaultOutput) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &LogAgent{ | ||
pipeline: pipeline, | ||
database: database, | ||
SugaredLogger: b.logger, | ||
}, nil | ||
} |
Oops, something went wrong.