Skip to content

Commit

Permalink
ESQL: Add block factory to DISSECT command (elastic#100140)
Browse files Browse the repository at this point in the history
  • Loading branch information
luigidellaquila authored Oct 3, 2023
1 parent ff51a6d commit 8d50534
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public record StringExtractOperatorFactory(

@Override
public Operator get(DriverContext driverContext) {
return new StringExtractOperator(fieldNames, expressionEvaluator.get(driverContext), parserSupplier.get());
return new StringExtractOperator(fieldNames, expressionEvaluator.get(driverContext), parserSupplier.get(), driverContext);
}

@Override
Expand All @@ -44,95 +44,103 @@ public String describe() {
private final String[] fieldNames;
private final EvalOperator.ExpressionEvaluator inputEvaluator;
private final Function<String, Map<String, String>> parser; // TODO parser should consume ByteRef instead of String
private final DriverContext driverContext;

public StringExtractOperator(
String[] fieldNames,
EvalOperator.ExpressionEvaluator inputEvaluator,
Function<String, Map<String, String>> parser
Function<String, Map<String, String>> parser,
DriverContext driverContext
) {
this.fieldNames = fieldNames;
this.inputEvaluator = inputEvaluator;
this.parser = parser;
this.driverContext = driverContext;
}

@Override
protected Page process(Page page) {
int rowsCount = page.getPositionCount();

BytesRefBlock.Builder[] blockBuilders = new BytesRefBlock.Builder[fieldNames.length];
for (int i = 0; i < fieldNames.length; i++) {
blockBuilders[i] = BytesRefBlock.newBlockBuilder(rowsCount);
}

try (Block.Ref ref = inputEvaluator.eval(page)) {
BytesRefBlock input = (BytesRefBlock) ref.block();
BytesRef spare = new BytesRef();
for (int row = 0; row < rowsCount; row++) {
if (input.isNull(row)) {
for (int i = 0; i < fieldNames.length; i++) {
blockBuilders[i].appendNull();
}
continue;
}
try {
for (int i = 0; i < fieldNames.length; i++) {
blockBuilders[i] = BytesRefBlock.newBlockBuilder(rowsCount, driverContext.blockFactory());
}

int position = input.getFirstValueIndex(row);
int valueCount = input.getValueCount(row);
if (valueCount == 1) {
Map<String, String> items = parser.apply(input.getBytesRef(position, spare).utf8ToString());
if (items == null) {
try (Block.Ref ref = inputEvaluator.eval(page)) {
BytesRefBlock input = (BytesRefBlock) ref.block();
BytesRef spare = new BytesRef();
for (int row = 0; row < rowsCount; row++) {
if (input.isNull(row)) {
for (int i = 0; i < fieldNames.length; i++) {
blockBuilders[i].appendNull();
}
continue;
}
for (int i = 0; i < fieldNames.length; i++) {
String val = items.get(fieldNames[i]);
BlockUtils.appendValue(blockBuilders[i], val, ElementType.BYTES_REF);
}
} else {
// multi-valued input
String[] firstValues = new String[fieldNames.length];
boolean[] positionEntryOpen = new boolean[fieldNames.length];
for (int c = 0; c < valueCount; c++) {
Map<String, String> items = parser.apply(input.getBytesRef(position + c, spare).utf8ToString());

int position = input.getFirstValueIndex(row);
int valueCount = input.getValueCount(row);
if (valueCount == 1) {
Map<String, String> items = parser.apply(input.getBytesRef(position, spare).utf8ToString());
if (items == null) {
for (int i = 0; i < fieldNames.length; i++) {
blockBuilders[i].appendNull();
}
continue;
}
for (int i = 0; i < fieldNames.length; i++) {
String val = items.get(fieldNames[i]);
if (val == null) {
BlockUtils.appendValue(blockBuilders[i], val, ElementType.BYTES_REF);
}
} else {
// multi-valued input
String[] firstValues = new String[fieldNames.length];
boolean[] positionEntryOpen = new boolean[fieldNames.length];
for (int c = 0; c < valueCount; c++) {
Map<String, String> items = parser.apply(input.getBytesRef(position + c, spare).utf8ToString());
if (items == null) {
continue;
}
if (firstValues[i] == null) {
firstValues[i] = val;
} else {
if (positionEntryOpen[i] == false) {
positionEntryOpen[i] = true;
blockBuilders[i].beginPositionEntry();
BlockUtils.appendValue(blockBuilders[i], firstValues[i], ElementType.BYTES_REF);
for (int i = 0; i < fieldNames.length; i++) {
String val = items.get(fieldNames[i]);
if (val == null) {
continue;
}
if (firstValues[i] == null) {
firstValues[i] = val;
} else {
if (positionEntryOpen[i] == false) {
positionEntryOpen[i] = true;
blockBuilders[i].beginPositionEntry();
BlockUtils.appendValue(blockBuilders[i], firstValues[i], ElementType.BYTES_REF);
}
BlockUtils.appendValue(blockBuilders[i], val, ElementType.BYTES_REF);
}
BlockUtils.appendValue(blockBuilders[i], val, ElementType.BYTES_REF);
}
}
}
for (int i = 0; i < fieldNames.length; i++) {
if (positionEntryOpen[i]) {
blockBuilders[i].endPositionEntry();
} else if (firstValues[i] == null) {
blockBuilders[i].appendNull();
} else {
BlockUtils.appendValue(blockBuilders[i], firstValues[i], ElementType.BYTES_REF);
for (int i = 0; i < fieldNames.length; i++) {
if (positionEntryOpen[i]) {
blockBuilders[i].endPositionEntry();
} else if (firstValues[i] == null) {
blockBuilders[i].appendNull();
} else {
BlockUtils.appendValue(blockBuilders[i], firstValues[i], ElementType.BYTES_REF);
}
}
}
}
}
}

Block[] blocks = new Block[blockBuilders.length];
for (int i = 0; i < blockBuilders.length; i++) {
blocks[i] = blockBuilders[i].build();
Block[] blocks = new Block[blockBuilders.length];
for (int i = 0; i < blockBuilders.length; i++) {
blocks[i] = blockBuilders[i].build();
}
return page.appendBlocks(blocks);
} finally {
Releasables.closeExpectNoException(blockBuilders);
}
return page.appendBlocks(blocks);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ protected void assertSimpleOutput(List<Page> input, List<Page> results) {

@Override
protected ByteSizeValue smallEnoughToCircuitBreak() {
assumeTrue("doesn't use big arrays so can't break", false);
return null;
return ByteSizeValue.ofBytes(between(1, 32));
}

public void testMultivalueDissectInput() {
Expand All @@ -98,7 +97,7 @@ public Block.Ref eval(Page page) {

@Override
public void close() {}
}, new FirstWord("test"));
}, new FirstWord("test"), driverContext());

BytesRefBlock.Builder builder = BytesRefBlock.newBlockBuilder(1);
builder.beginPositionEntry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ Anneke Preusig | Anneke | Preusig
;


dissectStats
# AwaitsFix https://github.com/elastic/elasticsearch/issues/99826
dissectStats-Ignore
from employees | eval x = concat(gender, " foobar") | dissect x "%{a} %{b}" | stats n = max(emp_no) by a | keep a, n | sort a asc;

a:keyword | n:integer
Expand Down

0 comments on commit 8d50534

Please sign in to comment.