Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg/stanza] Use storage extension only if specified #13418

Merged
merged 4 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions extension/storage/storagetest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,18 @@ func (p *TestClient) Close(_ context.Context) error {
return os.WriteFile(p.storageFile, contents, os.FileMode(0600))
}

// Kind of component that is using the storage client
func (p *TestClient) Kind() component.Kind {
return p.kind
}
const clientCreatorID = "client_creator_id"

// ID of component that is using the storage client
func (p *TestClient) ID() config.ComponentID {
return p.id
func setCreatorID(ctx context.Context, client storage.Client, creatorID config.ComponentID) error {
return client.Set(ctx, clientCreatorID, []byte(creatorID.String()))
}

// Name assigned to the storage client
func (p *TestClient) Name() string {
return p.name
// CreatorID is the config.ComponentID of the extension that created the component
func CreatorID(ctx context.Context, client storage.Client) (config.ComponentID, error) {
idBytes, err := client.Get(ctx, clientCreatorID)
if err != nil || idBytes == nil {
return config.ComponentID{}, err
}

return config.NewComponentIDFromString(string(idBytes))
}
25 changes: 17 additions & 8 deletions extension/storage/storagetest/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,29 @@ var testStorageType config.Type = "test_storage"
type TestStorage struct {
config.ExtensionSettings
storageDir string
clients []*TestClient
}

// Ensure this storage extension implements the appropriate interface
var _ storage.Extension = (*TestStorage)(nil)

func NewStorageID(name string) config.ComponentID {
return config.NewComponentIDWithName(testStorageType, name)
}

// NewInMemoryStorageExtension creates a TestStorage extension
func NewInMemoryStorageExtension(name string) *TestStorage {
return &TestStorage{
ExtensionSettings: config.NewExtensionSettings(
config.NewComponentIDWithName(testStorageType, name),
NewStorageID(name),
),
clients: []*TestClient{},
}
}

// NewFileBackedStorageExtension creates a TestStorage extension
func NewFileBackedStorageExtension(name string, storageDir string) *TestStorage {
return &TestStorage{
ExtensionSettings: config.NewExtensionSettings(
config.NewComponentIDWithName(testStorageType, name),
NewStorageID(name),
),
storageDir: storageDir,
}
Expand All @@ -65,11 +67,14 @@ func (s *TestStorage) Shutdown(ctx context.Context) error {
}

// GetClient returns a storage client for an individual component
func (s *TestStorage) GetClient(_ context.Context, kind component.Kind, ent config.ComponentID, name string) (storage.Client, error) {
func (s *TestStorage) GetClient(ctx context.Context, kind component.Kind, ent config.ComponentID, name string) (storage.Client, error) {
var client *TestClient
if s.storageDir == "" {
return NewInMemoryClient(kind, ent, name), nil
client = NewInMemoryClient(kind, ent, name)
} else {
client = NewFileBackedClient(kind, ent, name, s.storageDir)
}
return NewFileBackedClient(kind, ent, name, s.storageDir), nil
return client, setCreatorID(ctx, client, s.ID())
}

var nonStorageType config.Type = "non_storage"
Expand All @@ -83,11 +88,15 @@ type NonStorage struct {
// Ensure this extension implements the appropriate interface
var _ component.Extension = (*NonStorage)(nil)

func NewNonStorageID(name string) config.ComponentID {
return config.NewComponentIDWithName(nonStorageType, name)
}

// NewNonStorageExtension creates a NonStorage extension
func NewNonStorageExtension(name string) *NonStorage {
return &NonStorage{
ExtensionSettings: config.NewExtensionSettings(
config.NewComponentIDWithName(nonStorageType, name),
NewNonStorageID(name),
),
}
}
Expand Down
16 changes: 15 additions & 1 deletion extension/storage/storagetest/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ import (
"go.opentelemetry.io/collector/extension/experimental/storage"
)

func TestID(t *testing.T) {
require.Equal(t, NewStorageID("test"), NewInMemoryStorageExtension("test").ID())
require.Equal(t, NewStorageID("test"), NewFileBackedStorageExtension("test", t.TempDir()).ID())
require.Equal(t, NewNonStorageID("test"), NewNonStorageExtension("test").ID())
}

func TestInMemoryLifecycle(t *testing.T) {
ext := NewInMemoryStorageExtension("test")
require.Equal(t, config.NewComponentIDWithName(testStorageType, "test"), ext.ID())
Expand All @@ -38,13 +44,17 @@ func TestFileBackedLifecycle(t *testing.T) {
runExtensionLifecycle(t, ext, true)
}

func runExtensionLifecycle(t *testing.T, ext storage.Extension, expectPersistence bool) {
func runExtensionLifecycle(t *testing.T, ext *TestStorage, expectPersistence bool) {
ctx := context.Background()
require.NoError(t, ext.Start(ctx, componenttest.NewNopHost()))

clientOne, err := ext.GetClient(ctx, component.KindProcessor, config.NewComponentID("foo"), "client_one")
require.NoError(t, err)

creatorID, err := CreatorID(ctx, clientOne)
require.NoError(t, err)
require.Equal(t, ext.ID(), creatorID)

// Write a value, confirm it is saved
require.NoError(t, clientOne.Set(ctx, "foo", []byte("bar")))
fooVal, err := clientOne.Get(ctx, "foo")
Expand All @@ -70,6 +80,10 @@ func runExtensionLifecycle(t *testing.T, ext storage.Extension, expectPersistenc
clientTwo, err := ext.GetClient(ctx, component.KindProcessor, config.NewComponentID("foo"), "client_one")
require.NoError(t, err)

creatorID, err = CreatorID(ctx, clientTwo)
require.NoError(t, err)
require.Equal(t, ext.ID(), creatorID)

// Check if the value is accessible from another client
fooVal, err = clientTwo.Get(ctx, "foo2")
require.NoError(t, err)
Expand Down
5 changes: 5 additions & 0 deletions extension/storage/storagetest/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func NewStorageHost() *StorageHost {
}
}

func (h *StorageHost) WithExtension(id config.ComponentID, ext component.Extension) *StorageHost {
h.extensions[id] = ext
return h
}

func (h *StorageHost) WithInMemoryStorageExtension(name string) *StorageHost {
ext := NewInMemoryStorageExtension(name)
h.extensions[ext.ID()] = ext
Expand Down
5 changes: 3 additions & 2 deletions pkg/stanza/adapter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (
// BaseConfig is the common configuration of a stanza-based receiver
type BaseConfig struct {
config.ReceiverSettings `mapstructure:",squash"`
Operators OperatorConfigs `mapstructure:"operators"`
Converter ConverterConfig `mapstructure:"converter"`
Operators OperatorConfigs `mapstructure:"operators"`
Converter ConverterConfig `mapstructure:"converter"`
StorageID *config.ComponentID `mapstructure:"storage"`
}

// OperatorConfigs is an alias that allows for unmarshaling outside of mapstructure
Expand Down
1 change: 1 addition & 0 deletions pkg/stanza/adapter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func createLogsReceiver(logReceiverType LogReceiverType) component.CreateLogsRec
logger: params.Logger,
converter: converter,
obsrecv: obsrecv,
storageID: baseCfg.StorageID,
}, nil
}
}
14 changes: 8 additions & 6 deletions pkg/stanza/adapter/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ type receiver struct {
wg sync.WaitGroup
cancel context.CancelFunc

pipe pipeline.Pipeline
emitter *LogEmitter
consumer consumer.Logs
pipe pipeline.Pipeline
emitter *LogEmitter
consumer consumer.Logs
converter *Converter
logger *zap.Logger
obsrecv *obsreport.Receiver

storageID *config.ComponentID
storageClient storage.Client
converter *Converter
logger *zap.Logger
obsrecv *obsreport.Receiver
}

// Ensure this receiver adheres to required interface
Expand Down
32 changes: 15 additions & 17 deletions pkg/stanza/adapter/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,37 @@ package adapter // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"context"
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/extension/experimental/storage"
)

func GetStorageClient(ctx context.Context, id config.ComponentID, componentKind component.Kind, host component.Host) (storage.Client, error) {
var storageExtension storage.Extension
if host != nil {
for _, ext := range host.GetExtensions() {
if se, ok := ext.(storage.Extension); ok {
if storageExtension != nil {
return nil, errors.New("multiple storage extensions found")
}
storageExtension = se
}
}
func GetStorageClient(ctx context.Context, host component.Host, storageID *config.ComponentID, componentID config.ComponentID) (storage.Client, error) {
if storageID == nil {
return storage.NewNopClient(), nil
}

if storageExtension == nil {
return storage.NewNopClient(), nil
extension, ok := host.GetExtensions()[*storageID]
if !ok {
return nil, fmt.Errorf("storage extension '%s' not found", storageID)
}

storageExtension, ok := extension.(storage.Extension)
if !ok {
return nil, fmt.Errorf("non-storage extension '%s' found", storageID)
}

return storageExtension.GetClient(ctx, componentKind, id, "")
return storageExtension.GetClient(ctx, component.KindReceiver, componentID, "")

}

func (r *receiver) setStorageClient(ctx context.Context, host component.Host) error {
client, err := GetStorageClient(ctx, r.id, component.KindReceiver, host)
client, err := GetStorageClient(ctx, host, r.storageID, r.id)
if err != nil {
return err
}

r.storageClient = client
return nil
}
55 changes: 45 additions & 10 deletions pkg/stanza/adapter/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest"
)

func TestStorage(t *testing.T) {
ctx := context.Background()
r := createReceiver(t)

storageExt := storagetest.NewFileBackedStorageExtension("test", t.TempDir())
host := storagetest.NewStorageHost().
WithFileBackedStorageExtension("test", t.TempDir())
WithExtension(storageExt.ID(), storageExt)

id := storageExt.ID()
r := createReceiver(t, id)
require.NoError(t, r.Start(ctx, host))

myBytes := []byte("my_value")

require.NoError(t, r.storageClient.Set(ctx, "key", myBytes))
val, err := r.storageClient.Get(ctx, "key")
require.NoError(t, err)
Expand All @@ -46,7 +50,7 @@ func TestStorage(t *testing.T) {
require.NoError(t, e.Shutdown(ctx))
}

r = createReceiver(t)
r = createReceiver(t, id)
err = r.Start(ctx, host)
require.NoError(t, err)

Expand All @@ -70,17 +74,47 @@ func TestStorage(t *testing.T) {
require.Equal(t, "client closed", err.Error())
}

func TestFailOnMultipleStorageExtensions(t *testing.T) {
r := createReceiver(t)
func TestFindCorrectStorageExtension(t *testing.T) {
correctStoragedExt := storagetest.NewInMemoryStorageExtension("want")
id := correctStoragedExt.ID()
r := createReceiver(t, id)
host := storagetest.NewStorageHost().
WithInMemoryStorageExtension("one").
WithInMemoryStorageExtension("two")
WithNonStorageExtension("one").
WithFileBackedStorageExtension("foo", t.TempDir()).
WithExtension(id, correctStoragedExt).
WithFileBackedStorageExtension("bar", t.TempDir()).
WithNonStorageExtension("two")

err := r.Start(context.Background(), host)
require.NoError(t, err)
require.NotNil(t, r.storageClient)

clientCreatorID, err := storagetest.CreatorID(context.Background(), r.storageClient)
require.NoError(t, err)
require.Equal(t, id, clientCreatorID)
}

func TestFailOnMissingStorageExtension(t *testing.T) {
id := config.NewComponentIDWithName("test", "missing")
r := createReceiver(t, id)
err := r.Start(context.Background(), storagetest.NewStorageHost())
require.Error(t, err)
require.Equal(t, "storage client: storage extension 'test/missing' not found", err.Error())
}

func TestFailOnNonStorageExtension(t *testing.T) {
nonStorageExt := storagetest.NewNonStorageExtension("non")
id := nonStorageExt.ID()
r := createReceiver(t, id)
host := storagetest.NewStorageHost().
WithExtension(id, nonStorageExt)

err := r.Start(context.Background(), host)
require.Error(t, err)
require.Equal(t, "storage client: multiple storage extensions found", err.Error())
require.Equal(t, "storage client: non-storage extension 'non_storage/non' found", err.Error())
}

func createReceiver(t *testing.T) *receiver {
func createReceiver(t *testing.T, storageID config.ComponentID) *receiver {
params := component.ReceiverCreateSettings{
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
}
Expand All @@ -97,5 +131,6 @@ func createReceiver(t *testing.T) *receiver {

r, ok := logsReceiver.(*receiver)
require.True(t, ok)
r.storageID = &storageID
return r
}
3 changes: 2 additions & 1 deletion receiver/filelogreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Tails and parses logs from files.
| `exclude` | [] | A list of file glob patterns to exclude from reading |
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end` |
| `multiline` | | A `multiline` configuration block. See below for more details |
| `force_flush_period` | `500ms` | Time since last read of data from file, after which currently buffered log should be send to pipeline. Takes [duration](../../pkg/stanza/docs/types/duration.md) as value. Zero means waiting for new data forever |
| `force_flush_period` | `500ms` | Time since last read of data from file, after which currently buffered log should be send to pipeline. Takes `time.Duration` (e.g. `10s`, `1m`, or `500ms`) as value. Zero means waiting for new data forever |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options |
| `include_file_name` | `true` | Whether to add the file name as the attribute `log.file.name`. |
| `include_file_path` | `false` | Whether to add the file path as the attribute `log.file.path`. |
Expand All @@ -30,6 +30,7 @@ Tails and parses logs from files.
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details |
| `converter` | <pre lang="jsonp">{<br> max_flush_count: 100,<br> flush_interval: 100ms,<br> worker_count: max(1,runtime.NumCPU()/4)<br>}</pre> | A map of `key: value` pairs to configure the [`entry.Entry`][entry_link] to [`plog.LogRecord`][pdata_logrecord_link] converter, more info can be found [here][converter_link] |
| `storage` | | The ID of a storage extension. The extension will be used to store file checkpoints, which allows the receiver to pick up where it left off in the case of a collector restart. |

[entry_link]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/entry/entry.go
[pdata_logrecord_link]: https://github.com/open-telemetry/opentelemetry-collector/blob/v0.40.0/model/pdata/generated_log.go#L553-L564
Expand Down
Loading