Skip to content

Commit

Permalink
ESQL: Add support for multivalue fields in Arrow output (elastic#114774)
Browse files Browse the repository at this point in the history
  • Loading branch information
swallez authored Oct 21, 2024
1 parent f5ceaff commit 6b6c367
Show file tree
Hide file tree
Showing 5 changed files with 431 additions and 115 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/114774.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 114774
summary: "ESQL: Add support for multivalue fields in Arrow output"
area: ES|QL
type: enhancement
issues: []
1 change: 1 addition & 0 deletions x-pack/plugin/esql/arrow/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {

testImplementation project(':test:framework')
testImplementation('org.apache.arrow:arrow-memory-unsafe:16.1.0')
testImplementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson}")
}

tasks.named("dependencyLicenses").configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -44,6 +45,7 @@ public class ArrowResponse implements ChunkedRestResponseBodyPart, Releasable {
public static class Column {
private final BlockConverter converter;
private final String name;
private boolean multivalued;

public Column(String esqlType, String name) {
this.converter = ESQL_CONVERTERS.get(esqlType);
Expand All @@ -61,20 +63,24 @@ public Column(String esqlType, String name) {
public ArrowResponse(List<Column> columns, List<Page> pages) {
this.columns = columns;

// Find multivalued columns
int colSize = columns.size();
for (int col = 0; col < colSize; col++) {
for (Page page : pages) {
if (page.getBlock(col).mayHaveMultivaluedFields()) {
columns.get(col).multivalued = true;
break;
}
}
}

currentSegment = new SchemaResponse(this);
List<ResponseSegment> rest = new ArrayList<>(pages.size());
for (int p = 0; p < pages.size(); p++) {
var page = pages.get(p);

for (Page page : pages) {
rest.add(new PageResponse(this, page));
// Multivalued fields are not supported yet.
for (int b = 0; b < page.getBlockCount(); b++) {
if (page.getBlock(b).mayHaveMultivaluedFields()) {
throw new IllegalArgumentException(
"ES|QL response field [" + columns.get(b).name + "] is multi-valued. This isn't supported yet by the Arrow format"
);
}
}
}

rest.add(new EndResponse(this));
segments = rest.iterator();
}
Expand Down Expand Up @@ -185,6 +191,9 @@ public void close() {}
* @see <a href="https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format">IPC Streaming Format</a>
*/
private static class SchemaResponse extends ResponseSegment {

private static final FieldType LIST_FIELD_TYPE = FieldType.nullable(MinorType.LIST.getType());

private boolean done = false;

SchemaResponse(ArrowResponse response) {
Expand All @@ -204,7 +213,20 @@ protected void encodeChunk(int sizeHint, RecyclerBytesStreamOutput out) throws I
}

private Schema arrowSchema() {
return new Schema(response.columns.stream().map(c -> new Field(c.name, c.converter.arrowFieldType(), List.of())).toList());
return new Schema(response.columns.stream().map(c -> {
var fieldType = c.converter.arrowFieldType();
if (c.multivalued) {
// A variable-sized list is a vector of offsets and a child vector of values
// See https://arrow.apache.org/docs/format/Columnar.html#variable-size-list-layout
var listType = new FieldType(true, LIST_FIELD_TYPE.getType(), null, fieldType.getMetadata());
// Value vector is non-nullable (ES|QL multivalues cannot contain nulls).
var valueType = new FieldType(false, fieldType.getType(), fieldType.getDictionary(), null);
// The nested vector is named "$data$", following what the Arrow/Java library does.
return new Field(c.name, listType, List.of(new Field("$data$", valueType, null)));
} else {
return new Field(c.name, fieldType, null);
}
}).toList());
}
}

Expand Down Expand Up @@ -257,7 +279,14 @@ protected void encodeChunk(int sizeHint, RecyclerBytesStreamOutput out) throws I

@Override
public void write(ArrowBuf buffer) throws IOException {
extraPosition += bufWriters.get(bufIdx++).write(out);
var len = bufWriters.get(bufIdx++).write(out);
// Consistency check
if (len != buffer.writerIndex()) {
throw new IllegalStateException(
"Buffer [" + (bufIdx - 1) + "]: wrote [" + len + "] bytes, but expected [" + buffer.writerIndex() + "]"
);
}
extraPosition += len;
}

@Override
Expand All @@ -277,11 +306,26 @@ public long align() throws IOException {

// Create Arrow buffers for each of the blocks in this page
for (int b = 0; b < page.getBlockCount(); b++) {
var converter = response.columns.get(b).converter;
var column = response.columns.get(b);
var converter = column.converter;

Block block = page.getBlock(b);
nodes.add(new ArrowFieldNode(block.getPositionCount(), converter.nullValuesCount(block)));
converter.convert(block, bufs, bufWriters);
if (column.multivalued) {
// List node.
nodes.add(new ArrowFieldNode(block.getPositionCount(), converter.nullValuesCount(block)));
// Value vector, does not contain nulls.
nodes.add(new ArrowFieldNode(BlockConverter.valueCount(block), 0));
} else {
nodes.add(new ArrowFieldNode(block.getPositionCount(), converter.nullValuesCount(block)));
}
converter.convert(block, column.multivalued, bufs, bufWriters);
}

// Consistency check
if (bufs.size() != bufWriters.size()) {
throw new IllegalStateException(
"Inconsistent Arrow buffers: [" + bufs.size() + "] buffers and [" + bufWriters.size() + "] writers"
);
}

// Create the batch and serialize it
Expand Down
Loading

0 comments on commit 6b6c367

Please sign in to comment.