-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable smartSegmentLoading on the Coordinator #13197
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added change summary to important classes.
The tests have been updated to use new flow but the same behaviour is still being verifiied.
@@ -63,7 +56,7 @@ | |||
* of the same or different methods. | |||
*/ | |||
@Deprecated | |||
public class CuratorLoadQueuePeon extends LoadQueuePeon | |||
public class CuratorLoadQueuePeon implements LoadQueuePeon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change summary:
Commoned out class SegmentHolder
as QueuedSegment
to represent an item in a load queue and to be used by both HttpLoadQueuePeon
and CuratorLoadQueuePeon
.
Implemented new methods in LoadQueuePeon
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the change is trying to simplify the management of the queues. In which case, I think the abstraction you want is a LoadQueue
that is given segment load/drop requests and then can be read from "in order". That could then be used by either Peon to do what it needs to that.
Additionally though, the CuratorLoadQueuePeon is effectively broken at this point anyway because it puts all of the znodes on ZK as quickly as possible and it's probably too expensive to really fix (we should just use http and ignore zk for this), so I don't see a reason to fix it. So, another approach is to consider the ZK based stuff dead and only improve on the http stuff.
We should likely queue up the death of the zk-based stuff too.
server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/SegmentAction.java
Outdated
Show resolved
Hide resolved
*/ | ||
private static final Comparator<Pair<Double, ServerHolder>> CHEAPEST_SERVERS_FIRST | ||
= Comparator.<Pair<Double, ServerHolder>, Double>comparing(pair -> pair.lhs) | ||
.thenComparing(pair -> ThreadLocalRandom.current().nextInt()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it help to use ServerHolder.getSizeUsed instead of a random integer for the second comparison?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, that would work too (but it would make more sense to use free size or free percentage rather than size used).
This PR does not make any modifications to strategies, so it has not been included here. The only modification done here is reduction in the number of calls to strategy.
server/src/main/java/org/apache/druid/server/coordinator/TierLoadingState.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/TierLoadingState.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not done, but need to do something else and don't want the comments to be stuck in draft, so submitting for now to make the comments visible.
server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
Outdated
Show resolved
Hide resolved
@@ -63,7 +56,7 @@ | |||
* of the same or different methods. | |||
*/ | |||
@Deprecated | |||
public class CuratorLoadQueuePeon extends LoadQueuePeon | |||
public class CuratorLoadQueuePeon implements LoadQueuePeon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the change is trying to simplify the management of the queues. In which case, I think the abstraction you want is a LoadQueue
that is given segment load/drop requests and then can be read from "in order". That could then be used by either Peon to do what it needs to that.
Additionally though, the CuratorLoadQueuePeon is effectively broken at this point anyway because it puts all of the znodes on ZK as quickly as possible and it's probably too expensive to really fix (we should just use http and ignore zk for this), so I don't see a reason to fix it. So, another approach is to consider the ZK based stuff dead and only improve on the http stuff.
We should likely queue up the death of the zk-based stuff too.
server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/SegmentLoader.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/SegmentLoader.java
Outdated
Show resolved
Hide resolved
&& stateManager.loadSegment(segment, server, true)) { | ||
return true; | ||
} else { | ||
log.makeAlert("Failed to broadcast segment for [%s]", segment.getDataSource()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Be careful with messages in this code. "Failed to broadcast segment" is ambiguous about whether there was an issue with the server actually downloading the segment (definitely not the case given the code here) versus an issue with the coordinator believing that it is safe to assign the segment (much more likely). We should be very explicit about what it is that is happening and, also, what we might expect the end user to do about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I am going through all the log/alert messages to make sure they capture the right information.
server/src/main/java/org/apache/druid/server/coordinator/TierLoadingState.java
Outdated
Show resolved
Hide resolved
@imply-cheddar , thanks a lot for your review! I have incorporated your feedback. |
server/src/main/java/org/apache/druid/server/coordinator/SegmentStateManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/SegmentStateManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/SegmentStateManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/SegmentStateManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/SegmentStateManager.java
Outdated
Show resolved
Hide resolved
segmentsToLoad.put(segment, new LoadSegmentHolder(segment, callback)); | ||
queuedSize.addAndGet(segment.getSize()); | ||
holder = new SegmentHolder(segment, action, callback); | ||
segmentsToLoad.put(segment, holder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old code separated segmentsToLoad
and segmentsToDrop
so that it could prioritize drops over loads. If I'm udnerstanding correctly, we are doing that prioriziation through the queuedSegments
prioritization now, which makes me wonder if we need to keep the old segmentsToLoad
and segmentsToDrop
around anymore? Are those data structures still used for some meaningful purpose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might still need the segmentsToLoad
and segmentsToDrop
atleast for now because the balancer strategies use these to compute cost. This can be done with queuedSegments
itself but we might have to filter out the relevant entries on every cost computation.
I do have a follow up PR which deals with the fixes in the strategy. I will try to clean up this part there.
server/src/main/java/org/apache/druid/server/coordinator/SegmentAction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/SegmentHolder.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/SegmentLoader.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/SegmentLoader.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
// Drop as many replicas as possible from decommissioning servers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why drop things from a decommissioning server? As long as the server is up, the dta is available it can be used. If you don't want the server to be used for anything, just kill -9
the process. If it's up and working, keep using it until it's kill -9
d. If we are going to support decommissioning, it shouldn't be a "I need to remove things from this server" but rather "I'm going to pretend as if that server doesn't exist anymore".
That said, decommissioning for historicals is not a really good model. Instead, we need the ability to start up as a replica.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. Will include this changes along with the changes for full node replication in a follow up PR.
server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, I don't like the introduction of so many magical boolean parameters on methods. It makes it more difficult to understand what is happening.
Additionally, there is commentary in here, but I'm not sure any of them necessarily need to block this PR, it's all mostly just hygiene stuff that could be done later too (especially given that this PR is already so large and has existed for so long).
Given that we've run this in some legitimate production clusters as well as some performance environments and it's done what we expect, I'm approving this.
server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
Outdated
Show resolved
Hide resolved
} | ||
if (useBatchedSegmentSampler != that.useBatchedSegmentSampler) { | ||
return false; | ||
} | ||
if (replicantLifetime != that.replicantLifetime) { | ||
return false; | ||
} | ||
if (replicationThrottleLimit != that.replicationThrottleLimit) { | ||
return false; | ||
} | ||
if (balancerComputeThreads != that.balancerComputeThreads) { | ||
return false; | ||
} | ||
if (emitBalancingStats != that.emitBalancingStats) { | ||
return false; | ||
} | ||
if (maxSegmentsInNodeLoadingQueue != that.maxSegmentsInNodeLoadingQueue) { | ||
return false; | ||
} | ||
if (!Objects.equals(specificDataSourcesToKillUnusedSegmentsIn, that.specificDataSourcesToKillUnusedSegmentsIn)) { | ||
return false; | ||
} | ||
if (!Objects.equals(dataSourcesToNotKillStalePendingSegmentsIn, that.dataSourcesToNotKillStalePendingSegmentsIn)) { | ||
return false; | ||
} | ||
if (!Objects.equals(decommissioningNodes, that.decommissioningNodes)) { | ||
return false; | ||
} | ||
if (pauseCoordination != that.pauseCoordination) { | ||
return false; | ||
} | ||
if (replicateAfterLoadTimeout != that.replicateAfterLoadTimeout) { | ||
return false; | ||
} | ||
if (maxNonPrimaryReplicantsToLoad != that.maxNonPrimaryReplicantsToLoad) { | ||
return false; | ||
} | ||
return decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove; | ||
return markSegmentAsUnusedDelayMillis == that.markSegmentAsUnusedDelayMillis | ||
&& mergeBytesLimit == that.mergeBytesLimit | ||
&& mergeSegmentsLimit == that.mergeSegmentsLimit | ||
&& maxSegmentsToMove == that.maxSegmentsToMove | ||
&& percentOfSegmentsToConsiderPerMove == that.percentOfSegmentsToConsiderPerMove | ||
&& decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove | ||
&& useBatchedSegmentSampler == that.useBatchedSegmentSampler | ||
&& balancerComputeThreads == that.balancerComputeThreads | ||
&& emitBalancingStats == that.emitBalancingStats | ||
&& replicantLifetime == that.replicantLifetime | ||
&& replicationThrottleLimit == that.replicationThrottleLimit | ||
&& replicateAfterLoadTimeout == that.replicateAfterLoadTimeout | ||
&& maxSegmentsInNodeLoadingQueue == that.maxSegmentsInNodeLoadingQueue | ||
&& maxNonPrimaryReplicantsToLoad == that.maxNonPrimaryReplicantsToLoad | ||
&& useRoundRobinSegmentAssignment == that.useRoundRobinSegmentAssignment | ||
&& pauseCoordination == that.pauseCoordination | ||
&& Objects.equals( | ||
specificDataSourcesToKillUnusedSegmentsIn, | ||
that.specificDataSourcesToKillUnusedSegmentsIn) | ||
&& Objects.equals( | ||
dataSourcesToNotKillStalePendingSegmentsIn, | ||
that.dataSourcesToNotKillStalePendingSegmentsIn) | ||
&& Objects.equals(decommissioningNodes, that.decommissioningNodes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This (and hashcode) appear to be ignoring the various debugDimensions things, is that intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this, must have missed adding it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was even thinking of getting rid of the equals
and hashCode
in this class. Don't see them serving any purpose (unless the JsonConfigProvider
does something with it, need to double check). Even the tests do an item-by-item comparison. But will do it later.
server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
Outdated
Show resolved
Hide resolved
private void cancelLoadsOnDecommissioningServers(DruidCluster cluster) | ||
{ | ||
final AtomicInteger cancelledCount = new AtomicInteger(0); | ||
final List<ServerHolder> decommissioningServers | ||
= cluster.getAllServers().stream() | ||
.filter(ServerHolder::isDecommissioning) | ||
.collect(Collectors.toList()); | ||
|
||
for (ServerHolder server : decommissioningServers) { | ||
server.getQueuedSegments().forEach( | ||
(segment, action) -> { | ||
// Cancel the operation if it is a type of load | ||
if (action.isLoad() && server.cancelOperation(action, segment)) { | ||
cancelledCount.incrementAndGet(); | ||
} | ||
} | ||
); | ||
} | ||
|
||
if (cancelledCount.get() > 0) { | ||
log.info( | ||
"Cancelled [%d] load/move operations on [%d] decommissioning servers.", | ||
cancelledCount.get(), decommissioningServers.size() | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would've expected that the act of decommissioning would put the queue into a state where it will only accept DROP requests and then dropped the load queue immediately. It seems weird to me that this is logic handled in this class instead. The only thing I'd expect a CoordinatorDuty to do about it is to look for decomissioning servers and move the segments away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cancelling all loads on decomissioning servers is faster than moving the segments away.
Balancing moves are subject to the cost computation performance in BalancerStrategy
and thus are typically limited (maxSegmentsToMove <= 1000
). With unlimited load queues, there can potentially be many more segments in the load queue of decommissioning servers.
With cancellation of loads on decommissioning servers, these segments would immediately be (round-robin) assigned to active servers in this run itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are doing this cancellation in this duty (as opposed to RunRules
or BalanceSegments
) since the SegmentReplicantLookup
is constructed right after this method and thus the Coordinator knows in this run itself that some segments are under-replicated and it needs to queue up some loads on active servers.
server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/StrategicSegmentAssigner.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/StrategicSegmentAssigner.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/loadqueue/HttpLoadQueuePeon.java
Fixed
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
Fixed
Show fixed
Hide fixed
server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
Fixed
Show fixed
Hide fixed
CoordinatorDynamicConfig.builder() | ||
.withSmartSegmentLoading(false) | ||
.withMaxSegmentsToMove(1) | ||
.withUseBatchedSegmentSampler(true) | ||
.withPercentOfSegmentsToConsiderPerMove(40) |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
CoordinatorDynamicConfig.builder() | ||
.withSmartSegmentLoading(false) | ||
.withMaxSegmentsToMove(1) | ||
.withUseBatchedSegmentSampler(true) |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
CoordinatorDynamicConfig.builder() | ||
.withSmartSegmentLoading(false) | ||
.withMaxSegmentsToMove(2) | ||
.withUseBatchedSegmentSampler(true) |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
ServerTestHelper.MAPPER | ||
.writerWithType(HttpLoadQueuePeon.RESPONSE_ENTITY_TYPE_REF) |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
Fixed
Show fixed
Hide fixed
…mentReplicaCountMap
if (action.isLoad()) { | ||
projectedSegments.add(segment); | ||
} else { | ||
queuedSegments.remove(segment); | ||
projectedSegments.remove(segment); | ||
} | ||
|
||
final long sizeDelta = addToQueue ? segment.getSize() : -segment.getSize(); | ||
if (action.isLoad()) { | ||
sizeOfLoadingSegments += sizeDelta; | ||
sizeOfLoadingSegments += segment.getSize(); | ||
} else if (action == SegmentAction.DROP) { | ||
sizeOfDroppingSegments += sizeDelta; | ||
sizeOfDroppingSegments += segment.getSize(); | ||
} else { | ||
// MOVE_FROM actions graduate to DROP after the corresponding MOVE_TO has finished | ||
// Do not consider size delta until then, otherwise we might over-assign the server | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After the change to remove teh addToQueue
boolean, you now have the exact same if clause at the beginning of 2 subsequence if statements. You should be able to make it
if (action.isLoad()) {
projectedSegments.add(segment);
sizeOfLoadingSegments += segment.getSize();
} else {
projectedSegments.remove(segment);
// The current else block could be from a DROP or a MOVE_FROM. The MOVE_FROM will eventually graduate to
// DROP after ...
if (action == SegmentAction.DROP) {
sizeOfDroppingSegments += segment.getSize()
}
}
if (action.isLoad()) { | ||
projectedSegments.remove(segment); | ||
} else { | ||
projectedSegments.add(segment); | ||
} | ||
|
||
return true; | ||
if (action.isLoad()) { | ||
sizeOfLoadingSegments -= segment.getSize(); | ||
} else if (action == SegmentAction.DROP) { | ||
sizeOfDroppingSegments -= segment.getSize(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comments here, basically have the same if statement twice.
* @return Iterator over {@link BalancerSegmentHolder}s, each of which contains | ||
* a segment picked for moving and the server currently loading it. | ||
*/ | ||
public static List<BalancerSegmentHolder> pickMovableLoadingSegmentsFrom( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the different in name is soooo minute it took me a long time to realize. One is ing
and the other is ed
.
Is there a reason not to expose the 4-argument method as public
and have the call sites pass in one of the two lambdas? It looks like there's only a single call-site for each of them except for tests.
That or maybe try to make the names a bit more different from each other.
private final boolean useRoundRobinSegmentAssignment; | ||
private final boolean emitBalancingStats; | ||
|
||
public SegmentLoadingConfig(CoordinatorDynamicConfig dynamicConfig, int numUsedSegments) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
design nit: a constructor should generally define the "dependency relationship" of a class. I.e. the things passed in on the constructor are the things that the current class is dependent upon. This constructor is doing a bunch of work, that work is dependent on the CoordinatorDynamicConfig
object, but SegmentLoadingConfig
is not dependent on the object.
In this case, it would be preferable for this to be a static method and the constructor just take all of the various values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
making my gray check green.
{ | ||
this.replicaCountsInTier = ImmutableMap.copyOf(replicaCountsInTier); | ||
|
||
final Map<SegmentId, SegmentReplicaCount> totalReplicaCounts = new HashMap<>(); |
Check notice
Code scanning / CodeQL
Possible confusion of local and field
return new SegmentLoadingConfig( | ||
dynamicConfig.getMaxSegmentsInNodeLoadingQueue(), | ||
dynamicConfig.getReplicationThrottleLimit(), | ||
dynamicConfig.getMaxNonPrimaryReplicantsToLoad(), |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
dynamicConfig.getMaxNonPrimaryReplicantsToLoad(), | ||
dynamicConfig.getReplicantLifetime(), | ||
dynamicConfig.getMaxSegmentsToMove(), | ||
dynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove(), |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
After #13197 , several coordinator configs are now redundant as they are not being used anymore, neither with `smartSegmentLoading` nor otherwise. Changes: - Remove dynamic configs `emitBalancingStats`: balancer error stats are always emitted, debug stats can be logged by using `debugDimensions` - `useBatchedSegmentSampler`, `percentOfSegmentsToConsiderPerMove`: batched segment sampling is always used - Add test to verify deserialization with unknown properties - Update `CoordinatorRunStats` to always track stats, this can be optimized later.
After apache#13197 , several coordinator configs are now redundant as they are not being used anymore, neither with `smartSegmentLoading` nor otherwise. Changes: - Remove dynamic configs `emitBalancingStats`: balancer error stats are always emitted, debug stats can be logged by using `debugDimensions` - `useBatchedSegmentSampler`, `percentOfSegmentsToConsiderPerMove`: batched segment sampling is always used - Add test to verify deserialization with unknown properties - Update `CoordinatorRunStats` to always track stats, this can be optimized later.
Fixes #12881
Description
This PR lays the ground work to allow load queue to safely have an unlimited number of items, and thus
eventually phase out
maxSegmentsInNodeLoadingQueue
andreplicationThrottleLimit
.Load queue is already allowed to have unlimited items (by setting
maxSegmentsInNodeLoadingQueue = 0
)but this leads to
Changes
Classes to review
StrategicSegmentAssigner
LoadRule
,BroadcastDistributionRule
,DropRule
,Rule.SegmentActionHandler
SegmentLoadQueueManager
LoadQueuePeon
: http and curatorSegmentHolder
ServerHolder
Behavioral changes
- Allow coordinator to take corrective actions quickly.
- Always maintain target level of replication, thus ensuring that segment read concurrency does not suffer
- Avoid considering balancing items in load queue as over-replicated.
replicationThrottleLimit
does not act on a tier if the segment is not loaded on that tier at allmaxNonPrimaryReplicantsToLoad
does not act on first replica in any tierreplicationThrottleLimit
maxSegmentsInNodeLoadingQueue
Structural changes
StrategicSegmentAssigner
which handles all segment assignments. The lifecycle of the assigner is tied to a single coordinator run.- Single place to maintain state of a single run thus allowing better metrics and logging.
StrategicSegmentAssigner
.SegmentLoadQueueManager
that interacts with the load queues.- Allow reporting of metrics from queue callbacks.
- Prevent callbacks from holding references to items from the previous coordinator run.
New metrics
segment/loadQueue/assigned
segment/loadQueue/success
segment/loadQueue/cancelled
segment/loadQueue/failed
"datasource"
to most segment level metricsRelease notes
The Druid coordinator has been completely revamped to make it much more stable and user-friendly. This is accompanied by several bug fixes, logging and metric improvements and a whole new range of capabilities.
Features:
smartSegmentLoading
mode, which is enabled by default. When enabled, users need not specify any of the following dynamic configs as they would be ignored by the coordinator. Instead, the coordinator computes the optimal values of these configs at run time to best utilize coordinator runs.maxSegmentsInNodeLoadingQueue
maxSegmentsToMove
replicationThrottleLimit
useRoundRobinSegmentAssignment
useBatchedSegmentSampler
emitBalancingStats
These configs are now deprecated and will be removed in subsequent releases.
Monitoring:
This PR has: