Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ERROR Graceful stop of task failed #677

Merged
merged 1 commit into from
Aug 6, 2019

Conversation

dmitryikh
Copy link

I've bumped into the problem:

[2019-07-08 20:05:06,314] ERROR Graceful stop of task deals-0 failed. (org.apache.kafka.connect.runtime.Worker)

Which leads to duplicates in kafka topics while rebalancing.

The reason is that JDBC source task is sleeping in poll() too much (exactly POLL_INTERVAL_MS_CONFIG which is 60000 ms in my case). And worker is unable to finish all tasks in task.shutdown.graceful.timeout.ms which is 5000 ms by default.

This can be seen in the log below (see *** marks):

[2019-07-08 20:04:59,929] DEBUG WorkerSourceTask{id=deals-0} About to send 100 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-07-08 20:04:59,935] TRACE {} Polling for new data (io.confluent.connect.jdbc.source.JdbcSourceTask)
[2019-07-08 20:04:59,935] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{table=null, query='SELECT ...', topicPrefix='mt5real.deals', incrementingColumn='Timestamp', timestampColumns=[]} (io.confluent.connect.jdbc.source.JdbcSourceTask)
[2019-07-08 20:04:59,944] DEBUG Resetting querier TimestampIncrementingTableQuerier{table=null, query='SELECT ...', topicPrefix='deals', incrementingColumn='Timestamp', timestampColumns=[]} (io.confluent.connect.jdbc.source.JdbcSourceTask)
[2019-07-08 20:04:59,944] TRACE No updates for TimestampIncrementingTableQuerier{table=null, query='SELECT ...', topicPrefix='deals', incrementingColumn='Timestamp', timestampColumns=[]} (io.confluent.connect.jdbc.source.JdbcSourceTask)
*** [2019-07-08 20:04:59,945] TRACE Waiting 59999 ms to poll TimestampIncrementingTableQuerier{table=null, query='SELECT ...', topicPrefix='deals', incrementingColumn='Timestamp', timestampColumns=[]} next (io.confluent.connect.jdbc.source.JdbcSourceTask)


[2019-07-08 20:05:01,312] INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-07-08 20:05:01,312] INFO Stopping connector deals (org.apache.kafka.connect.runtime.Worker)
[2019-07-08 20:05:01,312] DEBUG Getting plugin class loader for connector: 'io.confluent.connect.jdbc.JdbcSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2019-07-08 20:05:01,312] INFO Stopping table monitoring thread (io.confluent.connect.jdbc.JdbcSourceConnector)
[2019-07-08 20:05:01,312] INFO Shutting down thread monitoring tables. (io.confluent.connect.jdbc.source.TableMonitorThread)
*** [2019-07-08 20:05:01,312] INFO Stopping task deals-0 (org.apache.kafka.connect.runtime.Worker)
[2019-07-08 20:05:01,313] INFO Stopping JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask)
[2019-07-08 20:05:01,313] INFO Closing connection #1 to PostgreSql (io.confluent.connect.jdbc.util.CachedConnectionProvider)
[2019-07-08 20:05:01,327] INFO Stopped connector deals (org.apache.kafka.connect.runtime.Worker)
*** [2019-07-08 20:05:06,314] ERROR Graceful stop of task deals-0 failed. (org.apache.kafka.connect.runtime.Worker)
[2019-07-08 20:05:06,314] INFO Finished stopping tasks in preparation for rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

I propose to sleep no more than 100ms in order to be able to react on task shutdown.

@ghost
Copy link

ghost commented Jul 9, 2019

It looks like @dmitryikh hasn't signed our Contributor License Agreement, yet.

The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence.
Wikipedia

You can read and sign our full Contributor License Agreement here.

Once you've signed reply with [clabot:check] to prove it.

Appreciation of efforts,

clabot

@dmitryikh
Copy link
Author

[clabot:check]

@ghost
Copy link

ghost commented Jul 9, 2019

It looks like @dmitryikh hasn't signed our Contributor License Agreement, yet.

The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence.
Wikipedia

You can read and sign our full Contributor License Agreement here.

Once you've signed reply with [clabot:check] to prove it.

Appreciation of efforts,

clabot

@dmitryikh
Copy link
Author

It looks like @dmitryikh hasn't signed our Contributor License Agreement, yet.

The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence.
Wikipedia

You can read and sign our full Contributor License Agreement here.

Once you've signed reply with [clabot:check] to prove it.

Appreciation of efforts,

clabot

[clabot:check]

@ghost
Copy link

ghost commented Jul 9, 2019

@confluentinc It looks like @dmitryikh just signed our Contributor License Agreement. 👍

Always at your service,

clabot

@dmitryikh
Copy link
Author

@rhauch @C0urante Could one of you have a look and provide some feedback if possible ?

@C0urante
Copy link
Contributor

Hi @dmitryikh, it looks like your diagnosis of the issue is spot on and the changes in your PR would help prevent the error from occurring. However, they would also cause the source database to be queried 10 times a second, which may be undesirable if, for example, reading from a large number of tables concurrently with a large number of tasks from the same database.

One possible alternative approach I can think of is to add some kind of interrupt semantics to the poll() loop so that the call to time.sleep(...) can be exited prematurely in the event that stop() is called on the connector. Do you agree that this would also solve the problem you're encountering with the connector, and if so, would you be willing to look into implementing that approach?

Thanks for your PR, looking forward to working with you on this!

@dmitryikh
Copy link
Author

Hi, @C0urante , I don't agree with you about the case that querying will occur 10 times per second.

Only this part of the code will be repeated 10x per second:

 while (running.get()) {
      final TableQuerier querier = tableQueue.peek();
      if (!querier.querying()) {
...

That didn't perform any actual querying to the database. We just peek first table from tableQueue and check if it is in querying state, if not - wait.

Am I wrong?

@C0urante
Copy link
Contributor

C0urante commented Aug 5, 2019

@dmitryikh apologies, you are correct! This approach seems fine; one request I have is that we alter the log message on line 301 (log.trace("Waiting {} ms to poll {} next", untilNext, querier.toString());) to be more accurate since it's no longer actually logging the amount of time that will elapse before the next poll of the database. Also, it's possible our users might be accustomed to that log message and changing the behavior that generates it might cause some confusion.

I think either logging nextUpdate - time.milliseconds() instead of untilNext might be sufficient, but let me know if you have other thoughts.

Again, apologies for misreading your PR and thank you for correcting me. Looking forward to merging this!

@dmitryikh dmitryikh force-pushed the reduce_time_in_poll branch from 2a14535 to a66412f Compare August 6, 2019 12:31
@dmitryikh
Copy link
Author

@C0urante , please have a look.

Thanks for spending your time on this!

Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dmitryikh looks good to me!

Since this is a bug fix, we'd like to backport these changes to the earliest relevant feature branch, which in this case is 3.3.x. Could you please retarget the PR to that branch, and then we can merge?

@dmitryikh dmitryikh changed the base branch from master to 3.3.x August 6, 2019 18:37
@dmitryikh dmitryikh force-pushed the reduce_time_in_poll branch from a66412f to d9ed197 Compare August 6, 2019 18:42
@dmitryikh
Copy link
Author

@C0urante , done.

@C0urante
Copy link
Contributor

C0urante commented Aug 6, 2019

Thanks @dmitryikh!

@C0urante C0urante merged commit 4094d2f into confluentinc:3.3.x Aug 6, 2019
@C0urante
Copy link
Contributor

C0urante commented Aug 6, 2019

This fix will be available in all future bug fix releases for CP 3.3 through 5.3, and will also be included in all future releases for CP 5.4 onward.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants