Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [mongodb] task managers hang forever one by one #2510

Closed
1 of 2 tasks
vanliu-tx opened this issue Sep 20, 2023 · 4 comments · Fixed by #2511
Closed
1 of 2 tasks

[Bug] [mongodb] task managers hang forever one by one #2510

vanliu-tx opened this issue Sep 20, 2023 · 4 comments · Fixed by #2511
Assignees
Labels
bug Something isn't working

Comments

@vanliu-tx
Copy link
Contributor

Search before asking

  • I searched in the issues and found nothing similar.

Flink version

1.17.0

Flink CDC version

2.4.1

Database and its version

mongodb 4.2.11

Minimal reproduce step

  1. prepare a big collection with more than 10 billion records
  2. start a flink app to load data and sink to DiscardingSink with parallelism 125.
  3. at the beginning, the pull speed is high for the first hour or two, it decreases slowly to 0 in a few hours.
  4. when you look at the taskmanager's log, all task manager hang at "c.v.c.c.mongodb.source.reader.fetch.MongoDBScanFetchTask - Snapshot step 2 - Snapshotting data"

What did you expect to see?

snapshot all records in the collection, and then copying records from change stream.

What did you see instead?

task hang at snapshot phase.

Anything else?

here are two of the taskmanager's thread dump:
snapshot_hang.threaddump.txt
snapshot_hang_2.threaddump.txt

suspect theads:

"cluster-rtt-ClusterId{value='6509694684a9a629144c6209', description='null'}-x.x.x.x:27017" Id=94 TIMED_WAITING
	at java.lang.Thread.sleep(Native Method)
	at com.mongodb.internal.connection.DefaultServerMonitor.waitForNext(DefaultServerMonitor.java:445)
	at com.mongodb.internal.connection.DefaultServerMonitor.access$1500(DefaultServerMonitor.java:60)
	at com.mongodb.internal.connection.DefaultServerMonitor$RoundTripTimeRunnable.run(DefaultServerMonitor.java:417)
	at java.lang.Thread.run(Thread.java:748)

"cluster-ClusterId{value='6509694684a9a629144c6209', description='null'}-x.x.x.x:27017" Id=93 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1259379d
	at sun.misc.Unsafe.park(Native Method)
	-  waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1259379d
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.waitForSignalOrTimeout(DefaultServerMonitor.java:302)
	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.waitForNext(DefaultServerMonitor.java:283)
	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:177)
	at java.lang.Thread.run(Thread.java:748)

"System Time Trigger for Source: 100205_mongocdc_t_lint_defect_v2_prod -> Map -> Sink: discard (42/125)#0" Id=92 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@44ac244
	at sun.misc.Unsafe.park(Native Method)
	-  waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@44ac244
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

"Source Data Fetcher for Source: 100205_mongocdc_t_lint_defect_v2_prod -> Map -> Sink: discard (42/125)#0" Id=91 TIMED_WAITING on io.debezium.connector.base.ChangeEventQueue@4b418b24
	at java.lang.Object.wait(Native Method)
	-  waiting on io.debezium.connector.base.ChangeEventQueue@4b418b24
	at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:249)
	at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(IncrementalSourceScanFetcher.java:130)
	at com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:73)
	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

	Number of locked synchronizers = 1
	- java.util.concurrent.ThreadPoolExecutor$Worker@2dc71397

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@vanliu-tx vanliu-tx added the bug Something isn't working label Sep 20, 2023
@vanliu-tx
Copy link
Contributor Author

@Jiabao-Sun please help to take a look

@vanliu-tx
Copy link
Contributor Author

image

@vanliu-tx
Copy link
Contributor Author

@vanliu-tx
Copy link
Contributor Author

vanliu-tx commented Sep 22, 2023

build a new jar with #2511 , now I got exception like this:

2023-09-22 08:45:22.656 [flink-akka.actor.default-dispatcher-21] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: 100205_mongocdc_t_lint_defect_v2_dev -> Map -> Sink: discard (21/60) (75a3cd8b17ccfc5b59346a7e86773d5e_cbc357ccb763df2852fee8c4fc7d55f2_20_248) switched from RUNNING to FAILED on cdc-05b1197b6c2d4b8e8ffcb5f7ada6e6da-taskmanager-1-47 @ x.x.x.x (dataPort=36398).
java.lang.RuntimeException: One or more fetchers have encountered exception
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
        at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
        at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
        at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)
        at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 103 received unexpected exception while polling the records
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 common frames omitted
Caused by: org.apache.flink.util.FlinkRuntimeException: Read split SnapshotSplit{tableId=db_defect.t_lint_defect_v2, splitId='db_defect.t_lint_defect_v2:45015', splitKeyType=[`task_id` INT], splitStart=[{"task_id": "hashed"}, {"task_id": 6597861409290023229}], splitEnd=[{"task_id": "hashed"}, {"task_id": 6597871154751884846}], highWatermark=null} error due to GC overhead limit exceeded.
        at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.checkReadException(IncrementalSourceScanFetcher.java:179)
        at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(IncrementalSourceScanFetcher.java:126)
        at com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:73)
        at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
        ... 6 common frames omitted
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants