Skip to content

Commit

Permalink
Add logs support on Kafka receiver (#2944)
Browse files Browse the repository at this point in the history
* Add logs support on Kafka receiver

* Update README

* feedback - public logsUnmarshallers
  • Loading branch information
sincejune authored Apr 20, 2021
1 parent 5db1af0 commit 0a2ea1b
Show file tree
Hide file tree
Showing 10 changed files with 494 additions and 10 deletions.
1 change: 1 addition & 0 deletions receiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Available metric receivers (sorted alphabetically):
Available log receivers (sorted alphabetically):

- [OTLP Receiver](otlpreceiver/README.md)
- [Kafka Receiver](kafkareceiver/README.md)

The [contrib repository](https://github.com/open-telemetry/opentelemetry-collector-contrib)
has more receivers that can be added to custom builds of the collector.
Expand Down
4 changes: 2 additions & 2 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Kafka receiver receives traces from Kafka. Message payload encoding is configurable.

Supported pipeline types: traces
Supported pipeline types: traces, logs

## Getting Started

Expand All @@ -13,7 +13,7 @@ The following settings are required:
The following settings can be optionally configured:

- `brokers` (default = localhost:9092): The list of kafka brokers
- `topic` (default = otlp_spans): The name of the kafka topic to export to
- `topic` (default = otlp_spans): The name of the kafka topic to read from
- `encoding` (default = otlp_proto): The encoding of the payload sent to kafka. Available encodings:
- `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`.
- `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`.
Expand Down
33 changes: 30 additions & 3 deletions receiver/kafkareceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,30 @@ func WithAddUnmarshallers(encodingMarshaller map[string]Unmarshaller) FactoryOpt
}
}

// WithAddLogsUnmarshallers adds LogsUnmarshallers.
func WithAddLogsUnmarshallers(encodingMarshaller map[string]LogsUnmarshaller) FactoryOption {
return func(factory *kafkaReceiverFactory) {
for encoding, unmarshaller := range encodingMarshaller {
factory.logsUnmarshaller[encoding] = unmarshaller
}
}
}

// NewFactory creates Kafka receiver factory.
func NewFactory(options ...FactoryOption) component.ReceiverFactory {
f := &kafkaReceiverFactory{
unmarshalers: defaultUnmarshallers(),
unmarshalers: defaultUnmarshallers(),
logsUnmarshaller: defaultLogsUnmarshallers(),
}
for _, o := range options {
o(f)
}
return receiverhelper.NewFactory(
typeStr,
createDefaultConfig,
receiverhelper.WithTraces(f.createTracesReceiver))
receiverhelper.WithTraces(f.createTracesReceiver),
receiverhelper.WithLogs(f.createLogsReceiver),
)
}

func createDefaultConfig() config.Receiver {
Expand All @@ -89,7 +101,8 @@ func createDefaultConfig() config.Receiver {
}

type kafkaReceiverFactory struct {
unmarshalers map[string]Unmarshaller
unmarshalers map[string]Unmarshaller
logsUnmarshaller map[string]LogsUnmarshaller
}

func (f *kafkaReceiverFactory) createTracesReceiver(
Expand All @@ -105,3 +118,17 @@ func (f *kafkaReceiverFactory) createTracesReceiver(
}
return r, nil
}

func (f *kafkaReceiverFactory) createLogsReceiver(
_ context.Context,
params component.ReceiverCreateParams,
cfg config.Receiver,
nextConsumer consumer.Logs,
) (component.LogsReceiver, error) {
c := cfg.(*Config)
r, err := newLogsReceiver(*c, params, f.logsUnmarshaller, nextConsumer)
if err != nil {
return nil, err
}
return r, nil
}
65 changes: 60 additions & 5 deletions receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestCreateTracesReceiver_error(t *testing.T) {
}

func TestWithUnmarshallers(t *testing.T) {
unmarshaller := &customUnamarshaller{}
unmarshaller := &customUnmarshaller{}
f := NewFactory(WithAddUnmarshallers(map[string]Unmarshaller{unmarshaller.Encoding(): unmarshaller}))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
Expand All @@ -80,15 +80,70 @@ func TestWithUnmarshallers(t *testing.T) {
})
}

