From f5de641b053a8da86b1f37d1646ccd90bdef7634 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 6 Dec 2023 15:56:21 +0100 Subject: [PATCH 1/2] [Profiling] Query in parallel only if beneficial With this commit we check index allocation before we do key-value lookups. To reduce latency, key-value lookups are done in parallel for multiple slices of data. However, on nodes with spinning disks, parallel accesses are harmful. Therefore, we check whether any index is allocated either to the warm or cold tier (which are usually on spinning disks) and disable parallel key-value lookups. This has improved latency on the warm tier by about 10% in our experiments. --- .../xpack/profiling/IndexAllocation.java | 60 +++++++++ .../TransportGetStackTracesAction.java | 22 +++- .../xpack/profiling/IndexAllocationTests.java | 122 ++++++++++++++++++ .../TransportGetStackTracesActionTests.java | 7 + 4 files changed, 206 insertions(+), 5 deletions(-) create mode 100644 x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/IndexAllocation.java create mode 100644 x-pack/plugin/profiling/src/test/java/org/elasticsearch/xpack/profiling/IndexAllocationTests.java diff --git a/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/IndexAllocation.java b/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/IndexAllocation.java new file mode 100644 index 0000000000000..701b2d8d8728d --- /dev/null +++ b/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/IndexAllocation.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.profiling; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.DataTier; +import org.elasticsearch.index.Index; + +import java.util.List; +import java.util.function.Predicate; + +final class IndexAllocation { + private IndexAllocation() { + // no instances intended + } + + static boolean isAnyAssignedToNode(ClusterState state, List indices, Predicate nodePredicate) { + for (Index index : indices) { + IndexMetadata metadata = state.getMetadata().index(index); + if (metadata == null) { + continue; + } + IndexRoutingTable routingTable = state.routingTable().index(index); + if (routingTable == null) { + continue; + } + for (ShardRouting shardRouting : routingTable.randomAllActiveShardsIt()) { + if (shardRouting.assignedToNode() == false) { + continue; + } + DiscoveryNode assignedNode = state.nodes().get(shardRouting.currentNodeId()); + if (nodePredicate.test(assignedNode)) { + return true; + } + } + } + return false; + } + + /** + * Determines whether any of the provided indices is allocated to the warm or cold tier. Machines on these + * tiers usually use spinning disks. + * + * @param state Current cluster state. + * @param indices A list of indices to check. + * @return true iff at least one index is allocated to either a warm or cold data node. + */ + static boolean isAnyOnWarmOrColdTier(ClusterState state, List indices) { + return isAnyAssignedToNode(state, indices, n -> DataTier.isWarmNode(n) || DataTier.isColdNode(n)); + } +} diff --git a/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/TransportGetStackTracesAction.java b/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/TransportGetStackTracesAction.java index a51d8b509003a..cf602845ca035 100644 --- a/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/TransportGetStackTracesAction.java +++ b/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/TransportGetStackTracesAction.java @@ -408,9 +408,12 @@ private void retrieveStackTraces( return; } List eventIds = new ArrayList<>(responseBuilder.getStackTraceEvents().keySet()); - List> slicedEventIds = sliced(eventIds, desiredSlices); ClusterState clusterState = clusterService.state(); List indices = resolver.resolve(clusterState, "profiling-stacktraces", responseBuilder.getStart(), responseBuilder.getEnd()); + // Avoid parallelism if there is potential we are on spinning disks (frozen tier uses searchable snapshots) + int sliceCount = IndexAllocation.isAnyOnWarmOrColdTier(clusterState, indices) ? 1 : desiredSlices; + log.trace("Using [{}] slice(s) to lookup stacktraces.", sliceCount); + List> slicedEventIds = sliced(eventIds, sliceCount); // Build a set of unique host IDs. Set uniqueHostIDs = new HashSet<>(responseBuilder.hostEventCounts.size()); @@ -464,7 +467,7 @@ private void retrieveStackTraces( // package private for testing static List> sliced(List c, int slices) { - if (c.size() <= slices) { + if (c.size() <= slices || slices == 1) { return List.of(c); } List> slicedList = new ArrayList<>(); @@ -628,9 +631,6 @@ private void retrieveStackTraceDetails( if (mayNotifyOfCancellation(submitTask, submitListener)) { return; } - - List> slicedStackFrameIds = sliced(stackFrameIds, desiredDetailSlices); - List> slicedExecutableIds = sliced(executableIds, desiredDetailSlices); List stackFrameIndices = resolver.resolve( clusterState, "profiling-stackframes", @@ -643,6 +643,18 @@ private void retrieveStackTraceDetails( responseBuilder.getStart(), responseBuilder.getEnd() ); + // Avoid parallelism if there is potential we are on spinning disks (frozen tier uses searchable snapshots) + int stackFrameSliceCount = IndexAllocation.isAnyOnWarmOrColdTier(clusterState, stackFrameIndices) ? 1 : desiredDetailSlices; + int executableSliceCount = IndexAllocation.isAnyOnWarmOrColdTier(clusterState, executableIndices) ? 1 : desiredDetailSlices; + log.trace( + "Using [{}] slice(s) to lookup stack frames and [{}] slice(s) to lookup executables.", + stackFrameSliceCount, + executableSliceCount + ); + + List> slicedStackFrameIds = sliced(stackFrameIds, stackFrameSliceCount); + List> slicedExecutableIds = sliced(executableIds, executableSliceCount); + DetailsHandler handler = new DetailsHandler( responseBuilder, submitListener, diff --git a/x-pack/plugin/profiling/src/test/java/org/elasticsearch/xpack/profiling/IndexAllocationTests.java b/x-pack/plugin/profiling/src/test/java/org/elasticsearch/xpack/profiling/IndexAllocationTests.java new file mode 100644 index 0000000000000..852790e219a2d --- /dev/null +++ b/x-pack/plugin/profiling/src/test/java/org/elasticsearch/xpack/profiling/IndexAllocationTests.java @@ -0,0 +1,122 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.profiling; + +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +public class IndexAllocationTests extends ESTestCase { + private final Index hot = idx("hot"); + private final Index warm = idx("warm"); + private final Index cold = idx("cold"); + private final Index frozen = idx("frozen"); + + public void testEmptyIndicesNotOnWarmColdTier() { + assertFalse(IndexAllocation.isAnyOnWarmOrColdTier(clusterState(), Collections.emptyList())); + } + + public void testOtherIndicesNotOnWarmColdTier() { + assertFalse(IndexAllocation.isAnyOnWarmOrColdTier(clusterState(), List.of(hot, frozen))); + } + + public void testIndicesOnWarmColdTier() { + assertTrue(IndexAllocation.isAnyOnWarmOrColdTier(clusterState(), List.of(warm))); + assertTrue(IndexAllocation.isAnyOnWarmOrColdTier(clusterState(), List.of(cold))); + } + + public void testMixedIndicesOnWarmColdTier() { + assertTrue(IndexAllocation.isAnyOnWarmOrColdTier(clusterState(), List.of(hot, warm))); + assertTrue(IndexAllocation.isAnyOnWarmOrColdTier(clusterState(), List.of(frozen, cold))); + } + + /** + * Creates a cluster state that represents several indices: + * + *
    + *
  • hot assigned to a hot-tier node named n-hot
  • + *
  • warm assigned to a warm-tier node named n-warm
  • + *
  • cold assigned to a cold-tier node named n-cold
  • + *
  • frozen assigned to a frozen-tier node named n-frozen
  • + *
+ */ + private ClusterState clusterState() { + DiscoveryNode node = DiscoveryNodeUtils.create("node"); + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node); + + nodesBuilder.add(DiscoveryNodeUtils.builder("n-" + hot.getName()).roles(Set.of(DiscoveryNodeRole.DATA_HOT_NODE_ROLE)).build()); + nodesBuilder.add(DiscoveryNodeUtils.builder("n-" + warm.getName()).roles(Set.of(DiscoveryNodeRole.DATA_WARM_NODE_ROLE)).build()); + nodesBuilder.add(DiscoveryNodeUtils.builder("n-" + cold.getName()).roles(Set.of(DiscoveryNodeRole.DATA_COLD_NODE_ROLE)).build()); + nodesBuilder.add( + DiscoveryNodeUtils.builder("n-" + frozen.getName()).roles(Set.of(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE)).build() + ); + + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Map indices = new HashMap<>(); + for (Index index : List.of(hot, warm, cold, frozen)) { + indices.put(index.getName(), metadata(index)); + ShardRouting shardRouting = ShardRouting.newUnassigned( + new ShardId(index, 0), + true, + RecoverySource.ExistingStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""), + ShardRouting.Role.DEFAULT + ); + + shardRouting = shardRouting.initialize("n-" + index.getName(), null, 0).moveToStarted(0); + routingTableBuilder.add( + IndexRoutingTable.builder(index) + .addIndexShard(IndexShardRoutingTable.builder(shardRouting.shardId()).addShard(shardRouting)) + ); + } + + return ClusterState.builder(new ClusterName("test")) + .metadata(Metadata.builder().indices(indices).build()) + .blocks(new ClusterBlocks.Builder().build()) + .nodes(nodesBuilder) + .routingTable(routingTableBuilder) + .build(); + } + + private IndexMetadata metadata(Index index) { + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build(); + return IndexMetadata.builder(index.getName()).settings(settings).numberOfShards(1).numberOfReplicas(0).build(); + } + + private Index idx(String name) { + return new Index(name, UUID.randomUUID().toString()); + } + +} diff --git a/x-pack/plugin/profiling/src/test/java/org/elasticsearch/xpack/profiling/TransportGetStackTracesActionTests.java b/x-pack/plugin/profiling/src/test/java/org/elasticsearch/xpack/profiling/TransportGetStackTracesActionTests.java index bf4c15d8d1ed5..2eccfb45f5958 100644 --- a/x-pack/plugin/profiling/src/test/java/org/elasticsearch/xpack/profiling/TransportGetStackTracesActionTests.java +++ b/x-pack/plugin/profiling/src/test/java/org/elasticsearch/xpack/profiling/TransportGetStackTracesActionTests.java @@ -17,6 +17,13 @@ public void testSliceEmptyList() { assertEquals(List.of(List.of()), TransportGetStackTracesAction.sliced(Collections.emptyList(), 4)); } + public void testSingleSlice() { + List input = randomList(2, 5, () -> randomAlphaOfLength(3)); + List> sliced = TransportGetStackTracesAction.sliced(input, 1); + assertEquals(1, sliced.size()); + assertEquals(input, sliced.get(0)); + } + public void testSliceListSmallerOrEqualToSliceCount() { int slices = 7; List input = randomList(0, slices, () -> randomAlphaOfLength(3)); From 7aef899578c74ef5ca3df412b52b22d0c4820318 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 6 Dec 2023 16:00:39 +0100 Subject: [PATCH 2/2] Update docs/changelog/103061.yaml --- docs/changelog/103061.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/103061.yaml diff --git a/docs/changelog/103061.yaml b/docs/changelog/103061.yaml new file mode 100644 index 0000000000000..558429493ac6f --- /dev/null +++ b/docs/changelog/103061.yaml @@ -0,0 +1,5 @@ +pr: 103061 +summary: "[Profiling] Query in parallel only if beneficial" +area: Application +type: bug +issues: []