diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 2f60c731bc554..c3b5b35c721a4 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -257,6 +257,10 @@ public void apply(Settings value, Settings current, Settings previous) { public static Set> BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet( new HashSet<>( Arrays.asList( + // changed: for performComputeIntensiveTask + SearchService.COMPUTE_INTENSIVE_DURATION_SECONDS, + SearchService.MEMORY_OVERHEAD_PER_ITERATION, + AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING, diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 93946fa11de13..b5322b0b42a64 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -146,6 +146,8 @@ public final class IndicesRequestCache implements RemovalListener registeredClosedListeners = ConcurrentCollections.newConcurrentMap(); @@ -162,6 +164,7 @@ public final class IndicesRequestCache implements RemovalListener> cacheEntityFunction, @@ -182,6 +185,7 @@ public final class IndicesRequestCache implements RemovalListener().setSettings(settings) .setWeigher(weigher) diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index a53a7198c366f..84573926e7359 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -91,6 +91,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.IndicesRequestCache; import org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; import org.opensearch.node.ResponseCollectorService; import org.opensearch.script.FieldScript; @@ -171,6 +172,17 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEventListener { private static final Logger logger = LogManager.getLogger(SearchService.class); + // changed: for performComputeIntensiveTask + public static final Setting COMPUTE_INTENSIVE_DURATION_SECONDS = + Setting.intSetting("search.service.experimental.compute_intensive.duration_seconds", 0, Setting.Property.Dynamic, Setting.Property.NodeScope); + + public static final Setting MEMORY_OVERHEAD_PER_ITERATION = + Setting.intSetting("search.service.experimental.memory_overhead.per_iteration", 0, Setting.Property.Dynamic, Setting.Property.NodeScope); + + // changed: new setting + private volatile int computeIntensiveDurationSeconds; + private volatile int memoryOverheadPerIteration; + // we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes public static final Setting DEFAULT_KEEPALIVE_SETTING = Setting.positiveTimeSetting( "search.default_keep_alive", @@ -374,6 +386,13 @@ public SearchService( TaskResourceTrackingService taskResourceTrackingService ) { Settings settings = clusterService.getSettings(); + // changed: new setting + this.computeIntensiveDurationSeconds = SearchService.COMPUTE_INTENSIVE_DURATION_SECONDS.get(settings); + this.memoryOverheadPerIteration = SearchService.MEMORY_OVERHEAD_PER_ITERATION.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(SearchService.COMPUTE_INTENSIVE_DURATION_SECONDS, this::setComputeIntensiveDurationSeconds); + clusterService.getClusterSettings().addSettingsUpdateConsumer(SearchService.MEMORY_OVERHEAD_PER_ITERATION, this::setMemoryOverheadPerIteration); + + this.threadPool = threadPool; this.clusterService = clusterService; this.indicesService = indicesService; @@ -428,6 +447,35 @@ public SearchService( clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_ALLOW_DERIVED_FIELD_SETTING, this::setAllowDerivedField); } + // changed: new setting to line 468 + private void setComputeIntensiveDurationSeconds(int time) { + this.computeIntensiveDurationSeconds = time; + } + + private void setMemoryOverheadPerIteration(int overhead) { + this.memoryOverheadPerIteration = overhead; + } + + public void performComputeIntensiveTask() { + long endTime = System.currentTimeMillis() + computeIntensiveDurationSeconds * 1000; + logger.info("Starting compute-intensive task for {} seconds and {} bytes per iteration", + computeIntensiveDurationSeconds, memoryOverheadPerIteration); + + int iterations = 0; + while (System.currentTimeMillis() < endTime) { + byte[] memoryHog = new byte[memoryOverheadPerIteration]; + for (int j = 0; j < memoryOverheadPerIteration; j++) { + memoryHog[j] = (byte) (j % 256); + } + iterations++; + if (iterations % 1000 == 0) { + logger.info("Performed {} iterations", iterations); + } + } + logger.info("Completed compute-intensive task"); + } + + private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) { if (defaultKeepAlive.millis() > maxKeepAlive.millis()) { throw new IllegalArgumentException( @@ -630,8 +678,12 @@ public void onResponse(ShardSearchRequest orig) { return; } } + performComputeIntensiveTask(); // fork the execution in the search thread pool - runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener); + runAsync(getExecutor(shard), () -> + // changed: Compute- and memory-intensive logic +// performComputeIntensiveTask(); + executeQueryPhase(orig, task, keepStatesInContext), listener); } @Override @@ -641,6 +693,7 @@ public void onFailure(Exception exc) { }); } + private IndexShard getShard(ShardSearchRequest request) { if (request.readerId() != null) { return findReaderContext(request.readerId(), request).indexShard();