type customUnamarshaller struct {
func TestCreateLogsReceiver(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
f := kafkaReceiverFactory{logsUnmarshaller: defaultLogsUnmarshallers()}
r, err := f.createLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
// no available broker
require.Error(t, err)
assert.Nil(t, r)
}

func TestCreateLogsReceiver_error(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.ProtocolVersion = "2.0.0"
// disable contacting broker at startup
cfg.Metadata.Full = false
f := kafkaReceiverFactory{logsUnmarshaller: defaultLogsUnmarshallers()}
r, err := f.createLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
require.NoError(t, err)
assert.NotNil(t, r)
}

func TestWithLogsUnmarshallers(t *testing.T) {
unmarshaller := &customLogsUnmarshaller{}
f := NewFactory(WithAddLogsUnmarshallers(map[string]LogsUnmarshaller{unmarshaller.Encoding(): unmarshaller}))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false
cfg.ProtocolVersion = "2.0.0"

t.Run("custom_encoding", func(t *testing.T) {
cfg.Encoding = unmarshaller.Encoding()
exporter, err := f.CreateLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
require.NoError(t, err)
require.NotNil(t, exporter)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = new(otlpLogsPbUnmarshaller).Encoding()
exporter, err := f.CreateLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
require.NoError(t, err)
assert.NotNil(t, exporter)
})
}

var _ Unmarshaller = (*customUnamarshaller)(nil)
type customUnmarshaller struct {
}

type customLogsUnmarshaller struct {
}

var _ Unmarshaller = (*customUnmarshaller)(nil)

func (c customUnmarshaller) Unmarshal([]byte) (pdata.Traces, error) {
panic("implement me")
}

func (c customUnmarshaller) Encoding() string {
return "custom"
}

func (c customUnamarshaller) Unmarshal([]byte) (pdata.Traces, error) {
func (c customLogsUnmarshaller) Unmarshal([]byte) (pdata.Logs, error) {
panic("implement me")
}

func (c customUnamarshaller) Encoding() string {
func (c customLogsUnmarshaller) Encoding() string {
return "custom"
}
143 changes: 143 additions & 0 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,20 @@ type kafkaConsumer struct {
logger *zap.Logger
}

// kafkaLogsConsumer uses sarama to consume and handle messages from kafka.
type kafkaLogsConsumer struct {
name string
consumerGroup sarama.ConsumerGroup
nextConsumer consumer.Logs
topics []string
cancelConsumeLoop context.CancelFunc
unmarshaller LogsUnmarshaller

logger *zap.Logger
}

var _ component.Receiver = (*kafkaConsumer)(nil)
var _ component.Receiver = (*kafkaLogsConsumer)(nil)

func newReceiver(config Config, params component.ReceiverCreateParams, unmarshalers map[string]Unmarshaller, nextConsumer consumer.Traces) (*kafkaConsumer, error) {
unmarshaller := unmarshalers[config.Encoding]
Expand Down Expand Up @@ -121,6 +134,77 @@ func (c *kafkaConsumer) Shutdown(context.Context) error {
return c.consumerGroup.Close()
}

func newLogsReceiver(config Config, params component.ReceiverCreateParams, unmarshalers map[string]LogsUnmarshaller, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) {
unmarshaller := unmarshalers[config.Encoding]
if unmarshaller == nil {
return nil, errUnrecognizedEncoding
}

c := sarama.NewConfig()
c.ClientID = config.ClientID
c.Metadata.Full = config.Metadata.Full
c.Metadata.Retry.Max = config.Metadata.Retry.Max
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
return nil, err
}
c.Version = version
}
if err := kafkaexporter.ConfigureAuthentication(config.Authentication, c); err != nil {
return nil, err
}
client, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, c)
if err != nil {
return nil, err
}
return &kafkaLogsConsumer{
name: config.Name(),
consumerGroup: client,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
unmarshaller: unmarshaller,
logger: params.Logger,
}, nil
}

func (c *kafkaLogsConsumer) Start(context.Context, component.Host) error {
ctx, cancel := context.WithCancel(context.Background())
c.cancelConsumeLoop = cancel
logsConsumerGroup := &logsConsumerGroupHandler{
name: c.name,
logger: c.logger,
unmarshaller: c.unmarshaller,
nextConsumer: c.nextConsumer,
ready: make(chan bool),
}
go c.consumeLoop(ctx, logsConsumerGroup)
<-logsConsumerGroup.ready
return nil
}

func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil {
c.logger.Error("Error from consumer", zap.Error(err))
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
c.logger.Info("Consumer stopped", zap.Error(ctx.Err()))
return ctx.Err()
}
}
}

func (c *kafkaLogsConsumer) Shutdown(context.Context) error {
c.cancelConsumeLoop()
return c.consumerGroup.Close()
}

type consumerGroupHandler struct {
name string
unmarshaller Unmarshaller
Expand All @@ -131,7 +215,18 @@ type consumerGroupHandler struct {
logger *zap.Logger
}

type logsConsumerGroupHandler struct {
name string
unmarshaller LogsUnmarshaller
nextConsumer consumer.Logs
ready chan bool
readyCloser sync.Once

logger *zap.Logger
}

var _ sarama.ConsumerGroupHandler = (*consumerGroupHandler)(nil)
var _ sarama.ConsumerGroupHandler = (*logsConsumerGroupHandler)(nil)

func (c *consumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
c.readyCloser.Do(func() {
Expand Down Expand Up @@ -180,3 +275,51 @@ func (c *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
}
return nil
}

func (c *logsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
c.readyCloser.Do(func() {
close(c.ready)
})
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.name)}
_ = stats.RecordWithTags(session.Context(), statsTags, statPartitionStart.M(1))
return nil
}

func (c *logsConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.name)}
_ = stats.RecordWithTags(session.Context(), statsTags, statPartitionClose.M(1))
return nil
}

func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition()))
for message := range claim.Messages() {
c.logger.Debug("Kafka message claimed",
zap.String("value", string(message.Value)),
zap.Time("timestamp", message.Timestamp),
zap.String("topic", message.Topic))
session.MarkMessage(message, "")

ctx := obsreport.ReceiverContext(session.Context(), c.name, transport)
ctx = obsreport.StartTraceDataReceiveOp(ctx, c.name, transport)
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.name)}
_ = stats.RecordWithTags(ctx, statsTags,
statMessageCount.M(1),
statMessageOffset.M(message.Offset),
statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1))

logs, err := c.unmarshaller.Unmarshal(message.Value)
if err != nil {
c.logger.Error("failed to unmarshall message", zap.Error(err))
return err
}

err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
// TODO
obsreport.EndTraceDataReceiveOp(ctx, c.unmarshaller.Encoding(), logs.LogRecordCount(), err)
if err != nil {
return err
}
}
return nil
}
Loading

0 comments on commit 0a2ea1b

Please sign in to comment.