Skip to content

Commit

Permalink
ESQL: Add block factory to GROK command (#100139)
Browse files Browse the repository at this point in the history
Add DriverContext and BlockFactory to GROK command to properly handle
circuit breaking See
#99826
  • Loading branch information
luigidellaquila authored Oct 3, 2023
1 parent 2e86d25 commit bdb1ae5
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public record Factory(

@Override
public Operator get(DriverContext driverContext) {
return new ColumnExtractOperator(types, inputEvalSupplier.get(driverContext), evaluatorSupplier.get());
return new ColumnExtractOperator(types, inputEvalSupplier.get(driverContext), evaluatorSupplier.get(), driverContext);
}

@Override
Expand All @@ -39,44 +39,52 @@ public String describe() {
private final ElementType[] types;
private final EvalOperator.ExpressionEvaluator inputEvaluator;
private final ColumnExtractOperator.Evaluator evaluator;
private final DriverContext driverContext;

public ColumnExtractOperator(
ElementType[] types,
EvalOperator.ExpressionEvaluator inputEvaluator,
ColumnExtractOperator.Evaluator evaluator
ExpressionEvaluator inputEvaluator,
Evaluator evaluator,
DriverContext driverContext
) {
this.types = types;
this.inputEvaluator = inputEvaluator;
this.evaluator = evaluator;
this.driverContext = driverContext;
}

@Override
protected Page process(Page page) {
int rowsCount = page.getPositionCount();

Block.Builder[] blockBuilders = new Block.Builder[types.length];
for (int i = 0; i < types.length; i++) {
blockBuilders[i] = types[i].newBlockBuilder(rowsCount);
}
try {
for (int i = 0; i < types.length; i++) {
blockBuilders[i] = types[i].newBlockBuilder(rowsCount, driverContext.blockFactory());
}

try (Block.Ref ref = inputEvaluator.eval(page)) {
BytesRefBlock input = (BytesRefBlock) ref.block();
BytesRef spare = new BytesRef();
for (int row = 0; row < rowsCount; row++) {
if (input.isNull(row)) {
for (int i = 0; i < blockBuilders.length; i++) {
blockBuilders[i].appendNull();
try (Block.Ref ref = inputEvaluator.eval(page)) {
BytesRefBlock input = (BytesRefBlock) ref.block();
BytesRef spare = new BytesRef();
for (int row = 0; row < rowsCount; row++) {
if (input.isNull(row)) {
for (int i = 0; i < blockBuilders.length; i++) {
blockBuilders[i].appendNull();
}
continue;
}
continue;
evaluator.computeRow(input, row, blockBuilders, spare);
}

Block[] blocks = new Block[blockBuilders.length];
for (int i = 0; i < blockBuilders.length; i++) {
blocks[i] = blockBuilders[i].build();
}
evaluator.computeRow(input, row, blockBuilders, spare);
}

Block[] blocks = new Block[blockBuilders.length];
for (int i = 0; i < blockBuilders.length; i++) {
blocks[i] = blockBuilders[i].build();
return page.appendBlocks(blocks);
}
return page.appendBlocks(blocks);
} finally {
Releasables.closeExpectNoException(blockBuilders);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ protected void assertSimpleOutput(List<Page> input, List<Page> results) {

@Override
protected ByteSizeValue smallEnoughToCircuitBreak() {
assumeTrue("doesn't use big arrays so can't break", false);
return null;
return ByteSizeValue.ofBytes(between(1, 32));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ Anneke Preusig | Anneke | Preusig
;


grokStats
# AwaitsFix https://github.com/elastic/elasticsearch/issues/99826
grokStats-Ignore
from employees | eval x = concat(gender, " foobar") | grok x "%{WORD:a} %{WORD:b}" | stats n = max(emp_no) by a | keep a, n | sort a asc;

a:keyword | n:integer
Expand Down

0 comments on commit bdb1ae5

Please sign in to comment.