Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean up the balancing code around the batched vs deprecated way of sampling segments to balance #11960

Merged
merged 10 commits into from
Dec 7, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class BalancerStrategyBenchmark
private int maxSegmentsToMove;

private final List<ServerHolder> serverHolders = new ArrayList<>();
private boolean useBatchedSegmentSampler;
private int reservoirSize = 1;
private double percentOfSegmentsToConsider = 100;
private final BalancerStrategy balancerStrategy = new CostBalancerStrategy(
Expand All @@ -85,9 +86,11 @@ public void setup()
switch (mode) {
case "50percentOfSegmentsToConsiderPerMove":
percentOfSegmentsToConsider = 50;
useBatchedSegmentSampler = false;
break;
case "useBatchedSegmentSampler":
reservoirSize = maxSegmentsToMove;
useBatchedSegmentSampler = true;
break;
default:
}
Expand Down Expand Up @@ -128,12 +131,21 @@ public void setup()
@Benchmark
public void pickSegmentsToMove(Blackhole blackhole)
{
Iterator<BalancerSegmentHolder> iterator = balancerStrategy.pickSegmentsToMove(
serverHolders,
Collections.emptySet(),
reservoirSize,
percentOfSegmentsToConsider
);
Iterator<BalancerSegmentHolder> iterator;
if (useBatchedSegmentSampler) {
iterator = balancerStrategy.pickSegmentsToMove(
serverHolders,
Collections.emptySet(),
reservoirSize
);
} else {
iterator = balancerStrategy.pickSegmentsToMove(
serverHolders,
Collections.emptySet(),
percentOfSegmentsToConsider
);
}

for (int i = 0; i < maxSegmentsToMove && iterator.hasNext(); i++) {
blackhole.consume(iterator.next());
}
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100|
|`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5|
|`useBatchedSegmentSampler`|Boolean flag for whether or not we should use the Reservoir Sampling with a reservoir of size k instead of fixed size 1 to pick segments to move. This option can be enabled to speed up segment balancing process, especially if there are huge number of segments in the cluster or if there are too many segments to move.|false|
|`percentOfSegmentsToConsiderPerMove`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.|100|
|`percentOfSegmentsToConsiderPerMove`|Deprecated. Will eventually be phased out by the batched segment sampler. Only used if `useBatchedSegmentSampler == false`. The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.|100|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a similar deprecation message in the web console description for this property?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, good call @a2l007

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document what the preferred alternative is here?

|`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15|
|`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10|
|`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,52 +65,71 @@ public interface BalancerStrategy
* the interval or period-based broadcast rules. For simplicity of the initial
* implementation, only forever broadcast rules are supported.
* @param reservoirSize the reservoir size maintained by the Reservoir Sampling algorithm.
* @param percentOfSegmentsToConsider The percentage of the total number of segments that we will consider when
* choosing which segment to move. {@link CoordinatorDynamicConfig} defines a
* config percentOfSegmentsToConsiderPerMove that will be used as an argument
* for implementations of this method.
* @return Iterator for set of {@link BalancerSegmentHolder} containing segment to move and server they currently
* reside on, or empty if there are no segments to pick from (i. e. all provided serverHolders are empty).
*/
default Iterator<BalancerSegmentHolder> pickSegmentsToMove(
List<ServerHolder> serverHolders,
Set<String> broadcastDatasources,
int reservoirSize,
double percentOfSegmentsToConsider
int reservoirSize
)
{
if (reservoirSize > 1) {
return new Iterator<BalancerSegmentHolder>()
return new Iterator<BalancerSegmentHolder>()
{
private Iterator<BalancerSegmentHolder> it = sample();
private Iterator<BalancerSegmentHolder> sample()
{
private Iterator<BalancerSegmentHolder> it = sample();

private Iterator<BalancerSegmentHolder> sample()
{
return ReservoirSegmentSampler.getRandomBalancerSegmentHolders(
serverHolders,
broadcastDatasources,
reservoirSize
).iterator();
}
return ReservoirSegmentSampler.getRandomBalancerSegmentHolders(
serverHolders,
broadcastDatasources,
reservoirSize
).iterator();
}

@Override
public boolean hasNext()
{
if (it.hasNext()) {
return true;
}
it = sample();
return it.hasNext();
@Override
public boolean hasNext()
{
if (it.hasNext()) {
return true;
}
it = sample();
return it.hasNext();
}

@Override
public BalancerSegmentHolder next()
{
return it.next();
}
};
}
@Override
public BalancerSegmentHolder next()
{
return it.next();
}
};
}

/**
* Pick the best segments to move from one of the supplied set of servers according to the balancing strategy. This
* is the deprecated way of picking a segment to move. pickSegmentsToMove(List<ServerHoler>, Set<String>, int) uses
* a more performant bathced sampling method that will become the default picking mode in the future.
*
* @param serverHolders set of historicals to consider for moving segments
* @param broadcastDatasources Datasources that contain segments which were loaded via broadcast rules.
* Balancing strategies should avoid rebalancing segments for such datasources, since
* they should be loaded on all servers anyway.
* NOTE: this should really be handled on a per-segment basis, to properly support
* the interval or period-based broadcast rules. For simplicity of the initial
* implementation, only forever broadcast rules are supported.
* @param percentOfSegmentsToConsider The percentage of the total number of segments that we will consider when
* choosing which segment to move. {@link CoordinatorDynamicConfig} defines a
* config percentOfSegmentsToConsiderPerMove that will be used as an argument
* for implementations of this method.
* @return Iterator for set of {@link BalancerSegmentHolder} containing segment to move and server they currently
* reside on, or empty if there are no segments to pick from (i. e. all provided serverHolders are empty).
*/
@Deprecated
default Iterator<BalancerSegmentHolder> pickSegmentsToMove(
List<ServerHolder> serverHolders,
Set<String> broadcastDatasources,
double percentOfSegmentsToConsider
)
{
return new Iterator<BalancerSegmentHolder>()
{
private BalancerSegmentHolder next = sample();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class CoordinatorDynamicConfig
private final long mergeBytesLimit;
private final int mergeSegmentsLimit;
private final int maxSegmentsToMove;
@Deprecated
private final double percentOfSegmentsToConsiderPerMove;
private final boolean useBatchedSegmentSampler;
private final int replicantLifetime;
Expand Down Expand Up @@ -117,7 +118,7 @@ public CoordinatorDynamicConfig(
@JsonProperty("mergeBytesLimit") long mergeBytesLimit,
@JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") int maxSegmentsToMove,
@JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove,
@Deprecated @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove,
@JsonProperty("useBatchedSegmentSampler") boolean useBatchedSegmentSampler,
@JsonProperty("replicantLifetime") int replicantLifetime,
@JsonProperty("replicationThrottleLimit") int replicationThrottleLimit,
Expand Down Expand Up @@ -274,6 +275,7 @@ public int getMaxSegmentsToMove()
return maxSegmentsToMove;
}

@Deprecated
@JsonProperty
public double getPercentOfSegmentsToConsiderPerMove()
{
Expand Down Expand Up @@ -559,7 +561,7 @@ public Builder(
@JsonProperty("mergeBytesLimit") @Nullable Long mergeBytesLimit,
@JsonProperty("mergeSegmentsLimit") @Nullable Integer mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove,
@JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove,
@Deprecated @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove,
@JsonProperty("useBatchedSegmentSampler") Boolean useBatchedSegmentSampler,
@JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime,
@JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit,
Expand Down Expand Up @@ -623,6 +625,7 @@ public Builder withMaxSegmentsToMove(int maxSegmentsToMove)
return this;
}

@Deprecated
public Builder withPercentOfSegmentsToConsiderPerMove(double percentOfSegmentsToConsiderPerMove)
{
this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ public class BalanceSegments implements CoordinatorDuty
protected final Map<String, ConcurrentHashMap<SegmentId, BalancerSegmentHolder>> currentlyMovingSegments =
new HashMap<>();

private static final int DEFAULT_RESERVOIR_SIZE = 1;

public BalanceSegments(DruidCoordinator coordinator)
{
this.coordinator = coordinator;
Expand Down Expand Up @@ -202,12 +200,21 @@ private Pair<Integer, Integer> balanceServers(
final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
int moved = 0, unmoved = 0;

Iterator<BalancerSegmentHolder> segmentsToMove = strategy.pickSegmentsToMove(
toMoveFrom,
params.getBroadcastDatasources(),
params.getCoordinatorDynamicConfig().useBatchedSegmentSampler() ? maxSegmentsToMove : DEFAULT_RESERVOIR_SIZE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can delete this unused constant variable

params.getCoordinatorDynamicConfig().getPercentOfSegmentsToConsiderPerMove()
);
Iterator<BalancerSegmentHolder> segmentsToMove;
// The pick method depends on if the operator has enabled batched segment sampling in the Coorinator dynamic config.
if (params.getCoordinatorDynamicConfig().useBatchedSegmentSampler()) {
segmentsToMove = strategy.pickSegmentsToMove(
toMoveFrom,
params.getBroadcastDatasources(),
maxSegmentsToMove
);
} else {
segmentsToMove = strategy.pickSegmentsToMove(
toMoveFrom,
params.getBroadcastDatasources(),
params.getCoordinatorDynamicConfig().getPercentOfSegmentsToConsiderPerMove()
);
}

//noinspection ForLoopThatDoesntUseLoopVariable
for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove()
new ServerHolder(druidServer2, peon2, true)
),
broadcastDatasources,
1,
100
100.0
)
).andReturn(
ImmutableList.of(
Expand All @@ -249,7 +248,7 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove()
).iterator()
);

EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt()))
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble()))
.andReturn(
ImmutableList.of(
new BalancerSegmentHolder(druidServer1, segment1),
Expand Down Expand Up @@ -318,7 +317,7 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissi

BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(
strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt()))
strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble()))
.andReturn(
ImmutableList.of(
new BalancerSegmentHolder(druidServer1, segment2),
Expand Down Expand Up @@ -369,7 +368,7 @@ public void testMoveToDecommissioningServer()
mockCoordinator(coordinator);

BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt()))
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble()))
.andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator())
.anyTimes();
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(() -> {
Expand Down Expand Up @@ -405,7 +404,7 @@ public void testMoveFromDecommissioningServer()

ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt()))
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble()))
.andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator())
.once();
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
Expand Down Expand Up @@ -576,8 +575,7 @@ public void testThatDynamicConfigIsHonoredWhenPickingSegmentToMove()
new ServerHolder(druidServer1, peon1, false)
),
broadcastDatasources,
1,
40
40.0
)
)
.andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer2, segment3)).iterator());
Expand Down Expand Up @@ -748,7 +746,16 @@ public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, Li
public Iterator<BalancerSegmentHolder> pickSegmentsToMove(
List<ServerHolder> serverHolders,
Set<String> broadcastDatasources,
int numberOfSegments,
int numberOfSegments
)
{
return pickOrder.iterator();
}

@Override
public Iterator<BalancerSegmentHolder> pickSegmentsToMove(
List<ServerHolder> serverHolders,
Set<String> broadcastDatasources,
double percentOfSegmentsToConsider
)
{
Expand Down Expand Up @@ -780,13 +787,12 @@ private DruidCoordinatorRuntimeParams setupParamsForDecommissioningMaxPercentOfM
new ServerHolder(druidServer2, peon2, true)
),
broadcastDatasources,
1,
100
100.0
)
).andReturn(
ImmutableList.of(new BalancerSegmentHolder(druidServer2, segment2)).iterator()
);
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyDouble()))
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble()))
.andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator());
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new ServerHolder(druidServer3, peon3))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ exports[`coordinator dynamic config matches snapshot 1`] = `
Object {
"defaultValue": 100,
"info": <React.Fragment>
The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.
Deprecated. This will be phased out by the batched segment sampler. Only used if useBatchedSegmentSampler == false. The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.
</React.Fragment>,
"name": "percentOfSegmentsToConsiderPerMove",
"type": "number",
Expand Down
Loading