From f07a50a2e397d4a8b56226c667fe1949f191139e Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Thu, 5 Jan 2023 18:03:02 -0800 Subject: [PATCH] [receiver/carbonreceiver] fix carbon receiver server start order --- .../change_carbon_receiver_start_order.yaml | 16 +++++ receiver/carbonreceiver/receiver.go | 16 ++--- receiver/carbonreceiver/receiver_test.go | 64 +++++++++++++------ 3 files changed, 67 insertions(+), 29 deletions(-) create mode 100644 .chloggen/change_carbon_receiver_start_order.yaml diff --git a/.chloggen/change_carbon_receiver_start_order.yaml b/.chloggen/change_carbon_receiver_start_order.yaml new file mode 100644 index 000000000000..0be6f8fe76a4 --- /dev/null +++ b/.chloggen/change_carbon_receiver_start_order.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: carbonreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Create the carbon receiver server when the `Start` method is called, and only close it if created. + +# One or more tracking issues related to the change +issues: [17404] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/receiver/carbonreceiver/receiver.go b/receiver/carbonreceiver/receiver.go index 1d85903389f4..8dadadc5ad77 100644 --- a/receiver/carbonreceiver/receiver.go +++ b/receiver/carbonreceiver/receiver.go @@ -74,13 +74,6 @@ func New( return nil, err } - // This should be the last one built, or if any other error is raised after - // it, the server should be closed. - server, err := buildTransportServer(config) - if err != nil { - return nil, err - } - rep, err := newReporter(set) if err != nil { return nil, err @@ -90,7 +83,6 @@ func New( settings: set, config: &config, nextConsumer: nextConsumer, - server: server, reporter: rep, parser: parser, } @@ -113,6 +105,11 @@ func buildTransportServer(config Config) (transport.Server, error) { // By convention the consumer of the received data is set when the receiver // instance is created. func (r *carbonReceiver) Start(_ context.Context, host component.Host) error { + server, err := buildTransportServer(*r.config) + if err != nil { + return err + } + r.server = server go func() { if err := r.server.ListenAndServe(r.parser, r.nextConsumer, r.reporter); err != nil { host.ReportFatalError(err) @@ -124,5 +121,8 @@ func (r *carbonReceiver) Start(_ context.Context, host component.Host) error { // Shutdown tells the receiver that should stop reception, // giving it a chance to perform any necessary clean-up. func (r *carbonReceiver) Shutdown(context.Context) error { + if r.server == nil { + return nil + } return r.server.Close() } diff --git a/receiver/carbonreceiver/receiver_test.go b/receiver/carbonreceiver/receiver_test.go index 48cc0d5e9e13..82761308afe4 100644 --- a/receiver/carbonreceiver/receiver_test.go +++ b/receiver/carbonreceiver/receiver_test.go @@ -84,43 +84,68 @@ func Test_carbonreceiver_New(t *testing.T) { wantErr: errEmptyEndpoint, }, { - name: "invalid_transport", + name: "regex_parser", args: args{ config: Config{ NetAddr: confignet.NetAddr{ Endpoint: "localhost:2003", - Transport: "unknown_transp", + Transport: "tcp", }, Parser: &protocol.Config{ - Type: "plaintext", - Config: &protocol.PlaintextConfig{}, + Type: "regex", + Config: &protocol.RegexParserConfig{ + Rules: []*protocol.RegexRule{ + { + Regexp: `(?P[^.]*)\.test`, + }, + }, + }, }, }, nextConsumer: consumertest.NewNop(), }, - wantErr: errors.New("unsupported transport \"unknown_transp\""), }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := New(receivertest.NewNopCreateSettings(), tt.args.config, tt.args.nextConsumer) + assert.Equal(t, tt.wantErr, err) + if err == nil { + require.NotNil(t, got) + assert.NoError(t, got.Shutdown(context.Background())) + } else { + assert.Nil(t, got) + } + }) + } +} + +func Test_carbonreceiver_Start(t *testing.T) { + type args struct { + config Config + nextConsumer consumer.Metrics + } + tests := []struct { + name string + args args + wantErr error + }{ { - name: "regex_parser", + name: "invalid_transport", args: args{ config: Config{ NetAddr: confignet.NetAddr{ Endpoint: "localhost:2003", - Transport: "tcp", + Transport: "unknown_transp", }, Parser: &protocol.Config{ - Type: "regex", - Config: &protocol.RegexParserConfig{ - Rules: []*protocol.RegexRule{ - { - Regexp: `(?P[^.]*)\.test`, - }, - }, - }, + Type: "plaintext", + Config: &protocol.PlaintextConfig{}, }, }, nextConsumer: consumertest.NewNop(), }, + wantErr: errors.New("unsupported transport \"unknown_transp\""), }, { name: "negative_tcp_idle_timeout", @@ -144,13 +169,10 @@ func Test_carbonreceiver_New(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got, err := New(receivertest.NewNopCreateSettings(), tt.args.config, tt.args.nextConsumer) + require.NoError(t, err) + err = got.Start(context.Background(), componenttest.NewNopHost()) assert.Equal(t, tt.wantErr, err) - if err == nil { - require.NotNil(t, got) - assert.NoError(t, got.Shutdown(context.Background())) - } else { - assert.Nil(t, got) - } + assert.NoError(t, got.Shutdown(context.Background())) }) } }