Skip to content

Commit

Permalink
Backporting fixes for #3609, #3529 (#3616)
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick Assuied <[email protected]>
Co-authored-by: Yaron Schneider <[email protected]>
  • Loading branch information
passuied and yaron2 authored Nov 28, 2024
1 parent c3677e3 commit 042151e
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 7 deletions.
32 changes: 25 additions & 7 deletions common/component/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package kafka

import (
"context"
"errors"
"fmt"
"net/url"
Expand All @@ -32,6 +33,29 @@ type consumer struct {
mutex sync.Mutex
}

func notifyRecover(consumer *consumer, message *sarama.ConsumerMessage, session sarama.ConsumerGroupSession, b backoff.BackOff) error {
for {
if err := retry.NotifyRecover(func() error {
return consumer.doCallback(session, message)
}, b, func(err error, d time.Duration) {
consumer.k.logger.Warnf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
}, func() {
consumer.k.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key))
}); err != nil {
// If the retry policy got interrupted, it could mean that either
// the policy has reached its maximum number of attempts or the context has been cancelled.
// There is a weird edge case where the error returned is a 'context canceled' error but the session.Context is not done.
// This is a workaround to handle that edge case and reprocess the current message.
if err == context.Canceled && session.Context().Err() == nil {
consumer.k.logger.Warnf("Error processing Kafka message: %s/%d/%d [key=%s]. The error returned is 'context canceled' but the session context is not done. Retrying...")
continue
}
return err
}
return nil
}
}

func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
b := consumer.k.backOffConfig.NewBackOffWithContext(session.Context())
isBulkSubscribe := consumer.k.checkBulkSubscribe(claim.Topic())
Expand Down Expand Up @@ -83,13 +107,7 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
}

if consumer.k.consumeRetryEnabled {
if err := retry.NotifyRecover(func() error {
return consumer.doCallback(session, message)
}, b, func(err error, d time.Duration) {
consumer.k.logger.Warnf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
}, func() {
consumer.k.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key))
}); err != nil {
if err := notifyRecover(consumer, message, session, b); err != nil {
consumer.k.logger.Errorf("Too many failed attempts at processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
}
} else {
Expand Down
4 changes: 4 additions & 0 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,17 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
k.consumeRetryInterval = meta.ConsumeRetryInterval

if meta.SchemaRegistryURL != "" {
k.logger.Infof("Schema registry URL '%s' provided. Configuring the Schema Registry client.", meta.SchemaRegistryURL)
k.srClient = srclient.CreateSchemaRegistryClient(meta.SchemaRegistryURL)
// Empty password is a possibility
if meta.SchemaRegistryAPIKey != "" {
k.srClient.SetCredentials(meta.SchemaRegistryAPIKey, meta.SchemaRegistryAPISecret)
}
k.logger.Infof("Schema caching enabled: %v", meta.SchemaCachingEnabled)
k.srClient.CachingEnabled(meta.SchemaCachingEnabled)
if meta.SchemaCachingEnabled {
k.latestSchemaCache = make(map[string]SchemaCacheEntry)
k.logger.Debugf("Schema cache TTL: %v", meta.SchemaLatestVersionCacheTTL)
k.latestSchemaCacheTTL = meta.SchemaLatestVersionCacheTTL
}
}
Expand Down Expand Up @@ -323,6 +326,7 @@ func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec,
if ok && cacheEntry.expirationTime.After(time.Now()) {
return cacheEntry.schema, cacheEntry.codec, nil
}
k.logger.Debugf("Cache not found or expired for subject %s. Fetching from registry...", subject)
schema, errSchema := srClient.GetLatestSchema(subject)
if errSchema != nil {
return nil, nil, errSchema
Expand Down
7 changes: 7 additions & 0 deletions common/component/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require"

mock_srclient "github.com/dapr/components-contrib/common/component/kafka/mocks"
"github.com/dapr/kit/logger"
)

func TestGetValueSchemaType(t *testing.T) {
Expand Down Expand Up @@ -62,6 +63,7 @@ func TestDeserializeValue(t *testing.T) {
k := Kafka{
srClient: registry,
schemaCachingEnabled: true,
logger: logger.NewLogger("kafka_test"),
}

schemaIDBytes := make([]byte, 4)
Expand Down Expand Up @@ -175,6 +177,7 @@ func TestSerializeValueCachingDisabled(t *testing.T) {
k := Kafka{
srClient: registry,
schemaCachingEnabled: false,
logger: logger.NewLogger("kafka_test"),
}

t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) {
Expand Down Expand Up @@ -250,6 +253,7 @@ func TestSerializeValueCachingEnabled(t *testing.T) {
schemaCachingEnabled: true,
latestSchemaCache: make(map[string]SchemaCacheEntry),
latestSchemaCacheTTL: time.Minute * 5,
logger: logger.NewLogger("kafka_test"),
}

t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) {
Expand Down Expand Up @@ -280,6 +284,7 @@ func TestLatestSchemaCaching(t *testing.T) {
schemaCachingEnabled: true,
latestSchemaCache: make(map[string]SchemaCacheEntry),
latestSchemaCacheTTL: time.Second * 10,
logger: logger.NewLogger("kafka_test"),
}

m.EXPECT().GetLatestSchema(gomock.Eq("my-topic-value")).Return(schema, nil).Times(1)
Expand All @@ -302,6 +307,7 @@ func TestLatestSchemaCaching(t *testing.T) {
schemaCachingEnabled: true,
latestSchemaCache: make(map[string]SchemaCacheEntry),
latestSchemaCacheTTL: time.Second * 1,
logger: logger.NewLogger("kafka_test"),
}

m.EXPECT().GetLatestSchema(gomock.Eq("my-topic-value")).Return(schema, nil).Times(2)
Expand All @@ -326,6 +332,7 @@ func TestLatestSchemaCaching(t *testing.T) {
schemaCachingEnabled: false,
latestSchemaCache: make(map[string]SchemaCacheEntry),
latestSchemaCacheTTL: 0,
logger: logger.NewLogger("kafka_test"),
}

m.EXPECT().GetLatestSchema(gomock.Eq("my-topic-value")).Return(schema, nil).Times(2)
Expand Down
2 changes: 2 additions & 0 deletions common/component/kafka/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
ClientConnectionKeepAliveInterval: defaultClientConnectionKeepAliveInterval,
HeartbeatInterval: 3 * time.Second,
SessionTimeout: 10 * time.Second,
SchemaCachingEnabled: true,
SchemaLatestVersionCacheTTL: 5 * time.Minute,
EscapeHeaders: false,
}

Expand Down

0 comments on commit 042151e

Please sign in to comment.