Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove global block factory #100108

Merged
merged 2 commits into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion x-pack/plugin/esql/compute/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/

module org.elasticsearch.compute {
uses org.elasticsearch.compute.data.BlockFactoryParameters;

requires org.apache.lucene.core;
requires org.elasticsearch.base;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import org.elasticsearch.compute.data.Block.MvOrdering;

import java.util.BitSet;
import java.util.List;
import java.util.ServiceLoader;

public class BlockFactory {

Expand All @@ -26,22 +24,6 @@ public class BlockFactory {
BigArrays.NON_RECYCLING_INSTANCE
);

private static final BlockFactory GLOBAL = loadGlobalFactory();
// new BlockFactory(new NoopCircuitBreaker("esql_noop_breaker"), BigArrays.NON_RECYCLING_INSTANCE);

private static BlockFactory loadGlobalFactory() {
ServiceLoader<BlockFactoryParameters> loader = ServiceLoader.load(
BlockFactoryParameters.class,
BlockFactory.class.getClassLoader()
);
List<ServiceLoader.Provider<BlockFactoryParameters>> impls = loader.stream().toList();
if (impls.size() != 1) {
throw new AssertionError("expected exactly one impl, but got:" + impls);
}
BlockFactoryParameters params = impls.get(0).get();
return new BlockFactory(params.breaker(), params.bigArrays());
}

private final CircuitBreaker breaker;

private final BigArrays bigArrays;
Expand All @@ -51,13 +33,6 @@ public BlockFactory(CircuitBreaker breaker, BigArrays bigArrays) {
this.bigArrays = bigArrays;
}

/**
* Returns the global ESQL block factory.
*/
public static BlockFactory getGlobalInstance() {
return GLOBAL;
}

/**
* Returns the Non-Breaking block factory.
*/
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,6 @@ public BlockFactory get() {
public String toString() {
return "1gb";
}
}, new Supplier<>() {
@Override
public BlockFactory get() {
return BlockFactory.getGlobalInstance();
}

@Override
public String toString() {
return "global";
}
});
return l.stream().map(s -> new Object[] { s }).toList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.esql.action;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.FollowersChecker;
Expand All @@ -19,7 +18,6 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportSettings;

Expand All @@ -31,9 +29,7 @@
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

@TestLogging(value = "org.elasticsearch.indices.breaker:TRACE", reason = "failing")
@ESIntegTestCase.ClusterScope(scope = TEST, minNumDataNodes = 2, maxNumDataNodes = 4)
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99173")
public class EsqlDisruptionIT extends EsqlActionIT {

// copied from AbstractDisruptionTestCase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void testMultipleMatches() {
static DriverContext driverContext() {
return new DriverContext(
new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService()).withCircuitBreaking(),
BlockFactory.getGlobalInstance()
BlockFactory.getNonBreakingInstance()
);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
Expand All @@ -37,6 +38,7 @@
import java.time.ZoneOffset;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.Executor;

public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRequest, EsqlQueryResponse> {
Expand Down Expand Up @@ -69,8 +71,7 @@ public TransportEsqlQueryAction(
this.requestExecutor = threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME);
exchangeService.registerTransportHandler(transportService);
this.exchangeService = exchangeService;
EsqlBlockFactoryParams.init(bigArrays);
var blockFactory = BlockFactory.getGlobalInstance();
var blockFactory = createBlockFactory(bigArrays);
this.enrichPolicyResolver = new EnrichPolicyResolver(clusterService, transportService, planExecutor.indexResolver());
this.enrichLookupService = new EnrichLookupService(clusterService, searchService, transportService, bigArrays, blockFactory);
this.computeService = new ComputeService(
Expand All @@ -85,6 +86,12 @@ public TransportEsqlQueryAction(
this.settings = settings;
}

static BlockFactory createBlockFactory(BigArrays bigArrays) {
CircuitBreaker circuitBreaker = bigArrays.breakerService().getBreaker("request");
Objects.requireNonNull(circuitBreaker, "request circuit breaker wasn't set");
return new BlockFactory(circuitBreaker, bigArrays);
}

@Override
protected void doExecute(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {
// workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ private ActualResults executePlan() throws Exception {
sessionId,
new CancellableTask(1, "transport", "esql", null, TaskId.EMPTY_TASK_ID, Map.of()),
bigArrays,
BlockFactory.getGlobalInstance(),
BlockFactory.getNonBreakingInstance(),
configuration,
exchangeSource,
exchangeSink,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,8 +637,9 @@ private static void writeToTempDir(String subdir, String str, String extension)
*/
protected DriverContext driverContext() {
MockBigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1));
breakers.add(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST));
return new DriverContext(bigArrays.withCircuitBreaking(), BlockFactory.getGlobalInstance());
CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);
breakers.add(breaker);
return new DriverContext(bigArrays.withCircuitBreaking(), new BlockFactory(breaker, bigArrays));
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private static FieldAttribute field(String name, DataType type) {
static DriverContext driverContext() {
return new DriverContext(
new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService()).withCircuitBreaking(),
BlockFactory.getGlobalInstance()
BlockFactory.getNonBreakingInstance()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private LocalExecutionPlanner planner() throws IOException {
"test",
null,
BigArrays.NON_RECYCLING_INSTANCE,
BlockFactory.getGlobalInstance(),
BlockFactory.getNonBreakingInstance(),
config(),
null,
null,
Expand Down