Skip to content

Commit

Permalink
Remove global block factory (#100108)
Browse files Browse the repository at this point in the history
The global BlockFactory should work fine in production, where each 
Elasticsearch node runs in its own JVM process. However, this approach
can lead to issues during testing, especially in IT tests. The same JVM
process might get reused across multiple tests, resulting in situations
where multiple IT tests inadvertently use the same instance of the
global BlockFactory.

For instance, EsqlDisruptionIT fails because it accidentally uses the 
global BlockFactory initialized by EsqlActionBreakerIT, which has a 
limit set to 1KB. Another issue in IT tests is that multiple
Elasticsearch nodes can share the same (single) global instance of the
BlockFactory.

Closes #100105
  • Loading branch information
dnhatn authored Oct 1, 2023
1 parent c6f4616 commit dbb8b7d
Show file tree
Hide file tree
Showing 14 changed files with 16 additions and 166 deletions.
1 change: 0 additions & 1 deletion x-pack/plugin/esql/compute/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/

module org.elasticsearch.compute {
uses org.elasticsearch.compute.data.BlockFactoryParameters;

requires org.apache.lucene.core;
requires org.elasticsearch.base;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import org.elasticsearch.compute.data.Block.MvOrdering;

import java.util.BitSet;
import java.util.List;
import java.util.ServiceLoader;

public class BlockFactory {

Expand All @@ -26,22 +24,6 @@ public class BlockFactory {
BigArrays.NON_RECYCLING_INSTANCE
);

private static final BlockFactory GLOBAL = loadGlobalFactory();
// new BlockFactory(new NoopCircuitBreaker("esql_noop_breaker"), BigArrays.NON_RECYCLING_INSTANCE);

private static BlockFactory loadGlobalFactory() {
ServiceLoader<BlockFactoryParameters> loader = ServiceLoader.load(
BlockFactoryParameters.class,
BlockFactory.class.getClassLoader()
);
List<ServiceLoader.Provider<BlockFactoryParameters>> impls = loader.stream().toList();
if (impls.size() != 1) {
throw new AssertionError("expected exactly one impl, but got:" + impls);
}
BlockFactoryParameters params = impls.get(0).get();
return new BlockFactory(params.breaker(), params.bigArrays());
}

private final CircuitBreaker breaker;

private final BigArrays bigArrays;
Expand All @@ -51,13 +33,6 @@ public BlockFactory(CircuitBreaker breaker, BigArrays bigArrays) {
this.bigArrays = bigArrays;
}

/**
* Returns the global ESQL block factory.
*/
public static BlockFactory getGlobalInstance() {
return GLOBAL;
}

/**
* Returns the Non-Breaking block factory.
*/
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,6 @@ public BlockFactory get() {
public String toString() {
return "1gb";
}
}, new Supplier<>() {
@Override
public BlockFactory get() {
return BlockFactory.getGlobalInstance();
}

@Override
public String toString() {
return "global";
}
});
return l.stream().map(s -> new Object[] { s }).toList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.esql.action;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.FollowersChecker;
Expand All @@ -19,7 +18,6 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportSettings;

Expand All @@ -31,9 +29,7 @@
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

@TestLogging(value = "org.elasticsearch.indices.breaker:TRACE", reason = "failing")
@ESIntegTestCase.ClusterScope(scope = TEST, minNumDataNodes = 2, maxNumDataNodes = 4)
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99173")
public class EsqlDisruptionIT extends EsqlActionIT {

// copied from AbstractDisruptionTestCase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void testMultipleMatches() {
static DriverContext driverContext() {
return new DriverContext(
new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService()).withCircuitBreaking(),
BlockFactory.getGlobalInstance()
BlockFactory.getNonBreakingInstance()
);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
Expand All @@ -37,6 +38,7 @@
import java.time.ZoneOffset;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.Executor;

public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRequest, EsqlQueryResponse> {
Expand Down Expand Up @@ -69,8 +71,7 @@ public TransportEsqlQueryAction(
this.requestExecutor = threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME);
exchangeService.registerTransportHandler(transportService);
this.exchangeService = exchangeService;
EsqlBlockFactoryParams.init(bigArrays);
var blockFactory = BlockFactory.getGlobalInstance();
var blockFactory = createBlockFactory(bigArrays);
this.enrichPolicyResolver = new EnrichPolicyResolver(clusterService, transportService, planExecutor.indexResolver());
this.enrichLookupService = new EnrichLookupService(clusterService, searchService, transportService, bigArrays, blockFactory);
this.computeService = new ComputeService(
Expand All @@ -85,6 +86,12 @@ public TransportEsqlQueryAction(
this.settings = settings;
}

static BlockFactory createBlockFactory(BigArrays bigArrays) {
CircuitBreaker circuitBreaker = bigArrays.breakerService().getBreaker("request");
Objects.requireNonNull(circuitBreaker, "request circuit breaker wasn't set");
return new BlockFactory(circuitBreaker, bigArrays);
}

@Override
protected void doExecute(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {
// workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ private ActualResults executePlan() throws Exception {
sessionId,
new CancellableTask(1, "transport", "esql", null, TaskId.EMPTY_TASK_ID, Map.of()),
bigArrays,
BlockFactory.getGlobalInstance(),
BlockFactory.getNonBreakingInstance(),
configuration,
exchangeSource,
exchangeSink,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,8 +637,9 @@ private static void writeToTempDir(String subdir, String str, String extension)
*/
protected DriverContext driverContext() {
MockBigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1));
breakers.add(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST));
return new DriverContext(bigArrays.withCircuitBreaking(), BlockFactory.getGlobalInstance());
CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);
breakers.add(breaker);
return new DriverContext(bigArrays.withCircuitBreaking(), new BlockFactory(breaker, bigArrays));
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private static FieldAttribute field(String name, DataType type) {
static DriverContext driverContext() {
return new DriverContext(
new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService()).withCircuitBreaking(),
BlockFactory.getGlobalInstance()
BlockFactory.getNonBreakingInstance()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private LocalExecutionPlanner planner() throws IOException {
"test",
null,
BigArrays.NON_RECYCLING_INSTANCE,
BlockFactory.getGlobalInstance(),
BlockFactory.getNonBreakingInstance(),
config(),
null,
null,
Expand Down

0 comments on commit dbb8b7d

Please sign in to comment.