Skip to content

Commit

Permalink
update BlockHashRandomizedTests
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisHegarty committed Oct 2, 2023
1 parent 5179a22 commit ef147c8
Showing 1 changed file with 78 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,23 @@
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.compute.data.BasicBlockTests;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.MockBlockFactory;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.HashAggregationOperator;
import org.elasticsearch.compute.operator.MultivalueDedupeTests;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.ListMatcher;
import org.junit.After;

import java.util.ArrayList;
import java.util.Comparator;
Expand All @@ -38,6 +39,7 @@
import static org.elasticsearch.test.ListMatcher.matchesList;
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -47,7 +49,7 @@ public class BlockHashRandomizedTests extends ESTestCase {

final CircuitBreaker breaker = new MockBigArrays.LimitedBreaker("esql-test-breaker", ByteSizeValue.ofGb(1));
final BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, mockBreakerService(breaker));
final BlockFactory blockFactory = BlockFactory.getInstance(breaker, bigArrays);
final MockBlockFactory blockFactory = new MockBlockFactory(breaker, bigArrays);

@ParametersFactory
public static List<Object[]> params() {
Expand Down Expand Up @@ -100,87 +102,90 @@ public BlockHashRandomizedTests(
this.allowedTypes = allowedTypes;
}

@After
public void checkBreaker() {
assertThat(breaker.getUsed(), is(0L));
}

public void test() {
try {
List<ElementType> types = randomList(groups, groups, () -> randomFrom(allowedTypes));
BasicBlockTests.RandomBlock[] randomBlocks = new BasicBlockTests.RandomBlock[types.size()];
Block[] blocks = new Block[types.size()];
int pageCount = between(1, 10);
int positionCount = 100;
int emitBatchSize = 100;
try (BlockHash blockHash = newBlockHash(emitBatchSize, types)) {
/*
* Only the long/long, long/bytes_ref, and bytes_ref/long implementations don't collect nulls.
*/
Oracle oracle = new Oracle(
forcePackedHash
|| false == (types.equals(List.of(ElementType.LONG, ElementType.LONG))
|| types.equals(List.of(ElementType.LONG, ElementType.BYTES_REF))
|| types.equals(List.of(ElementType.BYTES_REF, ElementType.LONG)))
);
List<ElementType> types = randomList(groups, groups, () -> randomFrom(allowedTypes));
BasicBlockTests.RandomBlock[] randomBlocks = new BasicBlockTests.RandomBlock[types.size()];
Block[] blocks = new Block[types.size()];
int pageCount = between(1, 10);
int positionCount = 100;
int emitBatchSize = 100;
List<Releasable> releasables = new ArrayList<>();
try (BlockHash blockHash = newBlockHash(emitBatchSize, types)) {
/*
* Only the long/long, long/bytes_ref, and bytes_ref/long implementations don't collect nulls.
*/
Oracle oracle = new Oracle(
forcePackedHash
|| false == (types.equals(List.of(ElementType.LONG, ElementType.LONG))
|| types.equals(List.of(ElementType.LONG, ElementType.BYTES_REF))
|| types.equals(List.of(ElementType.BYTES_REF, ElementType.LONG)))
);

for (int p = 0; p < pageCount; p++) {
for (int g = 0; g < blocks.length; g++) {
randomBlocks[g] = BasicBlockTests.randomBlock(
types.get(g),
positionCount,
randomBoolean(),
1,
maxValuesPerPosition,
0,
dups
);
blocks[g] = randomBlocks[g].block();
}
oracle.add(randomBlocks);
int[] batchCount = new int[1];
// PackedValuesBlockHash always chunks but the normal single value ones don't
boolean usingSingle = forcePackedHash == false && types.size() == 1;
BlockHashTests.hash(false, blockHash, ordsAndKeys -> {
if (usingSingle == false) {
assertThat(ordsAndKeys.ords().getTotalValueCount(), lessThanOrEqualTo(emitBatchSize));
}
batchCount[0]++;
}, blocks);
if (usingSingle) {
assertThat(batchCount[0], equalTo(1));
for (int p = 0; p < pageCount; p++) {
for (int g = 0; g < blocks.length; g++) {
randomBlocks[g] = BasicBlockTests.randomBlock(
types.get(g),
positionCount,
randomBoolean(),
1,
maxValuesPerPosition,
0,
dups
);
blocks[g] = randomBlocks[g].block();
}
oracle.add(randomBlocks);
int[] batchCount = new int[1];
// PackedValuesBlockHash always chunks but the normal single value ones don't
boolean usingSingle = forcePackedHash == false && types.size() == 1;
BlockHashTests.hash(false, blockHash, ordsAndKeys -> {
if (usingSingle == false) {
assertThat(ordsAndKeys.ords().getTotalValueCount(), lessThanOrEqualTo(emitBatchSize));
}
batchCount[0]++;
releasables.add(ordsAndKeys.nonEmpty().asBlock());
}, blocks);
if (usingSingle) {
assertThat(batchCount[0], equalTo(1));
}
}

Block[] keyBlocks = blockHash.getKeys();
try {
Set<List<Object>> keys = new TreeSet<>(new KeyComparator());
for (int p = 0; p < keyBlocks[0].getPositionCount(); p++) {
List<Object> key = new ArrayList<>(keyBlocks.length);
for (Block keyBlock : keyBlocks) {
if (keyBlock.isNull(p)) {
key.add(null);
} else {
key.add(BasicBlockTests.valuesAtPositions(keyBlock, p, p + 1).get(0).get(0));
assertThat(keyBlock.getValueCount(p), equalTo(1));
}
Block[] keyBlocks = blockHash.getKeys();
try {
Set<List<Object>> keys = new TreeSet<>(new KeyComparator());
for (int p = 0; p < keyBlocks[0].getPositionCount(); p++) {
List<Object> key = new ArrayList<>(keyBlocks.length);
for (Block keyBlock : keyBlocks) {
if (keyBlock.isNull(p)) {
key.add(null);
} else {
key.add(BasicBlockTests.valuesAtPositions(keyBlock, p, p + 1).get(0).get(0));
assertThat(keyBlock.getValueCount(p), equalTo(1));
}
boolean contained = keys.add(key);
assertTrue(contained);
}
boolean contained = keys.add(key);
assertTrue(contained);
}

if (false == keys.equals(oracle.keys)) {
List<List<Object>> keyList = new ArrayList<>();
keyList.addAll(keys);
ListMatcher keyMatcher = matchesList();
for (List<Object> k : oracle.keys) {
keyMatcher = keyMatcher.item(k);
}
assertMap(keyList, keyMatcher);
if (false == keys.equals(oracle.keys)) {
List<List<Object>> keyList = new ArrayList<>();
keyList.addAll(keys);
ListMatcher keyMatcher = matchesList();
for (List<Object> k : oracle.keys) {
keyMatcher = keyMatcher.item(k);
}
} finally {
Releasables.closeExpectNoException(keyBlocks);
assertMap(keyList, keyMatcher);
}
} finally {
Releasables.closeExpectNoException(keyBlocks);
releasables.stream().forEach(Releasables::closeExpectNoException);
blockFactory.ensureAllBlocksAreReleased();
}
} catch (CircuitBreakingException cbe) {
Exception ex = new RuntimeException("byteLimit=" + cbe.getByteLimit() + ", bytesWanted=" + cbe.getBytesWanted());
cbe.addSuppressed(ex);
throw cbe;
}
}

Expand Down

0 comments on commit ef147c8

Please sign in to comment.