Skip to content

Commit

Permalink
feat: custom changes in configgrpc, confighttp and service
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-mwangi committed Mar 14, 2024
1 parent af71d1d commit 90a0bdc
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 22 deletions.
25 changes: 25 additions & 0 deletions FORK.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# How this fork works

This fork was created to be able to early patch open-telemetry/opentelemetry-collector versions in cases it is hard to get the change in upstream right away.

## How do we consume this library

For every opentelemetry-collector release we create a new release including our own patches. For example for version v0.49.0 we in open-telemetry/opentelemetry-collector we will crease v0.49.0+patches. This make sure we stick to a version in our downstream dependencies.

Whenever we need a new release on this repository we rebase the branch `latest+patches` version against the new release for open-telemetry/opentelemetry-collector and then get a new release. For example on version `v0.49.0` (asuming `origin` is `[email protected]:hypertrace/opentelemetry-collector.git` and `upstream` is `[email protected]:open-telemetry/opentelemetry-collector.git`):

```bash
git fetch --all
git checkout latest+patches
git pull --rebase upstream refs/tags/v0.49.0
# make lint test
git tag -a "v0.49.0+patches" -m "Release v0.49.0"
git push --tags
```

## What custom changes are in here
- In `config/configgrpc/configgrpc.go` we added the ability to add extra `ClientDialOptionHandler`. Also a unit test for this in `config/configgrpc/configgrpcclientdialoptionhandler_test.go`. Also commented out warning on servers starting `UnspecifiedHost` aka `0.0.0.0`.
- In `config/configgrpc/configgrpc_test.go` we commented a unit test checking for a warning on servers starting `UnspecifiedHost` aka `0.0.0.0`.
- In `config/confighttp/confighttp.go` we commented out warning on servers starting `UnspecifiedHost`.
- In ` config/confighttp/confighttp_test.go` we commented a unit test checking for a warning on servers starting `UnspecifiedHost`.
- In `service/collector.go` we added a `ConfigPostProcessor` interface so that we can intercept the final config and do more changes in the factories or pipelines.
30 changes: 25 additions & 5 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ import (
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/config/internal"
"go.opentelemetry.io/collector/extension/auth"
)

var errMetadataNotFound = errors.New("no request metadata found")

type ClientDialOptionHandler func() grpc.DialOption

var clientOptionHandlerList = make([]ClientDialOptionHandler, 0)

// KeepaliveClientConfig exposes the keepalive.ClientParameters to be used by the exporter.
// Refer to the original data-structure for the meaning of each parameter:
// https://godoc.org/google.golang.org/grpc/keepalive#ClientParameters
Expand Down Expand Up @@ -88,6 +91,9 @@ type ClientConfig struct {

// Auth configuration for outgoing RPCs.
Auth *configauth.Authentication `mapstructure:"auth"`

// SkipGlobalClientOption defines config if the global client interceptors need to be used
SkipGlobalClientOption bool `mapstructure:"skip_global_client_option"`
}

// KeepaliveServerConfig is the configuration for keepalive.
Expand Down Expand Up @@ -260,6 +266,15 @@ func (gcs *ClientConfig) toDialOptions(host component.Host, settings component.T
// Enable OpenTelemetry observability plugin.
opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelOpts...)))

if !gcs.SkipGlobalClientOption {
for _, handler := range clientOptionHandlerList {
opt := handler()
if opt != nil {
opts = append(opts, opt)
}
}
}

return opts, nil
}

Expand All @@ -284,10 +299,11 @@ func (gss *ServerConfig) ToServerContext(_ context.Context, host component.Host,
}

