diff --git a/receiver/README.md b/receiver/README.md index 98d182849f5..426f4a81453 100644 --- a/receiver/README.md +++ b/receiver/README.md @@ -24,6 +24,7 @@ Available metric receivers (sorted alphabetically): Available log receivers (sorted alphabetically): - [OTLP Receiver](otlpreceiver/README.md) +- [Kafka Receiver](kafkareceiver/README.md) The [contrib repository](https://github.com/open-telemetry/opentelemetry-collector-contrib) has more receivers that can be added to custom builds of the collector. diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index f371ce08df3..61e2300d9b6 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -2,7 +2,7 @@ Kafka receiver receives traces from Kafka. Message payload encoding is configurable. -Supported pipeline types: traces +Supported pipeline types: traces, logs ## Getting Started @@ -13,7 +13,7 @@ The following settings are required: The following settings can be optionally configured: - `brokers` (default = localhost:9092): The list of kafka brokers -- `topic` (default = otlp_spans): The name of the kafka topic to export to +- `topic` (default = otlp_spans): The name of the kafka topic to read from - `encoding` (default = otlp_proto): The encoding of the payload sent to kafka. Available encodings: - `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`. - `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`. diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index 481c4e51a65..fe637883d2b 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -53,10 +53,20 @@ func WithAddUnmarshallers(encodingMarshaller map[string]Unmarshaller) FactoryOpt } } +// WithAddLogsUnmarshallers adds LogsUnmarshallers. +func WithAddLogsUnmarshallers(encodingMarshaller map[string]LogsUnmarshaller) FactoryOption { + return func(factory *kafkaReceiverFactory) { + for encoding, unmarshaller := range encodingMarshaller { + factory.logsUnmarshaller[encoding] = unmarshaller + } + } +} + // NewFactory creates Kafka receiver factory. func NewFactory(options ...FactoryOption) component.ReceiverFactory { f := &kafkaReceiverFactory{ - unmarshalers: defaultUnmarshallers(), + unmarshalers: defaultUnmarshallers(), + logsUnmarshaller: defaultLogsUnmarshallers(), } for _, o := range options { o(f) @@ -64,7 +74,9 @@ func NewFactory(options ...FactoryOption) component.ReceiverFactory { return receiverhelper.NewFactory( typeStr, createDefaultConfig, - receiverhelper.WithTraces(f.createTracesReceiver)) + receiverhelper.WithTraces(f.createTracesReceiver), + receiverhelper.WithLogs(f.createLogsReceiver), + ) } func createDefaultConfig() config.Receiver { @@ -89,7 +101,8 @@ func createDefaultConfig() config.Receiver { } type kafkaReceiverFactory struct { - unmarshalers map[string]Unmarshaller + unmarshalers map[string]Unmarshaller + logsUnmarshaller map[string]LogsUnmarshaller } func (f *kafkaReceiverFactory) createTracesReceiver( @@ -105,3 +118,17 @@ func (f *kafkaReceiverFactory) createTracesReceiver( } return r, nil } + +func (f *kafkaReceiverFactory) createLogsReceiver( + _ context.Context, + params component.ReceiverCreateParams, + cfg config.Receiver, + nextConsumer consumer.Logs, +) (component.LogsReceiver, error) { + c := cfg.(*Config) + r, err := newLogsReceiver(*c, params, f.logsUnmarshaller, nextConsumer) + if err != nil { + return nil, err + } + return r, nil +} diff --git a/receiver/kafkareceiver/factory_test.go b/receiver/kafkareceiver/factory_test.go index 8eac6c083ec..bd6b0fb6b1c 100644 --- a/receiver/kafkareceiver/factory_test.go +++ b/receiver/kafkareceiver/factory_test.go @@ -59,7 +59,7 @@ func TestCreateTracesReceiver_error(t *testing.T) { } func TestWithUnmarshallers(t *testing.T) { - unmarshaller := &customUnamarshaller{} + unmarshaller := &customUnmarshaller{} f := NewFactory(WithAddUnmarshallers(map[string]Unmarshaller{unmarshaller.Encoding(): unmarshaller})) cfg := createDefaultConfig().(*Config) // disable contacting broker @@ -80,15 +80,70 @@ func TestWithUnmarshallers(t *testing.T) { }) } -type customUnamarshaller struct { +func TestCreateLogsReceiver(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Brokers = []string{"invalid:9092"} + cfg.ProtocolVersion = "2.0.0" + f := kafkaReceiverFactory{logsUnmarshaller: defaultLogsUnmarshallers()} + r, err := f.createLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil) + // no available broker + require.Error(t, err) + assert.Nil(t, r) +} + +func TestCreateLogsReceiver_error(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.ProtocolVersion = "2.0.0" + // disable contacting broker at startup + cfg.Metadata.Full = false + f := kafkaReceiverFactory{logsUnmarshaller: defaultLogsUnmarshallers()} + r, err := f.createLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil) + require.NoError(t, err) + assert.NotNil(t, r) +} + +func TestWithLogsUnmarshallers(t *testing.T) { + unmarshaller := &customLogsUnmarshaller{} + f := NewFactory(WithAddLogsUnmarshallers(map[string]LogsUnmarshaller{unmarshaller.Encoding(): unmarshaller})) + cfg := createDefaultConfig().(*Config) + // disable contacting broker + cfg.Metadata.Full = false + cfg.ProtocolVersion = "2.0.0" + + t.Run("custom_encoding", func(t *testing.T) { + cfg.Encoding = unmarshaller.Encoding() + exporter, err := f.CreateLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil) + require.NoError(t, err) + require.NotNil(t, exporter) + }) + t.Run("default_encoding", func(t *testing.T) { + cfg.Encoding = new(otlpLogsPbUnmarshaller).Encoding() + exporter, err := f.CreateLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil) + require.NoError(t, err) + assert.NotNil(t, exporter) + }) } -var _ Unmarshaller = (*customUnamarshaller)(nil) +type customUnmarshaller struct { +} + +type customLogsUnmarshaller struct { +} + +var _ Unmarshaller = (*customUnmarshaller)(nil) + +func (c customUnmarshaller) Unmarshal([]byte) (pdata.Traces, error) { + panic("implement me") +} + +func (c customUnmarshaller) Encoding() string { + return "custom" +} -func (c customUnamarshaller) Unmarshal([]byte) (pdata.Traces, error) { +func (c customLogsUnmarshaller) Unmarshal([]byte) (pdata.Logs, error) { panic("implement me") } -func (c customUnamarshaller) Encoding() string { +func (c customLogsUnmarshaller) Encoding() string { return "custom" } diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index c811b2234ed..65825a1118a 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -48,7 +48,20 @@ type kafkaConsumer struct { logger *zap.Logger } +// kafkaLogsConsumer uses sarama to consume and handle messages from kafka. +type kafkaLogsConsumer struct { + name string + consumerGroup sarama.ConsumerGroup + nextConsumer consumer.Logs + topics []string + cancelConsumeLoop context.CancelFunc + unmarshaller LogsUnmarshaller + + logger *zap.Logger +} + var _ component.Receiver = (*kafkaConsumer)(nil) +var _ component.Receiver = (*kafkaLogsConsumer)(nil) func newReceiver(config Config, params component.ReceiverCreateParams, unmarshalers map[string]Unmarshaller, nextConsumer consumer.Traces) (*kafkaConsumer, error) { unmarshaller := unmarshalers[config.Encoding] @@ -121,6 +134,77 @@ func (c *kafkaConsumer) Shutdown(context.Context) error { return c.consumerGroup.Close() } +func newLogsReceiver(config Config, params component.ReceiverCreateParams, unmarshalers map[string]LogsUnmarshaller, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) { + unmarshaller := unmarshalers[config.Encoding] + if unmarshaller == nil { + return nil, errUnrecognizedEncoding + } + + c := sarama.NewConfig() + c.ClientID = config.ClientID + c.Metadata.Full = config.Metadata.Full + c.Metadata.Retry.Max = config.Metadata.Retry.Max + c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff + if config.ProtocolVersion != "" { + version, err := sarama.ParseKafkaVersion(config.ProtocolVersion) + if err != nil { + return nil, err + } + c.Version = version + } + if err := kafkaexporter.ConfigureAuthentication(config.Authentication, c); err != nil { + return nil, err + } + client, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, c) + if err != nil { + return nil, err + } + return &kafkaLogsConsumer{ + name: config.Name(), + consumerGroup: client, + topics: []string{config.Topic}, + nextConsumer: nextConsumer, + unmarshaller: unmarshaller, + logger: params.Logger, + }, nil +} + +func (c *kafkaLogsConsumer) Start(context.Context, component.Host) error { + ctx, cancel := context.WithCancel(context.Background()) + c.cancelConsumeLoop = cancel + logsConsumerGroup := &logsConsumerGroupHandler{ + name: c.name, + logger: c.logger, + unmarshaller: c.unmarshaller, + nextConsumer: c.nextConsumer, + ready: make(chan bool), + } + go c.consumeLoop(ctx, logsConsumerGroup) + <-logsConsumerGroup.ready + return nil +} + +func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error { + for { + // `Consume` should be called inside an infinite loop, when a + // server-side rebalance happens, the consumer session will need to be + // recreated to get the new claims + if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil { + c.logger.Error("Error from consumer", zap.Error(err)) + } + // check if context was cancelled, signaling that the consumer should stop + if ctx.Err() != nil { + c.logger.Info("Consumer stopped", zap.Error(ctx.Err())) + return ctx.Err() + } + } +} + +func (c *kafkaLogsConsumer) Shutdown(context.Context) error { + c.cancelConsumeLoop() + return c.consumerGroup.Close() +} + type consumerGroupHandler struct { name string unmarshaller Unmarshaller @@ -131,7 +215,18 @@ type consumerGroupHandler struct { logger *zap.Logger } +type logsConsumerGroupHandler struct { + name string + unmarshaller LogsUnmarshaller + nextConsumer consumer.Logs + ready chan bool + readyCloser sync.Once + + logger *zap.Logger +} + var _ sarama.ConsumerGroupHandler = (*consumerGroupHandler)(nil) +var _ sarama.ConsumerGroupHandler = (*logsConsumerGroupHandler)(nil) func (c *consumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { c.readyCloser.Do(func() { @@ -180,3 +275,51 @@ func (c *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, } return nil } + +func (c *logsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { + c.readyCloser.Do(func() { + close(c.ready) + }) + statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.name)} + _ = stats.RecordWithTags(session.Context(), statsTags, statPartitionStart.M(1)) + return nil +} + +func (c *logsConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { + statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.name)} + _ = stats.RecordWithTags(session.Context(), statsTags, statPartitionClose.M(1)) + return nil +} + +func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition())) + for message := range claim.Messages() { + c.logger.Debug("Kafka message claimed", + zap.String("value", string(message.Value)), + zap.Time("timestamp", message.Timestamp), + zap.String("topic", message.Topic)) + session.MarkMessage(message, "") + + ctx := obsreport.ReceiverContext(session.Context(), c.name, transport) + ctx = obsreport.StartTraceDataReceiveOp(ctx, c.name, transport) + statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.name)} + _ = stats.RecordWithTags(ctx, statsTags, + statMessageCount.M(1), + statMessageOffset.M(message.Offset), + statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1)) + + logs, err := c.unmarshaller.Unmarshal(message.Value) + if err != nil { + c.logger.Error("failed to unmarshall message", zap.Error(err)) + return err + } + + err = c.nextConsumer.ConsumeLogs(session.Context(), logs) + // TODO + obsreport.EndTraceDataReceiveOp(ctx, c.unmarshaller.Encoding(), logs.LogRecordCount(), err) + if err != nil { + return err + } + } + return nil +} diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index ef53227aa31..114612a263f 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -35,6 +35,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/kafkaexporter" + "go.opentelemetry.io/collector/internal/testdata" ) func TestNewReceiver_version_err(t *testing.T) { @@ -228,6 +229,198 @@ func TestConsumerGroupHandler_error_nextConsumer(t *testing.T) { wg.Wait() } +func TestNewLogsReceiver_version_err(t *testing.T) { + c := Config{ + Encoding: defaultEncoding, + ProtocolVersion: "none", + } + r, err := newLogsReceiver(c, component.ReceiverCreateParams{}, defaultLogsUnmarshallers(), consumertest.NewNop()) + assert.Error(t, err) + assert.Nil(t, r) +} + +func TestNewLogsReceiver_encoding_err(t *testing.T) { + c := Config{ + Encoding: "foo", + } + r, err := newLogsReceiver(c, component.ReceiverCreateParams{}, defaultLogsUnmarshallers(), consumertest.NewNop()) + require.Error(t, err) + assert.Nil(t, r) + assert.EqualError(t, err, errUnrecognizedEncoding.Error()) +} + +func TestNewLogsExporter_err_auth_type(t *testing.T) { + c := Config{ + ProtocolVersion: "2.0.0", + Authentication: kafkaexporter.Authentication{ + TLS: &configtls.TLSClientSetting{ + TLSSetting: configtls.TLSSetting{ + CAFile: "/doesnotexist", + }, + }, + }, + Encoding: defaultEncoding, + Metadata: kafkaexporter.Metadata{ + Full: false, + }, + } + r, err := newLogsReceiver(c, component.ReceiverCreateParams{}, defaultLogsUnmarshallers(), consumertest.NewNop()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to load TLS config") + assert.Nil(t, r) +} + +func TestLogsReceiverStart(t *testing.T) { + testClient := testConsumerGroup{once: &sync.Once{}} + c := kafkaLogsConsumer{ + nextConsumer: consumertest.NewNop(), + logger: zap.NewNop(), + consumerGroup: testClient, + } + + err := c.Start(context.Background(), nil) + require.NoError(t, err) + c.Shutdown(context.Background()) +} + +func TestLogsReceiverStartConsume(t *testing.T) { + testClient := testConsumerGroup{once: &sync.Once{}} + c := kafkaLogsConsumer{ + nextConsumer: consumertest.NewNop(), + logger: zap.NewNop(), + consumerGroup: testClient, + } + ctx, cancelFunc := context.WithCancel(context.Background()) + c.cancelConsumeLoop = cancelFunc + c.Shutdown(context.Background()) + err := c.consumeLoop(ctx, &logsConsumerGroupHandler{ + ready: make(chan bool), + }) + assert.EqualError(t, err, context.Canceled.Error()) +} + +func TestLogsReceiver_error(t *testing.T) { + zcore, logObserver := observer.New(zapcore.ErrorLevel) + logger := zap.New(zcore) + + expectedErr := fmt.Errorf("handler error") + testClient := testConsumerGroup{once: &sync.Once{}, err: expectedErr} + c := kafkaLogsConsumer{ + nextConsumer: consumertest.NewNop(), + logger: logger, + consumerGroup: testClient, + } + + err := c.Start(context.Background(), nil) + require.NoError(t, err) + c.Shutdown(context.Background()) + waitUntil(func() bool { + return logObserver.FilterField(zap.Error(expectedErr)).Len() > 0 + }, 100, time.Millisecond*100) + assert.True(t, logObserver.FilterField(zap.Error(expectedErr)).Len() > 0) +} + +func TestLogsConsumerGroupHandler(t *testing.T) { + views := MetricViews() + view.Register(views...) + defer view.Unregister(views...) + + c := logsConsumerGroupHandler{ + unmarshaller: &otlpLogsPbUnmarshaller{}, + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + } + + testSession := testConsumerGroupSession{} + err := c.Setup(testSession) + require.NoError(t, err) + _, ok := <-c.ready + assert.False(t, ok) + viewData, err := view.RetrieveData(statPartitionStart.Name()) + require.NoError(t, err) + assert.Equal(t, 1, len(viewData)) + distData := viewData[0].Data.(*view.SumData) + assert.Equal(t, float64(1), distData.Value) + + err = c.Cleanup(testSession) + require.NoError(t, err) + viewData, err = view.RetrieveData(statPartitionClose.Name()) + require.NoError(t, err) + assert.Equal(t, 1, len(viewData)) + distData = viewData[0].Data.(*view.SumData) + assert.Equal(t, float64(1), distData.Value) + + groupClaim := testConsumerGroupClaim{ + messageChan: make(chan *sarama.ConsumerMessage), + } + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + err = c.ConsumeClaim(testSession, groupClaim) + require.NoError(t, err) + wg.Done() + }() + + groupClaim.messageChan <- &sarama.ConsumerMessage{} + close(groupClaim.messageChan) + wg.Wait() +} + +func TestLogsConsumerGroupHandler_error_unmarshall(t *testing.T) { + c := logsConsumerGroupHandler{ + unmarshaller: &otlpLogsPbUnmarshaller{}, + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + } + + wg := sync.WaitGroup{} + wg.Add(1) + groupClaim := &testConsumerGroupClaim{ + messageChan: make(chan *sarama.ConsumerMessage), + } + go func() { + err := c.ConsumeClaim(testConsumerGroupSession{}, groupClaim) + require.Error(t, err) + wg.Done() + }() + groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")} + close(groupClaim.messageChan) + wg.Wait() +} + +func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) { + consumerError := errors.New("failed to consumer") + c := logsConsumerGroupHandler{ + unmarshaller: &otlpLogsPbUnmarshaller{}, + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(consumerError), + } + + wg := sync.WaitGroup{} + wg.Add(1) + groupClaim := &testConsumerGroupClaim{ + messageChan: make(chan *sarama.ConsumerMessage), + } + go func() { + e := c.ConsumeClaim(testConsumerGroupSession{}, groupClaim) + assert.EqualError(t, e, consumerError.Error()) + wg.Done() + }() + + ld := testdata.GenerateLogDataOneLog() + bts, err := ld.ToOtlpProtoBytes() + require.NoError(t, err) + groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} + close(groupClaim.messageChan) + wg.Wait() +} + +// end + type testConsumerGroupClaim struct { messageChan chan *sarama.ConsumerMessage } diff --git a/receiver/kafkareceiver/otlp_unmarshaller.go b/receiver/kafkareceiver/otlp_unmarshaller.go index c4ba90d3908..a6dd53f5326 100644 --- a/receiver/kafkareceiver/otlp_unmarshaller.go +++ b/receiver/kafkareceiver/otlp_unmarshaller.go @@ -30,3 +30,16 @@ func (p *otlpTracesPbUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) func (*otlpTracesPbUnmarshaller) Encoding() string { return defaultEncoding } + +type otlpLogsPbUnmarshaller struct { +} + +var _ LogsUnmarshaller = (*otlpLogsPbUnmarshaller)(nil) + +func (p *otlpLogsPbUnmarshaller) Unmarshal(bytes []byte) (pdata.Logs, error) { + return pdata.LogsFromOtlpProtoBytes(bytes) +} + +func (*otlpLogsPbUnmarshaller) Encoding() string { + return defaultEncoding +} diff --git a/receiver/kafkareceiver/otlp_unmarshaller_test.go b/receiver/kafkareceiver/otlp_unmarshaller_test.go index cd3a77b5f09..2259e1f8aaf 100644 --- a/receiver/kafkareceiver/otlp_unmarshaller_test.go +++ b/receiver/kafkareceiver/otlp_unmarshaller_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal/testdata" ) func TestUnmarshallOTLP(t *testing.T) { @@ -44,3 +45,23 @@ func TestUnmarshallOTLP_error(t *testing.T) { _, err := p.Unmarshal([]byte("+$%")) assert.Error(t, err) } + +func TestUnmarshallOTLPLogs(t *testing.T) { + ld := testdata.GenerateLogDataOneLog() + + expected, err := ld.ToOtlpProtoBytes() + require.NoError(t, err) + + p := otlpLogsPbUnmarshaller{} + got, err := p.Unmarshal(expected) + require.NoError(t, err) + + assert.Equal(t, ld, got) + assert.Equal(t, "otlp_proto", p.Encoding()) +} + +func TestUnmarshallOTLPLogs_error(t *testing.T) { + p := otlpLogsPbUnmarshaller{} + _, err := p.Unmarshal([]byte("+$%")) + assert.Error(t, err) +} diff --git a/receiver/kafkareceiver/unmarshaller.go b/receiver/kafkareceiver/unmarshaller.go index 25680fa0632..57973b75a0d 100644 --- a/receiver/kafkareceiver/unmarshaller.go +++ b/receiver/kafkareceiver/unmarshaller.go @@ -27,6 +27,15 @@ type Unmarshaller interface { Encoding() string } +// LogsUnmarshaller deserializes the message body. +type LogsUnmarshaller interface { + // Unmarshal deserializes the message body into traces. + Unmarshal([]byte) (pdata.Logs, error) + + // Encoding of the serialized messages. + Encoding() string +} + // defaultUnmarshallers returns map of supported encodings with Unmarshaller. func defaultUnmarshallers() map[string]Unmarshaller { otlp := &otlpTracesPbUnmarshaller{} @@ -44,3 +53,10 @@ func defaultUnmarshallers() map[string]Unmarshaller { zipkinThrift.Encoding(): zipkinThrift, } } + +func defaultLogsUnmarshallers() map[string]LogsUnmarshaller { + otlp := &otlpLogsPbUnmarshaller{} + return map[string]LogsUnmarshaller{ + otlp.Encoding(): otlp, + } +} diff --git a/receiver/kafkareceiver/unmarshaller_test.go b/receiver/kafkareceiver/unmarshaller_test.go index 952eb9ced22..8c6b39ed834 100644 --- a/receiver/kafkareceiver/unmarshaller_test.go +++ b/receiver/kafkareceiver/unmarshaller_test.go @@ -40,3 +40,18 @@ func TestDefaultUnMarshaller(t *testing.T) { }) } } + +func TestDefaultLogsUnMarshaller(t *testing.T) { + expectedEncodings := []string{ + "otlp_proto", + } + marshallers := defaultLogsUnmarshallers() + assert.Equal(t, len(expectedEncodings), len(marshallers)) + for _, e := range expectedEncodings { + t.Run(e, func(t *testing.T) { + m, ok := marshallers[e] + require.True(t, ok) + assert.NotNil(t, m) + }) + } +}