From 90a0bdcba92d88cce54da686efa51401f9d733e1 Mon Sep 17 00:00:00 2001 From: Tim Mwangi Date: Fri, 20 Oct 2023 09:11:37 -0700 Subject: [PATCH] feat: custom changes in configgrpc, confighttp and service --- FORK.md | 25 ++++++++++ config/configgrpc/configgrpc.go | 30 ++++++++++-- config/configgrpc/configgrpc_test.go | 19 ++++---- .../configgrpcclientdialoptionhandler_test.go | 46 +++++++++++++++++++ config/confighttp/confighttp.go | 4 +- config/confighttp/confighttp_test.go | 13 +++--- otelcol/collector.go | 12 +++++ 7 files changed, 127 insertions(+), 22 deletions(-) create mode 100644 FORK.md create mode 100644 config/configgrpc/configgrpcclientdialoptionhandler_test.go diff --git a/FORK.md b/FORK.md new file mode 100644 index 00000000000..3eb3993079b --- /dev/null +++ b/FORK.md @@ -0,0 +1,25 @@ +# How this fork works + +This fork was created to be able to early patch open-telemetry/opentelemetry-collector versions in cases it is hard to get the change in upstream right away. + +## How do we consume this library + +For every opentelemetry-collector release we create a new release including our own patches. For example for version v0.49.0 we in open-telemetry/opentelemetry-collector we will crease v0.49.0+patches. This make sure we stick to a version in our downstream dependencies. + +Whenever we need a new release on this repository we rebase the branch `latest+patches` version against the new release for open-telemetry/opentelemetry-collector and then get a new release. For example on version `v0.49.0` (asuming `origin` is `git@github.com:hypertrace/opentelemetry-collector.git` and `upstream` is `git@github.com:open-telemetry/opentelemetry-collector.git`): + +```bash +git fetch --all +git checkout latest+patches +git pull --rebase upstream refs/tags/v0.49.0 +# make lint test +git tag -a "v0.49.0+patches" -m "Release v0.49.0" +git push --tags +``` + +## What custom changes are in here +- In `config/configgrpc/configgrpc.go` we added the ability to add extra `ClientDialOptionHandler`. Also a unit test for this in `config/configgrpc/configgrpcclientdialoptionhandler_test.go`. Also commented out warning on servers starting `UnspecifiedHost` aka `0.0.0.0`. +- In `config/configgrpc/configgrpc_test.go` we commented a unit test checking for a warning on servers starting `UnspecifiedHost` aka `0.0.0.0`. +- In `config/confighttp/confighttp.go` we commented out warning on servers starting `UnspecifiedHost`. +- In ` config/confighttp/confighttp_test.go` we commented a unit test checking for a warning on servers starting `UnspecifiedHost`. +- In `service/collector.go` we added a `ConfigPostProcessor` interface so that we can intercept the final config and do more changes in the factories or pipelines. \ No newline at end of file diff --git a/config/configgrpc/configgrpc.go b/config/configgrpc/configgrpc.go index 18526262317..5a6ae820871 100644 --- a/config/configgrpc/configgrpc.go +++ b/config/configgrpc/configgrpc.go @@ -31,12 +31,15 @@ import ( "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configtls" - "go.opentelemetry.io/collector/config/internal" "go.opentelemetry.io/collector/extension/auth" ) var errMetadataNotFound = errors.New("no request metadata found") +type ClientDialOptionHandler func() grpc.DialOption + +var clientOptionHandlerList = make([]ClientDialOptionHandler, 0) + // KeepaliveClientConfig exposes the keepalive.ClientParameters to be used by the exporter. // Refer to the original data-structure for the meaning of each parameter: // https://godoc.org/google.golang.org/grpc/keepalive#ClientParameters @@ -88,6 +91,9 @@ type ClientConfig struct { // Auth configuration for outgoing RPCs. Auth *configauth.Authentication `mapstructure:"auth"` + + // SkipGlobalClientOption defines config if the global client interceptors need to be used + SkipGlobalClientOption bool `mapstructure:"skip_global_client_option"` } // KeepaliveServerConfig is the configuration for keepalive. @@ -260,6 +266,15 @@ func (gcs *ClientConfig) toDialOptions(host component.Host, settings component.T // Enable OpenTelemetry observability plugin. opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelOpts...))) + if !gcs.SkipGlobalClientOption { + for _, handler := range clientOptionHandlerList { + opt := handler() + if opt != nil { + opts = append(opts, opt) + } + } + } + return opts, nil } @@ -284,10 +299,11 @@ func (gss *ServerConfig) ToServerContext(_ context.Context, host component.Host, } func (gss *ServerConfig) toServerOption(host component.Host, settings component.TelemetrySettings) ([]grpc.ServerOption, error) { - switch gss.NetAddr.Transport { - case "tcp", "tcp4", "tcp6", "udp", "udp4", "udp6": - internal.WarnOnUnspecifiedHost(settings.Logger, gss.NetAddr.Endpoint) - } + // Warning commented out for now. ENG-27501 + // switch gss.NetAddr.Transport { + // case "tcp", "tcp4", "tcp6", "udp", "udp4", "udp6": + // internal.WarnOnUnspecifiedHost(settings.Logger, gss.NetAddr.Endpoint) + // } var opts []grpc.ServerOption @@ -451,3 +467,7 @@ func authStreamServerInterceptor(srv any, stream grpc.ServerStream, _ *grpc.Stre return handler(srv, wrapServerStream(ctx, stream)) } + +func RegisterClientDialOptionHandlers(handlers ...ClientDialOptionHandler) { + clientOptionHandlerList = append(clientOptionHandlerList, handlers...) +} diff --git a/config/configgrpc/configgrpc_test.go b/config/configgrpc/configgrpc_test.go index 7d646af2b3e..bd952d9f21e 100644 --- a/config/configgrpc/configgrpc_test.go +++ b/config/configgrpc/configgrpc_test.go @@ -407,15 +407,16 @@ func TestGRPCServerWarning(t *testing.T) { settings ServerConfig len int }{ - { - settings: ServerConfig{ - NetAddr: confignet.AddrConfig{ - Endpoint: "0.0.0.0:1234", - Transport: "tcp", - }, - }, - len: 1, - }, + // Test commented out for now. ENG-27501 + // { + // settings: ServerConfig{ + // NetAddr: confignet.NetAddr{ + // Endpoint: "0.0.0.0:1234", + // Transport: "tcp", + // }, + // }, + // len: 1, + // }, { settings: ServerConfig{ NetAddr: confignet.AddrConfig{ diff --git a/config/configgrpc/configgrpcclientdialoptionhandler_test.go b/config/configgrpc/configgrpcclientdialoptionhandler_test.go new file mode 100644 index 00000000000..8e185ca4bd4 --- /dev/null +++ b/config/configgrpc/configgrpcclientdialoptionhandler_test.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// This is a test added by us. +package configgrpc + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/obsreport/obsreporttest" +) + +func TestRegisterClientDialOptionHandler(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(component.NewID("component")) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + gcs := &GRPCClientSettings{} + opts, err := gcs.toDialOptions( + &mockHost{ext: map[component.ID]extension.Extension{}}, + tt.TelemetrySettings, + ) + require.NoError(t, err) + + defaultOptsLen := len(opts) + + RegisterClientDialOptionHandlers(func() grpc.DialOption { + return grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + return invoker(ctx, method, req, reply, cc, opts...) + }) + }) + gcs = &GRPCClientSettings{} + opts, err = gcs.toDialOptions( + &mockHost{ext: map[component.ID]extension.Extension{}}, + tt.TelemetrySettings, + ) + assert.NoError(t, err) + assert.Len(t, opts, defaultOptsLen+1) +} diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 81a45e4de5d..bbeb4e24101 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -23,7 +23,6 @@ import ( "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configtls" - "go.opentelemetry.io/collector/config/internal" "go.opentelemetry.io/collector/extension/auth" ) @@ -327,7 +326,8 @@ func WithDecoder(key string, dec func(body io.ReadCloser) (io.ReadCloser, error) // ToServer creates an http.Server from settings object. func (hss *ServerConfig) ToServer(host component.Host, settings component.TelemetrySettings, handler http.Handler, opts ...ToServerOption) (*http.Server, error) { - internal.WarnOnUnspecifiedHost(settings.Logger, hss.Endpoint) + // Warning commented out for now. ENG-27501 + // internal.WarnOnUnspecifiedHost(settings.Logger, hss.Endpoint) serverOpts := &toServerOptions{} for _, o := range opts { diff --git a/config/confighttp/confighttp_test.go b/config/confighttp/confighttp_test.go index fc90f8f7caf..6473e62ada8 100644 --- a/config/confighttp/confighttp_test.go +++ b/config/confighttp/confighttp_test.go @@ -535,12 +535,13 @@ func TestHTTPServerWarning(t *testing.T) { settings ServerConfig len int }{ - { - settings: ServerConfig{ - Endpoint: "0.0.0.0:0", - }, - len: 1, - }, + // Test commented out for now. ENG-27501 + // { + // settings: ServerConfig{ + // Endpoint: "0.0.0.0:0", + // }, + // len: 1, + // }, { settings: ServerConfig{ Endpoint: "127.0.0.1:0", diff --git a/otelcol/collector.go b/otelcol/collector.go index 3fd5cb2ca31..27d84bd0693 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -153,6 +153,13 @@ func (col *Collector) Shutdown() { } } +// ConfigPostProcessor allows to intercept the final config and do changes in the factories or +// pipelines. For further information refer to the issue: +// https://github.com/open-telemetry/opentelemetry-collector/issues/3023 +type ConfigPostProcessor interface { + Process(c *Config) +} + // setupConfigurationComponents loads the config and starts the components. If all the steps succeeds it // sets the col.service with the service currently running. func (col *Collector) setupConfigurationComponents(ctx context.Context) error { @@ -182,6 +189,11 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { return fmt.Errorf("invalid configuration: %w", err) } + if postProcessor, ok := col.set.ConfigProvider.(ConfigPostProcessor); ok { + // Here we modify the config to be able to manipulate processors and pipelines + postProcessor.Process(cfg) + } + col.service, err = service.New(ctx, service.Settings{ BuildInfo: col.set.BuildInfo, CollectorConf: conf,