Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/jaegerremotesampling] Tie in the strategy storages #8818

Merged
merged 3 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
generated code (#5270)
- Add `make crosslink` target to ensure replace statements are included in `go.mod` for all transitive dependencies within repository (#8822)
- `filestorageextension`: Change bbolt DB settings for better performance (#9004)
- `jaegerremotesamplingextension`: Add local and remote sampling stores (#8818)

### 🛑 Breaking changes 🛑

Expand Down
15 changes: 10 additions & 5 deletions extension/jaegerremotesampling/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,26 @@ Note that the port `14250` will clash with the Jaeger Receiver. When both are us

Although this extension is derived from Jaeger, it can be used by any clients who can consume this standard, such as the [OpenTelemetry Java SDK](https://github.com/open-telemetry/opentelemetry-java/tree/v1.9.1/sdk-extensions/jaeger-remote-sampler).

At this moment, the `reload_interval` option is only effective for the `file` source. In the future, this property will be used to control a local cache for a `remote` source.

The `file` source can be used to load files from the local file system or from remote HTTP/S sources. The `remote` source must be used with a gRPC server that provides a Jaeger remote sampling service.

## Configuration

```yaml
extensions:
jaegerremotesampling:
grpc:
pmm-sumo marked this conversation as resolved.
Show resolved Hide resolved
endpoint: :15251
source:
remote:
endpoint: jaeger-collector:14250
jaegerremotesampling/1:
http:
endpoint: :5878
source:
file: /etc/otel/sampling_strategies.json
reload_interval: 1s
file: /etc/otelcol/sampling_strategies.json
jaegerremotesampling/2:
source:
reload_interval: 1s
file: http://jaeger.example.com/sampling_strategies.json
```

A sampling strategy file could look like:
Expand Down
4 changes: 3 additions & 1 deletion extension/jaegerremotesampling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package jaegerremotesampling
import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -60,7 +61,8 @@ func TestLoadConfig(t *testing.T) {
HTTPServerSettings: &confighttp.HTTPServerSettings{Endpoint: ":5778"},
GRPCServerSettings: &configgrpc.GRPCServerSettings{NetAddr: confignet.NetAddr{Endpoint: ":14250"}},
Source: Source{
File: "/etc/otel/sampling_strategies.json",
ReloadInterval: time.Second,
File: "/etc/otelcol/sampling_strategies.json",
},
},
ext1)
Expand Down
86 changes: 70 additions & 16 deletions extension/jaegerremotesampling/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,102 @@ package jaegerremotesampling // import "github.com/open-telemetry/opentelemetry-

import (
"context"
"fmt"

grpcStore "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal"
)

var _ component.Extension = (*jrsExtension)(nil)

type jrsExtension struct {
httpServer component.Component
cfg *Config
telemetry component.TelemetrySettings

httpServer component.Component
samplingStore strategystore.StrategyStore

closers []func() error
}

func newExtension(cfg *Config, telemetry component.TelemetrySettings) (*jrsExtension, error) {
// TODO(jpkroehling): get a proper instance
cfgMgr := internal.NewClientConfigManager()
ext := &jrsExtension{}
func newExtension(cfg *Config, telemetry component.TelemetrySettings) *jrsExtension {
jrse := &jrsExtension{
cfg: cfg,
telemetry: telemetry,
}
return jrse
}

if cfg.HTTPServerSettings != nil {
httpServer, err := internal.NewHTTP(telemetry, *cfg.HTTPServerSettings, cfgMgr)
func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error {
// the config validation will take care of ensuring we have one and only one of the following about the
// source of the sampling config:
// - remote (gRPC)
// - local file
// we can then use a simplified logic here to assign the appropriate store
if jrse.cfg.Source.File != "" {
opts := static.Options{
StrategiesFile: jrse.cfg.Source.File,
ReloadInterval: jrse.cfg.Source.ReloadInterval,
}
ss, err := static.NewStrategyStore(opts, jrse.telemetry.Logger)
if err != nil {
return nil, err
return fmt.Errorf("failed to create the local file strategy store: %v", err)
}
ext.httpServer = httpServer

// there's a Close function on the concrete type, which is not visible to us...
// how can we close it then?
jrse.samplingStore = ss
}

return ext, nil
}
if jrse.cfg.Source.Remote != nil {
opts, err := jrse.cfg.Source.Remote.ToDialOptions(host, jrse.telemetry)
if err != nil {
return fmt.Errorf("error while setting up the remote sampling source: %v", err)
}
conn, err := grpc.Dial(jrse.cfg.Source.Remote.Endpoint, opts...)
if err != nil {
return fmt.Errorf("error while connecting to the remote sampling source: %v", err)
}

jrse.samplingStore = grpcStore.NewConfigManager(conn)
jrse.closers = append(jrse.closers, func() error {
return conn.Close()
})
}

if jrse.cfg.HTTPServerSettings != nil {
httpServer, err := internal.NewHTTP(jrse.telemetry, *jrse.cfg.HTTPServerSettings, jrse.samplingStore)
if err != nil {
return fmt.Errorf("error while creating the HTTP server: %v", err)
}
jrse.httpServer = httpServer
}

func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error {
// then we start our own server interfaces, starting with the HTTP one
err := jrse.httpServer.Start(ctx, host)
if err != nil {
return err
return fmt.Errorf("error while starting the HTTP server: %v", err)
}

return nil
}

func (jrse *jrsExtension) Shutdown(ctx context.Context) error {
err := jrse.httpServer.Shutdown(ctx)
if err != nil {
return err
// we probably don't want to break whenever an error occurs, we want to continue and close the other resources
if err := jrse.httpServer.Shutdown(ctx); err != nil {
jrse.telemetry.Logger.Error("error while shutting down the HTTP server", zap.Error(err))
}

for _, closer := range jrse.closers {
if err := closer(); err != nil {
jrse.telemetry.Logger.Error("error while shutting down the sampling store", zap.Error(err))
}
}

return nil
Expand Down
96 changes: 43 additions & 53 deletions extension/jaegerremotesampling/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,89 +16,79 @@ package jaegerremotesampling

import (
"context"
"errors"
"fmt"
"net"
"path/filepath"
"testing"

"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"google.golang.org/grpc"
)

func TestNewExtension(t *testing.T) {
// test
e, err := newExtension(createDefaultConfig().(*Config), componenttest.NewNopTelemetrySettings())
require.NoError(t, err)
cfg := createDefaultConfig().(*Config)
cfg.Source.File = filepath.Join("testdata", "strategy.json")
e := newExtension(cfg, componenttest.NewNopTelemetrySettings())

// verify
assert.NotNil(t, e)
}

func TestStartAndShutdown(t *testing.T) {
func TestStartAndShutdownLocalFile(t *testing.T) {
// prepare
e, err := newExtension(createDefaultConfig().(*Config), componenttest.NewNopTelemetrySettings())
cfg := createDefaultConfig().(*Config)
cfg.Source.File = filepath.Join("testdata", "strategy.json")

e := newExtension(cfg, componenttest.NewNopTelemetrySettings())
require.NotNil(t, e)
require.NoError(t, err)
require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost()))

// test and verify
assert.NoError(t, e.Shutdown(context.Background()))
}

func TestFailedToStartHTTPServer(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this test just not needed anymore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of: previously, I had the ability to provide an HTTP server, making it trivial to test a failure scenario. Now, I moved the entire initialization logic to the start function, so that an HTTP server is created there. Failure testing is now more complicated.

// prepare
errBooBoo := errors.New("the server made a boo boo")

e, err := newExtension(createDefaultConfig().(*Config), componenttest.NewNopTelemetrySettings())
require.NotNil(t, e)
func TestStartAndShutdownRemote(t *testing.T) {
// prepare the socket the mock server will listen at
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

e.httpServer = &mockComponent{
StartFunc: func(_ context.Context, _ component.Host) error {
return errBooBoo
},
// create the mock server
server := grpc.NewServer()
go func() {
err = server.Serve(lis)
require.NoError(t, err)
}()

// register the service
api_v2.RegisterSamplingManagerServer(server, &samplingServer{})

// create the config, pointing to the mock server
cfg := createDefaultConfig().(*Config)
cfg.Source.Remote = &configgrpc.GRPCClientSettings{
Endpoint: fmt.Sprintf("localhost:%d", lis.Addr().(*net.TCPAddr).Port),
WaitForReady: true,
}

// test and verify
assert.Equal(t, errBooBoo, e.Start(context.Background(), componenttest.NewNopHost()))
}

func TestFailedToShutdownHTTPServer(t *testing.T) {
// prepare
errBooBoo := errors.New("the server made a boo boo")

e, err := newExtension(createDefaultConfig().(*Config), componenttest.NewNopTelemetrySettings())
// create the extension
e := newExtension(cfg, componenttest.NewNopTelemetrySettings())
require.NotNil(t, e)
require.NoError(t, err)

e.httpServer = &mockComponent{
ShutdownFunc: func(_ context.Context) error {
return errBooBoo
},
}
require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost()))

// test and verify
assert.Equal(t, errBooBoo, e.Shutdown(context.Background()))
}

type mockComponent struct {
StartFunc func(_ context.Context, _ component.Host) error
ShutdownFunc func(_ context.Context) error
// test
assert.NoError(t, e.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, e.Shutdown(context.Background()))
}

func (s *mockComponent) Start(ctx context.Context, host component.Host) error {
if s.StartFunc == nil {
return nil
}

return s.StartFunc(ctx, host)
type samplingServer struct {
api_v2.UnimplementedSamplingManagerServer
}

func (s *mockComponent) Shutdown(ctx context.Context) error {
if s.ShutdownFunc == nil {
return nil
}

return s.ShutdownFunc(ctx)
func (s samplingServer) GetSamplingStrategy(ctx context.Context, param *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) {
return &api_v2.SamplingStrategyResponse{
StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC,
}, nil
}
3 changes: 2 additions & 1 deletion extension/jaegerremotesampling/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ func createDefaultConfig() config.Extension {
Endpoint: ":14250",
},
},
Source: Source{},
}
}

func createExtension(_ context.Context, set component.ExtensionCreateSettings, cfg config.Extension) (component.Extension, error) {
return newExtension(cfg.(*Config), set.TelemetrySettings)
return newExtension(cfg.(*Config), set.TelemetrySettings), nil
}
Loading