Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka_producer: deadlock when closing after failure #2978

Closed
Tracked by #2971
liuzix opened this issue Oct 7, 2021 · 1 comment
Closed
Tracked by #2971

kafka_producer: deadlock when closing after failure #2978

liuzix opened this issue Oct 7, 2021 · 1 comment
Assignees
Labels
affects-4.0 affects-5.0 affects-5.1 affects-5.2 affects-5.3 area/ticdc Issues or PRs related to TiCDC. priority/P1 The issue has P1 priority. severity/major type/bug The issue is confirmed as a bug.

Comments

@liuzix
Copy link
Contributor

liuzix commented Oct 7, 2021

What did you do?

Kafka producer encounters two messages close to each other in time that would lead to errors such as ErrMessageSizeTooLarge

What did you expect to see?

The changefeed should close as expected

What did you see instead?

The sink is dead-locked due to the following reason:

  1. (*kafkaSaramaProducer).run is exiting, and no longer selecting from the k.asyncClient.Errors() channel.
  2. the k.asyncClient.Errors() channel is blocking, the asyncClient cannot process messages anymore, which means it cannot consume from k.asyncClient.Input().
  3. The sink tries to write one more message, before it was notified of the error. But it cannot write to k.asyncClient.Input(), because the asyncClient has been blocked by writing to k.asyncClient.Errors().
  4. The SendMessage method has taken k.clientLock.RLock(), and continues to hold it.
  5. (*kafkaSaramaProducer).run tries to take k.clientLock.Lock() (in (*kafkaSaramaProducer).stop) , which is blocked by SendMessage's read lock.
goroutine 384 [semacquire, 8647 minutes]:
sync.runtime_SemacquireMutex(0xc3c6f225a4, 0x0, 0x1)
	runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc3c6f225a0)
	sync/mutex.go:138 +0xfc
sync.(*Mutex).Lock(...)
	sync/mutex.go:81
sync.(*RWMutex).Lock(0xc3c6f225a0)
	sync/rwmutex.go:98 +0x97
github.com/pingcap/ticdc/cdc/sink/producer/kafka.(*kafkaSaramaProducer).stop(0xc3c6f225a0)
	github.com/pingcap/ticdc/cdc/sink/producer/kafka/kafka.go:199 +0x3e
github.com/pingcap/ticdc/cdc/sink/producer/kafka.(*kafkaSaramaProducer).Close(0xc3c6f225a0, 0x0, 0x0)
	github.com/pingcap/ticdc/cdc/sink/producer/kafka/kafka.go:211 +0x5e
github.com/pingcap/ticdc/cdc/sink.(*mqSink).Close(0xc3c6f22a20, 0xc2f9052de0, 0x2c35b18)
	github.com/pingcap/ticdc/cdc/sink/mq.go:269 +0x33
github.com/pingcap/ticdc/cdc/sink.(*Manager).Close(...)
	github.com/pingcap/ticdc/cdc/sink/manager.go:78
github.com/pingcap/ticdc/cdc.(*oldProcessor).stop(0xc002822000, 0x31cbea0, 0xc000b1c200, 0x17, 0xc0072abcf8)
	github.com/pingcap/ticdc/cdc/processor.go:1190 +0x5a9
github.com/pingcap/ticdc/cdc.(*Capture).handleTaskEvent(0xc000da6960, 0x31cbea0, 0xc000b1c200, 0xc423b5e450, 0x1, 0x0)
	github.com/pingcap/ticdc/cdc/capture.go:283 +0xd7
github.com/pingcap/ticdc/cdc.(*Capture).Run(0xc000da6960, 0x31cbea0, 0xc000b1c200, 0x0, 0x0)
	github.com/pingcap/ticdc/cdc/capture.go:211 +0x48a
github.com/pingcap/ticdc/cdc.(*Server).run.func4(0x0, 0x0)
	github.com/pingcap/ticdc/cdc/server.go:267 +0x3f
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000c60690, 0xc000c64260)
	golang.org/x/[email protected]/errgroup/errgroup.go:57 +0x59
created by golang.org/x/sync/errgroup.(*Group).Go
	golang.org/x/[email protected]/errgroup/errgroup.go:54 +0x66
goroutine 3087918 [chan receive, 8665 minutes]:
github.com/Shopify/sarama.(*asyncProducer).dispatcher(0xc2990febd0)
	github.com/Shopify/[email protected]/async_producer.go:330 +0xd3
github.com/Shopify/sarama.withRecover(0xc397169df0)
	github.com/Shopify/[email protected]/utils.go:43 +0x49
created by github.com/Shopify/sarama.newAsyncProducer
	github.com/Shopify/[email protected]/async_producer.go:166 +0x249
goroutine 3148846 [semacquire, 8647 minutes]:
sync.runtime_SemacquireMutex(0xc3c6f225ac, 0xe000, 0x0)
	runtime/sema.go:71 +0x47
sync.(*RWMutex).RLock(...)
	sync/rwmutex.go:50
github.com/pingcap/ticdc/cdc/sink/producer/kafka.(*kafkaSaramaProducer).SendMessage(0xc3c6f225a0, 0x31cbea0, 0xc1c084e800, 0xc2d84ab0e0, 0x0, 0x0, 0x0)
	github.com/pingcap/ticdc/cdc/sink/producer/kafka/kafka.go:89 +0x3e0
github.com/pingcap/ticdc/cdc/sink.(*mqSink).writeToProducer(0xc3c6f22a20, 0x31cbea0, 0xc1c084e800, 0xc2d84ab0e0, 0xf0c04501, 0x8, 0x2500)
	github.com/pingcap/ticdc/cdc/sink/mq.go:370 +0x91
github.com/pingcap/ticdc/cdc/sink.(*mqSink).runWorker.func1.1(0xc04bb619548958b9, 0xcc8c23f097f, 0x4fb9800)
	github.com/pingcap/ticdc/cdc/sink/mq.go:302 +0xe4
github.com/pingcap/ticdc/cdc/sink.(*Statistics).RecordBatchExecution(0xc3c6f226c0, 0xc3dc3ffda0, 0x0, 0xc3dc3ffef0)
	github.com/pingcap/ticdc/cdc/sink/statistics.go:99 +0x50
github.com/pingcap/ticdc/cdc/sink.(*mqSink).runWorker.func1(0xc27c57fc00, 0x5f068677c6c002a, 0x0)
	github.com/pingcap/ticdc/cdc/sink/mq.go:294 +0x9f
github.com/pingcap/ticdc/cdc/sink.(*mqSink).runWorker(0xc3c6f22a20, 0x31cbea0, 0xc1c084e800, 0xc200000000, 0x0, 0x0)
	github.com/pingcap/ticdc/cdc/sink/mq.go:340 +0x2c0
github.com/pingcap/ticdc/cdc/sink.(*mqSink).run.func1(0x43e556, 0x2c35840)
	github.com/pingcap/ticdc/cdc/sink/mq.go:279 +0x46
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc259382060, 0xc3aa5aecc0)
	golang.org/x/[email protected]/errgroup/errgroup.go:57 +0x59
created by golang.org/x/sync/errgroup.(*Group).Go
	golang.org/x/[email protected]/errgroup/errgroup.go:54 +0x66

Versions of the cluster

v5.0.1

@amyangfei
Copy link
Contributor

closed by #3003

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affects-4.0 affects-5.0 affects-5.1 affects-5.2 affects-5.3 area/ticdc Issues or PRs related to TiCDC. priority/P1 The issue has P1 priority. severity/major type/bug The issue is confirmed as a bug.
Projects
None yet
Development

No branches or pull requests

4 participants