-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: some robustness improvements for Connect integration #3227
Conversation
696dec3
to
8766a6e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @agavra -- having trouble understanding some of the changes. Questions inline. Thanks!
connectors.add(connector); | ||
} | ||
|
||
drainQueue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like I'm missing something, but what's the point of the logic above to poll
one connector from the queue at a time and add them to connectors
if we also drain the queue on each iteration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm using poll
as a blocking/notification mechanism, and there's no blocking drainTo
method
ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectPollingService.java
Outdated
Show resolved
Hide resolved
.swallow("connectors", methodParams(), ConnectResponse.of(ImmutableList.of())) | ||
.swallow("status", methodParams(String.class), ConnectResponse.of("sandbox")) | ||
.swallow("create", methodParams(String.class, Map.class), | ||
ConnectResponse.of("sandbox", HttpStatus.SC_INTERNAL_SERVER_ERROR)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the point of white-listing these methods if the sandboxed client has them return errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's well handled instead of throwing exceptions - i.e. the code doesn't expect "UnsupportedOperationException" it expects some kind of meaningful error response. I don't think it's necessary, but just a little nicer down the line. If you feel strongly that we shouldn't be doing this I can revisit it in another PR.
ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectPollingServiceTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patient explanations, @agavra ! Very cool changes :)
Description
DefaultConnectClient
ConnectConfigService
andConnectPollingService
(one now triggers the other)Testing done
Unit testing
Reviewer checklist