From 49a575364244eb2f281e611c16b00b074271c03e Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Mon, 14 Jun 2021 09:17:56 +0300 Subject: [PATCH] Use the new marshaler/unmarshaler in Kafka receiver (#3402) Signed-off-by: Bogdan Drutu --- receiver/kafkareceiver/factory_test.go | 4 +- receiver/kafkareceiver/kafka_receiver_test.go | 13 ++-- receiver/kafkareceiver/otlp_unmarshaler.go | 45 ------------- .../kafkareceiver/otlp_unmarshaler_test.go | 66 ------------------- receiver/kafkareceiver/pdata_unmarshaler.go | 51 ++++++++++++++ .../kafkareceiver/pdata_unmarshaler_test.go | 33 ++++++++++ receiver/kafkareceiver/unmarshaler.go | 9 +-- 7 files changed, 98 insertions(+), 123 deletions(-) delete mode 100644 receiver/kafkareceiver/otlp_unmarshaler.go delete mode 100644 receiver/kafkareceiver/otlp_unmarshaler_test.go create mode 100644 receiver/kafkareceiver/pdata_unmarshaler.go create mode 100644 receiver/kafkareceiver/pdata_unmarshaler_test.go diff --git a/receiver/kafkareceiver/factory_test.go b/receiver/kafkareceiver/factory_test.go index 5a74beb6258..36ce7cd82fc 100644 --- a/receiver/kafkareceiver/factory_test.go +++ b/receiver/kafkareceiver/factory_test.go @@ -73,7 +73,7 @@ func TestWithTracesUnmarshalers(t *testing.T) { require.NotNil(t, receiver) }) t.Run("default_encoding", func(t *testing.T) { - cfg.Encoding = new(otlpTracesPbUnmarshaler).Encoding() + cfg.Encoding = defaultEncoding receiver, err := f.CreateTracesReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, nil) require.NoError(t, err) assert.NotNil(t, receiver) @@ -117,7 +117,7 @@ func TestWithLogsUnmarshalers(t *testing.T) { require.NotNil(t, exporter) }) t.Run("default_encoding", func(t *testing.T) { - cfg.Encoding = new(otlpLogsPbUnmarshaler).Encoding() + cfg.Encoding = defaultEncoding exporter, err := f.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, nil) require.NoError(t, err) assert.NotNil(t, exporter) diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index 033288043b3..fd73bda72fc 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -34,6 +34,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/kafkaexporter" + "go.opentelemetry.io/collector/internal/otlp" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/obsreport" ) @@ -129,7 +130,7 @@ func TestTracesConsumerGroupHandler(t *testing.T) { defer view.Unregister(views...) c := tracesConsumerGroupHandler{ - unmarshaler: &otlpTracesPbUnmarshaler{}, + unmarshaler: newPdataTracesUnmarshaler(otlp.NewProtobufTracesUnmarshaler(), defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewNop(), @@ -171,7 +172,7 @@ func TestTracesConsumerGroupHandler(t *testing.T) { func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) { c := tracesConsumerGroupHandler{ - unmarshaler: &otlpTracesPbUnmarshaler{}, + unmarshaler: newPdataTracesUnmarshaler(otlp.NewProtobufTracesUnmarshaler(), defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewNop(), @@ -196,7 +197,7 @@ func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) { func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) { consumerError := errors.New("failed to consumer") c := tracesConsumerGroupHandler{ - unmarshaler: &otlpTracesPbUnmarshaler{}, + unmarshaler: newPdataTracesUnmarshaler(otlp.NewProtobufTracesUnmarshaler(), defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewErr(consumerError), @@ -314,7 +315,7 @@ func TestLogsConsumerGroupHandler(t *testing.T) { defer view.Unregister(views...) c := logsConsumerGroupHandler{ - unmarshaler: &otlpLogsPbUnmarshaler{}, + unmarshaler: newPdataLogsUnmarshaler(otlp.NewProtobufLogsUnmarshaler(), defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewNop(), @@ -356,7 +357,7 @@ func TestLogsConsumerGroupHandler(t *testing.T) { func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) { c := logsConsumerGroupHandler{ - unmarshaler: &otlpLogsPbUnmarshaler{}, + unmarshaler: newPdataLogsUnmarshaler(otlp.NewProtobufLogsUnmarshaler(), defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewNop(), @@ -381,7 +382,7 @@ func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) { func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) { consumerError := errors.New("failed to consumer") c := logsConsumerGroupHandler{ - unmarshaler: &otlpLogsPbUnmarshaler{}, + unmarshaler: newPdataLogsUnmarshaler(otlp.NewProtobufLogsUnmarshaler(), defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewErr(consumerError), diff --git a/receiver/kafkareceiver/otlp_unmarshaler.go b/receiver/kafkareceiver/otlp_unmarshaler.go deleted file mode 100644 index cca9208a8cf..00000000000 --- a/receiver/kafkareceiver/otlp_unmarshaler.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2020 The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kafkareceiver - -import ( - "go.opentelemetry.io/collector/consumer/pdata" -) - -type otlpTracesPbUnmarshaler struct { -} - -var _ TracesUnmarshaler = (*otlpTracesPbUnmarshaler)(nil) - -func (p *otlpTracesPbUnmarshaler) Unmarshal(bytes []byte) (pdata.Traces, error) { - return pdata.TracesFromOtlpProtoBytes(bytes) -} - -func (*otlpTracesPbUnmarshaler) Encoding() string { - return defaultEncoding -} - -type otlpLogsPbUnmarshaler struct { -} - -var _ LogsUnmarshaler = (*otlpLogsPbUnmarshaler)(nil) - -func (p *otlpLogsPbUnmarshaler) Unmarshal(bytes []byte) (pdata.Logs, error) { - return pdata.LogsFromOtlpProtoBytes(bytes) -} - -func (*otlpLogsPbUnmarshaler) Encoding() string { - return defaultEncoding -} diff --git a/receiver/kafkareceiver/otlp_unmarshaler_test.go b/receiver/kafkareceiver/otlp_unmarshaler_test.go deleted file mode 100644 index e1ec4dc36ef..00000000000 --- a/receiver/kafkareceiver/otlp_unmarshaler_test.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2020 The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kafkareceiver - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal/testdata" -) - -func TestUnmarshalOTLPTraces(t *testing.T) { - td := pdata.NewTraces() - td.ResourceSpans().AppendEmpty().Resource().Attributes().InsertString("foo", "bar") - - expected, err := td.ToOtlpProtoBytes() - require.NoError(t, err) - - p := otlpTracesPbUnmarshaler{} - got, err := p.Unmarshal(expected) - require.NoError(t, err) - - assert.Equal(t, td, got) - assert.Equal(t, "otlp_proto", p.Encoding()) -} - -func TestUnmarshalOTLPTraces_error(t *testing.T) { - p := otlpTracesPbUnmarshaler{} - _, err := p.Unmarshal([]byte("+$%")) - assert.Error(t, err) -} - -func TestUnmarshalOTLPLogs(t *testing.T) { - ld := testdata.GenerateLogsOneLogRecord() - - expected, err := ld.ToOtlpProtoBytes() - require.NoError(t, err) - - p := otlpLogsPbUnmarshaler{} - got, err := p.Unmarshal(expected) - require.NoError(t, err) - - assert.Equal(t, ld, got) - assert.Equal(t, "otlp_proto", p.Encoding()) -} - -func TestUnmarshalOTLPLogs_error(t *testing.T) { - p := otlpLogsPbUnmarshaler{} - _, err := p.Unmarshal([]byte("+$%")) - assert.Error(t, err) -} diff --git a/receiver/kafkareceiver/pdata_unmarshaler.go b/receiver/kafkareceiver/pdata_unmarshaler.go new file mode 100644 index 00000000000..23474a5b0df --- /dev/null +++ b/receiver/kafkareceiver/pdata_unmarshaler.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafkareceiver + +import ( + "go.opentelemetry.io/collector/internal/model" +) + +type pdataLogsUnmarshaler struct { + model.LogsUnmarshaler + encoding string +} + +func (p pdataLogsUnmarshaler) Encoding() string { + return p.encoding +} + +func newPdataLogsUnmarshaler(unmarshaler model.LogsUnmarshaler, encoding string) LogsUnmarshaler { + return pdataLogsUnmarshaler{ + LogsUnmarshaler: unmarshaler, + encoding: encoding, + } +} + +type pdataTracesUnmarshaler struct { + model.TracesUnmarshaler + encoding string +} + +func (p pdataTracesUnmarshaler) Encoding() string { + return p.encoding +} + +func newPdataTracesUnmarshaler(unmarshaler model.TracesUnmarshaler, encoding string) TracesUnmarshaler { + return pdataTracesUnmarshaler{ + TracesUnmarshaler: unmarshaler, + encoding: encoding, + } +} diff --git a/receiver/kafkareceiver/pdata_unmarshaler_test.go b/receiver/kafkareceiver/pdata_unmarshaler_test.go new file mode 100644 index 00000000000..9f5d75b2eff --- /dev/null +++ b/receiver/kafkareceiver/pdata_unmarshaler_test.go @@ -0,0 +1,33 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafkareceiver + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/internal/otlp" +) + +func TestNewPdataTracesUnmarshaler(t *testing.T) { + um := newPdataTracesUnmarshaler(otlp.NewProtobufTracesUnmarshaler(), "test") + assert.Equal(t, "test", um.Encoding()) +} + +func TestNewPdataLogsUnmarshaler(t *testing.T) { + um := newPdataLogsUnmarshaler(otlp.NewProtobufLogsUnmarshaler(), "test") + assert.Equal(t, "test", um.Encoding()) +} diff --git a/receiver/kafkareceiver/unmarshaler.go b/receiver/kafkareceiver/unmarshaler.go index f15e6b90074..f5aed35108b 100644 --- a/receiver/kafkareceiver/unmarshaler.go +++ b/receiver/kafkareceiver/unmarshaler.go @@ -16,6 +16,7 @@ package kafkareceiver import ( "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal/otlp" ) // TracesUnmarshaler deserializes the message body. @@ -38,14 +39,14 @@ type LogsUnmarshaler interface { // defaultTracesUnmarshalers returns map of supported encodings with TracesUnmarshaler. func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { - otlp := &otlpTracesPbUnmarshaler{} + otlpPb := newPdataTracesUnmarshaler(otlp.NewProtobufTracesUnmarshaler(), defaultEncoding) jaegerProto := jaegerProtoSpanUnmarshaler{} jaegerJSON := jaegerJSONSpanUnmarshaler{} zipkinProto := zipkinProtoSpanUnmarshaler{} zipkinJSON := zipkinJSONSpanUnmarshaler{} zipkinThrift := zipkinThriftSpanUnmarshaler{} return map[string]TracesUnmarshaler{ - otlp.Encoding(): otlp, + otlpPb.Encoding(): otlpPb, jaegerProto.Encoding(): jaegerProto, jaegerJSON.Encoding(): jaegerJSON, zipkinProto.Encoding(): zipkinProto, @@ -55,8 +56,8 @@ func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { } func defaultLogsUnmarshalers() map[string]LogsUnmarshaler { - otlp := &otlpLogsPbUnmarshaler{} + otlpPb := newPdataLogsUnmarshaler(otlp.NewProtobufLogsUnmarshaler(), defaultEncoding) return map[string]LogsUnmarshaler{ - otlp.Encoding(): otlp, + otlpPb.Encoding(): otlpPb, } }