Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logs support on Kafka receiver #2944

Merged
merged 4 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions receiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Available metric receivers (sorted alphabetically):
Available log receivers (sorted alphabetically):

- [OTLP Receiver](otlpreceiver/README.md)
- [Kafka Receiver](kafkareceiver/README.md)

The [contrib repository](https://github.com/open-telemetry/opentelemetry-collector-contrib)
has more receivers that can be added to custom builds of the collector.
Expand Down
4 changes: 2 additions & 2 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Kafka receiver receives traces from Kafka. Message payload encoding is configurable.

Supported pipeline types: traces
Supported pipeline types: traces, logs

## Getting Started

Expand All @@ -13,7 +13,7 @@ The following settings are required:
The following settings can be optionally configured:

- `brokers` (default = localhost:9092): The list of kafka brokers
- `topic` (default = otlp_spans): The name of the kafka topic to export to
- `topic` (default = otlp_spans): The name of the kafka topic to read from
- `encoding` (default = otlp_proto): The encoding of the payload sent to kafka. Available encodings:
- `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`.
- `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`.
Expand Down
33 changes: 30 additions & 3 deletions receiver/kafkareceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,30 @@ func WithAddUnmarshallers(encodingMarshaller map[string]Unmarshaller) FactoryOpt
}
}

// WithAddLogsUnmarshallers adds logsUnmarshallers.
func WithAddLogsUnmarshallers(encodingMarshaller map[string]logsUnmarshaller) FactoryOption {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Names are inconsistent, probably worth updating the previous one to include traces.

Also I would change the API to:

func WithLogsUnmarshallers(encoding string, unmarshaller logsUnmarshaller) FactoryOption {}

Or even consider to add a name func on the interface to simplify usage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there is an Encoding so we can do:

func WithLogsUnmarshallers(unmarshallers logsUnmarshaller...) FactoryOption {}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is actually consistent with the traces' method here:

// WithAddUnmarshallers adds marshallers.
func WithAddUnmarshallers(encodingMarshaller map[string]Unmarshaller) FactoryOption {
return func(factory *kafkaReceiverFactory) {
for encoding, unmarshaller := range encodingMarshaller {
factory.unmarshalers[encoding] = unmarshaller
}
}
}

I'd like to keep the parameters consistent with trace's method and there are multiple encodings and return one factory in traces' method. But I'd like to rename the method name to be consistent in a subsequent PR. Thoughts?

return func(factory *kafkaReceiverFactory) {
for encoding, unmarshaller := range encodingMarshaller {
factory.logsUnmarshaller[encoding] = unmarshaller
}
}
}

// NewFactory creates Kafka receiver factory.
func NewFactory(options ...FactoryOption) component.ReceiverFactory {
f := &kafkaReceiverFactory{
unmarshalers: defaultUnmarshallers(),
unmarshalers: defaultUnmarshallers(),
logsUnmarshaller: defaultLogsUnmarshallers(),
}
for _, o := range options {
o(f)
}
return receiverhelper.NewFactory(
typeStr,
createDefaultConfig,
receiverhelper.WithTraces(f.createTracesReceiver))
receiverhelper.WithTraces(f.createTracesReceiver),
receiverhelper.WithLogs(f.createLogsReceiver),
)
}

func createDefaultConfig() config.Receiver {
Expand All @@ -89,7 +101,8 @@ func createDefaultConfig() config.Receiver {
}

type kafkaReceiverFactory struct {
unmarshalers map[string]Unmarshaller
unmarshalers map[string]Unmarshaller
logsUnmarshaller map[string]logsUnmarshaller
}

func (f *kafkaReceiverFactory) createTracesReceiver(
Expand All @@ -105,3 +118,17 @@ func (f *kafkaReceiverFactory) createTracesReceiver(
}
return r, nil
}

func (f *kafkaReceiverFactory) createLogsReceiver(
_ context.Context,
params component.ReceiverCreateParams,
cfg config.Receiver,
nextConsumer consumer.Logs,
) (component.LogsReceiver, error) {
c := cfg.(*Config)
r, err := newLogsReceiver(*c, params, f.logsUnmarshaller, nextConsumer)
if err != nil {
return nil, err
}
return r, nil
}
65 changes: 60 additions & 5 deletions receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestCreateTracesReceiver_error(t *testing.T) {
}

func TestWithUnmarshallers(t *testing.T) {
unmarshaller := &customUnamarshaller{}
unmarshaller := &customUnmarshaller{}
f := NewFactory(WithAddUnmarshallers(map[string]Unmarshaller{unmarshaller.Encoding(): unmarshaller}))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
Expand All @@ -80,15 +80,70 @@ func TestWithUnmarshallers(t *testing.T) {
})
}

