Skip to content

Commit

Permalink
Simplify merging enrich output (#107018)
Browse files Browse the repository at this point in the history
The merge logic in MergePositionsOperator is excessively complex and 
lacks flexibility. It relies on the source operator emitting pages with
ascending positions. Additionally, this merge logic introduced an
unusual method, appendAllValuesToCurrentPosition, to the Block.Builder.
We should replace this with a simpler and more flexible approach. This
PR uses a mechanism similar to the grouping aggregation. In fact, it is 
very close to the values aggregation. Initially, I considered using the
GroupingState from ValuesAggregator. However, unlike in the values
aggregation, we don't expect many multi-values in enrich. Hence, I
introduced the new EnrichResultBuilders instead.
  • Loading branch information
dnhatn authored Apr 9, 2024
1 parent aba7566 commit 24aed5c
Show file tree
Hide file tree
Showing 30 changed files with 910 additions and 710 deletions.
57 changes: 57 additions & 0 deletions x-pack/plugin/esql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import org.elasticsearch.gradle.internal.info.BuildParams

apply plugin: 'elasticsearch.internal-es-plugin'
apply plugin: 'elasticsearch.internal-cluster-test'
apply plugin: 'elasticsearch.string-templates'
esplugin {
name 'x-pack-esql'
description 'The plugin that powers ESQL for Elasticsearch'
Expand Down Expand Up @@ -222,3 +223,59 @@ tasks.register("regen") {
}
}
}

tasks.named("spotlessJava") { dependsOn stringTemplates }
tasks.named('checkstyleMain').configure {
excludes = [ "**/*.java.st" ]
}

def prop(Type, type, TYPE, BYTES, Array) {
return [
"Type" : Type,
"type" : type,
"TYPE" : TYPE,
"BYTES" : BYTES,
"Array" : Array,

"int" : type == "int" ? "true" : "",
"long" : type == "long" ? "true" : "",
"double" : type == "double" ? "true" : "",
"BytesRef" : type == "BytesRef" ? "true" : "",
"boolean" : type == "boolean" ? "true" : "",
]
}

