Skip to content

Commit

Permalink
added dbstorage to contrib (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
Pratyush Kumar authored Nov 7, 2024
1 parent f49d403 commit 838e6c7
Show file tree
Hide file tree
Showing 11 changed files with 485 additions and 0 deletions.
77 changes: 77 additions & 0 deletions extension/opsramplogsdbstorage/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//go:build !freebsd

package opsramplogsdbstorage

import (
"context"
"errors"
"fmt"

"github.com/syndtr/goleveldb/leveldb"
"go.opentelemetry.io/collector/extension/experimental/storage"
)

type dbStorageClient struct {
prefix string
db *leveldb.DB
}

func newClient(db *leveldb.DB, prefix string) (*dbStorageClient, error) {
return &dbStorageClient{
prefix: prefix,
db: db,
}, nil
}

// Get will retrieve data from storage that corresponds to the specified key
func (c *dbStorageClient) Get(_ context.Context, key string) ([]byte, error) {
dbKey := fmt.Sprintf("%s_%s", c.prefix, key)

b, err := c.db.Get([]byte(dbKey), nil)
if err == nil {
return b, nil
}
return nil, nil
}

// Set will store data. The data can be retrieved using the same key
func (c *dbStorageClient) Set(_ context.Context, key string, value []byte) error {
dbKey := fmt.Sprintf("%s_%s", c.prefix, key)

return c.db.Put([]byte(dbKey), value, nil)
}

// Delete will delete data associated with the specified key
func (c *dbStorageClient) Delete(_ context.Context, key string) error {
dbKey := fmt.Sprintf("%s_%s", c.prefix, key)

return c.db.Delete([]byte(dbKey), nil)
}

// Batch executes the specified operations in order. Get operation results are updated in place
func (c *dbStorageClient) Batch(ctx context.Context, ops ...storage.Operation) error {
var err error
for _, op := range ops {
switch op.Type {
case storage.Get:
op.Value, err = c.Get(ctx, op.Key)
case storage.Set:
err = c.Set(ctx, op.Key, op.Value)
case storage.Delete:
err = c.Delete(ctx, op.Key)
default:
return errors.New("wrong operation type")
}

if err != nil {
return err
}
}
return err
}

// Close will close the database
func (c *dbStorageClient) Close(_ context.Context) error {
c.db.Close()
return nil
}
18 changes: 18 additions & 0 deletions extension/opsramplogsdbstorage/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package opsramplogsdbstorage

import (
"fmt"
)

// Config defines configuration for dbstorage extension.
type Config struct {
DBPath string `mapstructure:"db_path, omitempty"`
}

func (cfg *Config) Validate() error {
if cfg.DBPath == "" {
return fmt.Errorf("missing db_path")
}

return nil
}
3 changes: 3 additions & 0 deletions extension/opsramplogsdbstorage/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//go:generate mdatagen metadata.yaml

package opsramplogsdbstorage
98 changes: 98 additions & 0 deletions extension/opsramplogsdbstorage/extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//go:build !freebsd

package opsramplogsdbstorage

import (
"context"
"errors"
"fmt"
"os"
"strings"

"go.opentelemetry.io/collector/extension"

"github.com/syndtr/goleveldb/leveldb"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.uber.org/zap"
)

type databaseStorage struct {
dbPath string
logger *zap.Logger
db *leveldb.DB
}

// Ensure this storage extension implements the appropriate interface
var _ storage.Extension = (*databaseStorage)(nil)

func newDBStorage(logger *zap.Logger, config *Config) (extension.Extension, error) {
return &databaseStorage{
dbPath: config.DBPath,
logger: logger,
}, nil
}

// Start opens a connection to the database
func (ds *databaseStorage) Start(context.Context, component.Host) error {
db, err := leveldb.OpenFile(ds.dbPath, nil)
if err != nil {
return err
}

ds.db = db
return nil
}

// Shutdown closes the connection to the database
func (ds *databaseStorage) Shutdown(context.Context) (shutdownErr error) {
defer func() {
if panicErr := recover(); panicErr != nil {
shutdownErr = fmt.Errorf("handiling panic error: %v", panicErr)
// remove ds.dbpath
if err := os.RemoveAll(ds.dbPath); err != nil {
shutdownErr = fmt.Errorf("handiling panic error: %v and Directory %v removing error: %v", panicErr, ds.dbPath, err)
}
}
}()
// ds.db nil return error
if ds.db == nil {
if err := os.RemoveAll(ds.dbPath); err != nil {
shutdownErr = fmt.Errorf("directory %v removing error: %v", ds.dbPath, err)
}
return shutdownErr
}
shutdownErr = ds.db.Close()
if errors.Is(shutdownErr, leveldb.ErrClosed) {
return nil
}

return shutdownErr
}

// GetClient returns a storage client for an individual component
func (ds *databaseStorage) GetClient(_ context.Context, kind component.Kind, ent component.ID, name string) (storage.Client, error) {
var fullName string
if name == "" {
fullName = fmt.Sprintf("%s_%s_%s", kindString(kind), ent.Type(), ent.Name())
} else {
fullName = fmt.Sprintf("%s_%s_%s_%s", kindString(kind), ent.Type(), ent.Name(), name)
}
fullName = strings.ReplaceAll(fullName, " ", "")
return newClient(ds.db, fullName)
}

func kindString(k component.Kind) string {
switch k {
case component.KindReceiver:
return "receiver"
case component.KindProcessor:
return "processor"
case component.KindExporter:
return "exporter"
case component.KindExtension:
return "extension"
default:
return "other" // not expected
}
}
34 changes: 34 additions & 0 deletions extension/opsramplogsdbstorage/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//go:build !freebsd

package opsramplogsdbstorage

import (
"context"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/opsramplogsdbstorage/internal/metadata"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
)

// NewFactory creates a factory for DBStorage extension.
func NewFactory() extension.Factory {
return extension.NewFactory(
metadata.Type,
createDefaultConfig,
createExtension,
metadata.LogsStability,
)
}

func createDefaultConfig() component.Config {
return &Config{}
}

func createExtension(
_ context.Context,
params extension.Settings,
cfg component.Config,
) (extension.Extension, error) {
return newDBStorage(params.Logger, cfg.(*Config))
}
38 changes: 38 additions & 0 deletions extension/opsramplogsdbstorage/generated_component_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions extension/opsramplogsdbstorage/generated_package_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions extension/opsramplogsdbstorage/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/extension/opsramplogsdbstorage

go 1.22.0

require (
github.com/stretchr/testify v1.9.0
github.com/syndtr/goleveldb v1.0.0
go.opentelemetry.io/collector/component v0.113.0
go.opentelemetry.io/collector/confmap v1.19.0
go.opentelemetry.io/collector/extension v0.113.0
go.opentelemetry.io/collector/extension/experimental/storage v0.113.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.1 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.113.0 // indirect
go.opentelemetry.io/collector/pdata v1.19.0 // indirect
go.opentelemetry.io/otel v1.31.0 // indirect
go.opentelemetry.io/otel/metric v1.31.0 // indirect
go.opentelemetry.io/otel/sdk v1.31.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
google.golang.org/grpc v1.67.1 // indirect
google.golang.org/protobuf v1.35.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 838e6c7

Please sign in to comment.