Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Track memory from values loaded from lucene #101383

Merged
merged 8 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.BytesRefVector;
import org.elasticsearch.compute.data.DocVector;
Expand Down Expand Up @@ -132,6 +133,7 @@ private static BlockLoader numericBlockLoader(String name, NumberFieldMapper.Num
@OperationsPerInvocation(INDEX_SIZE)
public void benchmark() {
ValuesSourceReaderOperator op = new ValuesSourceReaderOperator(
BlockFactory.getNonBreakingInstance(),
List.of(BlockReaderFactories.loaderToFactory(reader, blockLoader(name))),
0,
name
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.Releasable;

import java.io.IOException;

Expand Down Expand Up @@ -124,13 +125,24 @@ interface BuilderFactory {
// TODO support non-singleton ords
}

/**
* Marker interface for block results. The compute engine has a fleshed
* out implementation.
*/
interface Block {}

/**
* A builder for typed values. For each document you may either call
* {@link #appendNull}, {@code append<Type>}, or
* {@link #beginPositionEntry} followed by two or more {@code append<Type>}
* calls, and then {@link #endPositionEntry}.
*/
interface Builder {
interface Builder extends Releasable {
/**
* Build the actual block.
*/
Block build();

/**
* Insert a null value.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,17 @@ public String toString() {
}

@Override
public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) throws IOException {
BlockLoader.Builder blockBuilder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
int doc = docs.get(i);
if (doc < this.docID) {
throw new IllegalStateException("docs within same block must be in order");
}
readValuesFromSingleDoc(doc, blockBuilder);
public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) throws IOException {
try (BlockLoader.Builder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
int doc = docs.get(i);
if (doc < this.docID) {
throw new IllegalStateException("docs within same block must be in order");
}
readValuesFromSingleDoc(doc, builder);
}
return builder.build();
}
return blockBuilder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ protected BlockStoredFieldsReader(LeafStoredFieldLoader loader) {
}

@Override
public final BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) throws IOException {
var builder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
readValuesFromSingleDoc(docs.get(i), builder);
public final BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) throws IOException {
try (BlockLoader.Builder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
readValuesFromSingleDoc(docs.get(i), builder);
}
return builder.build();
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ public BlockLoader.BooleanBuilder builder(BlockLoader.BuilderFactory factory, in
}

@Override
public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
BlockLoader.BooleanBuilder builder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
try (BlockLoader.BooleanBuilder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
}
return builder.build();
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ public BlockLoader.LongBuilder builder(BlockLoader.BuilderFactory factory, int e
}

@Override
public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
BlockLoader.LongBuilder builder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
try (BlockLoader.LongBuilder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
}
return builder.build();
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ public BlockLoader.DoubleBuilder builder(BlockLoader.BuilderFactory factory, int
}

@Override
public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
BlockLoader.DoubleBuilder builder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
try (BlockLoader.DoubleBuilder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
}
return builder.build();
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ public BlockLoader.BytesRefBuilder builder(BlockLoader.BuilderFactory factory, i
}

@Override
public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
BlockLoader.BytesRefBuilder builder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
builder.appendBytesRef(bytes);
public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
try (BlockLoader.BytesRefBuilder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
builder.appendBytesRef(bytes);
}
return builder.build();
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ public BlockLoader.BytesRefBuilder builder(BlockLoader.BuilderFactory factory, i
}

@Override
public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
BlockLoader.BytesRefBuilder builder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
try (BlockLoader.BytesRefBuilder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
}
return builder.build();
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ public BlockLoader.BytesRefBuilder builder(BlockLoader.BuilderFactory factory, i
}

@Override
public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
BlockLoader.BytesRefBuilder builder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
try (BlockLoader.BytesRefBuilder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
}
return builder.build();
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ public BlockLoader.LongBuilder builder(BlockLoader.BuilderFactory factory, int e
}

@Override
public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
BlockLoader.LongBuilder builder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
try (BlockLoader.LongBuilder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
}
return builder.build();
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static void assertFitsIn(ByteSizeValue max, Function<BigArrays, Releasabl
* Tracking allocations is useful when debugging a leak but shouldn't be enabled by default as this would also be very costly
* since it creates a new Exception every time a new array is created.
*/
private static final boolean TRACK_ALLOCATIONS = false;
private static final boolean TRACK_ALLOCATIONS = true;

private static final ConcurrentMap<Object, Object> ACQUIRED_ARRAYS = new ConcurrentHashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public class TestBlock
BlockLoader.DoubleBuilder,
BlockLoader.IntBuilder,
BlockLoader.LongBuilder,
BlockLoader.SingletonOrdinalsBuilder {
BlockLoader.SingletonOrdinalsBuilder,
BlockLoader.Block {
public static BlockLoader.BuilderFactory FACTORY = new BlockLoader.BuilderFactory() {
@Override
public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) {
Expand Down Expand Up @@ -192,8 +193,18 @@ public TestBlock appendOrd(int value) {
}
}

@Override
public TestBlock build() {
return this;
}

private TestBlock add(Object value) {
(currentPosition == null ? values : currentPosition).add(value);
return this;
}

@Override
public void close() {
// TODO assert that we close the test blocks
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a look at this one now.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ protected final void ensureCapacity() {
return;
}
int newSize = calculateNewArraySize(valuesLength);
adjustBreaker((long) (newSize - valuesLength) * elementSize());
adjustBreaker(newSize * elementSize());
growValuesArray(newSize);
adjustBreaker(-valuesLength * elementSize());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this change while trying to reenable one of the HeapAttack tests - specifically the too many mv fields one. It didn't get it passing, which is genuinely odd to me. Either way, I think it is more correct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ this is more correct as we create a new array of newSize before copying.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,9 @@
* or dense data. A Block can represent either single or multi valued data. A Block that represents
* dense single-valued data can be viewed as a {@link Vector}.
*
* TODO: update comment
* <p> All Blocks share the same set of data retrieval methods, but actual concrete implementations
* effectively support a subset of these, throwing {@code UnsupportedOperationException} where a
* particular data retrieval method is not supported. For example, a Block of primitive longs may
* not support retrieval as an integer, {code getInt}. This greatly simplifies Block usage and
* avoids cumbersome use-site casting.
*
* <p> Block are immutable and can be passed between threads.
*/
public interface Block extends Accountable, NamedWriteable, Releasable {
public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, Releasable {

/**
* {@return an efficient dense single-value view of this block}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public static Block[] fromList(BlockFactory blockFactory, List<List<Object>> lis
public static Block deepCopyOf(Block block, BlockFactory blockFactory) {
try (Block.Builder builder = block.elementType().newBlockBuilder(block.getPositionCount(), blockFactory)) {
builder.copyFrom(block, 0, block.getPositionCount());
builder.mvOrdering(block.mvOrdering());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was required to get some tests working. It meant I had to make the method a noop on unsupported block types but that seems ok.

return builder.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public Block.Builder appendAllValuesToCurrentPosition(Block block) {

@Override
public Block.Builder mvOrdering(MvOrdering mvOrdering) {
throw new UnsupportedOperationException();
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public Block.Builder appendAllValuesToCurrentPosition(Block block) {

@Override
public Block.Builder mvOrdering(MvOrdering mvOrdering) {
throw new UnsupportedOperationException("doc blocks only contain one value per position");
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public record ValuesSourceReaderOperatorFactory(List<BlockDocValuesReader.Factor
OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
return new ValuesSourceReaderOperator(sources, docChannel, field);
return new ValuesSourceReaderOperator(driverContext.blockFactory(), sources, docChannel, field);
}

@Override
Expand Down Expand Up @@ -83,11 +83,16 @@ public String describe() {
* @param docChannel the channel containing the shard, leaf/segment and doc id
* @param field the lucene field being loaded
*/
public ValuesSourceReaderOperator(List<BlockDocValuesReader.Factory> factories, int docChannel, String field) {
public ValuesSourceReaderOperator(
BlockFactory blockFactory,
List<BlockDocValuesReader.Factory> factories,
int docChannel,
String field
) {
this.factories = factories;
this.docChannel = docChannel;
this.field = field;
this.blockFactory = new ComputeBlockLoaderFactory(BlockFactory.getNonBreakingInstance()); // TODO breaking!
this.blockFactory = new ComputeBlockLoaderFactory(blockFactory);
}

@Override
Expand All @@ -106,7 +111,7 @@ protected Page process(Page page) {

private Block loadFromSingleLeaf(DocVector docVector) throws IOException {
setupReader(docVector.shards().getInt(0), docVector.segments().getInt(0), docVector.docs().getInt(0));
return ((Block.Builder) lastReader.readValues(blockFactory, new BlockLoader.Docs() {
return ((Block) lastReader.readValues(blockFactory, new BlockLoader.Docs() {
private final IntVector docs = docVector.docs();

@Override
Expand All @@ -118,26 +123,28 @@ public int count() {
public int get(int i) {
return docs.getInt(i);
}
})).build();
}));
}

private Block loadFromManyLeaves(DocVector docVector) throws IOException {
int[] forwards = docVector.shardSegmentDocMapForwards();
int doc = docVector.docs().getInt(forwards[0]);
setupReader(docVector.shards().getInt(forwards[0]), docVector.segments().getInt(forwards[0]), doc);
BlockLoader.Builder builder = lastReader.builder(blockFactory, forwards.length);
lastReader.readValuesFromSingleDoc(doc, builder);
for (int i = 1; i < forwards.length; i++) {
int shard = docVector.shards().getInt(forwards[i]);
int segment = docVector.segments().getInt(forwards[i]);
doc = docVector.docs().getInt(forwards[i]);
if (segment != lastSegment || shard != lastShard) {
setupReader(shard, segment, doc);
}
try (BlockLoader.Builder builder = lastReader.builder(blockFactory, forwards.length)) {
lastReader.readValuesFromSingleDoc(doc, builder);
for (int i = 1; i < forwards.length; i++) {
int shard = docVector.shards().getInt(forwards[i]);
int segment = docVector.segments().getInt(forwards[i]);
doc = docVector.docs().getInt(forwards[i]);
if (segment != lastSegment || shard != lastShard) {
setupReader(shard, segment, doc);
}
lastReader.readValuesFromSingleDoc(doc, builder);
}
try (Block orig = ((Block.Builder) builder).build()) {
return orig.filter(docVector.shardSegmentDocMapBackwards());
}
}
// TODO maybe it's better for downstream consumers if we perform a copy here.
return ((Block.Builder) builder).build().filter(docVector.shardSegmentDocMapBackwards());
}

private void setupReader(int shard, int segment, int doc) throws IOException {
Expand Down
Loading