Skip to content

Commit

Permalink
Updated configgrpc ToDialOptions and confighttp ToClient apis to take…
Browse files Browse the repository at this point in the history
… extensions configuration map (#3340)

This PR is a port of the configfrpc's ToDialOptions() and confighttp ToClient() from PR #3128  in a piece meal fashion. 

The following are the changes
- Refactored configgrpc.PerRPCAuth as extension implementing configauth.GrpcClientAuthenticator
- Plugged in extensions configuration to all the grpc based clients in the core (OTLP, OpenCensus, Jaeger, JaegerReceiver)
- Plugged in extensions configuration to all the HTTP based clients in the core (Zipkin, OTLPHTTP)

Link to tracking Issue:
 #3282 #3276

Testing:
Unit tests, [manual test described (for only oidc)](#3128 (comment))
  • Loading branch information
pavankrish123 authored Jun 3, 2021
1 parent dc7899b commit 5369d7e
Show file tree
Hide file tree
Showing 24 changed files with 605 additions and 110 deletions.
43 changes: 23 additions & 20 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ type GRPCClientSettings struct {
// The headers associated with gRPC requests.
Headers map[string]string `mapstructure:"headers"`

// PerRPCAuth parameter configures the client to send authentication data on a per-RPC basis.
PerRPCAuth *PerRPCAuthConfig `mapstructure:"per_rpc_auth"`

// Sets the balancer in grpclb_policy to discover the servers. Default is pick_first.
// https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md
BalancerName string `mapstructure:"balancer_name"`

// Auth configuration for outgoing RPCs.
Auth *configauth.Authentication `mapstructure:"auth,omitempty"`
}

// KeepaliveServerConfig is the configuration for keepalive.
Expand All @@ -108,15 +108,6 @@ type KeepaliveServerConfig struct {
EnforcementPolicy *KeepaliveEnforcementPolicy `mapstructure:"enforcement_policy,omitempty"`
}

// PerRPCAuthConfig specifies how the Per-RPC authentication data should be obtained.
type PerRPCAuthConfig struct {
// AuthType represents the authentication type to use. Currently, only 'bearer' is supported.
AuthType string `mapstructure:"type,omitempty"`

// BearerToken specifies the bearer token to use for every RPC.
BearerToken string `mapstructure:"bearer_token,omitempty"`
}

// KeepaliveServerParameters allow configuration of the keepalive.ServerParameters.
// The same default values as keepalive.ServerParameters are applicable and get applied by the server.
// See https://godoc.org/google.golang.org/grpc/keepalive#ServerParameters for details.
Expand Down Expand Up @@ -168,7 +159,7 @@ type GRPCServerSettings struct {
}

// ToDialOptions maps configgrpc.GRPCClientSettings to a slice of dial options for gRPC.
func (gcs *GRPCClientSettings) ToDialOptions() ([]grpc.DialOption, error) {
func (gcs *GRPCClientSettings) ToDialOptions(ext map[config.ComponentID]component.Extension) ([]grpc.DialOption, error) {
var opts []grpc.DialOption
if gcs.Compression != "" {
if compressionKey := GetGRPCCompressionKey(gcs.Compression); compressionKey != CompressionUnsupported {
Expand Down Expand Up @@ -205,14 +196,26 @@ func (gcs *GRPCClientSettings) ToDialOptions() ([]grpc.DialOption, error) {
opts = append(opts, keepAliveOption)
}

if gcs.PerRPCAuth != nil {
if strings.EqualFold(gcs.PerRPCAuth.AuthType, PerRPCAuthTypeBearer) {
sToken := gcs.PerRPCAuth.BearerToken
token := BearerToken(sToken)
opts = append(opts, grpc.WithPerRPCCredentials(token))
} else {
return nil, fmt.Errorf("unsupported per-RPC auth type %q", gcs.PerRPCAuth.AuthType)
if gcs.Auth != nil {
if ext == nil {
return nil, fmt.Errorf("no extensions configuration available")
}

componentID, cperr := config.NewIDFromString(gcs.Auth.AuthenticatorName)
if cperr != nil {
return nil, cperr
}

grpcAuthenticator, cerr := configauth.GetGRPCClientAuthenticator(ext, componentID)
if cerr != nil {
return nil, cerr
}

perRPCCredentials, perr := grpcAuthenticator.PerRPCCredentials()
if perr != nil {
return nil, err
}
opts = append(opts, grpc.WithPerRPCCredentials(perRPCCredentials))
}

if gcs.BalancerName != "" {
Expand Down
79 changes: 40 additions & 39 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestDefaultGrpcClientSettings(t *testing.T) {
Insecure: true,
},
}
opts, err := gcs.ToDialOptions()
opts, err := gcs.ToDialOptions(map[config.ComponentID]component.Extension{})
assert.NoError(t, err)
assert.Len(t, opts, 1)
}
Expand All @@ -64,12 +64,17 @@ func TestAllGrpcClientSettings(t *testing.T) {
ReadBufferSize: 1024,
WriteBufferSize: 1024,
WaitForReady: true,
PerRPCAuth: nil,
BalancerName: "round_robin",
Auth: &configauth.Authentication{AuthenticatorName: "testauth"},
}
opts, err := gcs.ToDialOptions()

ext := map[config.ComponentID]component.Extension{
config.NewID("testauth"): &configauth.MockClientAuthenticator{},
}

opts, err := gcs.ToDialOptions(ext)
assert.NoError(t, err)
assert.Len(t, opts, 6)
assert.Len(t, opts, 7)
}

func TestDefaultGrpcServerSettings(t *testing.T) {
Expand Down Expand Up @@ -137,6 +142,7 @@ func TestGRPCClientSettingsError(t *testing.T) {
tests := []struct {
settings GRPCClientSettings
err string
ext map[config.ComponentID]component.Extension
}{
{
err: "^failed to load TLS config: failed to load CA CertPool: failed to load CA /doesnt/exist:",
Expand Down Expand Up @@ -192,10 +198,36 @@ func TestGRPCClientSettingsError(t *testing.T) {
BalancerName: "test",
},
},
{
err: "idStr must have non empty type",
settings: GRPCClientSettings{
Endpoint: "localhost:1234",
Auth: &configauth.Authentication{},
},
ext: map[config.ComponentID]component.Extension{
config.NewID("mock"): &configauth.MockClientAuthenticator{},
},
},
{
err: "failed to resolve authenticator \"doesntexist\": authenticator not found",
settings: GRPCClientSettings{
Endpoint: "localhost:1234",
Auth: &configauth.Authentication{AuthenticatorName: "doesntexist"},
},
ext: map[config.ComponentID]component.Extension{},
},
{
err: "no extensions configuration available",
settings: GRPCClientSettings{
Endpoint: "localhost:1234",
Auth: &configauth.Authentication{AuthenticatorName: "doesntexist"},
},
ext: nil,
},
}
for _, test := range tests {
t.Run(test.err, func(t *testing.T) {
opts, err := test.settings.ToDialOptions()
opts, err := test.settings.ToDialOptions(test.ext)
assert.Nil(t, opts)
assert.Error(t, err)
assert.Regexp(t, test.err, err)
Expand All @@ -210,9 +242,8 @@ func TestUseSecure(t *testing.T) {
Compression: "",
TLSSetting: configtls.TLSClientSetting{},
Keepalive: nil,
PerRPCAuth: nil,
}
dialOpts, err := gcs.ToDialOptions()
dialOpts, err := gcs.ToDialOptions(map[config.ComponentID]component.Extension{})
assert.NoError(t, err)
assert.Equal(t, len(dialOpts), 1)
}
Expand Down Expand Up @@ -431,7 +462,7 @@ func TestHttpReception(t *testing.T) {
Endpoint: ln.Addr().String(),
TLSSetting: *tt.tlsClientCreds,
}
clientOpts, errClient := gcs.ToDialOptions()
clientOpts, errClient := gcs.ToDialOptions(map[config.ComponentID]component.Extension{})
assert.NoError(t, errClient)
grpcClientConn, errDial := grpc.Dial(gcs.Endpoint, clientOpts...)
assert.NoError(t, errDial)
Expand Down Expand Up @@ -478,7 +509,7 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {
Insecure: true,
},
}
clientOpts, errClient := gcs.ToDialOptions()
clientOpts, errClient := gcs.ToDialOptions(map[config.ComponentID]component.Extension{})
assert.NoError(t, errClient)
grpcClientConn, errDial := grpc.Dial(gcs.Endpoint, clientOpts...)
assert.NoError(t, errDial)
Expand All @@ -497,36 +528,6 @@ func (gts *grpcTraceServer) Export(context.Context, *otelcol.ExportTraceServiceR
return &otelcol.ExportTraceServiceResponse{}, nil
}

func TestWithPerRPCAuthBearerToken(t *testing.T) {
// prepare
// test
gcs := &GRPCClientSettings{
PerRPCAuth: &PerRPCAuthConfig{
AuthType: "bearer",
BearerToken: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
},
}
dialOpts, err := gcs.ToDialOptions()

// verify
assert.NoError(t, err)
assert.Len(t, dialOpts, 2) // WithInsecure and WithPerRPCCredentials
}

func TestWithPerRPCAuthInvalidAuthType(t *testing.T) {
// test
gcs := &GRPCClientSettings{
PerRPCAuth: &PerRPCAuthConfig{
AuthType: "non-existing",
},
}
dialOpts, err := gcs.ToDialOptions()

// verify
assert.Error(t, err)
assert.Nil(t, dialOpts)
}

// tempSocketName provides a temporary Unix socket name for testing.
func tempSocketName(t *testing.T) string {
tmpfile, err := ioutil.TempFile("", "sock")
Expand Down
32 changes: 30 additions & 2 deletions config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ package confighttp

import (
"crypto/tls"
"fmt"
"net"
"net/http"
"time"

"github.com/rs/cors"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/internal/middleware"
)
Expand Down Expand Up @@ -49,10 +53,13 @@ type HTTPClientSettings struct {

// Custom Round Tripper to allow for individual components to intercept HTTP requests
CustomRoundTripper func(next http.RoundTripper) (http.RoundTripper, error)

// Auth configuration for outgoing HTTP calls.
Auth *configauth.Authentication `mapstructure:"auth,omitempty"`
}

// ToClient creates an HTTP client.
func (hcs *HTTPClientSettings) ToClient() (*http.Client, error) {
func (hcs *HTTPClientSettings) ToClient(ext map[config.ComponentID]component.Extension) (*http.Client, error) {
tlsCfg, err := hcs.TLSSetting.LoadTLSConfig()
if err != nil {
return nil, err
Expand All @@ -76,6 +83,27 @@ func (hcs *HTTPClientSettings) ToClient() (*http.Client, error) {
}
}

if hcs.Auth != nil {
if ext == nil {
return nil, fmt.Errorf("extensions configuration not found")
}

componentID, cperr := config.NewIDFromString(hcs.Auth.AuthenticatorName)
if cperr != nil {
return nil, cperr
}

httpCustomAuthRoundTripper, aerr := configauth.GetHTTPClientAuthenticator(ext, componentID)
if aerr != nil {
return nil, aerr
}

clientTransport, err = httpCustomAuthRoundTripper.RoundTripper(clientTransport)
if err != nil {
return nil, err
}
}

if hcs.CustomRoundTripper != nil {
clientTransport, err = hcs.CustomRoundTripper(clientTransport)
if err != nil {
Expand All @@ -89,7 +117,7 @@ func (hcs *HTTPClientSettings) ToClient() (*http.Client, error) {
}, nil
}

// Custom RoundTripper that add headers.
// Custom RoundTripper that adds headers.
type headerRoundTripper struct {
transport http.RoundTripper
headers map[string]string
Expand Down
Loading

0 comments on commit 5369d7e

Please sign in to comment.