Skip to content

Commit

Permalink
Use the new marshaler/unmarshaler in Kafka receiver (#3402)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Jun 14, 2021
1 parent 3050c8e commit 49a5753
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 123 deletions.
4 changes: 2 additions & 2 deletions receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestWithTracesUnmarshalers(t *testing.T) {
require.NotNil(t, receiver)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = new(otlpTracesPbUnmarshaler).Encoding()
cfg.Encoding = defaultEncoding
receiver, err := f.CreateTracesReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, nil)
require.NoError(t, err)
assert.NotNil(t, receiver)
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestWithLogsUnmarshalers(t *testing.T) {
require.NotNil(t, exporter)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = new(otlpLogsPbUnmarshaler).Encoding()
cfg.Encoding = defaultEncoding
exporter, err := f.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, nil)
require.NoError(t, err)
assert.NotNil(t, exporter)
Expand Down
13 changes: 7 additions & 6 deletions receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/kafkaexporter"
"go.opentelemetry.io/collector/internal/otlp"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/obsreport"
)
Expand Down Expand Up @@ -129,7 +130,7 @@ func TestTracesConsumerGroupHandler(t *testing.T) {
defer view.Unregister(views...)

c := tracesConsumerGroupHandler{
unmarshaler: &otlpTracesPbUnmarshaler{},
unmarshaler: newPdataTracesUnmarshaler(otlp.NewProtobufTracesUnmarshaler(), defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
Expand Down Expand Up @@ -171,7 +172,7 @@ func TestTracesConsumerGroupHandler(t *testing.T) {

func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) {
c := tracesConsumerGroupHandler{
unmarshaler: &otlpTracesPbUnmarshaler{},
unmarshaler: newPdataTracesUnmarshaler(otlp.NewProtobufTracesUnmarshaler(), defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
Expand All @@ -196,7 +197,7 @@ func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) {
func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) {
consumerError := errors.New("failed to consumer")
c := tracesConsumerGroupHandler{
unmarshaler: &otlpTracesPbUnmarshaler{},
unmarshaler: newPdataTracesUnmarshaler(otlp.NewProtobufTracesUnmarshaler(), defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewErr(consumerError),
Expand Down Expand Up @@ -314,7 +315,7 @@ func TestLogsConsumerGroupHandler(t *testing.T) {
defer view.Unregister(views...)

c := logsConsumerGroupHandler{
unmarshaler: &otlpLogsPbUnmarshaler{},
unmarshaler: newPdataLogsUnmarshaler(otlp.NewProtobufLogsUnmarshaler(), defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
Expand Down Expand Up @@ -356,7 +357,7 @@ func TestLogsConsumerGroupHandler(t *testing.T) {

func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) {
c := logsConsumerGroupHandler{
unmarshaler: &otlpLogsPbUnmarshaler{},
unmarshaler: newPdataLogsUnmarshaler(otlp.NewProtobufLogsUnmarshaler(), defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
Expand All @@ -381,7 +382,7 @@ func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) {
func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
consumerError := errors.New("failed to consumer")
c := logsConsumerGroupHandler{
unmarshaler: &otlpLogsPbUnmarshaler{},
unmarshaler: newPdataLogsUnmarshaler(otlp.NewProtobufLogsUnmarshaler(), defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewErr(consumerError),
Expand Down
45 changes: 0 additions & 45 deletions receiver/kafkareceiver/otlp_unmarshaler.go

This file was deleted.

66 changes: 0 additions & 66 deletions receiver/kafkareceiver/otlp_unmarshaler_test.go

This file was deleted.

51 changes: 51 additions & 0 deletions receiver/kafkareceiver/pdata_unmarshaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkareceiver

import (
"go.opentelemetry.io/collector/internal/model"
)

type pdataLogsUnmarshaler struct {
model.LogsUnmarshaler
encoding string
}

func (p pdataLogsUnmarshaler) Encoding() string {
return p.encoding
}

func newPdataLogsUnmarshaler(unmarshaler model.LogsUnmarshaler, encoding string) LogsUnmarshaler {
return pdataLogsUnmarshaler{
LogsUnmarshaler: unmarshaler,
encoding: encoding,
}
}

type pdataTracesUnmarshaler struct {
model.TracesUnmarshaler
encoding string
}

func (p pdataTracesUnmarshaler) Encoding() string {
return p.encoding
}

func newPdataTracesUnmarshaler(unmarshaler model.TracesUnmarshaler, encoding string) TracesUnmarshaler {
return pdataTracesUnmarshaler{
TracesUnmarshaler: unmarshaler,
encoding: encoding,
}
}
33 changes: 33 additions & 0 deletions receiver/kafkareceiver/pdata_unmarshaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkareceiver

import (
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/internal/otlp"
)

func TestNewPdataTracesUnmarshaler(t *testing.T) {
um := newPdataTracesUnmarshaler(otlp.NewProtobufTracesUnmarshaler(), "test")
assert.Equal(t, "test", um.Encoding())
}

func TestNewPdataLogsUnmarshaler(t *testing.T) {
um := newPdataLogsUnmarshaler(otlp.NewProtobufLogsUnmarshaler(), "test")
assert.Equal(t, "test", um.Encoding())
}
9 changes: 5 additions & 4 deletions receiver/kafkareceiver/unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kafkareceiver

import (
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/otlp"
)

// TracesUnmarshaler deserializes the message body.
Expand All @@ -38,14 +39,14 @@ type LogsUnmarshaler interface {

// defaultTracesUnmarshalers returns map of supported encodings with TracesUnmarshaler.
func defaultTracesUnmarshalers() map[string]TracesUnmarshaler {
otlp := &otlpTracesPbUnmarshaler{}
otlpPb := newPdataTracesUnmarshaler(otlp.NewProtobufTracesUnmarshaler(), defaultEncoding)
jaegerProto := jaegerProtoSpanUnmarshaler{}
jaegerJSON := jaegerJSONSpanUnmarshaler{}
zipkinProto := zipkinProtoSpanUnmarshaler{}
zipkinJSON := zipkinJSONSpanUnmarshaler{}
zipkinThrift := zipkinThriftSpanUnmarshaler{}
return map[string]TracesUnmarshaler{
otlp.Encoding(): otlp,
otlpPb.Encoding(): otlpPb,
jaegerProto.Encoding(): jaegerProto,
jaegerJSON.Encoding(): jaegerJSON,
zipkinProto.Encoding(): zipkinProto,
Expand All @@ -55,8 +56,8 @@ func defaultTracesUnmarshalers() map[string]TracesUnmarshaler {
}

func defaultLogsUnmarshalers() map[string]LogsUnmarshaler {
otlp := &otlpLogsPbUnmarshaler{}
otlpPb := newPdataLogsUnmarshaler(otlp.NewProtobufLogsUnmarshaler(), defaultEncoding)
return map[string]LogsUnmarshaler{
otlp.Encoding(): otlp,
otlpPb.Encoding(): otlpPb,
}
}

0 comments on commit 49a5753

Please sign in to comment.