-
Notifications
You must be signed in to change notification settings - Fork 482
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactory kafka binding to reuse the kafka common code extracting from kafka pubsub component #1696
Conversation
Signed-off-by: Sky Ao <[email protected]>
Signed-off-by: Sky Ao <[email protected]>
…since in input binding the topics will confiured in metadata Signed-off-by: Sky Ao <[email protected]>
Signed-off-by: Sky Ao <[email protected]>
Signed-off-by: Sky Ao <[email protected]>
…nts-contrib into refactory-kafka-components
Signed-off-by: Sky Ao <[email protected]>
Signed-off-by: Sky Ao <[email protected]>
kafka metadataI list the metadata in pubsub component and binding component in following table:
Change to pubsub componentNo change to pubsub component. Change to binding componentClientIDThis metadata only exists in pubsub component. This metadata is used to set the ClientID field of sarama config: config := sarama.NewConfig()
if meta.ClientID != "" {
config.ClientID = meta.ClientID
} It should be safe to add in binding component as new feature. topicsFor the kafka topics to publish (output binding) or subscibe (input binding), in pubsub component they are coming from the request (SubscribeRequest.Topic and PublishRequest.Topic), in binding component they are coming from metadata (metadata["topics"] and metadata["publishTopic"]). In this refactory, the common kafka coding in func (k *Kafka) Publish(topic string, data []byte, metadata map[string]string) error {}
func (k *Kafka) Subscribe(topics []string, _ map[string]string, handler EventHandler) error {} In new (after refactory) pubsub and binding components, they will set these parameters from the request(pubsub component) or from the metadata (binding component). So we can keep these metadata in binding component and there is no change in new (after refactory) binding component. consumeRetryIntervalIn pubsub component, it supports retry and add new metadata "consumeRetryInterval". This can be considered as new feature in new (after refactory) binding component. authenticationkafka binding support only password authentication k.authRequired = meta.AuthRequired
// ignore SASL properties if authRequired is false
if meta.AuthRequired {
k.saslUsername = meta.SaslUsername
k.saslPassword = meta.SaslPassword
} kafka pubsub support 3 auth type:
k.authType = meta.AuthType
switch k.authType {
case oidcAuthType:
err = updateOidcAuthInfo(config, meta)
case passwordAuthType:
k.saslUsername = meta.SaslUsername
k.saslPassword = meta.SaslPassword
updatePasswordAuthInfo(config, k.saslUsername, k.saslPassword)
case mtlsAuthType:
err = updateMTLSAuthInfo(config, meta)
} And in pubsub component, the upgradeMetadata() handles the "authRequired" metadata as deprecated: // upgradeMetadata updates metadata properties based on deprecated usage.
func (k *Kafka) upgradeMetadata(metadata map[string]string) (map[string]string, error) {
authTypeVal, authTypePres := metadata["authType"]
authReqVal, authReqPres := metadata["authRequired"]
saslPassVal, saslPassPres := metadata["saslPassword"]
// If authType is not set, derive it from authRequired.
if (!authTypePres || authTypeVal == "") && authReqPres && authReqVal != "" {
k.logger.Warn("AuthRequired is deprecated, use AuthType instead.")
validAuthRequired, err := strconv.ParseBool(authReqVal)
if err == nil {
if validAuthRequired {
// If legacy authRequired was used, either SASL username or mtls is the method.
if saslPassPres && saslPassVal != "" {
// User has specified saslPassword, so intend for password auth.
metadata["authType"] = passwordAuthType
} else {
metadata["authType"] = mtlsAuthType
}
} else {
metadata["authType"] = noAuthType
}
} else {
return metadata, errors.New("kafka error: invalid value for 'authRequired' attribute")
}
}
return metadata, nil
} So, for these authentication metadata from binding components, they are compatible with these from pubsub component, except the "authRequired" metadata will be marked as deprecated. ConclusionFrom the point of view of metadta, There is no broking changes to binding component in this refactory. Yet there are some compatible changes :
|
I will continue to check why e2e tests failed. |
The first broking change found! In kafka binding component, if we don't want to do authentication for kafka, then we need to set authRequired to false: apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: binding-topic
namespace: default
spec:
type: bindings.kafka
version: v1
metadata:
- name: topics # Input binding topic
value: binding-topic
- name: brokers
value: localhost:9092,localhost:9093
- name: consumerGroup
value: group1
- name: publishTopic # Output binding topic
value: binding-topic
- name: authRequired
value: "false"
- name: initialOffset
value: oldest In kafka pubsub component, if we don't want to do authentication for kafka, then we need to set authRequired to false and set disableTls to true: apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: localhost:9092
- name: consumerGroup
value: pubsubgroup1
- name: authRequired
value: "false"
- name: initialOffset
value: oldest
- name: disableTls
value: true In kafka pubsub component, there is a updateTLSConfig() method which will check metadata disableTls: func updateTLSConfig(config *sarama.Config, metadata *kafkaMetadata) error {
if metadata.TLSDisable {
config.Net.TLS.Enable = false
return nil
}
config.Net.TLS.Enable = true
...... If we don't set disableTls to true, then config.Net.TLS.Enable will set to true in above method, and it will cause a kafka connection fail if conf.Net.TLS.Enable {
// in fact, because of conf.Net.TLS.Enable is set to true, we will go here and failed
b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
} else if conf.Net.Proxy.Enable {
b.conn, b.connErr = conf.Net.Proxy.Dialer.Dial("tcp", b.addr)
} else {
// we should go here since we don't want to do authentication
b.conn, b.connErr = dialer.Dial("tcp", b.addr)
}
if b.connErr != nil {
Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
return
} dapr will fail to connect to kafka and report kafka connection error. @yaron2 @artursouza I will try to add metadata disableTls = true in the yaml configuration file of kafka binding component to verify if there are some other broken changes. |
Signed-off-by: Sky Ao <[email protected]>
@skyao that's a great find! I think what we need to do is actually remove the need for pub/sub to specify BOTH If What do you think? also cc @pkedy to keep my Kafka observations honest. |
Update: Today I tried to fix the e2e test case fail. Now 226 tests passed, and 3 failures. It seems that there is no big problem for this refactory. |
@yaron2 @artursouza During debug the e2e test case fail, I found a risk about the retry mechanism and I think I should show it to you two. In e2e test case "tests/e2e/bindings/bindings_test.go", it sends 10 message to a topic and the app of "tests/app/binding_input" will subscribe this topic and then received the 10 message. And in this test case, to simulate failure,"tests/app/binding_input" will fail only for the first time. func (m *messageBuffer) fail(failedMessage string) bool {
log.Printf("fail(failedMessage string): %s", failedMessage)
m.lock.Lock()
defer m.lock.Unlock()
// fail only for the first time. return false all other times.
if !m.errorOnce {
m.failedMessage = failedMessage
m.errorOnce = true
return m.errorOnce
}
return false
}
So in the e2e test case it will expect to received success message as {"2", "3", "4", "5", "6", "7", "8", "9", "10"}. This works well before this refactory withou retry mechanism. After this refactory, kakfa binding component reused the code of kakfa pubsub component, including the retry mechanism. So the message "This message fails" will be retried and success since only the first message will fail. Then in the e2e test case it will find that the success message is []string{"This message fails", "2", "3", "4", "5", "6", "7", "8", "9", "10"}, not []string{"2", "3", "4", "5", "6", "7", "8", "9", "10"}.
I can fix this e2e test case by remove the "simulate failure for first message" since it won't work after we introduced retry mechanism. Before removing these code, I tried to verify that if the app keep return fail for this message (and will return success for next message), what will happen. func (m *messageBuffer) fail(failedMessage string) bool {
log.Printf("fail(failedMessage string): %s", failedMessage)
m.lock.Lock()
defer m.lock.Unlock()
// fail only for the first time. return false all other times.
if !m.errorOnce {
m.failedMessage = failedMessage
m.errorOnce = true
return m.errorOnce
}
// if the message should fail, then keep fail for retry
if failedMessage == m.failedMessage {
return true
}
return false
} the log shown that all the retried messages are the first message: 2022/05/06 09:53:30 testTopicHandler called
2022/05/06 09:53:30 Got message: This message fails
2022/05/06 09:53:30 fail(failedMessage string): This message fails
2022/05/06 09:53:30 failing message
2022/05/06 09:53:35 testTopicHandler called
2022/05/06 09:53:35 Got message: This message fails
2022/05/06 09:53:35 fail(failedMessage string): This message fails
2022/05/06 09:53:35 failing message
2022/05/06 09:53:39 failed message This message fails
2022/05/06 09:53:40 testTopicHandler called
2022/05/06 09:53:40 Got message: This message fails
2022/05/06 09:53:40 fail(failedMessage string): This message fails
2022/05/06 09:53:40 failing message
2022/05/06 09:53:45 testTopicHandler called
2022/05/06 09:53:45 Got message: This message fails
2022/05/06 09:53:45 fail(failedMessage string): This message fails
2022/05/06 09:53:45 failing message
2022/05/06 09:53:50 testTopicHandler called
2022/05/06 09:53:50 Got message: This message fails
2022/05/06 09:53:50 fail(failedMessage string): This message fails
2022/05/06 09:53:50 failing message
2022/05/06 09:53:55 testTopicHandler called
2022/05/06 09:53:55 Got message: This message fails
2022/05/06 09:53:55 fail(failedMessage string): This message fails
2022/05/06 09:53:55 failing message
2022/05/06 09:54:00 testTopicHandler called
2022/05/06 09:54:00 Got message: This message fails
2022/05/06 09:54:00 fail(failedMessage string): This message fails
2022/05/06 09:54:00 failing message
2022/05/06 09:54:05 testTopicHandler called
2022/05/06 09:54:05 Got message: This message fails
2022/05/06 09:54:05 fail(failedMessage string): This message fails
2022/05/06 09:54:05 failing message
2022/05/06 09:54:10 testTopicHandler called
2022/05/06 09:54:10 Got message: This message fails
2022/05/06 09:54:10 fail(failedMessage string): This message fails
2022/05/06 09:54:10 failing message
2022/05/06 09:54:15 testTopicHandler called
2022/05/06 09:54:15 Got message: This message fails
2022/05/06 09:54:15 fail(failedMessage string): This message fails
2022/05/06 09:54:15 failing message So, I think there is a risk that if the application, which used input binding to subscribe topics and received the message from topic, keep failed for specified message content and return failed to dapr, then dapr will also keep retry this message again and again. This will cause an issue that all other messages will blocked and the application won't receive any more messages. For the user who wants to enable the retry mechanism, he should be careful to handle the event and SHOULD NOT return fail only for the input message content itself ("This message fails" message as shown before). Currently dapr can't understand the difference between "App can't handle this event" and "The app can't handle any more event". In other words, if the app wants to receive more events except current one (again and again) , then it should never return fail. I don't know if we have document this risk of retry mechanism clearly, I'm afraid the if the developer of the app don't know this and return fail for some specified message content, then it will work well without retry mechanism and run into above endless loop with retry mechanism. And this risk is not only for binding component, but also for pubsub. |
All e2e test passed, now I can summary this refactory: Compatible changes for kafka binding component
Broken changes for kafka binding component :
Risks to be assessed
|
@yaron2 @artursouza need your review to make decision. |
See my comment here. Re: the retry issue, I believe we should remove the component specific retries in the pub/sub component since:
|
The removal of retries should be configurable and keep the current behavior. This way, we don't break people that actually use this feature. Regarding the other breaking changes, I propose the following:
|
Lets summarize then:
cc @skyao |
… is disabled, need not set TLSDisable at the same time; Signed-off-by: Sky Ao <[email protected]>
Signed-off-by: Sky Ao <[email protected]>
Codecov Report
@@ Coverage Diff @@
## master #1696 +/- ##
==========================================
+ Coverage 36.37% 36.54% +0.17%
==========================================
Files 166 173 +7
Lines 15488 15503 +15
==========================================
+ Hits 5633 5666 +33
+ Misses 9228 9207 -21
- Partials 627 630 +3
Continue to review full report at Codecov.
|
As suggested by @yaron2 and @artursouza :
I have updated the code of this refactory:
"consumeRetryEnabled" property will used to enable consume retry, it will default to true for kafka pubsub component and default to false to kafka binding component, so that we will keep the same behavior after this refactory: retry enabled in kafka pubsub component and no retry in kafka binding component. And introduced a new metadata named "consumeRetryEnabled" to overwrite this default behavior. The user can set "consumeRetryEnabled" metadata to false to disable retry in pubsub component, and set "consumeRetryEnabled" metadata to true to enable retry in binding component. All e2e test passed, now I can summary this refactory again: Compatible changes for kafka pubsub component
Compatible changes for kafka binding component
It is a good news that now after this adjustment, there is no broken changes after refactory, thus we don't need to introduce v2 of kafka binding component. |
Great, please, create an issue in dapr/docs to document these changes in the components. |
…m kafka pubsub component (dapr#1696) * refactory kafka pubsub code to extract common kafka code for reuse Signed-off-by: Sky Ao <[email protected]> * fix lint;add unit test for subscribeAdapter Signed-off-by: Sky Ao <[email protected]> * move topics filed from internal kafak struct to pubsub kafka struct, since in input binding the topics will confiured in metadata Signed-off-by: Sky Ao <[email protected]> * reuse internal kafka code for bindings Signed-off-by: Sky Ao <[email protected]> * add redis standalone_test back which is delete by mistaken Signed-off-by: Sky Ao <[email protected]> * small code improvement to trigger test Signed-off-by: Sky Ao <[email protected]> * add license headers Signed-off-by: Sky Ao <[email protected]> * try to set disbaleTls to true to verify the kafka connection fail Signed-off-by: Sky Ao <[email protected]> * don't enable consum retry in kafka binding component;if authenticaion is disabled, need not set TLSDisable at the same time; Signed-off-by: Sky Ao <[email protected]> * fix lint Signed-off-by: Sky Ao <[email protected]> Co-authored-by: Loong Dai <[email protected]> Signed-off-by: Eddie <[email protected]>
* Update readme of bindings (#1690) Signed-off-by: pigletfly <[email protected]> Co-authored-by: Looong Dai <[email protected]> Signed-off-by: Eddie <[email protected]> * Fixing includedHeaders problem with spaces (#1610) Signed-off-by: Ben Kotvis <[email protected]> Co-authored-by: Bernd Verst <[email protected]> Co-authored-by: Looong Dai <[email protected]> Signed-off-by: Eddie <[email protected]> * Simplify vault token read (#1560) * Simplify vault token get Signed-off-by: zhangchao <[email protected]> * fix lint Signed-off-by: zhangchao <[email protected]> * update tests Signed-off-by: zhangchao <[email protected]> Co-authored-by: Looong Dai <[email protected]> Co-authored-by: Yaron Schneider <[email protected]> Signed-off-by: Eddie <[email protected]> * GH-1609 : Fix for MongoDB Atlas conn strings Added recommended fix in the issue Signed-off-by: Eddie <[email protected]> * updating the comment based on PR feedback Signed-off-by: Eddie <[email protected]> * Initial Certification test for eventhubs binding [incomplete] (#1670) * certification test for eventhubs binding Signed-off-by: tanvigour <[email protected]> * modified go.mod and go.sum Signed-off-by: tanvigour <[email protected]> * Add connection string testing Signed-off-by: tanvigour <[email protected]> * iothub testing Signed-off-by: tanvigour <[email protected]> * address feedback and run test Signed-off-by: tanvigour <[email protected]> * Install Azure CLI IOT hub extension Signed-off-by: Bernd Verst <[email protected]> * make modtidy-all Signed-off-by: Bernd Verst <[email protected]> * covering all eventhubs test cases Signed-off-by: tanvigour <[email protected]> * dependency changes after go modtidy-all Signed-off-by: tanvigour <[email protected]> Co-authored-by: Bernd Verst <[email protected]> Co-authored-by: Yaron Schneider <[email protected]> Co-authored-by: Looong Dai <[email protected]> Signed-off-by: Eddie <[email protected]> * Use revive instead of golint (#1685) Signed-off-by: pigletfly <[email protected]> Co-authored-by: Yaron Schneider <[email protected]> Signed-off-by: Eddie <[email protected]> * Updated to Go 1.18 (#1697) * Updated to Go 1.18 Signed-off-by: Alessandro (Ale) Segala <[email protected]> * Added go.work file With Go 1.18, this allows gopls (the Go language server used for example in VS Code) to work inside test apps too. See: https://go.dev/doc/tutorial/workspaces Signed-off-by: ItalyPaleAle <[email protected]> Signed-off-by: ItalyPaleAle <[email protected]> * Removed go.work Signed-off-by: ItalyPaleAle <[email protected]> * 💄 Signed-off-by: ItalyPaleAle <[email protected]> Co-authored-by: Bernd Verst <[email protected]> Signed-off-by: Eddie <[email protected]> * Add metadata property to configure Batching in Pulsar (#1707) * Add metadata property to configure BatchingMaxSize&batchingMaxMessages in Pulsar Signed-off-by: saberwang <[email protected]> * sort field Signed-off-by: saberwang <[email protected]> * [pubsub]fix unit test bug Signed-off-by: saberwang <[email protected]> * remove unrelated changes Signed-off-by: saberwang <[email protected]> * Delete hard coded Metadata Signed-off-by: saberwang <[email protected]> * remove .history Signed-off-by: saberwang <[email protected]> * restore .gitignore Signed-off-by: saberwang <[email protected]> * Hard coding default values and adding 'BatchingMaxPublishDelay' metadata Signed-off-by: saberwang <[email protected]> * fix code format Signed-off-by: saberwang <[email protected]> * formatting code Signed-off-by: saberwang <[email protected]> Co-authored-by: Looong Dai <[email protected]> Co-authored-by: Bernd Verst <[email protected]> Signed-off-by: Eddie <[email protected]> * Fix 4529: Ignore Subscribe/Get wrong redis configuration type keys. (#1693) * fix: 4529 Signed-off-by: LaurenceLiZhixin <[email protected]> * Fix: add test does not throw error for wrong type during get all test case of redis configuration Signed-off-by: LaurenceLiZhixin <[email protected]> Co-authored-by: Yaron Schneider <[email protected]> Co-authored-by: Ian Luo <[email protected]> Signed-off-by: Eddie <[email protected]> * Signed-off-by: Eddie Wassef <[email protected]> Fixing leading newline in linter error Signed-off-by: Eddie <[email protected]> * Implment a secret store based on Huawei CSMS (#1710) Signed-off-by: Chen Cong <[email protected]> Co-authored-by: Chen Cong <[email protected]> Signed-off-by: Eddie <[email protected]> * Add yet another missing secret to eventhub binding cert test (#1713) Signed-off-by: Bernd Verst <[email protected]> Signed-off-by: Eddie <[email protected]> * Support custom queueEndpoint in Azure Storage Queues (#1692) * Support custom queueEndpoint in Azure Storage Queues Signed-off-by: Janusz Dziurzynski <[email protected]> * run gofmt Signed-off-by: Janusz Dziurzynski <[email protected]> * Add "Url" to JSON field name for clarity Suggested by @msfussell in dapr/docs#2424 Signed-off-by: Janusz Dziurzynski <[email protected]> Signed-off-by: Eddie <[email protected]> * Refactory kafka binding to reuse the kafka common code extracting from kafka pubsub component (#1696) * refactory kafka pubsub code to extract common kafka code for reuse Signed-off-by: Sky Ao <[email protected]> * fix lint;add unit test for subscribeAdapter Signed-off-by: Sky Ao <[email protected]> * move topics filed from internal kafak struct to pubsub kafka struct, since in input binding the topics will confiured in metadata Signed-off-by: Sky Ao <[email protected]> * reuse internal kafka code for bindings Signed-off-by: Sky Ao <[email protected]> * add redis standalone_test back which is delete by mistaken Signed-off-by: Sky Ao <[email protected]> * small code improvement to trigger test Signed-off-by: Sky Ao <[email protected]> * add license headers Signed-off-by: Sky Ao <[email protected]> * try to set disbaleTls to true to verify the kafka connection fail Signed-off-by: Sky Ao <[email protected]> * don't enable consum retry in kafka binding component;if authenticaion is disabled, need not set TLSDisable at the same time; Signed-off-by: Sky Ao <[email protected]> * fix lint Signed-off-by: Sky Ao <[email protected]> Co-authored-by: Loong Dai <[email protected]> Signed-off-by: Eddie <[email protected]> * Add topic metadata for mqtt input binding and support user defined topic for mqtt output binding (#1674) * feat(bindings/mqtt): add data incoming topic to metadata Signed-off-by: lotuc <[email protected]> * feat(bindings/mqtt): support user defined topic on create action Signed-off-by: lotuc <[email protected]> * chore(bindings/mqtt): add integration test and topic response check test Signed-off-by: lotuc <[email protected]> * fix(bindings/mqtt): ignore misspell linting error for word mosquitto Signed-off-by: lotuc <[email protected]> Co-authored-by: Yaron Schneider <[email protected]> Signed-off-by: Eddie <[email protected]> * Expire -> ExpiryInSeconds (#1721) Signed-off-by: seeflood <[email protected]> Signed-off-by: Eddie <[email protected]> * running gofmt -s -w state/mongodb/mongodb.go Signed-off-by: Eddie <[email protected]> * Update mongodb.go Co-authored-by: Wang Bing <[email protected]> Co-authored-by: Looong Dai <[email protected]> Co-authored-by: Ben Kotvis <[email protected]> Co-authored-by: Bernd Verst <[email protected]> Co-authored-by: Taction <[email protected]> Co-authored-by: Yaron Schneider <[email protected]> Co-authored-by: tanvigour <[email protected]> Co-authored-by: Alessandro (Ale) Segala <[email protected]> Co-authored-by: saber-wang <[email protected]> Co-authored-by: Laurence <[email protected]> Co-authored-by: Ian Luo <[email protected]> Co-authored-by: Chock Chen <[email protected]> Co-authored-by: Chen Cong <[email protected]> Co-authored-by: Janusz Dziurzynski <[email protected]> Co-authored-by: Sky Ao <[email protected]> Co-authored-by: lotuc <[email protected]> Co-authored-by: seeflood <[email protected]>
Description
As discussed in #1305, kafka binding should reuse the kafka common code extracting from kafka pubsub component, which will be updated in PR #1695.
This PR will replaced the implementation of kafka binding component to base on the code base of kafka pubsub component, so we need careful to handle the difference between the legacy kafka binding component and kafka pubsub component.
Issue reference
#1305
Checklist
Please make sure you've completed the relevant tasks for this PR, out of the following list: