Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Remove database, provide Persistor at Start #93

Merged
merged 5 commits into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ import (

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-log-collection/database"
"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/pipeline"
)

// LogAgent is an entity that handles log monitoring.
type LogAgent struct {
database database.Database
pipeline pipeline.Pipeline

startOnce sync.Once
Expand All @@ -35,9 +34,9 @@ type LogAgent struct {
}

// Start will start the log monitoring process
func (a *LogAgent) Start() (err error) {
func (a *LogAgent) Start(persister operator.Persister) (err error) {
a.startOnce.Do(func() {
err = a.pipeline.Start()
err = a.pipeline.Start(persister)
if err != nil {
return
}
Expand All @@ -52,11 +51,6 @@ func (a *LogAgent) Stop() (err error) {
if err != nil {
return
}

err = a.database.Close()
if err != nil {
return
}
})
return
}
41 changes: 8 additions & 33 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,84 +27,59 @@ import (
func TestStartAgentSuccess(t *testing.T) {
logger := zap.NewNop().Sugar()
pipeline := &testutil.Pipeline{}
pipeline.On("Start").Return(nil)
persister := testutil.NewMockPersister("test")
pipeline.On("Start", persister).Return(nil)

agent := LogAgent{
SugaredLogger: logger,
pipeline: pipeline,
}
err := agent.Start()
err := agent.Start(persister)
require.NoError(t, err)
pipeline.AssertCalled(t, "Start")
pipeline.AssertCalled(t, "Start", persister)
}

func TestStartAgentFailure(t *testing.T) {
logger := zap.NewNop().Sugar()
pipeline := &testutil.Pipeline{}
persister := testutil.NewMockPersister("test")
failure := fmt.Errorf("failed to start pipeline")
pipeline.On("Start").Return(failure)
pipeline.On("Start", persister).Return(failure)

agent := LogAgent{
SugaredLogger: logger,
pipeline: pipeline,
}
err := agent.Start()
err := agent.Start(persister)
require.Error(t, err, failure)
pipeline.AssertCalled(t, "Start")
pipeline.AssertCalled(t, "Start", persister)
}

func TestStopAgentSuccess(t *testing.T) {
logger := zap.NewNop().Sugar()
pipeline := &testutil.Pipeline{}
pipeline.On("Stop").Return(nil)
database := &testutil.Database{}
database.On("Close").Return(nil)

agent := LogAgent{
SugaredLogger: logger,
pipeline: pipeline,
database: database,
}
err := agent.Stop()
require.NoError(t, err)
pipeline.AssertCalled(t, "Stop")
database.AssertCalled(t, "Close")
}

func TestStopAgentPipelineFailure(t *testing.T) {
logger := zap.NewNop().Sugar()
pipeline := &testutil.Pipeline{}
failure := fmt.Errorf("failed to start pipeline")
pipeline.On("Stop").Return(failure)
database := &testutil.Database{}
database.On("Close").Return(nil)

agent := LogAgent{
SugaredLogger: logger,
pipeline: pipeline,
database: database,
}
err := agent.Stop()
require.Error(t, err, failure)
pipeline.AssertCalled(t, "Stop")
database.AssertNotCalled(t, "Close")
}

func TestStopAgentDatabaseFailure(t *testing.T) {
logger := zap.NewNop().Sugar()
pipeline := &testutil.Pipeline{}
pipeline.On("Stop").Return(nil)
database := &testutil.Database{}
failure := fmt.Errorf("failed to close database")
database.On("Close").Return(failure)

agent := LogAgent{
SugaredLogger: logger,
pipeline: pipeline,
database: database,
}
err := agent.Stop()
require.Error(t, err, failure)
pipeline.AssertCalled(t, "Stop")
database.AssertCalled(t, "Close")
}
29 changes: 3 additions & 26 deletions agent/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
package agent

import (
"fmt"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/open-telemetry/opentelemetry-log-collection/database"
"github.com/open-telemetry/opentelemetry-log-collection/errors"
"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/plugin"
Expand All @@ -33,8 +31,6 @@ type LogAgentBuilder struct {
config *Config
logger *zap.SugaredLogger
pluginDir string
namespace string
databaseFile string
defaultOutput operator.Operator
}

Expand Down Expand Up @@ -63,13 +59,6 @@ func (b *LogAgentBuilder) WithConfig(cfg *Config) *LogAgentBuilder {
return b
}

// WithDatabaseFile adds the specified database file when building a log agent
func (b *LogAgentBuilder) WithDatabaseFile(databaseFile, namespace string) *LogAgentBuilder {
b.databaseFile = databaseFile
b.namespace = namespace
return b
}

// WithDefaultOutput adds a default output when building a log agent
func (b *LogAgentBuilder) WithDefaultOutput(defaultOutput operator.Operator) *LogAgentBuilder {
b.defaultOutput = defaultOutput
Expand All @@ -78,15 +67,6 @@ func (b *LogAgentBuilder) WithDefaultOutput(defaultOutput operator.Operator) *Lo

// Build will build a new log agent using the values defined on the builder
func (b *LogAgentBuilder) Build() (*LogAgent, error) {
if b.databaseFile != "" && b.namespace == "" {
return nil, fmt.Errorf("use of database requires namespace")
}

db, err := database.OpenDatabase(b.databaseFile)
if err != nil {
return nil, errors.Wrap(err, "open database")
}

if b.pluginDir != "" {
if errs := plugin.RegisterPlugins(b.pluginDir, operator.DefaultRegistry); len(errs) != 0 {
b.logger.Errorw("Got errors parsing plugins", "errors", errs)
Expand All @@ -100,10 +80,11 @@ func (b *LogAgentBuilder) Build() (*LogAgent, error) {
return nil, errors.NewError("agent cannot be built without WithConfig or WithConfigFiles", "")
}
if len(b.configFiles) > 0 {
b.config, err = NewConfigFromGlobs(b.configFiles)
cfgs, err := NewConfigFromGlobs(b.configFiles)
if err != nil {
return nil, errors.Wrap(err, "read configs from globs")
}
b.config = cfgs
}

sampledLogger := b.logger.Desugar().WithOptions(
Expand All @@ -112,10 +93,7 @@ func (b *LogAgentBuilder) Build() (*LogAgent, error) {
}),
).Sugar()

buildContext := operator.NewBuildContext(db, sampledLogger)
if b.namespace != "" {
buildContext = buildContext.WithSubNamespace(b.namespace)
}
buildContext := operator.NewBuildContext(sampledLogger)

pipeline, err := b.config.Pipeline.BuildPipeline(buildContext, b.defaultOutput)
if err != nil {
Expand All @@ -124,7 +102,6 @@ func (b *LogAgentBuilder) Build() (*LogAgent, error) {

return &LogAgent{
pipeline: pipeline,
database: db,
SugaredLogger: b.logger,
}, nil
}
45 changes: 0 additions & 45 deletions agent/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package agent

import (
"io/ioutil"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -29,60 +27,17 @@ func TestBuildAgentSuccess(t *testing.T) {
mockCfg := Config{}
mockLogger := zap.NewNop().Sugar()
mockPluginDir := "/some/path/plugins"
mockDatabaseFile := filepath.Join(testutil.NewTempDir(t), "test.db")
mockNamespace := "mynamespace"
mockOutput := testutil.NewFakeOutput(t)

agent, err := NewBuilder(mockLogger).
WithConfig(&mockCfg).
WithPluginDir(mockPluginDir).
WithDefaultOutput(mockOutput).
WithDatabaseFile(mockDatabaseFile, mockNamespace).
Build()
require.NoError(t, err)
require.Equal(t, mockLogger, agent.SugaredLogger)
}

func TestBuildAgentFailureOnDatabase(t *testing.T) {
tempDir := testutil.NewTempDir(t)
invalidDatabaseFile := filepath.Join(tempDir, "test.db")
err := ioutil.WriteFile(invalidDatabaseFile, []byte("invalid"), 0755)
require.NoError(t, err)

mockCfg := Config{}
mockLogger := zap.NewNop().Sugar()
mockPluginDir := "/some/path/plugins"
mockDatabaseFile := invalidDatabaseFile
mockNamespace := "mynamespace"
mockOutput := testutil.NewFakeOutput(t)

agent, err := NewBuilder(mockLogger).
WithConfig(&mockCfg).
WithPluginDir(mockPluginDir).
WithDatabaseFile(mockDatabaseFile, mockNamespace).
WithDefaultOutput(mockOutput).
Build()
require.Error(t, err)
require.Nil(t, agent)
}

func TestBuildAgentFailureOnNamespace(t *testing.T) {
mockCfg := Config{}
mockLogger := zap.NewNop().Sugar()
mockPluginDir := "/some/path/plugins"
mockDatabaseFile := filepath.Join(testutil.NewTempDir(t), "test.db")
mockNamespace := ""
mockOutput := testutil.NewFakeOutput(t)

_, err := NewBuilder(mockLogger).
WithConfig(&mockCfg).
WithPluginDir(mockPluginDir).
WithDatabaseFile(mockDatabaseFile, mockNamespace).
WithDefaultOutput(mockOutput).
Build()
require.Error(t, err)
}

func TestBuildAgentFailureOnPluginRegistry(t *testing.T) {
mockCfg := Config{}
mockLogger := zap.NewNop().Sugar()
Expand Down
77 changes: 0 additions & 77 deletions database/database.go

This file was deleted.

Loading