Skip to content

Commit

Permalink
Merge branch 'main' into topmetricsnames
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticmachine authored Nov 6, 2024
2 parents 37ed526 + a902878 commit ee5f0a6
Show file tree
Hide file tree
Showing 40 changed files with 2,960 additions and 2,286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ percentiles: `[ 1, 5, 25, 50, 75, 95, 99 ]`. The response will look like this:

As you can see, the aggregation will return a calculated value for each percentile
in the default range. If we assume response times are in milliseconds, it is
immediately obvious that the webpage normally loads in 10-725ms, but occasionally
spikes to 945-985ms.
immediately obvious that the webpage normally loads in 10-720ms, but occasionally
spikes to 940-980ms.

Often, administrators are only interested in outliers -- the extreme percentiles.
We can specify just the percents we are interested in (requested percentiles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.packaging.util.ServerUtils;
import org.elasticsearch.packaging.util.Shell;
import org.elasticsearch.packaging.util.Shell.Result;
import org.junit.Assume;
import org.junit.BeforeClass;

import java.nio.file.Files;
Expand Down Expand Up @@ -112,6 +113,10 @@ public void test32SpecialCharactersInJdkPath() throws Exception {
}

public void test40AutoconfigurationNotTriggeredWhenNodeIsMeantToJoinExistingCluster() throws Exception {
Assume.assumeFalse(
"https://github.com/elastic/elasticsearch/issues/116299",
distribution.platform == Distribution.Platform.WINDOWS
);
// auto-config requires that the archive owner and the process user be the same,
Platforms.onWindows(() -> sh.chown(installation.config, installation.getOwner()));
FileUtils.assertPathsDoNotExist(installation.data);
Expand All @@ -124,8 +129,11 @@ public void test40AutoconfigurationNotTriggeredWhenNodeIsMeantToJoinExistingClus
FileUtils.rm(installation.data);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/116299")
public void test41AutoconfigurationNotTriggeredWhenNodeCannotContainData() throws Exception {
Assume.assumeFalse(
"https://github.com/elastic/elasticsearch/issues/116299",
distribution.platform == Distribution.Platform.WINDOWS
);
// auto-config requires that the archive owner and the process user be the same
Platforms.onWindows(() -> sh.chown(installation.config, installation.getOwner()));
ServerUtils.addSettingToExistingConfiguration(installation, "node.roles", "[\"voting_only\", \"master\"]");
Expand All @@ -138,6 +146,10 @@ public void test41AutoconfigurationNotTriggeredWhenNodeCannotContainData() throw
}

public void test42AutoconfigurationNotTriggeredWhenNodeCannotBecomeMaster() throws Exception {
Assume.assumeFalse(
"https://github.com/elastic/elasticsearch/issues/116299",
distribution.platform == Distribution.Platform.WINDOWS
);
// auto-config requires that the archive owner and the process user be the same
Platforms.onWindows(() -> sh.chown(installation.config, installation.getOwner()));
ServerUtils.addSettingToExistingConfiguration(installation, "node.roles", "[\"ingest\"]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,14 +434,13 @@ private synchronized void onMergeFailure(Exception exc) {
}

private void tryExecuteNext() {
assert Thread.holdsLock(this);
final MergeTask task;
synchronized (this) {
if (hasFailure() || runningTask.get() != null) {
return;
}
task = queue.poll();
runningTask.set(task);
if (hasFailure() || runningTask.get() != null) {
return;
}
task = queue.poll();
runningTask.set(task);
if (task == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ public void onNewInput(T input) {
}
}

/**
* enqueues {@code input} if {@code expectedLatestKnownInput} is the latest known input.
* Neither of the parameters can be null.
*/
protected boolean compareAndEnqueue(T expectedLatestKnownInput, T input) {
assert expectedLatestKnownInput != null;
assert input != null;
return enqueuedInput.compareAndSet(Objects.requireNonNull(expectedLatestKnownInput), Objects.requireNonNull(input));
}

/**
* @return {@code false} iff there are no active/enqueued computations
*/
Expand All @@ -67,7 +77,7 @@ protected boolean isFresh(T input) {
/**
* Process the given input.
*
* @param input the value that was last received by {@link #onNewInput} before invocation.
* @param input the value that was last received by {@link #onNewInput} or {@link #compareAndEnqueue} before invocation.
*/
protected abstract void processInput(T input);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@
public record DesiredBalance(
long lastConvergedIndex,
Map<ShardId, ShardAssignment> assignments,
Map<DiscoveryNode, DesiredBalanceMetrics.NodeWeightStats> weightsPerNode
Map<DiscoveryNode, DesiredBalanceMetrics.NodeWeightStats> weightsPerNode,
ComputationFinishReason finishReason
) {

enum ComputationFinishReason {
CONVERGED,
YIELD_TO_NEW_INPUT,
STOP_EARLY
}

public DesiredBalance(long lastConvergedIndex, Map<ShardId, ShardAssignment> assignments) {
this(lastConvergedIndex, assignments, Map.of());
this(lastConvergedIndex, assignments, Map.of(), ComputationFinishReason.CONVERGED);
}

public static final DesiredBalance INITIAL = new DesiredBalance(-1, Map.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.LongSupplier;
import java.util.function.Predicate;

import static java.util.stream.Collectors.toUnmodifiableSet;
Expand All @@ -49,8 +50,8 @@ public class DesiredBalanceComputer {

private static final Logger logger = LogManager.getLogger(DesiredBalanceComputer.class);

private final ThreadPool threadPool;
private final ShardsAllocator delegateAllocator;
private final LongSupplier timeSupplierMillis;

// stats
protected final MeanMetric iterations = new MeanMetric();
Expand All @@ -63,12 +64,28 @@ public class DesiredBalanceComputer {
Setting.Property.NodeScope
);

public static final Setting<TimeValue> MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING = Setting.timeSetting(
"cluster.routing.allocation.desired_balance.max_balance_computation_time_during_index_creation",
TimeValue.timeValueSeconds(1),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private TimeValue progressLogInterval;
private long maxBalanceComputationTimeDuringIndexCreationMillis;

public DesiredBalanceComputer(ClusterSettings clusterSettings, ThreadPool threadPool, ShardsAllocator delegateAllocator) {
this.threadPool = threadPool;
this(clusterSettings, delegateAllocator, threadPool::relativeTimeInMillis);
}

DesiredBalanceComputer(ClusterSettings clusterSettings, ShardsAllocator delegateAllocator, LongSupplier timeSupplierMillis) {
this.delegateAllocator = delegateAllocator;
this.timeSupplierMillis = timeSupplierMillis;
clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value);
clusterSettings.initializeAndWatch(
MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
value -> this.maxBalanceComputationTimeDuringIndexCreationMillis = value.millis()
);
}

public DesiredBalance compute(
Expand All @@ -77,7 +94,6 @@ public DesiredBalance compute(
Queue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves,
Predicate<DesiredBalanceInput> isFresh
) {

if (logger.isTraceEnabled()) {
logger.trace(
"Recomputing desired balance for [{}]: {}, {}, {}, {}",
Expand All @@ -97,9 +113,10 @@ public DesiredBalance compute(
final var changes = routingAllocation.changes();
final var ignoredShards = getIgnoredShardsWithDiscardedAllocationStatus(desiredBalanceInput.ignoredShards());
final var clusterInfoSimulator = new ClusterInfoSimulator(routingAllocation);
DesiredBalance.ComputationFinishReason finishReason = DesiredBalance.ComputationFinishReason.CONVERGED;

if (routingNodes.size() == 0) {
return new DesiredBalance(desiredBalanceInput.index(), Map.of());
return new DesiredBalance(desiredBalanceInput.index(), Map.of(), Map.of(), finishReason);
}

// we assume that all ongoing recoveries will complete
Expand Down Expand Up @@ -263,11 +280,12 @@ public DesiredBalance compute(

final int iterationCountReportInterval = computeIterationCountReportInterval(routingAllocation);
final long timeWarningInterval = progressLogInterval.millis();
final long computationStartedTime = threadPool.relativeTimeInMillis();
final long computationStartedTime = timeSupplierMillis.getAsLong();
long nextReportTime = computationStartedTime + timeWarningInterval;

int i = 0;
boolean hasChanges = false;
boolean assignedNewlyCreatedPrimaryShards = false;
while (true) {
if (hasChanges) {
// Not the first iteration, so every remaining unassigned shard has been ignored, perhaps due to throttling. We must bring
Expand All @@ -293,6 +311,15 @@ public DesiredBalance compute(
for (final var shardRouting : routingNode) {
if (shardRouting.initializing()) {
hasChanges = true;
if (shardRouting.primary()
&& shardRouting.unassignedInfo() != null
&& shardRouting.unassignedInfo().reason() == UnassignedInfo.Reason.INDEX_CREATED) {
// TODO: we could include more cases that would cause early publishing of desired balance in case of a long
// computation. e.g.:
// - unassigned search replicas in case the shard has no assigned shard replicas
// - other reasons for an unassigned shard such as NEW_INDEX_RESTORED
assignedNewlyCreatedPrimaryShards = true;
}
clusterInfoSimulator.simulateShardStarted(shardRouting);
routingNodes.startShard(shardRouting, changes, 0L);
}
Expand All @@ -301,14 +328,14 @@ public DesiredBalance compute(

i++;
final int iterations = i;
final long currentTime = threadPool.relativeTimeInMillis();
final long currentTime = timeSupplierMillis.getAsLong();
final boolean reportByTime = nextReportTime <= currentTime;
final boolean reportByIterationCount = i % iterationCountReportInterval == 0;
if (reportByTime || reportByIterationCount) {
nextReportTime = currentTime + timeWarningInterval;
}

if (hasChanges == false) {
if (hasComputationConverged(hasChanges, i)) {
logger.debug(
"Desired balance computation for [{}] converged after [{}] and [{}] iterations",
desiredBalanceInput.index(),
Expand All @@ -324,9 +351,25 @@ public DesiredBalance compute(
"Desired balance computation for [{}] interrupted after [{}] and [{}] iterations as newer cluster state received. "
+ "Publishing intermediate desired balance and restarting computation",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
i
);
finishReason = DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT;
break;
}

if (assignedNewlyCreatedPrimaryShards
&& currentTime - computationStartedTime >= maxBalanceComputationTimeDuringIndexCreationMillis) {
logger.info(
"Desired balance computation for [{}] interrupted after [{}] and [{}] iterations "
+ "in order to not delay assignment of newly created index shards for more than [{}]. "
+ "Publishing intermediate desired balance and restarting computation",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
i,
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString()
TimeValue.timeValueMillis(maxBalanceComputationTimeDuringIndexCreationMillis).toString()
);
finishReason = DesiredBalance.ComputationFinishReason.STOP_EARLY;
break;
}

Expand Down Expand Up @@ -368,7 +411,12 @@ public DesiredBalance compute(
}

long lastConvergedIndex = hasChanges ? previousDesiredBalance.lastConvergedIndex() : desiredBalanceInput.index();
return new DesiredBalance(lastConvergedIndex, assignments, routingNodes.getBalanceWeightStatsPerNode());
return new DesiredBalance(lastConvergedIndex, assignments, routingNodes.getBalanceWeightStatsPerNode(), finishReason);
}

// visible for testing
boolean hasComputationConverged(boolean hasRoutingChanges, int currentIteration) {
return hasRoutingChanges == false;
}

private static Map<ShardId, ShardAssignment> collectShardAssignments(RoutingNodes routingNodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,16 @@ protected void processInput(DesiredBalanceInput desiredBalanceInput) {
)
);
computationsExecuted.inc();
if (isFresh(desiredBalanceInput)) {

if (currentDesiredBalance.finishReason() == DesiredBalance.ComputationFinishReason.STOP_EARLY) {
logger.debug(
"Desired balance computation for [{}] terminated early with partial result, scheduling reconciliation",
index
);
submitReconcileTask(currentDesiredBalance);
var newInput = DesiredBalanceInput.create(indexGenerator.incrementAndGet(), desiredBalanceInput.routingAllocation());
desiredBalanceComputation.compareAndEnqueue(desiredBalanceInput, newInput);
} else if (isFresh(desiredBalanceInput)) {
logger.debug("Desired balance computation for [{}] is completed, scheduling reconciliation", index);
computationsConverged.inc();
submitReconcileTask(currentDesiredBalance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS,
DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MIN_WRITE_THREADS,
DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING,
DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING,
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -73,6 +74,68 @@ protected void processInput(Integer input) {
assertTrue(Arrays.toString(valuePerThread) + " vs " + result.get(), Arrays.stream(valuePerThread).anyMatch(i -> i == result.get()));
}

public void testCompareAndEnqueue() throws Exception {
final var initialInput = new Object();
final var compareAndEnqueueCount = between(1, 10);
final var remaining = new AtomicInteger(compareAndEnqueueCount);
final var computationsExecuted = new AtomicInteger();
final var result = new AtomicReference<>();
final var computation = new ContinuousComputation<>(threadPool.generic()) {
@Override
protected void processInput(Object input) {
result.set(input);
if (remaining.decrementAndGet() >= 0) {
compareAndEnqueue(input, new Object());
}
computationsExecuted.incrementAndGet();
}
};
computation.onNewInput(initialInput);
assertBusy(() -> assertFalse(computation.isActive()));
assertNotEquals(result.get(), initialInput);
assertEquals(computationsExecuted.get(), 1 + compareAndEnqueueCount);
}

public void testCompareAndEnqueueSkipped() throws Exception {
final var barrier = new CyclicBarrier(2);
final var computationsExecuted = new AtomicInteger();
final var initialInput = new Object();
final var conditionalInput = new Object();
final var newInput = new Object();
final var submitConditional = new AtomicBoolean(true);
final var result = new AtomicReference<>();

final var computation = new ContinuousComputation<>(threadPool.generic()) {
@Override
protected void processInput(Object input) {
assertNotEquals(input, conditionalInput);
safeAwait(barrier); // start
safeAwait(barrier); // continue
if (submitConditional.getAndSet(false)) {
compareAndEnqueue(input, conditionalInput);
}
result.set(input);
safeAwait(barrier); // finished
computationsExecuted.incrementAndGet();
}
};
computation.onNewInput(initialInput);

safeAwait(barrier); // start
computation.onNewInput(newInput);
safeAwait(barrier); // continue
safeAwait(barrier); // finished
assertEquals(result.get(), initialInput);

safeAwait(barrier); // start
safeAwait(barrier); // continue
safeAwait(barrier); // finished

assertBusy(() -> assertFalse(computation.isActive()));
assertEquals(result.get(), newInput);
assertEquals(computationsExecuted.get(), 2);
}

public void testSkipsObsoleteValues() throws Exception {
final var barrier = new CyclicBarrier(2);
final Runnable await = () -> safeAwait(barrier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,12 @@ private void checkIterationLogging(int iterations, long eachIterationDuration, M
var currentTime = new AtomicLong(0L);
when(mockThreadPool.relativeTimeInMillis()).thenAnswer(invocation -> currentTime.addAndGet(eachIterationDuration));

var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), mockThreadPool, new ShardsAllocator() {
// Some runs of this test try to simulate a long desired balance computation. Setting a high value on the following setting
// prevents interrupting a long computation.
var clusterSettings = createBuiltInClusterSettings(
Settings.builder().put(DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING.getKey(), "2m").build()
);
var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, mockThreadPool, new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
Expand Down
Loading

0 comments on commit ee5f0a6

Please sign in to comment.