diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/StringExtractOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/StringExtractOperator.java index 21375b72ac6f6..b3b41a542e465 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/StringExtractOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/StringExtractOperator.java @@ -32,7 +32,7 @@ public record StringExtractOperatorFactory( @Override public Operator get(DriverContext driverContext) { - return new StringExtractOperator(fieldNames, expressionEvaluator.get(driverContext), parserSupplier.get()); + return new StringExtractOperator(fieldNames, expressionEvaluator.get(driverContext), parserSupplier.get(), driverContext); } @Override @@ -44,15 +44,18 @@ public String describe() { private final String[] fieldNames; private final EvalOperator.ExpressionEvaluator inputEvaluator; private final Function> parser; // TODO parser should consume ByteRef instead of String + private final DriverContext driverContext; public StringExtractOperator( String[] fieldNames, EvalOperator.ExpressionEvaluator inputEvaluator, - Function> parser + Function> parser, + DriverContext driverContext ) { this.fieldNames = fieldNames; this.inputEvaluator = inputEvaluator; this.parser = parser; + this.driverContext = driverContext; } @Override @@ -60,79 +63,84 @@ protected Page process(Page page) { int rowsCount = page.getPositionCount(); BytesRefBlock.Builder[] blockBuilders = new BytesRefBlock.Builder[fieldNames.length]; - for (int i = 0; i < fieldNames.length; i++) { - blockBuilders[i] = BytesRefBlock.newBlockBuilder(rowsCount); - } - - try (Block.Ref ref = inputEvaluator.eval(page)) { - BytesRefBlock input = (BytesRefBlock) ref.block(); - BytesRef spare = new BytesRef(); - for (int row = 0; row < rowsCount; row++) { - if (input.isNull(row)) { - for (int i = 0; i < fieldNames.length; i++) { - blockBuilders[i].appendNull(); - } - continue; - } + try { + for (int i = 0; i < fieldNames.length; i++) { + blockBuilders[i] = BytesRefBlock.newBlockBuilder(rowsCount, driverContext.blockFactory()); + } - int position = input.getFirstValueIndex(row); - int valueCount = input.getValueCount(row); - if (valueCount == 1) { - Map items = parser.apply(input.getBytesRef(position, spare).utf8ToString()); - if (items == null) { + try (Block.Ref ref = inputEvaluator.eval(page)) { + BytesRefBlock input = (BytesRefBlock) ref.block(); + BytesRef spare = new BytesRef(); + for (int row = 0; row < rowsCount; row++) { + if (input.isNull(row)) { for (int i = 0; i < fieldNames.length; i++) { blockBuilders[i].appendNull(); } continue; } - for (int i = 0; i < fieldNames.length; i++) { - String val = items.get(fieldNames[i]); - BlockUtils.appendValue(blockBuilders[i], val, ElementType.BYTES_REF); - } - } else { - // multi-valued input - String[] firstValues = new String[fieldNames.length]; - boolean[] positionEntryOpen = new boolean[fieldNames.length]; - for (int c = 0; c < valueCount; c++) { - Map items = parser.apply(input.getBytesRef(position + c, spare).utf8ToString()); + + int position = input.getFirstValueIndex(row); + int valueCount = input.getValueCount(row); + if (valueCount == 1) { + Map items = parser.apply(input.getBytesRef(position, spare).utf8ToString()); if (items == null) { + for (int i = 0; i < fieldNames.length; i++) { + blockBuilders[i].appendNull(); + } continue; } for (int i = 0; i < fieldNames.length; i++) { String val = items.get(fieldNames[i]); - if (val == null) { + BlockUtils.appendValue(blockBuilders[i], val, ElementType.BYTES_REF); + } + } else { + // multi-valued input + String[] firstValues = new String[fieldNames.length]; + boolean[] positionEntryOpen = new boolean[fieldNames.length]; + for (int c = 0; c < valueCount; c++) { + Map items = parser.apply(input.getBytesRef(position + c, spare).utf8ToString()); + if (items == null) { continue; } - if (firstValues[i] == null) { - firstValues[i] = val; - } else { - if (positionEntryOpen[i] == false) { - positionEntryOpen[i] = true; - blockBuilders[i].beginPositionEntry(); - BlockUtils.appendValue(blockBuilders[i], firstValues[i], ElementType.BYTES_REF); + for (int i = 0; i < fieldNames.length; i++) { + String val = items.get(fieldNames[i]); + if (val == null) { + continue; + } + if (firstValues[i] == null) { + firstValues[i] = val; + } else { + if (positionEntryOpen[i] == false) { + positionEntryOpen[i] = true; + blockBuilders[i].beginPositionEntry(); + BlockUtils.appendValue(blockBuilders[i], firstValues[i], ElementType.BYTES_REF); + } + BlockUtils.appendValue(blockBuilders[i], val, ElementType.BYTES_REF); } - BlockUtils.appendValue(blockBuilders[i], val, ElementType.BYTES_REF); } } - } - for (int i = 0; i < fieldNames.length; i++) { - if (positionEntryOpen[i]) { - blockBuilders[i].endPositionEntry(); - } else if (firstValues[i] == null) { - blockBuilders[i].appendNull(); - } else { - BlockUtils.appendValue(blockBuilders[i], firstValues[i], ElementType.BYTES_REF); + for (int i = 0; i < fieldNames.length; i++) { + if (positionEntryOpen[i]) { + blockBuilders[i].endPositionEntry(); + } else if (firstValues[i] == null) { + blockBuilders[i].appendNull(); + } else { + BlockUtils.appendValue(blockBuilders[i], firstValues[i], ElementType.BYTES_REF); + } } } } } - } - Block[] blocks = new Block[blockBuilders.length]; - for (int i = 0; i < blockBuilders.length; i++) { - blocks[i] = blockBuilders[i].build(); + Block[] blocks = new Block[blockBuilders.length]; + for (int i = 0; i < blockBuilders.length; i++) { + blocks[i] = blockBuilders[i].build(); + } + return page.appendBlocks(blocks); + } finally { + Releasables.closeExpectNoException(blockBuilders); } - return page.appendBlocks(blocks); + } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/StringExtractOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/StringExtractOperatorTests.java index bad8092b8a737..f76cbf4d72348 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/StringExtractOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/StringExtractOperatorTests.java @@ -84,8 +84,7 @@ protected void assertSimpleOutput(List input, List results) { @Override protected ByteSizeValue smallEnoughToCircuitBreak() { - assumeTrue("doesn't use big arrays so can't break", false); - return null; + return ByteSizeValue.ofBytes(between(1, 32)); } public void testMultivalueDissectInput() { @@ -98,7 +97,7 @@ public Block.Ref eval(Page page) { @Override public void close() {} - }, new FirstWord("test")); + }, new FirstWord("test"), driverContext()); BytesRefBlock.Builder builder = BytesRefBlock.newBlockBuilder(1); builder.beginPositionEntry(); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/dissect.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/dissect.csv-spec index 54bc481c54b48..8d20e48ef1552 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/dissect.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/dissect.csv-spec @@ -99,7 +99,8 @@ Anneke Preusig | Anneke | Preusig ; -dissectStats +# AwaitsFix https://github.com/elastic/elasticsearch/issues/99826 +dissectStats-Ignore from employees | eval x = concat(gender, " foobar") | dissect x "%{a} %{b}" | stats n = max(emp_no) by a | keep a, n | sort a asc; a:keyword | n:integer