Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ERROR Graceful stop of task failed #1360

Open
robsonhermes opened this issue Aug 21, 2023 · 9 comments
Open

Fix ERROR Graceful stop of task failed #1360

robsonhermes opened this issue Aug 21, 2023 · 9 comments

Comments

@robsonhermes
Copy link

robsonhermes commented Aug 21, 2023

This is a new issue and not exactly the same as #677

I'm facing this error whenever stoping my JDBC connectors. Got in touch with Kafka connect users and they explained is most likely caused by: https://issues.apache.org/jira/browse/KAFKA-15090

Since this change, the stop signal in invoked in the same thread as the dedicated thread for the task. This results in the JdbcSourceTask to only process the stop() signal when it returns from the current poll().

Basically it means this change in kafka connect made the change of #677 stop reaching the goal of that change.

On my case, we are using kafka connect and JDBC source/sink to move large amounts of data around, once per day. Waiting for the next poll()returning to be able to cleanly stop tasks is not really an option.

I'd guess the required change would be to implement the poll() in some sort of non blocking way, but I admit not quite sure how to implement it.

@gharris1727
Copy link
Contributor

I see that an unbounded sleep was introduced in #1348, I wonder if that has made the change in #677 ineffective.

@robsonhermes
Copy link
Author

robsonhermes commented Aug 21, 2023

Hello @gharris1727

Thanks for the reply. IMHO, #1348 did not cause the "Graceful stop of task failed". I did some tracing today and could clearly see the task logging every 100ms that was waiting for the next poll() interval (even when a DEL was requested in the connector via API). So the stop() caused by the DEL got only processed when the current poll() iteration happened, as it was the only chance stop() got to be finally processed.

On a side note, all the merit goes to you, as you pointed the issue in the kafka mailing list when I submitted this same question.

Finally, what is puzzling me is that there is a test which, apparently, checks if a task can be started/stopped by the same thread. But I'm still trying to understand in details if the test is missing something.

@robsonhermes
Copy link
Author

robsonhermes commented Aug 21, 2023

The testStartStopSameThread doesn't catch the scenario of this issue because it tests only starting and stoping. It doesn't cover the scenario where the poll() is already invoked "nearly" after the start(), which is fairly what happens in a real scenario. See here.

I think a more complete test would be: start, poll, stop. Which would re-create the issue (stop will be processed after current poll returns). However, kafka source code tells expected behavior is to stop any in progress poll:

https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java#L132

Signal this SourceTask to stop. In SourceTasks, this method only needs to signal to the task that it should stop
trying to poll for new data and interrupt any outstanding poll() requests.

@C0urante 's original suggestion in #677 would do the trick. However, I don't know how to implement this other than spawning a separate thread in JdbcSourceTask and make the poll() run in this separate thread, freeing up the dedicated task thread to be able to receive stop() call.

@robsonhermes
Copy link
Author

robsonhermes commented Aug 27, 2023

Maybe it would be possible to simply to replace this if with:

        if (sleepMs > 0) {
          log.trace("{}ms to poll {}. Will return control.", nextUpdate - now, querier);
          return Collections.emptyList();
        }

Idea is to, if this task needs to sleep, we actually return an empty list (don't return null cause I guess that's to signal a pause?).
Checking here, which is where I believe the connector task is called from, believe returning the empty list would be a no-op, and then the poll() would be called again, until the time for the next execution is finally achieved. But with this way, returning control, if there's a pending stop() it would be execute before the next poll() .

Thoughts?

@papaya-daniel
Copy link

@robsonhermes : was that resolved since? if not, how did you workaround it?

@robsonhermes
Copy link
Author

Hello @papaya-daniel
Thanks for reaching out. Fortunately, the issue did not caused a real impact on my use case.
I ended up filtering out from our logging the undesirable error log Graceful stop of task failed.

jakubmalek added a commit to jakubmalek/kafka-connect-jdbc that referenced this issue Aug 6, 2024
…aximum duration time for the poll operation
@jakubmalek
Copy link

Hi, I've opened a PR with the proposed solution for this problem.

jakubmalek added a commit to jakubmalek/kafka-connect-jdbc that referenced this issue Aug 6, 2024
jakubmalek added a commit to jakubmalek/kafka-connect-jdbc that referenced this issue Aug 6, 2024
jakubmalek added a commit to jakubmalek/kafka-connect-jdbc that referenced this issue Aug 6, 2024
jakubmalek added a commit to jakubmalek/kafka-connect-jdbc that referenced this issue Aug 8, 2024
@jakubmalek
Copy link

Can someone have a look at my PR ?
Thanks in advance :)

@sangeet259
Copy link
Member

@jakubmalek , will take a look next week, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants