-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add code to wait for segments generated to be loaded on historicals #14322
Add code to wait for segments generated to be loaded on historicals #14322
Conversation
Injector injector = Initialization.makeInjectorWithModules( | ||
GuiceInjectors.makeStartupInjector(), ImmutableList.<Module>of( | ||
binder -> { | ||
JsonConfigProvider.bindInstance( | ||
binder, | ||
Key.get(DruidNode.class, Self.class), | ||
node | ||
); | ||
binder.bind(Integer.class).annotatedWith(Names.named("port")).toInstance(node.getPlaintextPort()); | ||
binder.bind(JettyServerInitializer.class).to(DruidLeaderClientTest.TestJettyServerInitializer.class).in( | ||
LazySingleton.class); | ||
Jerseys.addResource(binder, DruidLeaderClientTest.SimpleResource.class); | ||
LifecycleModule.register(binder, Server.class); | ||
} | ||
) | ||
); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note test
Initialization.makeInjectorWithModules
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented a few line items on the PR. Also, can you please use DruidException
class for errors that might get surfaced to the user, either due to their fault or ours? (i.e. all the exceptions that aren't caught and rethrown).
server/src/main/java/org/apache/druid/discovery/BrokerClient.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/discovery/BrokerClient.java
Outdated
Show resolved
Hide resolved
{ | ||
StringFullResponseHandler responseHandler = new StringFullResponseHandler(StandardCharsets.UTF_8); | ||
|
||
for (int counter = 0; counter < MAX_RETRIES; counter++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should wait for a while before retrying. This would prevent bombarding the Broker with requests in a short span of time, and also allow any transient failures to auto-resolve before sending another request. We should also have a back-off strategy here.
Consider refactoring it to RetryUtils.retry
which does it for us.
} | ||
catch (IOException | ChannelException ex) { | ||
// can happen if the node is stopped. | ||
log.warn(ex, "Request [%s] failed.", request.getUrl()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would log it after each retry. Since the retries are happening in a short span, there's a high likelihood that we would be posting the same stack over and over. This should log once after all the retries are exhausted. If you refactor it to RetryUtils, I think it also handles that for you.
server/src/main/java/org/apache/druid/discovery/BrokerClient.java
Outdated
Show resolved
Hide resolved
public static String pickOneHost(DruidNodeDiscovery druidNodeDiscovery) | ||
{ | ||
Iterator<DiscoveryDruidNode> iter = druidNodeDiscovery.getAllNodes().iterator(); | ||
if (iter.hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to me that this would have the affinity of picking the first broker node all the time. It would be better if we choose this in a round-robin fashion or at random. Is there any pre-existing code that gives a server at random, seems like this would be a common use case?
server/src/main/java/org/apache/druid/discovery/ClientUtils.java
Outdated
Show resolved
Hide resolved
...nsions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadWaiter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Left some comments.
@@ -257,7 +257,15 @@ The response shows an example report for a query. | |||
"startTime": "2022-09-14T22:12:09.266Z", | |||
"durationMs": 28227, | |||
"pendingTasks": 0, | |||
"runningTasks": 2 | |||
"runningTasks": 2, | |||
"segmentLoadWaiterStatus": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: segmentLoadStatus?
What is this start time ?
How would segments which match a drop rule get communicated to the console?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startTime is the time at which we started checking the load status. That with duration would give us a clear idea of when it started and when it ended, and this is the structure for other MSQ stages.
private static final long SLEEP_DURATION_MILLIS = TimeUnit.SECONDS.toMillis(5); | ||
private static final long TIMEOUT_DURATION_MILLIS = TimeUnit.MINUTES.toMillis(10); | ||
private static final String LOAD_QUERY = "SELECT COUNT(*) AS totalSegments,\n" | ||
+ "COUNT(*) FILTER (WHERE is_available = 0 AND is_published = 1 AND replication_factor != 0) AS loadingSegments\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the replication factor filter needed here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replication factor is used to filter out cold segments. Even if a cold segment is unavailable, we don't wait for it.
performSegmentPublish( | ||
context.taskActionClient(), | ||
SegmentTransactionalInsertAction.overwriteAction(null, null, segmentsWithTombstones) | ||
); | ||
} | ||
} else if (!segments.isEmpty()) { | ||
Set<String> versionsToAwait = segments.stream().map(DataSegment::getVersion).collect(Collectors.toSet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There would always be one version rite ?
Can we add a check here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought this initially as well, but looking into the code, we actually generate the version based on the lock we acquire for the segment. So if there are multiple intervals we are replacing into and therefore multiple locks, we would have more than one version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh i did not know that. Thanks for explaining the rational. We could also document this as well :)
* <br> | ||
* Only expected to be called from the main controller thread. | ||
*/ | ||
public void waitForSegmentsToLoad() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would want the experience on the console to be realtime ie like how counters currently work so that the console can render the waiting segment status progress to the end user.
With this approach the main thread looks to be blocked until the segment loading is complete and the info is only included in the task report once the call returns?
You could add a method which check if the segment loading is complete. If not get the status of loading and write it in the task report.
We would want to do this until the segment loading is completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline, the controller thread updates the status periodically without blocking. This status is included from the liveReports() in the controller. I have tested this out as well and the realtime information is available to the console
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments. Overall LGTM!
"totalSegments": 1, | ||
"usedSegments": 1, | ||
"precachedSegments": 0, | ||
"asyncOnlySegments": 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"asyncOnlySegments": 0, | |
"onDemandSegments": 0, |
Async only seems weird to me since its ties the execution mode to segment.
// If successful and there are segments created, segmentLoadWaiter should wait for them to become available. | ||
segmentLoadWaiter.waitForSegmentsToLoad(); | ||
try { | ||
final List<TaskLock> locks = context.taskActionClient().submit(new LockListAction()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this can go in a separate method with Exception Handling clearly mentioning unable to release locks.
private static final long SLEEP_DURATION_MILLIS = TimeUnit.SECONDS.toMillis(5); | ||
private static final long TIMEOUT_DURATION_MILLIS = TimeUnit.MINUTES.toMillis(10); | ||
private static final String LOAD_QUERY = "SELECT COUNT(*) AS totalSegments,\n" | ||
+ "COUNT(*) FILTER (WHERE is_available = 0 AND is_published = 1 AND replication_factor != 0) AS loadingSegments\n" | ||
private static final String LOAD_QUERY = "SELECT COUNT(*) AS usedSegments,\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably a writeup here of what each replication_factor means and link to : #14403 would be helpful.
That PR might needs an updated description though .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
|
||
public enum State | ||
{ | ||
INIT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add more dev notes to this enum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
HttpResponseStatus responseStatus = fullResponseHolder.getResponse().getStatus(); | ||
if (HttpResponseStatus.SERVICE_UNAVAILABLE.equals(responseStatus) | ||
|| HttpResponseStatus.GATEWAY_TIMEOUT.equals(responseStatus)) { | ||
throw new IOE(StringUtils.format("Request to broker failed due to failed response status: [%s]", responseStatus)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DruidException defensive exceptions here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these be defensive? These would be thrown if the broker is not reachable which is a condition that can happen without any bugs in the code, right?
...nsions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadWaiter.java
Outdated
Show resolved
Hide resolved
…id/msq/exec/SegmentLoadWaiter.java
…oad (#15000) With PR #14322 , MSQ insert/Replace q's will wait for segment to be loaded on the historical's before finishing. The patch introduces a bug where in the main thread had a thread.sleep() which could not be interrupted via the cancel calls from the overlord. This new patch addressed that problem by moving the thread.sleep inside a thread of its own. Thus the main thread is now waiting on the future object of this execution. The cancel call can now shutdown the executor service via another method thus unblocking the main thread to proceed.
This relies on the work done in apache#14322 and apache#15076. It allows the user to set waitTillSegmentsLoad in the query context (if they want, else it defaults to true) and shows the results in the UI :
This relies on the work done in #14322 and #15076. It allows the user to set waitTillSegmentsLoad in the query context (if they want, else it defaults to true) and shows the results in the UI : Co-authored-by: Sébastien <[email protected]>
This relies on the work done in apache#14322 and apache#15076. It allows the user to set waitTillSegmentsLoad in the query context (if they want, else it defaults to true) and shows the results in the UI :
This relies on the work done in apache#14322 and apache#15076. It allows the user to set waitTillSegmentsLoad in the query context (if they want, else it defaults to true) and shows the results in the UI :
Currently, after an MSQ query, the web console is responsible for waiting for the segments to load. It does so by checking if there are any segments loading into the datasource ingested into, which can cause some issues, like in cases where the segments would never be loaded, or would end up waiting for other ingests as well.
This PR shifts this responsibility from the web console to the controller, which would have the list of segments created.
This PR also introduces a new field in the reports, that provides a realtime update of the current progress of waiting for the task.
This PR has: