Skip to content

Commit

Permalink
[pkg/stanza] Use storage extension only if specified (#13418)
Browse files Browse the repository at this point in the history
* [pkg/stanza] Use storage extension only if specified

* [pkg/stanza] Use storage extension only if specified

Previous behavior allowed stanza-based receivers to automatically
detect and use a storage extension. This implicit behavior was not
clear and precluded the abilty to choose one of multiple storage
extensions. This change adds a 'storage' field to stanza-based
receivers and requires the user to name the extension that should
be used.

* Remove dead link

* Export a GetStorageClient function from pkg/stanza/adapter
  • Loading branch information
djaglowski authored Aug 31, 2022
1 parent 36681a0 commit 178e885
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 66 deletions.
21 changes: 11 additions & 10 deletions extension/storage/storagetest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,18 @@ func (p *TestClient) Close(_ context.Context) error {
return os.WriteFile(p.storageFile, contents, os.FileMode(0600))
}

// Kind of component that is using the storage client
func (p *TestClient) Kind() component.Kind {
return p.kind
}
const clientCreatorID = "client_creator_id"

// ID of component that is using the storage client
func (p *TestClient) ID() config.ComponentID {
return p.id
func setCreatorID(ctx context.Context, client storage.Client, creatorID config.ComponentID) error {
return client.Set(ctx, clientCreatorID, []byte(creatorID.String()))
}

// Name assigned to the storage client
func (p *TestClient) Name() string {
return p.name
// CreatorID is the config.ComponentID of the extension that created the component
func CreatorID(ctx context.Context, client storage.Client) (config.ComponentID, error) {
idBytes, err := client.Get(ctx, clientCreatorID)
if err != nil || idBytes == nil {
return config.ComponentID{}, err
}

return config.NewComponentIDFromString(string(idBytes))
}
25 changes: 17 additions & 8 deletions extension/storage/storagetest/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,29 @@ var testStorageType config.Type = "test_storage"
type TestStorage struct {
config.ExtensionSettings
storageDir string
clients []*TestClient
}

// Ensure this storage extension implements the appropriate interface
var _ storage.Extension = (*TestStorage)(nil)

func NewStorageID(name string) config.ComponentID {
return config.NewComponentIDWithName(testStorageType, name)
}

// NewInMemoryStorageExtension creates a TestStorage extension
func NewInMemoryStorageExtension(name string) *TestStorage {
return &TestStorage{
ExtensionSettings: config.NewExtensionSettings(
config.NewComponentIDWithName(testStorageType, name),
NewStorageID(name),
),
clients: []*TestClient{},
}
}

// NewFileBackedStorageExtension creates a TestStorage extension
func NewFileBackedStorageExtension(name string, storageDir string) *TestStorage {
return &TestStorage{
ExtensionSettings: config.NewExtensionSettings(
config.NewComponentIDWithName(testStorageType, name),
NewStorageID(name),
),
storageDir: storageDir,
}
Expand All @@ -65,11 +67,14 @@ func (s *TestStorage) Shutdown(ctx context.Context) error {
}

// GetClient returns a storage client for an individual component
func (s *TestStorage) GetClient(_ context.Context, kind component.Kind, ent config.ComponentID, name string) (storage.Client, error) {
func (s *TestStorage) GetClient(ctx context.Context, kind component.Kind, ent config.ComponentID, name string) (storage.Client, error) {
var client *TestClient
if s.storageDir == "" {
return NewInMemoryClient(kind, ent, name), nil
client = NewInMemoryClient(kind, ent, name)
} else {
client = NewFileBackedClient(kind, ent, name, s.storageDir)
}
return NewFileBackedClient(kind, ent, name, s.storageDir), nil
return client, setCreatorID(ctx, client, s.ID())
}

var nonStorageType config.Type = "non_storage"
Expand All @@ -83,11 +88,15 @@ type NonStorage struct {
// Ensure this extension implements the appropriate interface
var _ component.Extension = (*NonStorage)(nil)

func NewNonStorageID(name string) config.ComponentID {
return config.NewComponentIDWithName(nonStorageType, name)
}

// NewNonStorageExtension creates a NonStorage extension
func NewNonStorageExtension(name string) *NonStorage {
return &NonStorage{
ExtensionSettings: config.NewExtensionSettings(
config.NewComponentIDWithName(nonStorageType, name),
NewNonStorageID(name),
),
}
}
Expand Down
16 changes: 15 additions & 1 deletion extension/storage/storagetest/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ import (
"go.opentelemetry.io/collector/extension/experimental/storage"
)

func TestID(t *testing.T) {
require.Equal(t, NewStorageID("test"), NewInMemoryStorageExtension("test").ID())
require.Equal(t, NewStorageID("test"), NewFileBackedStorageExtension("test", t.TempDir()).ID())
require.Equal(t, NewNonStorageID("test"), NewNonStorageExtension("test").ID())
}

func TestInMemoryLifecycle(t *testing.T) {
ext := NewInMemoryStorageExtension("test")
require.Equal(t, config.NewComponentIDWithName(testStorageType, "test"), ext.ID())
Expand All @@ -38,13 +44,17 @@ func TestFileBackedLifecycle(t *testing.T) {
runExtensionLifecycle(t, ext, true)
}

func runExtensionLifecycle(t *testing.T, ext storage.Extension, expectPersistence bool) {
func runExtensionLifecycle(t *testing.T, ext *TestStorage, expectPersistence bool) {
ctx := context.Background()
require.NoError(t, ext.Start(ctx, componenttest.NewNopHost()))

clientOne, err := ext.GetClient(ctx, component.KindProcessor, config.NewComponentID("foo"), "client_one")
require.NoError(t, err)

creatorID, err := CreatorID(ctx, clientOne)
require.NoError(t, err)
require.Equal(t, ext.ID(), creatorID)

// Write a value, confirm it is saved
require.NoError(t, clientOne.Set(ctx, "foo", []byte("bar")))
fooVal, err := clientOne.Get(ctx, "foo")
Expand All @@ -70,6 +80,10 @@ func runExtensionLifecycle(t *testing.T, ext storage.Extension, expectPersistenc
clientTwo, err := ext.GetClient(ctx, component.KindProcessor, config.NewComponentID("foo"), "client_one")
require.NoError(t, err)

creatorID, err = CreatorID(ctx, clientTwo)
require.NoError(t, err)
require.Equal(t, ext.ID(), creatorID)

// Check if the value is accessible from another client
fooVal, err = clientTwo.Get(ctx, "foo2")
require.NoError(t, err)
Expand Down
5 changes: 5 additions & 0 deletions extension/storage/storagetest/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func NewStorageHost() *StorageHost {
}
}

func (h *StorageHost) WithExtension(id config.ComponentID, ext component.Extension) *StorageHost {
h.extensions[id] = ext
return h
}

func (h *StorageHost) WithInMemoryStorageExtension(name string) *StorageHost {
ext := NewInMemoryStorageExtension(name)
h.extensions[ext.ID()] = ext
Expand Down
5 changes: 3 additions & 2 deletions pkg/stanza/adapter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (
// BaseConfig is the common configuration of a stanza-based receiver
type BaseConfig struct {
config.ReceiverSettings `mapstructure:",squash"`
Operators OperatorConfigs `mapstructure:"operators"`
Converter ConverterConfig `mapstructure:"converter"`
Operators OperatorConfigs `mapstructure:"operators"`
Converter ConverterConfig `mapstructure:"converter"`
StorageID *config.ComponentID `mapstructure:"storage"`
}

// OperatorConfigs is an alias that allows for unmarshaling outside of mapstructure
Expand Down
1 change: 1 addition & 0 deletions pkg/stanza/adapter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func createLogsReceiver(logReceiverType LogReceiverType) component.CreateLogsRec
logger: params.Logger,
converter: converter,
obsrecv: obsrecv,
storageID: baseCfg.StorageID,
}, nil
}
}
14 changes: 8 additions & 6 deletions pkg/stanza/adapter/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ type receiver struct {
wg sync.WaitGroup
cancel context.CancelFunc

pipe pipeline.Pipeline
emitter *LogEmitter
consumer consumer.Logs
pipe pipeline.Pipeline
emitter *LogEmitter
consumer consumer.Logs
converter *Converter
logger *zap.Logger
obsrecv *obsreport.Receiver

storageID *config.ComponentID
storageClient storage.Client
converter *Converter
logger *zap.Logger
obsrecv *obsreport.Receiver
}

// Ensure this receiver adheres to required interface
Expand Down
32 changes: 15 additions & 17 deletions pkg/stanza/adapter/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,37 @@ package adapter // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"context"
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/extension/experimental/storage"
)

func GetStorageClient(ctx context.Context, id config.ComponentID, componentKind component.Kind, host component.Host) (storage.Client, error) {
var storageExtension storage.Extension
if host != nil {
for _, ext := range host.GetExtensions() {
if se, ok := ext.(storage.Extension); ok {
if storageExtension != nil {
return nil, errors.New("multiple storage extensions found")
}
storageExtension = se
}
}
func GetStorageClient(ctx context.Context, host component.Host, storageID *config.ComponentID, componentID config.ComponentID) (storage.Client, error) {
if storageID == nil {
return storage.NewNopClient(), nil
}

if storageExtension == nil {
return storage.NewNopClient(), nil
extension, ok := host.GetExtensions()[*storageID]
if !ok {
return nil, fmt.Errorf("storage extension '%s' not found", storageID)
}

storageExtension, ok := extension.(storage.Extension)
if !ok {
return nil, fmt.Errorf("non-storage extension '%s' found", storageID)
}

return storageExtension.GetClient(ctx, componentKind, id, "")
return storageExtension.GetClient(ctx, component.KindReceiver, componentID, "")

}

func (r *receiver) setStorageClient(ctx context.Context, host component.Host) error {
client, err := GetStorageClient(ctx, r.id, component.KindReceiver, host)
client, err := GetStorageClient(ctx, host, r.storageID, r.id)
if err != nil {
return err
}

r.storageClient = client
return nil
}
55 changes: 45 additions & 10 deletions pkg/stanza/adapter/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest"
)

func TestStorage(t *testing.T) {
ctx := context.Background()
r := createReceiver(t)

storageExt := storagetest.NewFileBackedStorageExtension("test", t.TempDir())
host := storagetest.NewStorageHost().
WithFileBackedStorageExtension("test", t.TempDir())
WithExtension(storageExt.ID(), storageExt)

id := storageExt.ID()
r := createReceiver(t, id)
require.NoError(t, r.Start(ctx, host))

myBytes := []byte("my_value")

require.NoError(t, r.storageClient.Set(ctx, "key", myBytes))
val, err := r.storageClient.Get(ctx, "key")
require.NoError(t, err)
Expand All @@ -46,7 +50,7 @@ func TestStorage(t *testing.T) {
require.NoError(t, e.Shutdown(ctx))
}

r = createReceiver(t)
r = createReceiver(t, id)
err = r.Start(ctx, host)
require.NoError(t, err)

Expand All @@ -70,17 +74,47 @@ func TestStorage(t *testing.T) {
require.Equal(t, "client closed", err.Error())
}

func TestFailOnMultipleStorageExtensions(t *testing.T) {
r := createReceiver(t)
func TestFindCorrectStorageExtension(t *testing.T) {
correctStoragedExt := storagetest.NewInMemoryStorageExtension("want")
id := correctStoragedExt.ID()
r := createReceiver(t, id)
host := storagetest.NewStorageHost().
WithInMemoryStorageExtension("one").
WithInMemoryStorageExtension("two")
WithNonStorageExtension("one").
WithFileBackedStorageExtension("foo", t.TempDir()).
WithExtension(id, correctStoragedExt).
WithFileBackedStorageExtension("bar", t.TempDir()).
WithNonStorageExtension("two")

err := r.Start(context.Background(), host)
require.NoError(t, err)
require.NotNil(t, r.storageClient)

clientCreatorID, err := storagetest.CreatorID(context.Background(), r.storageClient)
require.NoError(t, err)
require.Equal(t, id, clientCreatorID)
}

func TestFailOnMissingStorageExtension(t *testing.T) {
id := config.NewComponentIDWithName("test", "missing")
r := createReceiver(t, id)
err := r.Start(context.Background(), storagetest.NewStorageHost())
require.Error(t, err)
require.Equal(t, "storage client: storage extension 'test/missing' not found", err.Error())
}

func TestFailOnNonStorageExtension(t *testing.T) {
nonStorageExt := storagetest.NewNonStorageExtension("non")
id := nonStorageExt.ID()
r := createReceiver(t, id)
host := storagetest.NewStorageHost().
WithExtension(id, nonStorageExt)

err := r.Start(context.Background(), host)
require.Error(t, err)
require.Equal(t, "storage client: multiple storage extensions found", err.Error())
require.Equal(t, "storage client: non-storage extension 'non_storage/non' found", err.Error())
}

func createReceiver(t *testing.T) *receiver {
func createReceiver(t *testing.T, storageID config.ComponentID) *receiver {
params := component.ReceiverCreateSettings{
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
}
Expand All @@ -97,5 +131,6 @@ func createReceiver(t *testing.T) *receiver {

r, ok := logsReceiver.(*receiver)
require.True(t, ok)
r.storageID = &storageID
return r
}
11 changes: 8 additions & 3 deletions receiver/filelogreceiver/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,20 @@ func TestStorage(t *testing.T) {

logsDir := t.TempDir()
storageDir := t.TempDir()
extID := storagetest.NewFileBackedStorageExtension("test", storageDir).ID()

f := NewFactory()

cfg := rotationTestConfig(logsDir)
cfg.Converter.MaxFlushCount = 1
cfg.Converter.FlushInterval = time.Millisecond
cfg.Operators = nil // not testing processing, just read the lines
cfg.StorageID = &extID

logger := newRecallLogger(t, logsDir)

host := storagetest.NewStorageHost().WithFileBackedStorageExtension("test", storageDir)
ext := storagetest.NewFileBackedStorageExtension("test", storageDir)
host := storagetest.NewStorageHost().WithExtension(ext.ID(), ext)
sink := new(consumertest.LogsSink)
rcvr, err := f.CreateLogsReceiver(ctx, componenttest.NewNopReceiverCreateSettings(), cfg, sink)
require.NoError(t, err, "failed to create receiver")
Expand Down Expand Up @@ -80,7 +83,8 @@ func TestStorage(t *testing.T) {
logger.log(fmt.Sprintf(baseLog, 4))

// Start the components again
host = storagetest.NewStorageHost().WithFileBackedStorageExtension("test", storageDir)
ext = storagetest.NewFileBackedStorageExtension("test", storageDir)
host = storagetest.NewStorageHost().WithExtension(ext.ID(), ext)
rcvr, err = f.CreateLogsReceiver(ctx, componenttest.NewNopReceiverCreateSettings(), cfg, sink)
require.NoError(t, err, "failed to create receiver")
require.NoError(t, rcvr.Start(ctx, host))
Expand Down Expand Up @@ -124,7 +128,8 @@ func TestStorage(t *testing.T) {
logger.log(fmt.Sprintf(baseLog, 9))

// Start the components again
host = storagetest.NewStorageHost().WithFileBackedStorageExtension("test", storageDir)
ext = storagetest.NewFileBackedStorageExtension("test", storageDir)
host = storagetest.NewStorageHost().WithExtension(ext.ID(), ext)
rcvr, err = f.CreateLogsReceiver(ctx, componenttest.NewNopReceiverCreateSettings(), cfg, sink)
require.NoError(t, err, "failed to create receiver")
require.NoError(t, rcvr.Start(ctx, host))
Expand Down
Loading

0 comments on commit 178e885

Please sign in to comment.