Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(cdc): kafka producer use default configuration. (#4359) #4544

Conversation

ti-chi-bot
Copy link
Member

This is an automated cherry-pick of #4359

What problem does this PR solve?

Issue Number: close #4383, close #4499

If the network condition between the TiCDC and Kafka is not good enough, let producer get responses from Kafka as soon as possible, to prevent waste too much time.

What is changed and how it works?

change kafka producer to use default configurations, to prevent wast too much time on network communication with a Kafka cluster in bad network conditions.

  • set Net.DialTimeout, Net.WriteTimeout, Net.ReadTimeout to 10s
  • set Metadata.Retry.Max to 1, to make RefreshMatadata return fast

Check List

Tests

  • Manual test (add detailed scripts or steps below)
./bin/cdc cli changefeed create --pd=http://127.0.0.1:2379 --sink-uri="mysql://[email protected]:3306/"

./bin/cdc cli changefeed create --pd=http://127.0.0.1:2379 --sink-uri="kafka://127.0.0.1:9092/kafka-test?protocol=open-protocol"

./bin/go-tpc tpcc -H 127.0.0.1 -P 4000 -D workload --warehouses 50 -T 4 prepare

wait a few moment ... then kill the kafka by `kill -9`
[2022/01/17 18:54:33.796 +08:00] [ERROR] [changefeed.go:118] ["an error occurred in Owner"] [changefeed=4b62a15d-5da7-4f48-a40f-608e40456373] [error="[CDC:ErrKafkaSendMessage]kafka send message failed: kafka: Failed to deliver 3 messages."] [errorVerbose="[CDC:ErrKafkaSendMessage]kafka send message failed: kafka: Failed to deliver 3 messages.\ngithub.com/pingcap/errors.AddStack\n\tgithub.com/pingcap/[email protected]/errors.go:174\ngithub.com/pingcap/errors.(*Error).GenWithStackByArgs\n\tgithub.com/pingcap/[email protected]/normalize.go:164\ngithub.com/pingcap/tiflow/pkg/errors.WrapError\n\tgithub.com/pingcap/tiflow/pkg/errors/helper.go:30\ngithub.com/pingcap/tiflow/cdc/sink/producer/kafka.(*kafkaSaramaProducer).SyncBroadcastMessage\n\tgithub.com/pingcap/tiflow/cdc/sink/producer/kafka/kafka.go:135\ngithub.com/pingcap/tiflow/cdc/sink.(*mqSink).writeToProducer\n\tgithub.com/pingcap/tiflow/cdc/sink/mq.go:382\ngithub.com/pingcap/tiflow/cdc/sink.(*mqSink).EmitCheckpointTs\n\tgithub.com/pingcap/tiflow/cdc/sink/mq.go:219\ngithub.com/pingcap/tiflow/cdc/owner.(*ddlSinkImpl).run.func1\n\tgithub.com/pingcap/tiflow/cdc/owner/ddl_sink.go:149\nruntime.goexit\n\truntime/asm_arm64.s:1133"]
[2022/01/17 18:54:33.796 +08:00] [INFO] [changefeed.go:315] ["close changefeed"] [changefeed=4b62a15d-5da7-4f48-a40f-608e40456373] [info="{\"sink-uri\":\"kafka://***/asd123?protocol=open-protocol\",\"opts\":{\"max-message-bytes\":\"1048588\"},\"create-time\":\"2022-01-17T18:51:53.105199+08:00\",\"start-ts\":430549686829711362,\"target-ts\":0,\"admin-job-type\":0,\"sort-engine\":\"unified\",\"sort-dir\":\"\",\"config\":{\"case-sensitive\":true,\"enable-old-value\":true,\"force-replicate\":false,\"check-gc-safe-point\":true,\"filter\":{\"rules\":[\"*.*\"],\"ignore-txn-start-ts\":null},\"mounter\":{\"worker-num\":16},\"sink\":{\"dispatchers\":null,\"protocol\":\"open-protocol\",\"column-selectors\":null},\"cyclic-replication\":{\"enable\":false,\"replica-id\":0,\"filter-replica-ids\":null,\"id-buckets\":0,\"sync-ddl\":false},\"scheduler\":{\"type\":\"table-number\",\"polling-time\":-1},\"consistent\":{\"level\":\"none\",\"max-log-size\":64,\"flush-interval\":1000,\"storage\":\"\"}},\"state\":\"normal\",\"error\":null,\"sync-point-enabled\":false,\"sync-point-interval\":600000000000,\"creator-version\":\"v5.4.0-master-dirty\"}"] [isRemoved=false]
[2022/01/17 18:54:33.796 +08:00] [INFO] [ddl_puller.go:195] ["Close the ddl puller"]
[2022/01/17 18:54:33.796 +08:00] [INFO] [kafka.go:196] ["kafka producer closing..."]
[2022/01/17 18:54:33.797 +08:00] [INFO] [kafka.go:221] ["async client closed"] [duration=255.625µs]
[2022/01/17 18:54:33.864 +08:00] [INFO] [region_range_lock.go:370] ["unlocked range"] [lockID=1] [regionID=4] [startKey=6d44444c4a6f624cff69ff737400000000ff0000f90000000000ff00006c0000000000fa] [endKey=6d44444c4a6f624cff69ff737400000000ff0000f90000000000ff00006d0000000000fa] [checkpointTs=430549728786382855]
[2022/01/17 18:54:33.865 +08:00] [INFO] [client.go:1087] ["stream to store closed"] [addr=127.0.0.1:20160] [storeID=1]
[2022/01/17 18:54:33.888 +08:00] [INFO] [region_range_lock.go:370] ["unlocked range"] [lockID=2] [regionID=4] [startKey=6d44444c4a6f6241ff64ff644964784c69ff7374ff0000000000ff000000f700000000ff0000006c00000000fb] [endKey=6d44444c4a6f6241ff64ff644964784c69ff7374ff0000000000ff000000f700000000ff0000006d00000000fb] [checkpointTs=430549728786382855]
[2022/01/17 18:54:33.889 +08:00] [INFO] [client.go:1087] ["stream to store closed"] [addr=127.0.0.1:20160] [storeID=1]
[2022/01/17 18:54:34.568 +08:00] [INFO] [kafka.go:228] ["sync client closed"] [duration=771.709333ms]

as shown in the log above, the owner can be closed in around 1 second.

10s later, the changefeed restart, and failed in around 1 second again.
18s later, the changefeed restart again....

the restart logic is handled by feed_state_manager.

When the Kafka cluster is in a bad network condition, this configuration will detect it very fast, to prevent wasting too much time. But this would also have a false-negative case, such as the user's network is just not fast enough...

[2022/01/17 18:54:33.674 +08:00] [INFO] [processor.go:1074] ["processor try to close the sinkManager"] [changefeed=4b62a15d-5da7-4f48-a40f-608e40456373]
[2022/01/17 18:54:33.677 +08:00] [INFO] [manager.go:89] ["sinkManager try close bufSink"] [changefeed=4b62a15d-5da7-4f48-a40f-608e40456373]
[2022/01/17 18:54:55.254 +08:00] [ERROR] [kafka.go:219] ["close async client with error"] [error="kafka: Failed to deliver 383 messages."] [duration=21.576894875s]
[2022/01/17 18:54:55.254 +08:00] [INFO] [kafka.go:228] ["sync client closed"] [duration=41.084µs]
[2022/01/17 18:54:55.254 +08:00] [INFO] [manager.go:98] ["close bufSink success"] [changefeed=4b62a15d-5da7-4f48-a40f-608e40456373] [duration=21.577101583s]
[2022/01/17 18:54:55.254 +08:00] [INFO] [processor.go:1083] ["processor close sinkManager success"] [changefeed=4b62a15d-5da7-4f48-a40f-608e40456373] [duration=21.577510917s]
18:54:33 ~ 18:54:55, processor is closing the first time...

18:54:33 ~ 18:54:34, owner closed the first time
18:54:44 ~ 18:54:55, owner retry to start the first time
18:55:04 ~ 18:55:05, owner retry to start the second time.

It looks that the processor should be fully closed before the owner tries to initialize the changefeed again. Or, the owner shouldn't try to initialize the changefeed before the processor is not fully closed.

Code changes

Side effects

Related changes

  • Need to cherry-pick to the release branch

Release note

None

@ti-chi-bot
Copy link
Member Author

[REVIEW NOTIFICATION]

This pull request has not been approved.

To complete the pull request process, please ask the reviewers in the list to review by filling /cc @reviewer in the comment.
After your PR has acquired the required number of LGTMs, you can assign this pull request to the committer in the list by filling /assign @committer in the comment to help you merge this pull request.

The full list of commands accepted by this bot can be found here.

Reviewer can indicate their review by submitting an approval review.
Reviewer can cancel approval by submitting a request changes review.

@ti-chi-bot
Copy link
Member Author

@ti-chi-bot: This cherry pick PR is for a release branch and has not yet been approved by release team.
Adding the do-not-merge/cherry-pick-not-approved label.

To merge this cherry pick, it must first be approved by the collaborators.

AFTER it has been approved by collaborators, please ping the release team in a comment to request a cherry pick review.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@ti-chi-bot ti-chi-bot added component/sink Sink component. size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. status/LGT2 Indicates that a PR has LGTM 2. type/cherry-pick-for-release-5.0 This PR is cherry-picked to release-5.0 from a source PR. type/enhancement The issue or PR belongs to an enhancement. labels Feb 9, 2022
@ti-chi-bot ti-chi-bot added size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. and removed size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Feb 9, 2022
@ti-chi-bot
Copy link
Member Author

@ti-chi-bot: PR needs rebase.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@ti-chi-bot ti-chi-bot added the needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. label Feb 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/sink Sink component. do-not-merge/cherry-pick-not-approved needs-rebase Indicates a PR cannot be merged because it has merge conflicts with HEAD. release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. status/LGT2 Indicates that a PR has LGTM 2. type/cherry-pick-for-release-5.0 This PR is cherry-picked to release-5.0 from a source PR. type/enhancement The issue or PR belongs to an enhancement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants