Skip to content

Commit

Permalink
ESQL: Track blocks emitted from lucene (#101396)
Browse files Browse the repository at this point in the history
This enables memory tracking for blocks emitted by all of the
`LuceneOperator` subclasses. They don't use a ton of memory, but we'd
like to get everything tracked.
  • Loading branch information
nik9000 authored Oct 26, 2023
1 parent 3b0b484 commit 81ace01
Show file tree
Hide file tree
Showing 14 changed files with 261 additions and 107 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/101396.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 101396
summary: "ESQL: Track blocks emitted from lucene"
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
Expand Down Expand Up @@ -62,7 +64,7 @@ public Factory(

@Override
public SourceOperator get(DriverContext driverContext) {
return new LuceneCountOperator(sliceQueue, limit);
return new LuceneCountOperator(driverContext.blockFactory(), sliceQueue, limit);
}

@Override
Expand All @@ -80,8 +82,8 @@ public String describe() {
}
}

public LuceneCountOperator(LuceneSliceQueue sliceQueue, int limit) {
super(PAGE_SIZE, sliceQueue);
public LuceneCountOperator(BlockFactory blockFactory, LuceneSliceQueue sliceQueue, int limit) {
super(blockFactory, PAGE_SIZE, sliceQueue);
this.remainingDocs = limit;
this.leafCollector = new LeafCollector() {
@Override
Expand Down Expand Up @@ -155,11 +157,17 @@ public Page getOutput() {
// emit only one page
if (remainingDocs <= 0 && pagesEmitted == 0) {
pagesEmitted++;
page = new Page(
PAGE_SIZE,
LongBlock.newConstantBlockWith(totalHits, PAGE_SIZE),
BooleanBlock.newConstantBlockWith(true, PAGE_SIZE)
);
LongBlock count = null;
BooleanBlock seen = null;
try {
count = LongBlock.newConstantBlockWith(totalHits, PAGE_SIZE, blockFactory);
seen = BooleanBlock.newConstantBlockWith(true, PAGE_SIZE, blockFactory);
page = new Page(PAGE_SIZE, count, seen);
} finally {
if (page == null) {
Releasables.closeExpectNoException(count, seen);
}
}
}
return page;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.logging.LogManager;
Expand All @@ -37,6 +38,8 @@ public abstract class LuceneOperator extends SourceOperator {

public static final int NO_LIMIT = Integer.MAX_VALUE;

protected final BlockFactory blockFactory;

private int processSlices;
final int maxPageSize;
private final LuceneSliceQueue sliceQueue;
Expand All @@ -49,7 +52,8 @@ public abstract class LuceneOperator extends SourceOperator {
int pagesEmitted;
boolean doneCollecting;

public LuceneOperator(int maxPageSize, LuceneSliceQueue sliceQueue) {
public LuceneOperator(BlockFactory blockFactory, int maxPageSize, LuceneSliceQueue sliceQueue) {
this.blockFactory = blockFactory;
this.maxPageSize = maxPageSize;
this.sliceQueue = sliceQueue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.DocVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
Expand Down Expand Up @@ -61,7 +63,7 @@ public Factory(

@Override
public SourceOperator get(DriverContext driverContext) {
return new LuceneSourceOperator(maxPageSize, sliceQueue, limit);
return new LuceneSourceOperator(driverContext.blockFactory(), maxPageSize, sliceQueue, limit);
}

@Override
Expand Down Expand Up @@ -89,11 +91,11 @@ public String describe() {
}
}

public LuceneSourceOperator(int maxPageSize, LuceneSliceQueue sliceQueue, int limit) {
super(maxPageSize, sliceQueue);
public LuceneSourceOperator(BlockFactory blockFactory, int maxPageSize, LuceneSliceQueue sliceQueue, int limit) {
super(blockFactory, maxPageSize, sliceQueue);
this.minPageSize = Math.max(1, maxPageSize / 2);
this.remainingDocs = limit;
this.docsBuilder = IntVector.newVectorBuilder(Math.min(limit, maxPageSize));
this.docsBuilder = IntVector.newVectorBuilder(Math.min(limit, maxPageSize), blockFactory);
this.leafCollector = new LeafCollector() {
@Override
public void setScorer(Scorable scorer) {
Expand Down Expand Up @@ -143,16 +145,20 @@ public Page getOutput() {
Page page = null;
if (currentPagePos >= minPageSize || remainingDocs <= 0 || scorer.isDone()) {
pagesEmitted++;
page = new Page(
currentPagePos,
new DocVector(
IntBlock.newConstantBlockWith(scorer.shardIndex(), currentPagePos).asVector(),
IntBlock.newConstantBlockWith(scorer.leafReaderContext().ord, currentPagePos).asVector(),
docsBuilder.build(),
true
).asBlock()
);
docsBuilder = IntVector.newVectorBuilder(Math.min(remainingDocs, maxPageSize));
IntBlock shard = null;
IntBlock leaf = null;
IntVector docs = null;
try {
shard = IntBlock.newConstantBlockWith(scorer.shardIndex(), currentPagePos, blockFactory);
leaf = IntBlock.newConstantBlockWith(scorer.leafReaderContext().ord, currentPagePos, blockFactory);
docs = docsBuilder.build();
docsBuilder = IntVector.newVectorBuilder(Math.min(remainingDocs, maxPageSize), blockFactory);
page = new Page(currentPagePos, new DocVector(shard.asVector(), leaf.asVector(), docs, true).asBlock());
} finally {
if (page == null) {
Releasables.closeExpectNoException(shard, leaf, docs);
}
}
currentPagePos = 0;
}
return page;
Expand All @@ -161,6 +167,11 @@ public Page getOutput() {
}
}

@Override
public void close() {
docsBuilder.close();
}

@Override
protected void describe(StringBuilder sb) {
sb.append(", remainingDocs=").append(remainingDocs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.TopFieldCollector;
import org.elasticsearch.common.Strings;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.DocVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.sort.SortBuilder;
Expand All @@ -38,25 +40,6 @@
* Source operator that builds Pages out of the output of a TopFieldCollector (aka TopN)
*/
public final class LuceneTopNSourceOperator extends LuceneOperator {
/**
* Collected docs. {@code null} until we're {@link #emit(boolean)}.
*/
private ScoreDoc[] scoreDocs;
/**
* The offset in {@link #scoreDocs} of the next page.
*/
private int offset = 0;

private PerShardCollector perShardCollector;
private final List<SortBuilder<?>> sorts;
private final int limit;

public LuceneTopNSourceOperator(int maxPageSize, List<SortBuilder<?>> sorts, int limit, LuceneSliceQueue sliceQueue) {
super(maxPageSize, sliceQueue);
this.sorts = sorts;
this.limit = limit;
}

public static final class Factory implements LuceneOperator.Factory {
private final int taskConcurrency;
private final int maxPageSize;
Expand Down Expand Up @@ -85,7 +68,7 @@ public Factory(

@Override
public SourceOperator get(DriverContext driverContext) {
return new LuceneTopNSourceOperator(maxPageSize, sorts, limit, sliceQueue);
return new LuceneTopNSourceOperator(driverContext.blockFactory(), maxPageSize, sorts, limit, sliceQueue);
}

@Override
Expand Down Expand Up @@ -116,6 +99,31 @@ public String describe() {
}
}

/**
* Collected docs. {@code null} until we're {@link #emit(boolean)}.
*/
private ScoreDoc[] scoreDocs;
/**
* The offset in {@link #scoreDocs} of the next page.
*/
private int offset = 0;

private PerShardCollector perShardCollector;
private final List<SortBuilder<?>> sorts;
private final int limit;

public LuceneTopNSourceOperator(
BlockFactory blockFactory,
int maxPageSize,
List<SortBuilder<?>> sorts,
int limit,
LuceneSliceQueue sliceQueue
) {
super(blockFactory, maxPageSize, sliceQueue);
this.sorts = sorts;
this.limit = limit;
}

@Override
public boolean isFinished() {
return doneCollecting && isEmitting() == false;
Expand Down Expand Up @@ -187,29 +195,35 @@ private Page emit(boolean startEmitting) {
return null;
}
int size = Math.min(maxPageSize, scoreDocs.length - offset);
IntVector.Builder currentSegmentBuilder = IntVector.newVectorBuilder(size);
IntVector.Builder currentDocsBuilder = IntVector.newVectorBuilder(size);
IntBlock shard = null;
IntVector segments = null;
IntVector docs = null;
Page page = null;
try (
IntVector.Builder currentSegmentBuilder = IntVector.newVectorBuilder(size, blockFactory);
IntVector.Builder currentDocsBuilder = IntVector.newVectorBuilder(size, blockFactory)
) {
int start = offset;
offset += size;
List<LeafReaderContext> leafContexts = perShardCollector.searchContext.searcher().getLeafContexts();
for (int i = start; i < offset; i++) {
int doc = scoreDocs[i].doc;
int segment = ReaderUtil.subIndex(doc, leafContexts);
currentSegmentBuilder.appendInt(segment);
currentDocsBuilder.appendInt(doc - leafContexts.get(segment).docBase); // the offset inside the segment
}

int start = offset;
offset += size;
List<LeafReaderContext> leafContexts = perShardCollector.searchContext.searcher().getLeafContexts();
for (int i = start; i < offset; i++) {
int doc = scoreDocs[i].doc;
int segment = ReaderUtil.subIndex(doc, leafContexts);
currentSegmentBuilder.appendInt(segment);
currentDocsBuilder.appendInt(doc - leafContexts.get(segment).docBase); // the offset inside the segment
shard = IntBlock.newConstantBlockWith(perShardCollector.shardIndex, size, blockFactory);
segments = currentSegmentBuilder.build();
docs = currentDocsBuilder.build();
page = new Page(size, new DocVector(shard.asVector(), segments, docs, null).asBlock());
} finally {
if (page == null) {
Releasables.close(shard, segments, docs);
}
}

pagesEmitted++;
return new Page(
size,
new DocVector(
IntBlock.newConstantBlockWith(perShardCollector.shardIndex, size).asVector(),
currentSegmentBuilder.build(),
currentDocsBuilder.build(),
null
).asBlock()
);
return page;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.elasticsearch.compute.operator.NullInsertingSourceOperator;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.PositionMergingSourceOperator;
import org.elasticsearch.compute.operator.ResultPageSinkOperator;
import org.elasticsearch.compute.operator.TestResultPageSinkOperator;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -110,7 +110,7 @@ public final void testIgnoresNulls() {
driverContext,
new NullInsertingSourceOperator(new CannedSourceOperator(input.iterator()), blockFactory),
List.of(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext)),
new ResultPageSinkOperator(results::add),
new TestResultPageSinkOperator(results::add),
() -> {}
)
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.Driver;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.ResultPageSinkOperator;
import org.elasticsearch.compute.operator.SequenceDoubleBlockSourceOperator;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.operator.TestResultPageSinkOperator;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
Expand Down Expand Up @@ -57,7 +57,7 @@ public void testOverflowSucceeds() {
driverContext,
new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(Double.MAX_VALUE - 1, 2)),
List.of(simple(nonBreakingBigArrays()).get(driverContext)),
new ResultPageSinkOperator(results::add),
new TestResultPageSinkOperator(results::add),
() -> {}
)
) {
Expand All @@ -78,7 +78,7 @@ public void testSummationAccuracy() {
DoubleStream.of(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7)
),
List.of(simple(nonBreakingBigArrays()).get(driverContext)),
new ResultPageSinkOperator(results::add),
new TestResultPageSinkOperator(results::add),
() -> {}
)
) {
Expand All @@ -104,7 +104,7 @@ public void testSummationAccuracy() {
driverContext,
new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(values)),
List.of(simple(nonBreakingBigArrays()).get(driverContext)),
new ResultPageSinkOperator(results::add),
new TestResultPageSinkOperator(results::add),
() -> {}
)
) {
Expand All @@ -126,7 +126,7 @@ public void testSummationAccuracy() {
driverContext,
new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(largeValues)),
List.of(simple(nonBreakingBigArrays()).get(driverContext)),
new ResultPageSinkOperator(results::add),
new TestResultPageSinkOperator(results::add),
() -> {}
)
) {
Expand All @@ -145,7 +145,7 @@ public void testSummationAccuracy() {
driverContext,
new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(largeValues)),
List.of(simple(nonBreakingBigArrays()).get(driverContext)),
new ResultPageSinkOperator(results::add),
new TestResultPageSinkOperator(results::add),
() -> {}
)
) {
Expand Down
Loading

0 comments on commit 81ace01

Please sign in to comment.