func (gss *ServerConfig) toServerOption(host component.Host, settings component.TelemetrySettings) ([]grpc.ServerOption, error) {
switch gss.NetAddr.Transport {
case "tcp", "tcp4", "tcp6", "udp", "udp4", "udp6":
internal.WarnOnUnspecifiedHost(settings.Logger, gss.NetAddr.Endpoint)
}
// Warning commented out for now. ENG-27501
// switch gss.NetAddr.Transport {
// case "tcp", "tcp4", "tcp6", "udp", "udp4", "udp6":
// internal.WarnOnUnspecifiedHost(settings.Logger, gss.NetAddr.Endpoint)
// }

var opts []grpc.ServerOption

Expand Down Expand Up @@ -451,3 +467,7 @@ func authStreamServerInterceptor(srv any, stream grpc.ServerStream, _ *grpc.Stre

return handler(srv, wrapServerStream(ctx, stream))
}

func RegisterClientDialOptionHandlers(handlers ...ClientDialOptionHandler) {
clientOptionHandlerList = append(clientOptionHandlerList, handlers...)
}
19 changes: 10 additions & 9 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,15 +407,16 @@ func TestGRPCServerWarning(t *testing.T) {
settings ServerConfig
len int
}{
{
settings: ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: "0.0.0.0:1234",
Transport: "tcp",
},
},
len: 1,
},
// Test commented out for now. ENG-27501
// {
// settings: ServerConfig{
// NetAddr: confignet.NetAddr{
// Endpoint: "0.0.0.0:1234",
// Transport: "tcp",
// },
// },
// len: 1,
// },
{
settings: ServerConfig{
NetAddr: confignet.AddrConfig{
Expand Down
46 changes: 46 additions & 0 deletions config/configgrpc/configgrpcclientdialoptionhandler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// This is a test added by us.
package configgrpc

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
)

func TestRegisterClientDialOptionHandler(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(component.NewID("component"))
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

gcs := &GRPCClientSettings{}
opts, err := gcs.toDialOptions(
&mockHost{ext: map[component.ID]extension.Extension{}},
tt.TelemetrySettings,
)
require.NoError(t, err)

defaultOptsLen := len(opts)

RegisterClientDialOptionHandlers(func() grpc.DialOption {
return grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(ctx, method, req, reply, cc, opts...)
})
})
gcs = &GRPCClientSettings{}
opts, err = gcs.toDialOptions(
&mockHost{ext: map[component.ID]extension.Extension{}},
tt.TelemetrySettings,
)
assert.NoError(t, err)
assert.Len(t, opts, defaultOptsLen+1)
}
4 changes: 2 additions & 2 deletions config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/config/internal"
"go.opentelemetry.io/collector/extension/auth"
)

Expand Down Expand Up @@ -327,7 +326,8 @@ func WithDecoder(key string, dec func(body io.ReadCloser) (io.ReadCloser, error)

// ToServer creates an http.Server from settings object.
func (hss *ServerConfig) ToServer(host component.Host, settings component.TelemetrySettings, handler http.Handler, opts ...ToServerOption) (*http.Server, error) {
internal.WarnOnUnspecifiedHost(settings.Logger, hss.Endpoint)
// Warning commented out for now. ENG-27501
// internal.WarnOnUnspecifiedHost(settings.Logger, hss.Endpoint)

serverOpts := &toServerOptions{}
for _, o := range opts {
Expand Down
13 changes: 7 additions & 6 deletions config/confighttp/confighttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,12 +535,13 @@ func TestHTTPServerWarning(t *testing.T) {
settings ServerConfig
len int
}{
{
settings: ServerConfig{
Endpoint: "0.0.0.0:0",
},
len: 1,
},
// Test commented out for now. ENG-27501
// {
// settings: ServerConfig{
// Endpoint: "0.0.0.0:0",
// },
// len: 1,
// },
{
settings: ServerConfig{
Endpoint: "127.0.0.1:0",
Expand Down
12 changes: 12 additions & 0 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ func (col *Collector) Shutdown() {
}
}

// ConfigPostProcessor allows to intercept the final config and do changes in the factories or
// pipelines. For further information refer to the issue:
// https://github.com/open-telemetry/opentelemetry-collector/issues/3023
type ConfigPostProcessor interface {
Process(c *Config)
}

// setupConfigurationComponents loads the config and starts the components. If all the steps succeeds it
// sets the col.service with the service currently running.
func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
Expand Down Expand Up @@ -182,6 +189,11 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
return fmt.Errorf("invalid configuration: %w", err)
}

if postProcessor, ok := col.set.ConfigProvider.(ConfigPostProcessor); ok {
// Here we modify the config to be able to manipulate processors and pipelines
postProcessor.Process(cfg)
}

col.service, err = service.New(ctx, service.Settings{
BuildInfo: col.set.BuildInfo,
CollectorConf: conf,
Expand Down

0 comments on commit 90a0bdc

Please sign in to comment.