diff --git a/.chloggen/feat_kafkareceiver-initialoffset.yaml b/.chloggen/feat_kafkareceiver-initialoffset.yaml new file mode 100755 index 000000000000..367e072c7017 --- /dev/null +++ b/.chloggen/feat_kafkareceiver-initialoffset.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Support configuration of initial offset strategy to allow consuming form latest or earliest offset + +# One or more tracking issues related to the change +issues: [14976] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index c121de2472d2..00ebcc0646fb 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -34,6 +34,7 @@ The following settings can be optionally configured: - `raw`: (logs only) the payload's bytes are inserted as the body of a log record. - `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from - `client_id` (default = otel-collector): The consumer client ID that receiver will use +- `initial_offset` (default = latest): The initial offset to use if no offset was previously committed. Must be `latest` or `earliest`. - `auth` - `plain_text` - `username`: The username to use. diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 063060caffd8..86a18bcdd803 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -56,6 +56,9 @@ type Config struct { GroupID string `mapstructure:"group_id"` // The consumer client ID that receiver will use (default "otel-collector") ClientID string `mapstructure:"client_id"` + // The initial offset to use if no offset was previously committed. + // Must be `latest` or `earliest` (default "latest"). + InitialOffset string `mapstructure:"initial_offset"` // Metadata is the namespace for metadata management properties used by the // Client, and shared by the Producer/Consumer. @@ -70,6 +73,11 @@ type Config struct { MessageMarking MessageMarking `mapstructure:"message_marking"` } +const ( + offsetLatest string = "latest" + offsetEarliest string = "earliest" +) + var _ component.Config = (*Config)(nil) // Validate checks the receiver configuration is valid diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index 2c49b9396980..cc156eaaac4e 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -43,11 +43,12 @@ func TestLoadConfig(t *testing.T) { { id: component.NewIDWithName(metadata.Type, ""), expected: &Config{ - Topic: "spans", - Encoding: "otlp_proto", - Brokers: []string{"foo:123", "bar:456"}, - ClientID: "otel-collector", - GroupID: "otel-collector", + Topic: "spans", + Encoding: "otlp_proto", + Brokers: []string{"foo:123", "bar:456"}, + ClientID: "otel-collector", + GroupID: "otel-collector", + InitialOffset: "latest", Authentication: kafkaexporter.Authentication{ TLS: &configtls.TLSClientSetting{ TLSSetting: configtls.TLSSetting{ @@ -74,11 +75,12 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, "logs"), expected: &Config{ - Topic: "logs", - Encoding: "direct", - Brokers: []string{"coffee:123", "foobar:456"}, - ClientID: "otel-collector", - GroupID: "otel-collector", + Topic: "logs", + Encoding: "direct", + Brokers: []string{"coffee:123", "foobar:456"}, + ClientID: "otel-collector", + GroupID: "otel-collector", + InitialOffset: "earliest", Authentication: kafkaexporter.Authentication{ TLS: &configtls.TLSClientSetting{ TLSSetting: configtls.TLSSetting{ diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index 214bd3a1fadf..e87face937d2 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -28,11 +28,12 @@ import ( ) const ( - defaultTopic = "otlp_spans" - defaultEncoding = "otlp_proto" - defaultBroker = "localhost:9092" - defaultClientID = "otel-collector" - defaultGroupID = defaultClientID + defaultTopic = "otlp_spans" + defaultEncoding = "otlp_proto" + defaultBroker = "localhost:9092" + defaultClientID = "otel-collector" + defaultGroupID = defaultClientID + defaultInitialOffset = offsetLatest // default from sarama.NewConfig() defaultMetadataRetryMax = 3 @@ -100,11 +101,12 @@ func NewFactory(options ...FactoryOption) receiver.Factory { func createDefaultConfig() component.Config { return &Config{ - Topic: defaultTopic, - Encoding: defaultEncoding, - Brokers: []string{defaultBroker}, - ClientID: defaultClientID, - GroupID: defaultGroupID, + Topic: defaultTopic, + Encoding: defaultEncoding, + Brokers: []string{defaultBroker}, + ClientID: defaultClientID, + GroupID: defaultGroupID, + InitialOffset: defaultInitialOffset, Metadata: kafkaexporter.Metadata{ Full: defaultMetadataFull, Retry: kafkaexporter.MetadataRetry{ diff --git a/receiver/kafkareceiver/factory_test.go b/receiver/kafkareceiver/factory_test.go index 79b9474d474b..d295c63491b8 100644 --- a/receiver/kafkareceiver/factory_test.go +++ b/receiver/kafkareceiver/factory_test.go @@ -35,6 +35,7 @@ func TestCreateDefaultConfig(t *testing.T) { assert.Equal(t, defaultTopic, cfg.Topic) assert.Equal(t, defaultGroupID, cfg.GroupID) assert.Equal(t, defaultClientID, cfg.ClientID) + assert.Equal(t, defaultInitialOffset, cfg.InitialOffset) } func TestCreateTracesReceiver(t *testing.T) { diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 38c0c9095595..d7d57cd4388a 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -36,6 +36,7 @@ const ( ) var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding") +var errInvalidInitialOffset = fmt.Errorf("invalid initial offset") // kafkaTracesConsumer uses sarama to consume and handle messages from kafka. type kafkaTracesConsumer struct { @@ -96,6 +97,11 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff c.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable c.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval + if initialOffset, err := toSaramaInitialOffset(config.InitialOffset); err == nil { + c.Consumer.Offsets.Initial = initialOffset + } else { + return nil, err + } if config.ProtocolVersion != "" { version, err := sarama.ParseKafkaVersion(config.ProtocolVersion) if err != nil { @@ -184,7 +190,11 @@ func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshalers c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff c.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable c.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval - + if initialOffset, err := toSaramaInitialOffset(config.InitialOffset); err == nil { + c.Consumer.Offsets.Initial = initialOffset + } else { + return nil, err + } if config.ProtocolVersion != "" { version, err := sarama.ParseKafkaVersion(config.ProtocolVersion) if err != nil { @@ -273,6 +283,11 @@ func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers ma c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff c.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable c.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval + if initialOffset, err := toSaramaInitialOffset(config.InitialOffset); err == nil { + c.Consumer.Offsets.Initial = initialOffset + } else { + return nil, err + } if config.ProtocolVersion != "" { version, err := sarama.ParseKafkaVersion(config.ProtocolVersion) if err != nil { @@ -627,3 +642,16 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess } } } + +func toSaramaInitialOffset(initialOffset string) (int64, error) { + switch initialOffset { + case offsetEarliest: + return sarama.OffsetOldest, nil + case offsetLatest: + fallthrough + case "": + return sarama.OffsetNewest, nil + default: + return 0, errInvalidInitialOffset + } +} diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index eb72745a0ffc..46afd9e33c87 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -82,6 +82,17 @@ func TestNewTracesReceiver_err_auth_type(t *testing.T) { assert.Nil(t, r) } +func TestNewTracesReceiver_initial_offset_err(t *testing.T) { + c := Config{ + InitialOffset: "foo", + Encoding: defaultEncoding, + } + r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop()) + require.Error(t, err) + assert.Nil(t, r) + assert.EqualError(t, err, errInvalidInitialOffset.Error()) +} + func TestTracesReceiverStart(t *testing.T) { c := kafkaTracesConsumer{ nextConsumer: consumertest.NewNop(), @@ -328,6 +339,17 @@ func TestNewMetricsExporter_err_auth_type(t *testing.T) { assert.Nil(t, r) } +func TestNewMetricsReceiver_initial_offset_err(t *testing.T) { + c := Config{ + InitialOffset: "foo", + Encoding: defaultEncoding, + } + r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop()) + require.Error(t, err) + assert.Nil(t, r) + assert.EqualError(t, err, errInvalidInitialOffset.Error()) +} + func TestMetricsReceiverStart(t *testing.T) { c := kafkaMetricsConsumer{ nextConsumer: consumertest.NewNop(), @@ -572,6 +594,17 @@ func TestNewLogsExporter_err_auth_type(t *testing.T) { assert.Nil(t, r) } +func TestNewLogsReceiver_initial_offset_err(t *testing.T) { + c := Config{ + InitialOffset: "foo", + Encoding: defaultEncoding, + } + r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop()) + require.Error(t, err) + assert.Nil(t, r) + assert.EqualError(t, err, errInvalidInitialOffset.Error()) +} + func TestLogsReceiverStart(t *testing.T) { c := kafkaLogsConsumer{ nextConsumer: consumertest.NewNop(), @@ -775,6 +808,33 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) { wg.Wait() } +func TestToSaramaInitialOffset_earliest(t *testing.T) { + saramaInitialOffset, err := toSaramaInitialOffset(offsetEarliest) + + require.NoError(t, err) + assert.Equal(t, sarama.OffsetOldest, saramaInitialOffset) +} + +func TestToSaramaInitialOffset_latest(t *testing.T) { + saramaInitialOffset, err := toSaramaInitialOffset(offsetLatest) + + require.NoError(t, err) + assert.Equal(t, sarama.OffsetNewest, saramaInitialOffset) +} + +func TestToSaramaInitialOffset_default(t *testing.T) { + saramaInitialOffset, err := toSaramaInitialOffset("") + + require.NoError(t, err) + assert.Equal(t, sarama.OffsetNewest, saramaInitialOffset) +} + +func TestToSaramaInitialOffset_invalid(t *testing.T) { + _, err := toSaramaInitialOffset("other") + + assert.Equal(t, err, errInvalidInitialOffset) +} + type testConsumerGroupClaim struct { messageChan chan *sarama.ConsumerMessage } diff --git a/receiver/kafkareceiver/testdata/config.yaml b/receiver/kafkareceiver/testdata/config.yaml index 3addc02ea78d..f4b99b780391 100644 --- a/receiver/kafkareceiver/testdata/config.yaml +++ b/receiver/kafkareceiver/testdata/config.yaml @@ -22,6 +22,7 @@ kafka/logs: - "foobar:456" client_id: otel-collector group_id: otel-collector + initial_offset: earliest auth: tls: ca_file: ca.pem