type customUnamarshaller struct {
func TestCreateLogsReceiver(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
f := kafkaReceiverFactory{logsUnmarshaller: defaultLogsUnmarshallers()}
r, err := f.createLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
// no available broker
require.Error(t, err)
assert.Nil(t, r)
}

func TestCreateLogsReceiver_error(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.ProtocolVersion = "2.0.0"
// disable contacting broker at startup
cfg.Metadata.Full = false
f := kafkaReceiverFactory{logsUnmarshaller: defaultLogsUnmarshallers()}
r, err := f.createLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
require.NoError(t, err)
assert.NotNil(t, r)
}

func TestWithLogsUnmarshallers(t *testing.T) {
unmarshaller := &customLogsUnmarshaller{}
f := NewFactory(WithAddLogsUnmarshallers(map[string]logsUnmarshaller{unmarshaller.Encoding(): unmarshaller}))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false
cfg.ProtocolVersion = "2.0.0"

t.Run("custom_encoding", func(t *testing.T) {
cfg.Encoding = unmarshaller.Encoding()
exporter, err := f.CreateLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
require.NoError(t, err)
require.NotNil(t, exporter)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = new(otlpLogsPbUnmarshaller).Encoding()
exporter, err := f.CreateLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
require.NoError(t, err)
assert.NotNil(t, exporter)
})
}

var _ Unmarshaller = (*customUnamarshaller)(nil)
type customUnmarshaller struct {
}

type customLogsUnmarshaller struct {
}

var _ Unmarshaller = (*customUnmarshaller)(nil)

func (c customUnmarshaller) Unmarshal([]byte) (pdata.Traces, error) {
panic("implement me")
}

func (c customUnmarshaller) Encoding() string {
return "custom"
}

func (c customUnamarshaller) Unmarshal([]byte) (pdata.Traces, error) {
func (c customLogsUnmarshaller) Unmarshal([]byte) (pdata.Logs, error) {
panic("implement me")
}

func (c customUnamarshaller) Encoding() string {
func (c customLogsUnmarshaller) Encoding() string {
return "custom"
}
143 changes: 143 additions & 0 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,20 @@ type kafkaConsumer struct {
logger *zap.Logger
}

// kafkaLogsConsumer uses sarama to consume and handle messages from kafka.
type kafkaLogsConsumer struct {
name string
consumerGroup sarama.ConsumerGroup
nextConsumer consumer.Logs
topics []string
cancelConsumeLoop context.CancelFunc
unmarshaller logsUnmarshaller

logger *zap.Logger
}

var _ component.Receiver = (*kafkaConsumer)(nil)
var _ component.Receiver = (*kafkaLogsConsumer)(nil)

func newReceiver(config Config, params component.ReceiverCreateParams, unmarshalers map[string]Unmarshaller, nextConsumer consumer.Traces) (*kafkaConsumer, error) {
unmarshaller := unmarshalers[config.Encoding]
Expand Down Expand Up @@ -121,6 +134,77 @@ func (c *kafkaConsumer) Shutdown(context.Context) error {
return c.consumerGroup.Close()
}

func newLogsReceiver(config Config, params component.ReceiverCreateParams, unmarshalers map[string]logsUnmarshaller, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) {
unmarshaller := unmarshalers[config.Encoding]
if unmarshaller == nil {
return nil, errUnrecognizedEncoding
}

c := sarama.NewConfig()
c.ClientID = config.ClientID
c.Metadata.Full = config.Metadata.Full
c.Metadata.Retry.Max = config.Metadata.Retry.Max
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
return nil, err
}
c.Version = version
}
if err := kafkaexporter.ConfigureAuthentication(config.Authentication, c); err != nil {
return nil, err
}
client, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, c)
if err != nil {
return nil, err
}
return &kafkaLogsConsumer{
name: config.Name(),
consumerGroup: client,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
unmarshaller: unmarshaller,
logger: params.Logger,
}, nil
}

func (c *kafkaLogsConsumer) Start(context.Context, component.Host) error {
ctx, cancel := context.WithCancel(context.Background())
c.cancelConsumeLoop = cancel
logsConsumerGroup := &logsConsumerGroupHandler{
name: c.name,
logger: c.logger,
unmarshaller: c.unmarshaller,
nextConsumer: c.nextConsumer,
ready: make(chan bool),
}
go c.consumeLoop(ctx, logsConsumerGroup)
<-logsConsumerGroup.ready
return nil
}

func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil {
c.logger.Error("Error from consumer", zap.Error(err))
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
c.logger.Info("Consumer stopped", zap.Error(ctx.Err()))
return ctx.Err()
}
}
}

func (c *kafkaLogsConsumer) Shutdown(context.Context) error {
c.cancelConsumeLoop()
return c.consumerGroup.Close()
}

type consumerGroupHandler struct {
name string
unmarshaller Unmarshaller
Expand All @@ -131,7 +215,18 @@ type consumerGroupHandler struct {
logger *zap.Logger
}

type logsConsumerGroupHandler struct {
name string
unmarshaller logsUnmarshaller
nextConsumer consumer.Logs
ready chan bool
readyCloser sync.Once

logger *zap.Logger
}

var _ sarama.ConsumerGroupHandler = (*consumerGroupHandler)(nil)
var _ sarama.ConsumerGroupHandler = (*logsConsumerGroupHandler)(nil)

func (c *consumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
c.readyCloser.Do(func() {
Expand Down Expand Up @@ -179,3 +274,51 @@ func (c *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
}
return nil
}

func (c *logsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
c.readyCloser.Do(func() {
close(c.ready)
})
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.name)}
_ = stats.RecordWithTags(session.Context(), statsTags, statPartitionStart.M(1))
return nil
}

func (c *logsConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.name)}
_ = stats.RecordWithTags(session.Context(), statsTags, statPartitionClose.M(1))
return nil
}

func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition()))
for message := range claim.Messages() {
c.logger.Debug("Kafka message claimed",
zap.String("value", string(message.Value)),
zap.Time("timestamp", message.Timestamp),
zap.String("topic", message.Topic))
session.MarkMessage(message, "")

ctx := obsreport.ReceiverContext(session.Context(), c.name, transport)
ctx = obsreport.StartTraceDataReceiveOp(ctx, c.name, transport)
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.name)}
_ = stats.RecordWithTags(ctx, statsTags,
statMessageCount.M(1),
statMessageOffset.M(message.Offset),
statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1))

logs, err := c.unmarshaller.Unmarshal(message.Value)
if err != nil {
c.logger.Error("failed to unmarshall message", zap.Error(err))
return err
}

err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
// TODO
obsreport.EndTraceDataReceiveOp(ctx, c.unmarshaller.Encoding(), logs.LogRecordCount(), err)
if err != nil {
return err
}
}
return nil
}
Loading