From 29859f2053f789098401b1e21d218327b530b4da Mon Sep 17 00:00:00 2001 From: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com> Date: Thu, 19 Nov 2020 12:14:18 -0500 Subject: [PATCH] Change default OTLP/gRPC port number (#2104) This implements specification change https://github.com/open-telemetry/opentelemetry-specification/pull/1221 To make transition to new port numbers less painful OTLP receiver will also accept data on the legacy port numbers when it is configured to use the default endpoint. Users who use the default Collector config can continue sending data to the legacy ports and have a graceful period to update their senders to start sending to the new ports. Note that OTLP/HTTP continues using a separate port number from OTLP/gRPC. There is separate work in progress to use one port for both. --- CHANGELOG.md | 2 + receiver/otlpreceiver/README.md | 6 +- receiver/otlpreceiver/config_test.go | 6 +- receiver/otlpreceiver/factory.go | 25 ++++--- receiver/otlpreceiver/otlp.go | 102 +++++++++++++++++++-------- receiver/otlpreceiver/otlp_test.go | 5 +- 6 files changed, 98 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b5a054a208..214a4901915 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ - Add config settings for component telemetry (#2148) - Use net.SplitHostPort for IPv6 support in `prometheus` receiver (#2154) - Add --log-format command line option (default to "console") #2177. +- Change default OTLP/gRPC port number to 4317, also continue receiving on legacy port + 55680 during transition period (#2104). ## 🧰 Bug fixes 🧰 diff --git a/receiver/otlpreceiver/README.md b/receiver/otlpreceiver/README.md index b1d442800ab..259590ac00c 100644 --- a/receiver/otlpreceiver/README.md +++ b/receiver/otlpreceiver/README.md @@ -25,9 +25,9 @@ receivers: The following settings are configurable: -- `endpoint` (default = 0.0.0.0:55680): host:port to which the receiver is - going to receive data. The valid syntax is described at - https://github.com/grpc/grpc/blob/master/doc/naming.md. +- `endpoint` (default = 0.0.0.0:4317 for grpc protocol, 0.0.0.0:55681 http protocol): + host:port to which the receiver is going to receive data. The valid syntax is + described at https://github.com/grpc/grpc/blob/master/doc/naming.md. ## Advanced Configuration diff --git a/receiver/otlpreceiver/config_test.go b/receiver/otlpreceiver/config_test.go index 0c3129c5424..dd177adabb8 100644 --- a/receiver/otlpreceiver/config_test.go +++ b/receiver/otlpreceiver/config_test.go @@ -82,7 +82,7 @@ func TestLoadConfig(t *testing.T) { Protocols: Protocols{ GRPC: &configgrpc.GRPCServerSettings{ NetAddr: confignet.NetAddr{ - Endpoint: "0.0.0.0:55680", + Endpoint: "0.0.0.0:4317", Transport: "tcp", }, ReadBufferSize: 512 * 1024, @@ -112,7 +112,7 @@ func TestLoadConfig(t *testing.T) { Protocols: Protocols{ GRPC: &configgrpc.GRPCServerSettings{ NetAddr: confignet.NetAddr{ - Endpoint: "0.0.0.0:55680", + Endpoint: "0.0.0.0:4317", Transport: "tcp", }, MaxRecvMsgSizeMiB: 32, @@ -139,7 +139,7 @@ func TestLoadConfig(t *testing.T) { Protocols: Protocols{ GRPC: &configgrpc.GRPCServerSettings{ NetAddr: confignet.NetAddr{ - Endpoint: "0.0.0.0:55680", + Endpoint: "0.0.0.0:4317", Transport: "tcp", }, TLSSetting: &configtls.TLSServerSetting{ diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 60659cd6ee0..75553059d16 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/spf13/viper" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" @@ -37,6 +38,10 @@ const ( protoGRPC = "grpc" protoHTTP = "http" protocolsFieldName = "protocols" + + defaultGRPCEndpoint = "0.0.0.0:4317" + defaultHTTPEndpoint = "0.0.0.0:55681" + legacyGRPCEndpoint = "0.0.0.0:55680" ) func NewFactory() component.ReceiverFactory { @@ -59,14 +64,14 @@ func createDefaultConfig() configmodels.Receiver { Protocols: Protocols{ GRPC: &configgrpc.GRPCServerSettings{ NetAddr: confignet.NetAddr{ - Endpoint: "0.0.0.0:55680", + Endpoint: defaultGRPCEndpoint, Transport: "tcp", }, // We almost write 0 bytes, so no need to tune WriteBufferSize. ReadBufferSize: 512 * 1024, }, HTTP: &confighttp.HTTPServerSettings{ - Endpoint: "0.0.0.0:55681", + Endpoint: defaultHTTPEndpoint, }, }, } @@ -117,11 +122,11 @@ func customUnmarshaler(componentViperSection *viper.Viper, intoCfg interface{}) // CreateTracesReceiver creates a trace receiver based on provided config. func createTraceReceiver( ctx context.Context, - _ component.ReceiverCreateParams, + params component.ReceiverCreateParams, cfg configmodels.Receiver, nextConsumer consumer.TracesConsumer, ) (component.TracesReceiver, error) { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, params.Logger) if err != nil { return nil, err } @@ -134,11 +139,11 @@ func createTraceReceiver( // CreateMetricsReceiver creates a metrics receiver based on provided config. func createMetricsReceiver( ctx context.Context, - _ component.ReceiverCreateParams, + params component.ReceiverCreateParams, cfg configmodels.Receiver, consumer consumer.MetricsConsumer, ) (component.MetricsReceiver, error) { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, params.Logger) if err != nil { return nil, err } @@ -151,11 +156,11 @@ func createMetricsReceiver( // CreateLogReceiver creates a log receiver based on provided config. func createLogReceiver( ctx context.Context, - _ component.ReceiverCreateParams, + params component.ReceiverCreateParams, cfg configmodels.Receiver, consumer consumer.LogsConsumer, ) (component.LogsReceiver, error) { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, params.Logger) if err != nil { return nil, err } @@ -165,7 +170,7 @@ func createLogReceiver( return r, nil } -func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) { +func createReceiver(cfg configmodels.Receiver, logger *zap.Logger) (*otlpReceiver, error) { rCfg := cfg.(*Config) // There must be one receiver for both metrics and traces. We maintain a map of @@ -176,7 +181,7 @@ func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) { if !ok { var err error // We don't have a receiver, so create one. - receiver, err = newOtlpReceiver(rCfg) + receiver, err = newOtlpReceiver(rCfg, logger) if err != nil { return nil, err } diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 0b8d8e66077..6601e0ee84a 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -22,10 +22,12 @@ import ( "sync" gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" + "go.uber.org/zap" "google.golang.org/grpc" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" collectorlog "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/logs/v1" @@ -49,14 +51,17 @@ type otlpReceiver struct { stopOnce sync.Once startServerOnce sync.Once + + logger *zap.Logger } // newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. -func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) { +func newOtlpReceiver(cfg *Config, logger *zap.Logger) (*otlpReceiver, error) { r := &otlpReceiver{ - cfg: cfg, + cfg: cfg, + logger: logger, } if cfg.GRPC != nil { opts, err := cfg.GRPC.ToServerOption() @@ -84,6 +89,70 @@ func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) { return r, nil } +func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host component.Host) error { + r.logger.Info("Starting GRPC server on endpoint " + cfg.NetAddr.Endpoint) + var gln net.Listener + gln, err := cfg.ToListener() + if err != nil { + return err + } + go func() { + if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil { + host.ReportFatalError(errGrpc) + } + }() + return nil +} + +func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host component.Host) error { + r.logger.Info("Starting HTTP server on endpoint " + cfg.Endpoint) + var hln net.Listener + hln, err := r.cfg.HTTP.ToListener() + if err != nil { + return err + } + go func() { + if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil { + host.ReportFatalError(errHTTP) + } + }() + return nil +} + +func (r *otlpReceiver) startProtocolServers(host component.Host) error { + var err error + if r.cfg.GRPC != nil { + err = r.startGRPCServer(r.cfg.GRPC, host) + if err != nil { + return err + } + if r.cfg.GRPC.NetAddr.Endpoint == defaultGRPCEndpoint { + r.logger.Info("Setting up a second GRPC listener on legacy endpoint " + legacyGRPCEndpoint) + + // Copy the config. + cfgLegacyGRPC := r.cfg.GRPC + // And use the legacy endpoint. + cfgLegacyGRPC.NetAddr.Endpoint = legacyGRPCEndpoint + err = r.startGRPCServer(cfgLegacyGRPC, host) + if err != nil { + return err + } + } + } + if r.cfg.HTTP != nil { + r.serverHTTP = r.cfg.HTTP.ToServer( + r.gatewayMux, + confighttp.WithErrorHandler(errorHandler), + ) + err = r.startHTTPServer(r.cfg.HTTP, host) + if err != nil { + return err + } + } + + return err +} + // Start runs the trace receiver on the gRPC server. Currently // it also enables the metrics receiver too. func (r *otlpReceiver) Start(_ context.Context, host component.Host) error { @@ -93,34 +162,7 @@ func (r *otlpReceiver) Start(_ context.Context, host component.Host) error { var err error r.startServerOnce.Do(func() { - if r.cfg.GRPC != nil { - var gln net.Listener - gln, err = r.cfg.GRPC.ToListener() - if err != nil { - return - } - go func() { - if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil { - host.ReportFatalError(errGrpc) - } - }() - } - if r.cfg.HTTP != nil { - r.serverHTTP = r.cfg.HTTP.ToServer( - r.gatewayMux, - confighttp.WithErrorHandler(errorHandler), - ) - var hln net.Listener - hln, err = r.cfg.HTTP.ToListener() - if err != nil { - return - } - go func() { - if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil { - host.ReportFatalError(errHTTP) - } - }() - } + err = r.startProtocolServers(host) }) return err } diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index d4e4e916314..f4a29e422e2 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -30,6 +30,7 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -698,7 +699,7 @@ func TestGRPCInvalidTLSCredentials(t *testing.T) { } // TLS is resolved during Creation of the receiver for GRPC. - _, err := createReceiver(cfg) + _, err := createReceiver(cfg, zap.NewNop()) assert.EqualError(t, err, `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) } @@ -745,7 +746,7 @@ func newHTTPReceiver(t *testing.T, endpoint string, tc consumer.TracesConsumer, } func newReceiver(t *testing.T, factory component.ReceiverFactory, cfg *Config, tc consumer.TracesConsumer, mc consumer.MetricsConsumer) *otlpReceiver { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, zap.NewNop()) require.NoError(t, err) if tc != nil { params := component.ReceiverCreateParams{}