diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b5a054a208..214a4901915 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ - Add config settings for component telemetry (#2148) - Use net.SplitHostPort for IPv6 support in `prometheus` receiver (#2154) - Add --log-format command line option (default to "console") #2177. +- Change default OTLP/gRPC port number to 4317, also continue receiving on legacy port + 55680 during transition period (#2104). ## 🧰 Bug fixes 🧰 diff --git a/receiver/otlpreceiver/README.md b/receiver/otlpreceiver/README.md index b1d442800ab..259590ac00c 100644 --- a/receiver/otlpreceiver/README.md +++ b/receiver/otlpreceiver/README.md @@ -25,9 +25,9 @@ receivers: The following settings are configurable: -- `endpoint` (default = 0.0.0.0:55680): host:port to which the receiver is - going to receive data. The valid syntax is described at - https://github.com/grpc/grpc/blob/master/doc/naming.md. +- `endpoint` (default = 0.0.0.0:4317 for grpc protocol, 0.0.0.0:55681 http protocol): + host:port to which the receiver is going to receive data. The valid syntax is + described at https://github.com/grpc/grpc/blob/master/doc/naming.md. ## Advanced Configuration diff --git a/receiver/otlpreceiver/config_test.go b/receiver/otlpreceiver/config_test.go index 0c3129c5424..dd177adabb8 100644 --- a/receiver/otlpreceiver/config_test.go +++ b/receiver/otlpreceiver/config_test.go @@ -82,7 +82,7 @@ func TestLoadConfig(t *testing.T) { Protocols: Protocols{ GRPC: &configgrpc.GRPCServerSettings{ NetAddr: confignet.NetAddr{ - Endpoint: "0.0.0.0:55680", + Endpoint: "0.0.0.0:4317", Transport: "tcp", }, ReadBufferSize: 512 * 1024, @@ -112,7 +112,7 @@ func TestLoadConfig(t *testing.T) { Protocols: Protocols{ GRPC: &configgrpc.GRPCServerSettings{ NetAddr: confignet.NetAddr{ - Endpoint: "0.0.0.0:55680", + Endpoint: "0.0.0.0:4317", Transport: "tcp", }, MaxRecvMsgSizeMiB: 32, @@ -139,7 +139,7 @@ func TestLoadConfig(t *testing.T) { Protocols: Protocols{ GRPC: &configgrpc.GRPCServerSettings{ NetAddr: confignet.NetAddr{ - Endpoint: "0.0.0.0:55680", + Endpoint: "0.0.0.0:4317", Transport: "tcp", }, TLSSetting: &configtls.TLSServerSetting{ diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 60659cd6ee0..75553059d16 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/spf13/viper" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" @@ -37,6 +38,10 @@ const ( protoGRPC = "grpc" protoHTTP = "http" protocolsFieldName = "protocols" + + defaultGRPCEndpoint = "0.0.0.0:4317" + defaultHTTPEndpoint = "0.0.0.0:55681" + legacyGRPCEndpoint = "0.0.0.0:55680" ) func NewFactory() component.ReceiverFactory { @@ -59,14 +64,14 @@ func createDefaultConfig() configmodels.Receiver { Protocols: Protocols{ GRPC: &configgrpc.GRPCServerSettings{ NetAddr: confignet.NetAddr{ - Endpoint: "0.0.0.0:55680", + Endpoint: defaultGRPCEndpoint, Transport: "tcp", }, // We almost write 0 bytes, so no need to tune WriteBufferSize. ReadBufferSize: 512 * 1024, }, HTTP: &confighttp.HTTPServerSettings{ - Endpoint: "0.0.0.0:55681", + Endpoint: defaultHTTPEndpoint, }, }, } @@ -117,11 +122,11 @@ func customUnmarshaler(componentViperSection *viper.Viper, intoCfg interface{}) // CreateTracesReceiver creates a trace receiver based on provided config. func createTraceReceiver( ctx context.Context, - _ component.ReceiverCreateParams, + params component.ReceiverCreateParams, cfg configmodels.Receiver, nextConsumer consumer.TracesConsumer, ) (component.TracesReceiver, error) { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, params.Logger) if err != nil { return nil, err } @@ -134,11 +139,11 @@ func createTraceReceiver( // CreateMetricsReceiver creates a metrics receiver based on provided config. func createMetricsReceiver( ctx context.Context, - _ component.ReceiverCreateParams, + params component.ReceiverCreateParams, cfg configmodels.Receiver, consumer consumer.MetricsConsumer, ) (component.MetricsReceiver, error) { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, params.Logger) if err != nil { return nil, err } @@ -151,11 +156,11 @@ func createMetricsReceiver( // CreateLogReceiver creates a log receiver based on provided config. func createLogReceiver( ctx context.Context, - _ component.ReceiverCreateParams, + params component.ReceiverCreateParams, cfg configmodels.Receiver, consumer consumer.LogsConsumer, ) (component.LogsReceiver, error) { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, params.Logger) if err != nil { return nil, err } @@ -165,7 +170,7 @@ func createLogReceiver( return r, nil } -func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) { +func createReceiver(cfg configmodels.Receiver, logger *zap.Logger) (*otlpReceiver, error) { rCfg := cfg.(*Config) // There must be one receiver for both metrics and traces. We maintain a map of @@ -176,7 +181,7 @@ func createReceiver(cfg configmodels.Receiver) (*otlpReceiver, error) { if !ok { var err error // We don't have a receiver, so create one. - receiver, err = newOtlpReceiver(rCfg) + receiver, err = newOtlpReceiver(rCfg, logger) if err != nil { return nil, err } diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 0b8d8e66077..6601e0ee84a 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -22,10 +22,12 @@ import ( "sync" gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" + "go.uber.org/zap" "google.golang.org/grpc" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" collectorlog "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/logs/v1" @@ -49,14 +51,17 @@ type otlpReceiver struct { stopOnce sync.Once startServerOnce sync.Once + + logger *zap.Logger } // newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. -func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) { +func newOtlpReceiver(cfg *Config, logger *zap.Logger) (*otlpReceiver, error) { r := &otlpReceiver{ - cfg: cfg, + cfg: cfg, + logger: logger, } if cfg.GRPC != nil { opts, err := cfg.GRPC.ToServerOption() @@ -84,6 +89,70 @@ func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) { return r, nil } +func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host component.Host) error { + r.logger.Info("Starting GRPC server on endpoint " + cfg.NetAddr.Endpoint) + var gln net.Listener + gln, err := cfg.ToListener() + if err != nil { + return err + } + go func() { + if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil { + host.ReportFatalError(errGrpc) + } + }() + return nil +} + +func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host component.Host) error { + r.logger.Info("Starting HTTP server on endpoint " + cfg.Endpoint) + var hln net.Listener + hln, err := r.cfg.HTTP.ToListener() + if err != nil { + return err + } + go func() { + if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil { + host.ReportFatalError(errHTTP) + } + }() + return nil +} + +func (r *otlpReceiver) startProtocolServers(host component.Host) error { + var err error + if r.cfg.GRPC != nil { + err = r.startGRPCServer(r.cfg.GRPC, host) + if err != nil { + return err + } + if r.cfg.GRPC.NetAddr.Endpoint == defaultGRPCEndpoint { + r.logger.Info("Setting up a second GRPC listener on legacy endpoint " + legacyGRPCEndpoint) + + // Copy the config. + cfgLegacyGRPC := r.cfg.GRPC + // And use the legacy endpoint. + cfgLegacyGRPC.NetAddr.Endpoint = legacyGRPCEndpoint + err = r.startGRPCServer(cfgLegacyGRPC, host) + if err != nil { + return err + } + } + } + if r.cfg.HTTP != nil { + r.serverHTTP = r.cfg.HTTP.ToServer( + r.gatewayMux, + confighttp.WithErrorHandler(errorHandler), + ) + err = r.startHTTPServer(r.cfg.HTTP, host) + if err != nil { + return err + } + } + + return err +} + // Start runs the trace receiver on the gRPC server. Currently // it also enables the metrics receiver too. func (r *otlpReceiver) Start(_ context.Context, host component.Host) error { @@ -93,34 +162,7 @@ func (r *otlpReceiver) Start(_ context.Context, host component.Host) error { var err error r.startServerOnce.Do(func() { - if r.cfg.GRPC != nil { - var gln net.Listener - gln, err = r.cfg.GRPC.ToListener() - if err != nil { - return - } - go func() { - if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil { - host.ReportFatalError(errGrpc) - } - }() - } - if r.cfg.HTTP != nil { - r.serverHTTP = r.cfg.HTTP.ToServer( - r.gatewayMux, - confighttp.WithErrorHandler(errorHandler), - ) - var hln net.Listener - hln, err = r.cfg.HTTP.ToListener() - if err != nil { - return - } - go func() { - if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil { - host.ReportFatalError(errHTTP) - } - }() - } + err = r.startProtocolServers(host) }) return err } diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index d4e4e916314..f4a29e422e2 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -30,6 +30,7 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -698,7 +699,7 @@ func TestGRPCInvalidTLSCredentials(t *testing.T) { } // TLS is resolved during Creation of the receiver for GRPC. - _, err := createReceiver(cfg) + _, err := createReceiver(cfg, zap.NewNop()) assert.EqualError(t, err, `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) } @@ -745,7 +746,7 @@ func newHTTPReceiver(t *testing.T, endpoint string, tc consumer.TracesConsumer, } func newReceiver(t *testing.T, factory component.ReceiverFactory, cfg *Config, tc consumer.TracesConsumer, mc consumer.MetricsConsumer) *otlpReceiver { - r, err := createReceiver(cfg) + r, err := createReceiver(cfg, zap.NewNop()) require.NoError(t, err) if tc != nil { params := component.ReceiverCreateParams{}