Skip to content

Commit

Permalink
ESQL: Paginate MV_EXPAND output (elastic#100598)
Browse files Browse the repository at this point in the history
  • Loading branch information
luigidellaquila committed Oct 11, 2023
1 parent 40f0d83 commit 0d8755b
Show file tree
Hide file tree
Showing 6 changed files with 413 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
Expand All @@ -31,11 +32,12 @@
* 2 | 2 | "foo"
* </pre>
*/
public class MvExpandOperator extends AbstractPageMappingOperator {
public record Factory(int channel) implements OperatorFactory {
public class MvExpandOperator implements Operator {

public record Factory(int channel, int blockSize) implements OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
return new MvExpandOperator(channel);
return new MvExpandOperator(channel, blockSize);
}

@Override
Expand All @@ -46,85 +48,201 @@ public String describe() {

private final int channel;

private final int pageSize;

private int noops;

public MvExpandOperator(int channel) {
private Page prev;
private boolean prevCompleted = false;
private boolean finished = false;

private Block expandingBlock;
private Block expandedBlock;

private int nextPositionToProcess = 0;
private int nextMvToProcess = 0;
private int nextItemOnExpanded = 0;

/**
* Count of pages that have been processed by this operator.
*/
private int pagesIn;
private int pagesOut;

public MvExpandOperator(int channel, int pageSize) {
this.channel = channel;
this.pageSize = pageSize;
assert pageSize > 0;
}

@Override
protected Page process(Page page) {
Block expandingBlock = page.getBlock(channel);
Block expandedBlock = expandingBlock.expand();
public final Page getOutput() {
if (prev == null) {
return null;
}
pagesOut++;
if (prev.getPositionCount() == 0 || expandingBlock.mayHaveMultivaluedFields() == false) {
noops++;
Page result = prev;
prev = null;
return result;
}

try {
return process();
} finally {
if (prevCompleted && prev != null) {
prev.releaseBlocks();
prev = null;
}
}
}

protected Page process() {
if (expandedBlock == expandingBlock) {
noops++;
return page;
prevCompleted = true;
return prev;
}
if (page.getBlockCount() == 1) {
if (prev.getBlockCount() == 1) {
assert channel == 0;
prevCompleted = true;
return new Page(expandedBlock);
}

int[] duplicateFilter = buildDuplicateExpandingFilter(expandingBlock, expandedBlock.getPositionCount());
int[] duplicateFilter = nextDuplicateExpandingFilter();

Block[] result = new Block[page.getBlockCount()];
Block[] result = new Block[prev.getBlockCount()];
int[] expandedMask = new int[duplicateFilter.length];
for (int i = 0; i < expandedMask.length; i++) {
expandedMask[i] = i + nextItemOnExpanded;
}
nextItemOnExpanded += expandedMask.length;
for (int b = 0; b < result.length; b++) {
result[b] = b == channel ? expandedBlock : page.getBlock(b).filter(duplicateFilter);
result[b] = b == channel ? expandedBlock.filter(expandedMask) : prev.getBlock(b).filter(duplicateFilter);
}
if (nextItemOnExpanded == expandedBlock.getPositionCount()) {
nextItemOnExpanded = 0;
}
return new Page(result);
}

private int[] buildDuplicateExpandingFilter(Block expandingBlock, int newPositions) {
int[] duplicateFilter = new int[newPositions];
private int[] nextDuplicateExpandingFilter() {
int[] duplicateFilter = new int[Math.min(pageSize, expandedBlock.getPositionCount() - nextPositionToProcess)];
int n = 0;
for (int p = 0; p < expandingBlock.getPositionCount(); p++) {
int count = expandingBlock.getValueCount(p);
while (true) {
int count = expandingBlock.getValueCount(nextPositionToProcess);
int positions = count == 0 ? 1 : count;
Arrays.fill(duplicateFilter, n, n + positions, p);
n += positions;
int toAdd = Math.min(pageSize - n, positions - nextMvToProcess);
Arrays.fill(duplicateFilter, n, n + toAdd, nextPositionToProcess);
n += toAdd;

if (n == pageSize) {
if (nextMvToProcess + toAdd == positions) {
// finished expanding this position, let's move on to next position (that will be expanded with next call)
nextMvToProcess = 0;
nextPositionToProcess++;
if (nextPositionToProcess == expandingBlock.getPositionCount()) {
nextPositionToProcess = 0;
prevCompleted = true;
}
} else {
// there are still items to expand in current position, but the duplicate filter is full, so we'll deal with them at
// next call
nextMvToProcess = nextMvToProcess + toAdd;
}
return duplicateFilter;
}

nextMvToProcess = 0;
nextPositionToProcess++;
if (nextPositionToProcess == expandingBlock.getPositionCount()) {
nextPositionToProcess = 0;
nextMvToProcess = 0;
prevCompleted = true;
return n < pageSize ? Arrays.copyOfRange(duplicateFilter, 0, n) : duplicateFilter;
}
}
return duplicateFilter;
}

@Override
protected AbstractPageMappingOperator.Status status(int pagesProcessed) {
return new Status(pagesProcessed, noops);
public final boolean needsInput() {
return prev == null && finished == false;
}

@Override
public final void addInput(Page page) {
assert prev == null : "has pending input page";
prev = page;
this.expandingBlock = prev.getBlock(channel);
this.expandedBlock = expandingBlock.expand();
pagesIn++;
prevCompleted = false;
}

@Override
public final void finish() {
finished = true;
}

@Override
public final boolean isFinished() {
return finished && prev == null;
}

@Override
public final Status status() {
return new Status(pagesIn, pagesOut, noops);
}

@Override
public void close() {
if (prev != null) {
Releasables.closeExpectNoException(() -> prev.releaseBlocks());
}
}

@Override
public String toString() {
return "MvExpandOperator[channel=" + channel + "]";
}

public static final class Status extends AbstractPageMappingOperator.Status {
public static final class Status implements Operator.Status {

private final int pagesIn;
private final int pagesOut;
private final int noops;

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Operator.Status.class,
"mv_expand",
Status::new
);

private final int noops;

Status(int pagesProcessed, int noops) {
super(pagesProcessed);
Status(int pagesIn, int pagesOut, int noops) {
this.pagesIn = pagesIn;
this.pagesOut = pagesOut;
this.noops = noops;
}

Status(StreamInput in) throws IOException {
super(in);
pagesIn = in.readVInt();
pagesOut = in.readVInt();
noops = in.readVInt();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(pagesIn);
out.writeVInt(pagesOut);
out.writeVInt(noops);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("pages_processed", pagesProcessed());
builder.field("pages_in", pagesIn);
builder.field("pages_out", pagesOut);
builder.field("noops", noops);
return builder.endObject();
}
Expand All @@ -147,12 +265,20 @@ public boolean equals(Object o) {
return false;
}
Status status = (Status) o;
return noops == status.noops && pagesProcessed() == status.pagesProcessed();
return noops == status.noops && pagesIn == status.pagesIn && pagesOut == status.pagesOut;
}

public int pagesIn() {
return pagesIn;
}

public int pagesOut() {
return pagesOut;
}

@Override
public int hashCode() {
return Objects.hash(noops, pagesProcessed());
return Objects.hash(noops, pagesIn, pagesOut);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

public class MvExpandOperatorStatusTests extends AbstractWireSerializingTestCase<MvExpandOperator.Status> {
public static MvExpandOperator.Status simple() {
return new MvExpandOperator.Status(10, 9);
return new MvExpandOperator.Status(10, 15, 9);
}

public static String simpleToJson() {
return """
{"pages_processed":10,"noops":9}""";
{"pages_in":10,"pages_out":15,"noops":9}""";
}

public void testToXContent() {
Expand All @@ -35,20 +35,28 @@ protected Writeable.Reader<MvExpandOperator.Status> instanceReader() {

@Override
public MvExpandOperator.Status createTestInstance() {
return new MvExpandOperator.Status(randomNonNegativeInt(), randomNonNegativeInt());
return new MvExpandOperator.Status(randomNonNegativeInt(), randomNonNegativeInt(), randomNonNegativeInt());
}

@Override
protected MvExpandOperator.Status mutateInstance(MvExpandOperator.Status instance) {
switch (between(0, 1)) {
switch (between(0, 2)) {
case 0:
return new MvExpandOperator.Status(
randomValueOtherThan(instance.pagesProcessed(), ESTestCase::randomNonNegativeInt),
randomValueOtherThan(instance.pagesIn(), ESTestCase::randomNonNegativeInt),
instance.pagesOut(),
instance.noops()
);
case 1:
return new MvExpandOperator.Status(
instance.pagesProcessed(),
instance.pagesIn(),
randomValueOtherThan(instance.pagesOut(), ESTestCase::randomNonNegativeInt),
instance.noops()
);
case 2:
return new MvExpandOperator.Status(
instance.pagesIn(),
instance.pagesOut(),
randomValueOtherThan(instance.noops(), ESTestCase::randomNonNegativeInt)
);
default:
Expand Down
Loading

0 comments on commit 0d8755b

Please sign in to comment.