-
Notifications
You must be signed in to change notification settings - Fork 23
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Completed test coverage for entry and agent packages * Made a database package and improved tests * Improved pipeline test coverage * Fixed open database in offsets * Moved operator.Duration to helper.Duration * Increased test coverage for the plugin package * Fixed graph command * Improved JSON test coverage * Improved regex parser test coverage * Added tests to persister * Improved operator package test coverage * Improved drop_output test coverage * Improved noop operator test coverage
- Loading branch information
1 parent
f8965c2
commit 60b8b03
Showing
55 changed files
with
2,039 additions
and
547 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,133 +1,47 @@ | ||
package agent | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
"path/filepath" | ||
"sync" | ||
"time" | ||
|
||
"github.com/observiq/stanza/errors" | ||
"github.com/observiq/stanza/operator" | ||
"github.com/observiq/stanza/database" | ||
"github.com/observiq/stanza/pipeline" | ||
"go.etcd.io/bbolt" | ||
"go.uber.org/zap" | ||
) | ||
|
||
// LogAgent is an entity that handles log monitoring. | ||
type LogAgent struct { | ||
database operator.Database | ||
pipeline *pipeline.Pipeline | ||
database database.Database | ||
pipeline pipeline.Pipeline | ||
|
||
startOnce sync.Once | ||
stopOnce sync.Once | ||
|
||
*zap.SugaredLogger | ||
} | ||
|
||
// Start will start the log monitoring process. | ||
// Start will start the log monitoring process | ||
func (a *LogAgent) Start() (err error) { | ||
a.startOnce.Do(func() { | ||
err = a.pipeline.Start() | ||
if err != nil { | ||
return | ||
} | ||
a.Info("Agent started") | ||
}) | ||
return | ||
} | ||
|
||
// Stop will stop the log monitoring process. | ||
func (a *LogAgent) Stop() { | ||
// Stop will stop the log monitoring process | ||
func (a *LogAgent) Stop() (err error) { | ||
a.stopOnce.Do(func() { | ||
a.pipeline.Stop() | ||
a.database.Close() | ||
a.Info("Agent stopped") | ||
}) | ||
} | ||
|
||
// OpenDatabase will open and create a database. | ||
func OpenDatabase(file string) (operator.Database, error) { | ||
if file == "" { | ||
return operator.NewStubDatabase(), nil | ||
} | ||
|
||
if _, err := os.Stat(filepath.Dir(file)); err != nil { | ||
if os.IsNotExist(err) { | ||
err := os.MkdirAll(filepath.Dir(file), 0755) | ||
if err != nil { | ||
return nil, fmt.Errorf("creating database directory: %s", err) | ||
} | ||
} else { | ||
return nil, err | ||
err = a.pipeline.Stop() | ||
if err != nil { | ||
return | ||
} | ||
} | ||
|
||
options := &bbolt.Options{Timeout: 1 * time.Second} | ||
return bbolt.Open(file, 0666, options) | ||
} | ||
|
||
// LogAgentBuilder is a construct used to build a log agent | ||
type LogAgentBuilder struct { | ||
cfg *Config | ||
logger *zap.SugaredLogger | ||
pluginDir string | ||
databaseFile string | ||
defaultOutput operator.Operator | ||
} | ||
|
||
// NewBuilder creates a new LogAgentBuilder | ||
func NewBuilder(cfg *Config, logger *zap.SugaredLogger) *LogAgentBuilder { | ||
return &LogAgentBuilder{ | ||
cfg: cfg, | ||
logger: logger, | ||
} | ||
} | ||
|
||
// WithPluginDir adds the specified plugin directory when building a log agent | ||
func (b *LogAgentBuilder) WithPluginDir(pluginDir string) *LogAgentBuilder { | ||
b.pluginDir = pluginDir | ||
return b | ||
} | ||
|
||
// WithDatabaseFile adds the specified database file when building a log agent | ||
func (b *LogAgentBuilder) WithDatabaseFile(databaseFile string) *LogAgentBuilder { | ||
b.databaseFile = databaseFile | ||
return b | ||
} | ||
|
||
// WithDefaultOutput adds a default output when building a log agent | ||
func (b *LogAgentBuilder) WithDefaultOutput(defaultOutput operator.Operator) *LogAgentBuilder { | ||
b.defaultOutput = defaultOutput | ||
return b | ||
} | ||
|
||
// Build will build a new log agent using the values defined on the builder | ||
func (b *LogAgentBuilder) Build() (*LogAgent, error) { | ||
database, err := OpenDatabase(b.databaseFile) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "open database") | ||
} | ||
|
||
registry, err := operator.NewPluginRegistry(b.pluginDir) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "load plugin registry") | ||
} | ||
|
||
buildContext := operator.BuildContext{ | ||
Logger: b.logger, | ||
PluginRegistry: registry, | ||
Database: database, | ||
} | ||
|
||
pipeline, err := b.cfg.Pipeline.BuildPipeline(buildContext, b.defaultOutput) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &LogAgent{ | ||
pipeline: pipeline, | ||
database: database, | ||
SugaredLogger: b.logger, | ||
}, nil | ||
err = a.database.Close() | ||
if err != nil { | ||
return | ||
} | ||
}) | ||
return | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,54 +1,95 @@ | ||
package agent | ||
|
||
import ( | ||
"os" | ||
"path/filepath" | ||
"runtime" | ||
"fmt" | ||
"testing" | ||
|
||
"github.com/observiq/stanza/testutil" | ||
"github.com/stretchr/testify/require" | ||
"go.uber.org/zap" | ||
) | ||
|
||
func TestNewLogAgent(t *testing.T) { | ||
mockCfg := Config{} | ||
mockLogger := zap.NewNop().Sugar() | ||
mockPluginDir := "/some/path/plugins" | ||
mockDatabaseFile := "" | ||
agent, err := NewBuilder(&mockCfg, mockLogger). | ||
WithPluginDir(mockPluginDir). | ||
WithDatabaseFile(mockDatabaseFile). | ||
Build() | ||
func TestStartAgentSuccess(t *testing.T) { | ||
logger := zap.NewNop().Sugar() | ||
pipeline := &testutil.Pipeline{} | ||
pipeline.On("Start").Return(nil) | ||
|
||
agent := LogAgent{ | ||
SugaredLogger: logger, | ||
pipeline: pipeline, | ||
} | ||
err := agent.Start() | ||
require.NoError(t, err) | ||
pipeline.AssertCalled(t, "Start") | ||
} | ||
|
||
func TestStartAgentFailure(t *testing.T) { | ||
logger := zap.NewNop().Sugar() | ||
pipeline := &testutil.Pipeline{} | ||
failure := fmt.Errorf("failed to start pipeline") | ||
pipeline.On("Start").Return(failure) | ||
|
||
require.Equal(t, mockLogger, agent.SugaredLogger) | ||
agent := LogAgent{ | ||
SugaredLogger: logger, | ||
pipeline: pipeline, | ||
} | ||
err := agent.Start() | ||
require.Error(t, err, failure) | ||
pipeline.AssertCalled(t, "Start") | ||
} | ||
|
||
func TestOpenDatabase(t *testing.T) { | ||
t.Run("Simple", func(t *testing.T) { | ||
tempDir := testutil.NewTempDir(t) | ||
db, err := OpenDatabase(filepath.Join(tempDir, "test.db")) | ||
require.NoError(t, err) | ||
require.NotNil(t, db) | ||
}) | ||
|
||
t.Run("NonexistantPathIsCreated", func(t *testing.T) { | ||
tempDir := testutil.NewTempDir(t) | ||
db, err := OpenDatabase(filepath.Join(tempDir, "nonexistdir", "test.db")) | ||
require.NoError(t, err) | ||
require.NotNil(t, db) | ||
}) | ||
|
||
t.Run("BadPermissions", func(t *testing.T) { | ||
if runtime.GOOS == "windows" { | ||
t.Skip("Windows does not have the same kind of file permissions") | ||
} | ||
tempDir := testutil.NewTempDir(t) | ||
err := os.MkdirAll(filepath.Join(tempDir, "badperms"), 0666) | ||
require.NoError(t, err) | ||
db, err := OpenDatabase(filepath.Join(tempDir, "badperms", "nonexistdir", "test.db")) | ||
require.Error(t, err) | ||
require.Nil(t, db) | ||
}) | ||
func TestStopAgentSuccess(t *testing.T) { | ||
logger := zap.NewNop().Sugar() | ||
pipeline := &testutil.Pipeline{} | ||
pipeline.On("Stop").Return(nil) | ||
database := &testutil.Database{} | ||
database.On("Close").Return(nil) | ||
|
||
agent := LogAgent{ | ||
SugaredLogger: logger, | ||
pipeline: pipeline, | ||
database: database, | ||
} | ||
err := agent.Stop() | ||
require.NoError(t, err) | ||
pipeline.AssertCalled(t, "Stop") | ||
database.AssertCalled(t, "Close") | ||
} | ||
|
||
func TestStopAgentPipelineFailure(t *testing.T) { | ||
logger := zap.NewNop().Sugar() | ||
pipeline := &testutil.Pipeline{} | ||
failure := fmt.Errorf("failed to start pipeline") | ||
pipeline.On("Stop").Return(failure) | ||
database := &testutil.Database{} | ||
database.On("Close").Return(nil) | ||
|
||
agent := LogAgent{ | ||
SugaredLogger: logger, | ||
pipeline: pipeline, | ||
database: database, | ||
} | ||
err := agent.Stop() | ||
require.Error(t, err, failure) | ||
pipeline.AssertCalled(t, "Stop") | ||
database.AssertNotCalled(t, "Close") | ||
} | ||
|
||
func TestStopAgentDatabaseFailure(t *testing.T) { | ||
logger := zap.NewNop().Sugar() | ||
pipeline := &testutil.Pipeline{} | ||
pipeline.On("Stop").Return(nil) | ||
database := &testutil.Database{} | ||
failure := fmt.Errorf("failed to close database") | ||
database.On("Close").Return(failure) | ||
|
||
agent := LogAgent{ | ||
SugaredLogger: logger, | ||
pipeline: pipeline, | ||
database: database, | ||
} | ||
err := agent.Stop() | ||
require.Error(t, err, failure) | ||
pipeline.AssertCalled(t, "Stop") | ||
database.AssertCalled(t, "Close") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
package agent | ||
|
||
import ( | ||
"github.com/observiq/stanza/database" | ||
"github.com/observiq/stanza/errors" | ||
"github.com/observiq/stanza/operator" | ||
"github.com/observiq/stanza/plugin" | ||
"go.uber.org/zap" | ||
) | ||
|
||
// LogAgentBuilder is a construct used to build a log agent | ||
type LogAgentBuilder struct { | ||
cfg *Config | ||
logger *zap.SugaredLogger | ||
pluginDir string | ||
databaseFile string | ||
defaultOutput operator.Operator | ||
} | ||
|
||
// NewBuilder creates a new LogAgentBuilder | ||
func NewBuilder(cfg *Config, logger *zap.SugaredLogger) *LogAgentBuilder { | ||
return &LogAgentBuilder{ | ||
cfg: cfg, | ||
logger: logger, | ||
} | ||
} | ||
|
||
// WithPluginDir adds the specified plugin directory when building a log agent | ||
func (b *LogAgentBuilder) WithPluginDir(pluginDir string) *LogAgentBuilder { | ||
b.pluginDir = pluginDir | ||
return b | ||
} | ||
|
||
// WithDatabaseFile adds the specified database file when building a log agent | ||
func (b *LogAgentBuilder) WithDatabaseFile(databaseFile string) *LogAgentBuilder { | ||
b.databaseFile = databaseFile | ||
return b | ||
} | ||
|
||
// WithDefaultOutput adds a default output when building a log agent | ||
func (b *LogAgentBuilder) WithDefaultOutput(defaultOutput operator.Operator) *LogAgentBuilder { | ||
b.defaultOutput = defaultOutput | ||
return b | ||
} | ||
|
||
// Build will build a new log agent using the values defined on the builder | ||
func (b *LogAgentBuilder) Build() (*LogAgent, error) { | ||
db, err := database.OpenDatabase(b.databaseFile) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "open database") | ||
} | ||
|
||
registry, err := plugin.NewPluginRegistry(b.pluginDir) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "load plugin registry") | ||
} | ||
|
||
buildContext := operator.BuildContext{ | ||
Logger: b.logger, | ||
Database: db, | ||
} | ||
|
||
pipeline, err := b.cfg.Pipeline.BuildPipeline(buildContext, registry, b.defaultOutput) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &LogAgent{ | ||
pipeline: pipeline, | ||
database: db, | ||
SugaredLogger: b.logger, | ||
}, nil | ||
} |
Oops, something went wrong.