diff --git a/common/component/kafka/consumer.go b/common/component/kafka/consumer.go index 7cb923a455..b7ea51240c 100644 --- a/common/component/kafka/consumer.go +++ b/common/component/kafka/consumer.go @@ -14,6 +14,7 @@ limitations under the License. package kafka import ( + "context" "errors" "fmt" "net/url" @@ -32,6 +33,29 @@ type consumer struct { mutex sync.Mutex } +func notifyRecover(consumer *consumer, message *sarama.ConsumerMessage, session sarama.ConsumerGroupSession, b backoff.BackOff) error { + for { + if err := retry.NotifyRecover(func() error { + return consumer.doCallback(session, message) + }, b, func(err error, d time.Duration) { + consumer.k.logger.Warnf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err) + }, func() { + consumer.k.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key)) + }); err != nil { + // If the retry policy got interrupted, it could mean that either + // the policy has reached its maximum number of attempts or the context has been cancelled. + // There is a weird edge case where the error returned is a 'context canceled' error but the session.Context is not done. + // This is a workaround to handle that edge case and reprocess the current message. + if err == context.Canceled && session.Context().Err() == nil { + consumer.k.logger.Warnf("Error processing Kafka message: %s/%d/%d [key=%s]. The error returned is 'context canceled' but the session context is not done. Retrying...") + continue + } + return err + } + return nil + } +} + func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { b := consumer.k.backOffConfig.NewBackOffWithContext(session.Context()) isBulkSubscribe := consumer.k.checkBulkSubscribe(claim.Topic()) @@ -83,13 +107,7 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai } if consumer.k.consumeRetryEnabled { - if err := retry.NotifyRecover(func() error { - return consumer.doCallback(session, message) - }, b, func(err error, d time.Duration) { - consumer.k.logger.Warnf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err) - }, func() { - consumer.k.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key)) - }); err != nil { + if err := notifyRecover(consumer, message, session, b); err != nil { consumer.k.logger.Errorf("Too many failed attempts at processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err) } } else { diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index 97e714ef4d..c8a2d818ee 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -208,14 +208,17 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { k.consumeRetryInterval = meta.ConsumeRetryInterval if meta.SchemaRegistryURL != "" { + k.logger.Infof("Schema registry URL '%s' provided. Configuring the Schema Registry client.", meta.SchemaRegistryURL) k.srClient = srclient.CreateSchemaRegistryClient(meta.SchemaRegistryURL) // Empty password is a possibility if meta.SchemaRegistryAPIKey != "" { k.srClient.SetCredentials(meta.SchemaRegistryAPIKey, meta.SchemaRegistryAPISecret) } + k.logger.Infof("Schema caching enabled: %v", meta.SchemaCachingEnabled) k.srClient.CachingEnabled(meta.SchemaCachingEnabled) if meta.SchemaCachingEnabled { k.latestSchemaCache = make(map[string]SchemaCacheEntry) + k.logger.Debugf("Schema cache TTL: %v", meta.SchemaLatestVersionCacheTTL) k.latestSchemaCacheTTL = meta.SchemaLatestVersionCacheTTL } } @@ -323,6 +326,7 @@ func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec, if ok && cacheEntry.expirationTime.After(time.Now()) { return cacheEntry.schema, cacheEntry.codec, nil } + k.logger.Debugf("Cache not found or expired for subject %s. Fetching from registry...", subject) schema, errSchema := srClient.GetLatestSchema(subject) if errSchema != nil { return nil, nil, errSchema diff --git a/common/component/kafka/kafka_test.go b/common/component/kafka/kafka_test.go index 2e9ac3fbe7..385b12743b 100644 --- a/common/component/kafka/kafka_test.go +++ b/common/component/kafka/kafka_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" mock_srclient "github.com/dapr/components-contrib/common/component/kafka/mocks" + "github.com/dapr/kit/logger" ) func TestGetValueSchemaType(t *testing.T) { @@ -62,6 +63,7 @@ func TestDeserializeValue(t *testing.T) { k := Kafka{ srClient: registry, schemaCachingEnabled: true, + logger: logger.NewLogger("kafka_test"), } schemaIDBytes := make([]byte, 4) @@ -175,6 +177,7 @@ func TestSerializeValueCachingDisabled(t *testing.T) { k := Kafka{ srClient: registry, schemaCachingEnabled: false, + logger: logger.NewLogger("kafka_test"), } t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) { @@ -250,6 +253,7 @@ func TestSerializeValueCachingEnabled(t *testing.T) { schemaCachingEnabled: true, latestSchemaCache: make(map[string]SchemaCacheEntry), latestSchemaCacheTTL: time.Minute * 5, + logger: logger.NewLogger("kafka_test"), } t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) { @@ -280,6 +284,7 @@ func TestLatestSchemaCaching(t *testing.T) { schemaCachingEnabled: true, latestSchemaCache: make(map[string]SchemaCacheEntry), latestSchemaCacheTTL: time.Second * 10, + logger: logger.NewLogger("kafka_test"), } m.EXPECT().GetLatestSchema(gomock.Eq("my-topic-value")).Return(schema, nil).Times(1) @@ -302,6 +307,7 @@ func TestLatestSchemaCaching(t *testing.T) { schemaCachingEnabled: true, latestSchemaCache: make(map[string]SchemaCacheEntry), latestSchemaCacheTTL: time.Second * 1, + logger: logger.NewLogger("kafka_test"), } m.EXPECT().GetLatestSchema(gomock.Eq("my-topic-value")).Return(schema, nil).Times(2) @@ -326,6 +332,7 @@ func TestLatestSchemaCaching(t *testing.T) { schemaCachingEnabled: false, latestSchemaCache: make(map[string]SchemaCacheEntry), latestSchemaCacheTTL: 0, + logger: logger.NewLogger("kafka_test"), } m.EXPECT().GetLatestSchema(gomock.Eq("my-topic-value")).Return(schema, nil).Times(2) diff --git a/common/component/kafka/metadata.go b/common/component/kafka/metadata.go index 020d61d52b..4108c881b2 100644 --- a/common/component/kafka/metadata.go +++ b/common/component/kafka/metadata.go @@ -163,6 +163,8 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error) ClientConnectionKeepAliveInterval: defaultClientConnectionKeepAliveInterval, HeartbeatInterval: 3 * time.Second, SessionTimeout: 10 * time.Second, + SchemaCachingEnabled: true, + SchemaLatestVersionCacheTTL: 5 * time.Minute, EscapeHeaders: false, }