Skip to content

Commit

Permalink
[receiver/kafkareceiver] Support configuration of initial offset stra…
Browse files Browse the repository at this point in the history
…tegy (#21408)

[receiver/kafkareceiver] support configuration of initial offset strategy
  • Loading branch information
ueisele authored May 12, 2023
1 parent 0e43390 commit f0bc322
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 21 deletions.
16 changes: 16 additions & 0 deletions .chloggen/feat_kafkareceiver-initialoffset.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receiver/kafkareceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support configuration of initial offset strategy to allow consuming form latest or earliest offset

# One or more tracking issues related to the change
issues: [14976]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
1 change: 1 addition & 0 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The following settings can be optionally configured:
- `raw`: (logs only) the payload's bytes are inserted as the body of a log record.
- `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from
- `client_id` (default = otel-collector): The consumer client ID that receiver will use
- `initial_offset` (default = latest): The initial offset to use if no offset was previously committed. Must be `latest` or `earliest`.
- `auth`
- `plain_text`
- `username`: The username to use.
Expand Down
8 changes: 8 additions & 0 deletions receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ type Config struct {
GroupID string `mapstructure:"group_id"`
// The consumer client ID that receiver will use (default "otel-collector")
ClientID string `mapstructure:"client_id"`
// The initial offset to use if no offset was previously committed.
// Must be `latest` or `earliest` (default "latest").
InitialOffset string `mapstructure:"initial_offset"`

// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Expand All @@ -70,6 +73,11 @@ type Config struct {
MessageMarking MessageMarking `mapstructure:"message_marking"`
}

const (
offsetLatest string = "latest"
offsetEarliest string = "earliest"
)

var _ component.Config = (*Config)(nil)

// Validate checks the receiver configuration is valid
Expand Down
22 changes: 12 additions & 10 deletions receiver/kafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, ""),
expected: &Config{
Topic: "spans",
Encoding: "otlp_proto",
Brokers: []string{"foo:123", "bar:456"},
ClientID: "otel-collector",
GroupID: "otel-collector",
Topic: "spans",
Encoding: "otlp_proto",
Brokers: []string{"foo:123", "bar:456"},
ClientID: "otel-collector",
GroupID: "otel-collector",
InitialOffset: "latest",
Authentication: kafkaexporter.Authentication{
TLS: &configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
Expand All @@ -74,11 +75,12 @@ func TestLoadConfig(t *testing.T) {

id: component.NewIDWithName(metadata.Type, "logs"),
expected: &Config{
Topic: "logs",
Encoding: "direct",
Brokers: []string{"coffee:123", "foobar:456"},
ClientID: "otel-collector",
GroupID: "otel-collector",
Topic: "logs",
Encoding: "direct",
Brokers: []string{"coffee:123", "foobar:456"},
ClientID: "otel-collector",
GroupID: "otel-collector",
InitialOffset: "earliest",
Authentication: kafkaexporter.Authentication{
TLS: &configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
Expand Down
22 changes: 12 additions & 10 deletions receiver/kafkareceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ import (
)

const (
defaultTopic = "otlp_spans"
defaultEncoding = "otlp_proto"
defaultBroker = "localhost:9092"
defaultClientID = "otel-collector"
defaultGroupID = defaultClientID
defaultTopic = "otlp_spans"
defaultEncoding = "otlp_proto"
defaultBroker = "localhost:9092"
defaultClientID = "otel-collector"
defaultGroupID = defaultClientID
defaultInitialOffset = offsetLatest

// default from sarama.NewConfig()
defaultMetadataRetryMax = 3
Expand Down Expand Up @@ -100,11 +101,12 @@ func NewFactory(options ...FactoryOption) receiver.Factory {

func createDefaultConfig() component.Config {
return &Config{
Topic: defaultTopic,
Encoding: defaultEncoding,
Brokers: []string{defaultBroker},
ClientID: defaultClientID,
GroupID: defaultGroupID,
Topic: defaultTopic,
Encoding: defaultEncoding,
Brokers: []string{defaultBroker},
ClientID: defaultClientID,
GroupID: defaultGroupID,
InitialOffset: defaultInitialOffset,
Metadata: kafkaexporter.Metadata{
Full: defaultMetadataFull,
Retry: kafkaexporter.MetadataRetry{
Expand Down
1 change: 1 addition & 0 deletions receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestCreateDefaultConfig(t *testing.T) {
assert.Equal(t, defaultTopic, cfg.Topic)
assert.Equal(t, defaultGroupID, cfg.GroupID)
assert.Equal(t, defaultClientID, cfg.ClientID)
assert.Equal(t, defaultInitialOffset, cfg.InitialOffset)
}

func TestCreateTracesReceiver(t *testing.T) {
Expand Down
30 changes: 29 additions & 1 deletion receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
)

var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")
var errInvalidInitialOffset = fmt.Errorf("invalid initial offset")

// kafkaTracesConsumer uses sarama to consume and handle messages from kafka.
type kafkaTracesConsumer struct {
Expand Down Expand Up @@ -96,6 +97,11 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
c.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable
c.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval
if initialOffset, err := toSaramaInitialOffset(config.InitialOffset); err == nil {
c.Consumer.Offsets.Initial = initialOffset
} else {
return nil, err
}
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
Expand Down Expand Up @@ -184,7 +190,11 @@ func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshalers
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
c.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable
c.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval

if initialOffset, err := toSaramaInitialOffset(config.InitialOffset); err == nil {
c.Consumer.Offsets.Initial = initialOffset
} else {
return nil, err
}
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
Expand Down Expand Up @@ -273,6 +283,11 @@ func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers ma
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
c.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable
c.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval
if initialOffset, err := toSaramaInitialOffset(config.InitialOffset); err == nil {
c.Consumer.Offsets.Initial = initialOffset
} else {
return nil, err
}
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
Expand Down Expand Up @@ -627,3 +642,16 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
}
}
}

func toSaramaInitialOffset(initialOffset string) (int64, error) {
switch initialOffset {
case offsetEarliest:
return sarama.OffsetOldest, nil
case offsetLatest:
fallthrough
case "":
return sarama.OffsetNewest, nil
default:
return 0, errInvalidInitialOffset
}
}
60 changes: 60 additions & 0 deletions receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ func TestNewTracesReceiver_err_auth_type(t *testing.T) {
assert.Nil(t, r)
}

func TestNewTracesReceiver_initial_offset_err(t *testing.T) {
c := Config{
InitialOffset: "foo",
Encoding: defaultEncoding,
}
r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop())
require.Error(t, err)
assert.Nil(t, r)
assert.EqualError(t, err, errInvalidInitialOffset.Error())
}

func TestTracesReceiverStart(t *testing.T) {
c := kafkaTracesConsumer{
nextConsumer: consumertest.NewNop(),
Expand Down Expand Up @@ -328,6 +339,17 @@ func TestNewMetricsExporter_err_auth_type(t *testing.T) {
assert.Nil(t, r)
}

func TestNewMetricsReceiver_initial_offset_err(t *testing.T) {
c := Config{
InitialOffset: "foo",
Encoding: defaultEncoding,
}
r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop())
require.Error(t, err)
assert.Nil(t, r)
assert.EqualError(t, err, errInvalidInitialOffset.Error())
}

func TestMetricsReceiverStart(t *testing.T) {
c := kafkaMetricsConsumer{
nextConsumer: consumertest.NewNop(),
Expand Down Expand Up @@ -572,6 +594,17 @@ func TestNewLogsExporter_err_auth_type(t *testing.T) {
assert.Nil(t, r)
}

func TestNewLogsReceiver_initial_offset_err(t *testing.T) {
c := Config{
InitialOffset: "foo",
Encoding: defaultEncoding,
}
r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop())
require.Error(t, err)
assert.Nil(t, r)
assert.EqualError(t, err, errInvalidInitialOffset.Error())
}

func TestLogsReceiverStart(t *testing.T) {
c := kafkaLogsConsumer{
nextConsumer: consumertest.NewNop(),
Expand Down Expand Up @@ -775,6 +808,33 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
wg.Wait()
}

func TestToSaramaInitialOffset_earliest(t *testing.T) {
saramaInitialOffset, err := toSaramaInitialOffset(offsetEarliest)

require.NoError(t, err)
assert.Equal(t, sarama.OffsetOldest, saramaInitialOffset)
}

func TestToSaramaInitialOffset_latest(t *testing.T) {
saramaInitialOffset, err := toSaramaInitialOffset(offsetLatest)

require.NoError(t, err)
assert.Equal(t, sarama.OffsetNewest, saramaInitialOffset)
}

func TestToSaramaInitialOffset_default(t *testing.T) {
saramaInitialOffset, err := toSaramaInitialOffset("")

require.NoError(t, err)
assert.Equal(t, sarama.OffsetNewest, saramaInitialOffset)
}

func TestToSaramaInitialOffset_invalid(t *testing.T) {
_, err := toSaramaInitialOffset("other")

assert.Equal(t, err, errInvalidInitialOffset)
}

type testConsumerGroupClaim struct {
messageChan chan *sarama.ConsumerMessage
}
Expand Down
1 change: 1 addition & 0 deletions receiver/kafkareceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ kafka/logs:
- "foobar:456"
client_id: otel-collector
group_id: otel-collector
initial_offset: earliest
auth:
tls:
ca_file: ca.pem
Expand Down

0 comments on commit f0bc322

Please sign in to comment.