tasks.named('stringTemplates').configure {
var intProperties = prop("Int", "int", "INT", "Integer.BYTES", "IntArray")
var longProperties = prop("Long", "long", "LONG", "Long.BYTES", "LongArray")
var doubleProperties = prop("Double", "double", "DOUBLE", "Double.BYTES", "DoubleArray")
var bytesRefProperties = prop("BytesRef", "BytesRef", "BYTES_REF", "org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_REF", "")
var booleanProperties = prop("Boolean", "boolean", "BOOLEAN", "Byte.BYTES", "BitArray")
// enrich
File enrichResultBuilderInput = file("src/main/java/org/elasticsearch/xpack/esql/enrich/X-EnrichResultBuilder.java.st")
template {
it.properties = intProperties
it.inputFile = enrichResultBuilderInput
it.outputFile = "org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderForInt.java"
}
template {
it.properties = longProperties
it.inputFile = enrichResultBuilderInput
it.outputFile = "org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderForLong.java"
}
template {
it.properties = doubleProperties
it.inputFile = enrichResultBuilderInput
it.outputFile = "org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderForDouble.java"
}
template {
it.properties = bytesRefProperties
it.inputFile = enrichResultBuilderInput
it.outputFile = "org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderForBytesRef.java"
}
template {
it.properties = booleanProperties
it.inputFile = enrichResultBuilderInput
it.outputFile = "org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderForBoolean.java"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,19 +223,6 @@ sealed interface Builder extends Block.Builder, BlockLoader.BooleanBuilder permi
@Override
Builder mvOrdering(Block.MvOrdering mvOrdering);

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
@Override
Builder appendAllValuesToCurrentPosition(Block block);

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
Builder appendAllValuesToCurrentPosition(BooleanBlock block);

@Override
BooleanBlock build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,55 +71,6 @@ public BooleanBlockBuilder endPositionEntry() {
return this;
}

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
@Override
public BooleanBlockBuilder appendAllValuesToCurrentPosition(Block block) {
if (block.areAllValuesNull()) {
return appendNull();
}
return appendAllValuesToCurrentPosition((BooleanBlock) block);
}

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
@Override
public BooleanBlockBuilder appendAllValuesToCurrentPosition(BooleanBlock block) {
final int positionCount = block.getPositionCount();
if (positionCount == 0) {
return appendNull();
}
final int totalValueCount = block.getTotalValueCount();
if (totalValueCount == 0) {
return appendNull();
}
if (totalValueCount > 1) {
beginPositionEntry();
}
final BooleanVector vector = block.asVector();
if (vector != null) {
for (int p = 0; p < positionCount; p++) {
appendBoolean(vector.getBoolean(p));
}
} else {
for (int p = 0; p < positionCount; p++) {
int count = block.getValueCount(p);
int i = block.getFirstValueIndex(p);
for (int v = 0; v < count; v++) {
appendBoolean(block.getBoolean(i++));
}
}
}
if (totalValueCount > 1) {
endPositionEntry();
}
return this;
}

@Override
public BooleanBlockBuilder copyFrom(Block block, int beginInclusive, int endExclusive) {
if (block.areAllValuesNull()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,19 +229,6 @@ sealed interface Builder extends Block.Builder, BlockLoader.BytesRefBuilder perm
@Override
Builder mvOrdering(Block.MvOrdering mvOrdering);

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
@Override
Builder appendAllValuesToCurrentPosition(Block block);

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
Builder appendAllValuesToCurrentPosition(BytesRefBlock block);

@Override
BytesRefBlock build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,56 +78,6 @@ protected void writeNullValue() {
values.append(BytesRefBlock.NULL_VALUE);
}

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
@Override
public BytesRefBlockBuilder appendAllValuesToCurrentPosition(Block block) {
if (block.areAllValuesNull()) {
return appendNull();
}
return appendAllValuesToCurrentPosition((BytesRefBlock) block);
}

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
@Override
public BytesRefBlockBuilder appendAllValuesToCurrentPosition(BytesRefBlock block) {
final int positionCount = block.getPositionCount();
if (positionCount == 0) {
return appendNull();
}
final int totalValueCount = block.getTotalValueCount();
if (totalValueCount == 0) {
return appendNull();
}
if (totalValueCount > 1) {
beginPositionEntry();
}
BytesRef scratch = new BytesRef();
final BytesRefVector vector = block.asVector();
if (vector != null) {
for (int p = 0; p < positionCount; p++) {
appendBytesRef(vector.getBytesRef(p, scratch));
}
} else {
for (int p = 0; p < positionCount; p++) {
int count = block.getValueCount(p);
int i = block.getFirstValueIndex(p);
for (int v = 0; v < count; v++) {
appendBytesRef(block.getBytesRef(i++, scratch));
}
}
}
if (totalValueCount > 1) {
endPositionEntry();
}
return this;
}

@Override
public BytesRefBlockBuilder copyFrom(Block block, int beginInclusive, int endExclusive) {
if (block.areAllValuesNull()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,6 @@ sealed interface Builder extends Block.Builder, BlockLoader.DoubleBuilder permit
@Override
Builder mvOrdering(Block.MvOrdering mvOrdering);

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
@Override
Builder appendAllValuesToCurrentPosition(Block block);

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
Builder appendAllValuesToCurrentPosition(DoubleBlock block);

@Override
DoubleBlock build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,55 +71,6 @@ public DoubleBlockBuilder endPositionEntry() {
return this;
}

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
@Override
public DoubleBlockBuilder appendAllValuesToCurrentPosition(Block block) {
if (block.areAllValuesNull()) {
return appendNull();
}
return appendAllValuesToCurrentPosition((DoubleBlock) block);
}

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
@Override
public DoubleBlockBuilder appendAllValuesToCurrentPosition(DoubleBlock block) {
final int positionCount = block.getPositionCount();
if (positionCount == 0) {
return appendNull();
}
final int totalValueCount = block.getTotalValueCount();
if (totalValueCount == 0) {
return appendNull();
}
if (totalValueCount > 1) {
beginPositionEntry();
}
final DoubleVector vector = block.asVector();
if (vector != null) {
for (int p = 0; p < positionCount; p++) {
appendDouble(vector.getDouble(p));
}
} else {
for (int p = 0; p < positionCount; p++) {
int count = block.getValueCount(p);
int i = block.getFirstValueIndex(p);
for (int v = 0; v < count; v++) {
appendDouble(block.getDouble(i++));
}
}
}
if (totalValueCount > 1) {
endPositionEntry();
}
return this;
}

@Override
public DoubleBlockBuilder copyFrom(Block block, int beginInclusive, int endExclusive) {
if (block.areAllValuesNull()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,19 +223,6 @@ sealed interface Builder extends Block.Builder, BlockLoader.IntBuilder permits I
@Override
Builder mvOrdering(Block.MvOrdering mvOrdering);

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
@Override
Builder appendAllValuesToCurrentPosition(Block block);

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
Builder appendAllValuesToCurrentPosition(IntBlock block);

@Override
IntBlock build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,55 +71,6 @@ public IntBlockBuilder endPositionEntry() {
return this;
}

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
@Override
public IntBlockBuilder appendAllValuesToCurrentPosition(Block block) {
if (block.areAllValuesNull()) {
return appendNull();
}
return appendAllValuesToCurrentPosition((IntBlock) block);
}

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
@Override
public IntBlockBuilder appendAllValuesToCurrentPosition(IntBlock block) {
final int positionCount = block.getPositionCount();
if (positionCount == 0) {
return appendNull();
}
final int totalValueCount = block.getTotalValueCount();
if (totalValueCount == 0) {
return appendNull();
}
if (totalValueCount > 1) {
beginPositionEntry();
}
final IntVector vector = block.asVector();
if (vector != null) {
for (int p = 0; p < positionCount; p++) {
appendInt(vector.getInt(p));
}
} else {
for (int p = 0; p < positionCount; p++) {
int count = block.getValueCount(p);
int i = block.getFirstValueIndex(p);
for (int v = 0; v < count; v++) {
appendInt(block.getInt(i++));
}
}
}
if (totalValueCount > 1) {
endPositionEntry();
}
return this;
}

@Override
public IntBlockBuilder copyFrom(Block block, int beginInclusive, int endExclusive) {
if (block.areAllValuesNull()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,6 @@ sealed interface Builder extends Block.Builder, BlockLoader.LongBuilder permits
@Override
Builder mvOrdering(Block.MvOrdering mvOrdering);

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
@Override
Builder appendAllValuesToCurrentPosition(Block block);

/**
* Appends the all values of the given block into a the current position
* in this builder.
*/
Builder appendAllValuesToCurrentPosition(LongBlock block);

@Override
LongBlock build();
}
Expand Down
Loading

0 comments on commit 24aed5c

Please sign in to comment.