Skip to content

Commit

Permalink
Remove default driver context
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisHegarty committed Sep 14, 2023
1 parent b2df331 commit 54611b5
Show file tree
Hide file tree
Showing 42 changed files with 270 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,13 @@
*/
public class DriverContext {

/** A default driver context. The returned bigArrays is non recycling. */
public static DriverContext DEFAULT = new DriverContext(BigArrays.NON_RECYCLING_INSTANCE);

// Working set. Only the thread executing the driver will update this set.
Set<Releasable> workingSet = Collections.newSetFromMap(new IdentityHashMap<>());

private final AtomicReference<Snapshot> snapshot = new AtomicReference<>();

private final BigArrays bigArrays;

// For testing
public DriverContext() {
this(BigArrays.NON_RECYCLING_INSTANCE);
}

public DriverContext(BigArrays bigArrays) {
Objects.requireNonNull(bigArrays);
this.bigArrays = bigArrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public Operator get(DriverContext driverContext) {

@Override
public String describe() {
return "EvalOperator[evaluator=" + evaluator.get(DriverContext.DEFAULT) + "]";
return "EvalOperator[evaluator=" + evaluator.get(new ThrowingDriverContext()) + "]";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public Operator get(DriverContext driverContext) {

@Override
public String describe() {
return "FilterOperator[evaluator=" + evaluatorSupplier.get(DriverContext.DEFAULT) + "]";
return "FilterOperator[evaluator=" + evaluatorSupplier.get(new ThrowingDriverContext()) + "]";
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.FloatArray;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Releasable;

public class ThrowingDriverContext extends DriverContext {
public ThrowingDriverContext() {
super(new ThrowingBigArrays());
}

@Override
public BigArrays bigArrays() {
throw new AssertionError("should not reach here");
}

@Override
public boolean addReleasable(Releasable releasable) {
throw new AssertionError("should not reach here");
}

static class ThrowingBigArrays extends BigArrays {

ThrowingBigArrays() {
super(null, null, "fake");
}

@Override
public ByteArray newByteArray(long size, boolean clearOnResize) {
throw new AssertionError("should not reach here");
}

@Override
public IntArray newIntArray(long size, boolean clearOnResize) {
throw new AssertionError("should not reach here");
}

@Override
public LongArray newLongArray(long size, boolean clearOnResize) {
throw new AssertionError("should not reach here");
}

@Override
public FloatArray newFloatArray(long size, boolean clearOnResize) {
throw new AssertionError("should not reach here");
}

@Override
public DoubleArray newDoubleArray(long size, boolean clearOnResize) {
throw new AssertionError("should not reach here");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
Expand Down Expand Up @@ -112,7 +113,7 @@ public void testQueryOperator() throws IOException {
assertTrue("duplicated docId=" + docId, actualDocIds.add(docId));
}
});
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
drivers.add(new Driver(driverContext, factory.get(driverContext), List.of(), docCollector, () -> {}));
}
OperatorTestCase.runDriver(drivers);
Expand Down Expand Up @@ -144,9 +145,10 @@ public void testQueryOperator() throws IOException {
}
}

// @Repeat(iterations = 1)
public void testGroupingWithOrdinals() throws Exception {
final String gField = "g";
final int numDocs = between(100, 10000);
final int numDocs = 2856; // between(100, 10000);
final Map<BytesRef, Long> expectedCounts = new HashMap<>();
int keyLength = randomIntBetween(1, 10);
try (BaseDirectoryWrapper dir = newDirectory(); RandomIndexWriter writer = new RandomIndexWriter(random(), dir)) {
Expand Down Expand Up @@ -210,7 +212,7 @@ public String toString() {
};

try (DirectoryReader reader = writer.getReader()) {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();

Driver driver = new Driver(
driverContext,
Expand Down Expand Up @@ -258,14 +260,18 @@ public String toString() {
LongBlock counts = page.getBlock(1);
for (int i = 0; i < keys.getPositionCount(); i++) {
BytesRef spare = new BytesRef();
actualCounts.put(keys.getBytesRef(i, spare), counts.getLong(i));
keys.getBytesRef(i, spare);
actualCounts.put(BytesRef.deepCopyOf(spare), counts.getLong(i));
}
// System.out.println("HEGO: keys.getPositionCount=" + keys.getPositionCount());
// Releasables.close(keys);
}),
() -> {}
);
OperatorTestCase.runDriver(driver);
assertThat(actualCounts, equalTo(expectedCounts));
assertDriverContext(driverContext);
org.elasticsearch.common.util.MockBigArrays.ensureAllArraysAreReleased();
}
}
}
Expand All @@ -276,7 +282,7 @@ public void testLimitOperator() {
var values = randomList(positions, positions, ESTestCase::randomLong);

var results = new ArrayList<Long>();
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
try (
var driver = new Driver(
driverContext,
Expand Down Expand Up @@ -388,6 +394,13 @@ private BigArrays bigArrays() {
return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
}

/**
* A {@link DriverContext} that won't throw {@link CircuitBreakingException}.
*/
protected final DriverContext driverContext() {
return new DriverContext(bigArrays());
}

public static void assertDriverContext(DriverContext driverContext) {
assertTrue(driverContext.isFinished());
assertThat(driverContext.getSnapshot().releasables(), empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public final void testIgnoresNulls() {
int end = between(1_000, 100_000);
List<Page> results = new ArrayList<>();
List<Page> input = CannedSourceOperator.collectPages(simpleInput(end));
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();

try (
Driver d = new Driver(
Expand All @@ -110,30 +110,30 @@ public final void testIgnoresNulls() {

public final void testMultivalued() {
int end = between(1_000, 100_000);
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
List<Page> input = CannedSourceOperator.collectPages(new PositionMergingSourceOperator(simpleInput(end)));
assertSimpleOutput(input, drive(simple(BigArrays.NON_RECYCLING_INSTANCE).get(driverContext), input.iterator()));
}

public final void testMultivaluedWithNulls() {
int end = between(1_000, 100_000);
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
List<Page> input = CannedSourceOperator.collectPages(
new NullInsertingSourceOperator(new PositionMergingSourceOperator(simpleInput(end)))
);
assertSimpleOutput(input, drive(simple(BigArrays.NON_RECYCLING_INSTANCE).get(driverContext), input.iterator()));
}

public final void testEmptyInput() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
List<Page> results = drive(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext), List.<Page>of().iterator());

assertThat(results, hasSize(1));
assertOutputFromEmpty(results.get(0).getBlock(0));
}

public final void testEmptyInputInitialFinal() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
List<Page> results = drive(
List.of(
simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INITIAL).get(driverContext),
Expand All @@ -147,7 +147,7 @@ public final void testEmptyInputInitialFinal() {
}

public final void testEmptyInputInitialIntermediateFinal() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
List<Page> results = drive(
List.of(
simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INITIAL).get(driverContext),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected void assertOutputFromEmpty(Block b) {
}

public void testRejectsDouble() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
try (
Driver d = new Driver(
driverContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected void assertOutputFromEmpty(Block b) {
}

public void testRejectsDouble() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
try (
Driver d = new Driver(
driverContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ protected ByteSizeValue smallEnoughToCircuitBreak() {
}

public final void testNullGroupsAndValues() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
int end = between(50, 60);
List<Page> input = CannedSourceOperator.collectPages(new NullInsertingSourceOperator(simpleInput(end)));
List<Page> results = drive(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext), input.iterator());
assertSimpleOutput(input, results);
}

public final void testNullGroups() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
int end = between(50, 60);
List<Page> input = CannedSourceOperator.collectPages(nullGroups(simpleInput(end)));
List<Page> results = drive(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext), input.iterator());
Expand Down Expand Up @@ -182,15 +182,15 @@ protected void appendNull(ElementType elementType, Block.Builder builder, int bl
}

public final void testNullValues() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
int end = between(50, 60);
List<Page> input = CannedSourceOperator.collectPages(nullValues(simpleInput(end)));
List<Page> results = drive(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext), input.iterator());
assertSimpleOutput(input, results);
}

public final void testNullValuesInitialIntermediateFinal() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
int end = between(50, 60);
List<Page> input = CannedSourceOperator.collectPages(nullValues(simpleInput(end)));
List<Page> results = drive(
Expand Down Expand Up @@ -218,44 +218,44 @@ protected void appendNull(ElementType elementType, Block.Builder builder, int bl
}

public final void testMultivalued() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
int end = between(1_000, 100_000);
List<Page> input = CannedSourceOperator.collectPages(mergeValues(simpleInput(end)));
List<Page> results = drive(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext), input.iterator());
assertSimpleOutput(input, results);
}

public final void testMulitvaluedNullGroupsAndValues() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
int end = between(50, 60);
List<Page> input = CannedSourceOperator.collectPages(new NullInsertingSourceOperator(mergeValues(simpleInput(end))));
List<Page> results = drive(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext), input.iterator());
assertSimpleOutput(input, results);
}

public final void testMulitvaluedNullGroup() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
int end = between(50, 60);
List<Page> input = CannedSourceOperator.collectPages(nullGroups(mergeValues(simpleInput(end))));
List<Page> results = drive(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext), input.iterator());
assertSimpleOutput(input, results);
}

public final void testMulitvaluedNullValues() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
int end = between(50, 60);
List<Page> input = CannedSourceOperator.collectPages(nullValues(mergeValues(simpleInput(end))));
List<Page> results = drive(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext), input.iterator());
assertSimpleOutput(input, results);
}

public final void testNullOnly() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
assertNullOnly(List.of(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext)));
}

public final void testNullOnlyInputInitialFinal() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
assertNullOnly(
List.of(
simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INITIAL).get(driverContext),
Expand All @@ -265,7 +265,7 @@ public final void testNullOnlyInputInitialFinal() {
}

public final void testNullOnlyInputInitialIntermediateFinal() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
assertNullOnly(
List.of(
simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INITIAL).get(driverContext),
Expand Down Expand Up @@ -294,12 +294,12 @@ private void assertNullOnly(List<Operator> operators) {
}

public final void testNullSome() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
assertNullSome(List.of(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext)));
}

public final void testNullSomeInitialFinal() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
assertNullSome(
List.of(
simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INITIAL).get(driverContext),
Expand All @@ -309,7 +309,7 @@ public final void testNullSomeInitialFinal() {
}

public final void testNullSomeInitialIntermediateFinal() {
DriverContext driverContext = new DriverContext();
DriverContext driverContext = driverContext();
assertNullSome(
List.of(
simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INITIAL).get(driverContext),
Expand Down
Loading

0 comments on commit 54611b5

Please sign in to comment.