From 4d7f0e6f4b5798b19a7f8a3045cf7f9615704266 Mon Sep 17 00:00:00 2001 From: Niels Bauman <33722607+nielsbauman@users.noreply.github.com> Date: Sat, 21 Dec 2024 20:30:35 +0100 Subject: [PATCH 1/9] Expose BwC enrich cache setting in plugin (#119131) This actually allows users to set the BwC setting. --- docs/changelog/119131.yaml | 5 +++++ .../java/org/elasticsearch/xpack/enrich/EnrichPlugin.java | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) create mode 100644 docs/changelog/119131.yaml diff --git a/docs/changelog/119131.yaml b/docs/changelog/119131.yaml new file mode 100644 index 000000000000..2628b6184f90 --- /dev/null +++ b/docs/changelog/119131.yaml @@ -0,0 +1,5 @@ +pr: 119131 +summary: Expose BwC enrich cache setting in plugin +area: Ingest Node +type: bug +issues: [] diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java index ef455acb645d..16bc81785f53 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java @@ -308,7 +308,8 @@ public List> getSettings() { COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST, COORDINATOR_PROXY_QUEUE_CAPACITY, ENRICH_MAX_FORCE_MERGE_ATTEMPTS, - CACHE_SIZE + CACHE_SIZE, + CACHE_SIZE_BWC ); } From c662aecac3bbef1abcfb801a6a0b8326c3dfba0d Mon Sep 17 00:00:00 2001 From: elasticsearchmachine <58790826+elasticsearchmachine@users.noreply.github.com> Date: Mon, 23 Dec 2024 00:59:21 +1100 Subject: [PATCH 2/9] Mute org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT #119191 --- muted-tests.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/muted-tests.yml b/muted-tests.yml index 1114f927bb24..a265b166026c 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -305,6 +305,8 @@ tests: - class: org.elasticsearch.xpack.esql.action.EsqlNodeFailureIT method: testFailureLoadingFields issue: https://github.com/elastic/elasticsearch/issues/118000 +- class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT + issue: https://github.com/elastic/elasticsearch/issues/119191 # Examples: # From 22990df7f2924871edaf3337779dc3870ea6653e Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Mon, 23 Dec 2024 09:19:38 +0000 Subject: [PATCH 3/9] Propagate status codes from shard failures appropriately (#118016) * Return 502 if the underlying error is `NodeNotConnectedException` * Traverse through the cause stack trace and check for `NodeNotConnectedException` and undo `status()` override in `NodeDisconnectedException` * Rewrite `while` condition * Fix: precommit * Let status codes propagate rather than walking the stacktrace explicitly Walking the stacktrace explicitly and looking for a specific error (node connection-related errors in this case) is a workaround rather than a proper fix. Instead, let the status codes propagate all the way to the top so that they can be reported as-is. * Fix: unused import * Fix null deref * Do not map descendants of `ConnectTransportException` to `502` * Fix: precommit * Do not account for 4xx status codes 4xx status codes are not likely to appear along with 5xx status codes. As a result, we do not need to account for them when looking at shard failures' status codes. * Remove unnecessary `switch` case In reference to the previous commit: this case is no longer needed. * Rewrite code comment * Address review comments * [CI] Auto commit changes from spotless * Update docs/changelog/118016.yaml --------- Co-authored-by: elasticsearchmachine --- docs/changelog/118016.yaml | 6 ++ .../search/SearchPhaseExecutionException.java | 20 ++++--- .../SearchPhaseExecutionExceptionTests.java | 55 +++++++++++++++++++ 3 files changed, 74 insertions(+), 7 deletions(-) create mode 100644 docs/changelog/118016.yaml diff --git a/docs/changelog/118016.yaml b/docs/changelog/118016.yaml new file mode 100644 index 000000000000..7ee78b901b19 --- /dev/null +++ b/docs/changelog/118016.yaml @@ -0,0 +1,6 @@ +pr: 118016 +summary: Propagate status codes from shard failures appropriately +area: Search +type: enhancement +issues: + - 118482 diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseExecutionException.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseExecutionException.java index 2d798f2da3a4..fc79e85f9326 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseExecutionException.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseExecutionException.java @@ -73,15 +73,21 @@ public RestStatus status() { // on coordinator node. so get the status from cause instead of returning SERVICE_UNAVAILABLE blindly return getCause() == null ? RestStatus.SERVICE_UNAVAILABLE : ExceptionsHelper.status(getCause()); } - RestStatus status = shardFailures[0].status(); - if (shardFailures.length > 1) { - for (int i = 1; i < shardFailures.length; i++) { - if (shardFailures[i].status().getStatus() >= RestStatus.INTERNAL_SERVER_ERROR.getStatus()) { - status = shardFailures[i].status(); - } + RestStatus status = null; + for (ShardSearchFailure shardFailure : shardFailures) { + RestStatus shardStatus = shardFailure.status(); + int statusCode = shardStatus.getStatus(); + + // Return if it's an error that can be retried. + // These currently take precedence over other status code(s). + if (statusCode >= 502 && statusCode <= 504) { + return shardStatus; + } else if (statusCode >= 500) { + status = shardStatus; } } - return status; + + return status == null ? shardFailures[0].status() : status; } public ShardSearchFailure[] shardFailures() { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseExecutionExceptionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseExecutionExceptionTests.java index 50ce97a2b73a..3e483b1f2cae 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseExecutionExceptionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseExecutionExceptionTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContent; import org.elasticsearch.xcontent.XContentParser; @@ -31,6 +32,7 @@ import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; public class SearchPhaseExecutionExceptionTests extends ESTestCase { @@ -168,4 +170,57 @@ public void testPhaseFailureWithSearchShardFailure() { assertEquals(actual.status(), RestStatus.BAD_REQUEST); } + + public void testOnlyWithCodesThatDoNotRequirePrecedence() { + int pickedIndex = randomIntBetween(0, 1); + + // Pick one of these exceptions randomly. + var searchExceptions = new ElasticsearchException[] { + new ElasticsearchException("simulated"), + new NodeDisconnectedException(null, "unused message", "unused action", null) }; + + // Status codes that map to searchExceptions. + var expectedStatusCodes = new RestStatus[] { RestStatus.INTERNAL_SERVER_ERROR, RestStatus.BAD_GATEWAY }; + + ShardSearchFailure shardFailure1 = new ShardSearchFailure( + searchExceptions[pickedIndex], + new SearchShardTarget("nodeID", new ShardId("someIndex", "someUUID", 1), null) + ); + + ShardSearchFailure shardFailure2 = new ShardSearchFailure( + searchExceptions[pickedIndex], + new SearchShardTarget("nodeID", new ShardId("someIndex", "someUUID", 2), null) + ); + + SearchPhaseExecutionException ex = new SearchPhaseExecutionException( + "search", + "all shards failed", + new ShardSearchFailure[] { shardFailure1, shardFailure2 } + ); + + assertThat(ex.status(), is(expectedStatusCodes[pickedIndex])); + } + + public void testWithRetriableCodesThatTakePrecedence() { + // Maps to a 500. + ShardSearchFailure shardFailure1 = new ShardSearchFailure( + new ElasticsearchException("simulated"), + new SearchShardTarget("nodeID", new ShardId("someIndex", "someUUID", 1), null) + ); + + // Maps to a 502. + ShardSearchFailure shardFailure2 = new ShardSearchFailure( + new NodeDisconnectedException(null, "unused message", "unused action", null), + new SearchShardTarget("nodeID", new ShardId("someIndex", "someUUID", 2), null) + ); + + SearchPhaseExecutionException ex = new SearchPhaseExecutionException( + "search", + "all shards failed", + new ShardSearchFailure[] { shardFailure1, shardFailure2 } + ); + + // The 502 takes precedence over 500. + assertThat(ex.status(), is(RestStatus.BAD_GATEWAY)); + } } From d521f89ee758203951d9fd7fb88e69d23168229d Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Mon, 23 Dec 2024 11:07:46 +0100 Subject: [PATCH 4/9] ESQL: Rewrite TO_UPPER/TO_LOWER comparisons (#118870) This adds an optimization rule to rewrite TO_UPPER/TO_LOWER comparisons against a string into an InsensitiveEquals comparison. The rewrite can also result right away into a TRUE/FALSE, in case the string doesn't match the caseness of the function. This also allows later pushing down the predicate to lucene as a case-insensitive term-query. Fixes #118304. --- docs/changelog/118870.yaml | 6 + .../xpack/esql/core/util/TestUtils.java | 9 + .../src/main/resources/string.csv-spec | 183 +++++++++++++ ...valuator.java => ChangeCaseEvaluator.java} | 29 +- .../scalar/string/ToUpperEvaluator.java | 132 --------- .../function/scalar/string/ChangeCase.java | 112 ++++++++ .../function/scalar/string/ToLower.java | 61 +---- .../function/scalar/string/ToUpper.java | 63 +---- .../esql/optimizer/LogicalPlanOptimizer.java | 2 + ...laceStringCasingWithInsensitiveEquals.java | 68 +++++ .../function/scalar/string/ToLowerTests.java | 2 +- .../function/scalar/string/ToUpperTests.java | 2 +- .../optimizer/LogicalPlanOptimizerTests.java | 75 ++++++ .../optimizer/PhysicalPlanOptimizerTests.java | 255 +++++++++++++++++- 14 files changed, 735 insertions(+), 264 deletions(-) create mode 100644 docs/changelog/118870.yaml rename x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/string/{ToLowerEvaluator.java => ChangeCaseEvaluator.java} (76%) delete mode 100644 x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperEvaluator.java create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ChangeCase.java create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceStringCasingWithInsensitiveEquals.java diff --git a/docs/changelog/118870.yaml b/docs/changelog/118870.yaml new file mode 100644 index 000000000000..ce3692d5454a --- /dev/null +++ b/docs/changelog/118870.yaml @@ -0,0 +1,6 @@ +pr: 118870 +summary: Rewrite TO_UPPER/TO_LOWER comparisons +area: ES|QL +type: enhancement +issues: + - 118304 diff --git a/x-pack/plugin/esql-core/src/test/java/org/elasticsearch/xpack/esql/core/util/TestUtils.java b/x-pack/plugin/esql-core/src/test/java/org/elasticsearch/xpack/esql/core/util/TestUtils.java index b37ca0431ec2..40321fddebdf 100644 --- a/x-pack/plugin/esql-core/src/test/java/org/elasticsearch/xpack/esql/core/util/TestUtils.java +++ b/x-pack/plugin/esql-core/src/test/java/org/elasticsearch/xpack/esql/core/util/TestUtils.java @@ -15,6 +15,8 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.EsField; +import java.util.regex.Pattern; + import static java.util.Collections.emptyMap; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; import static org.elasticsearch.test.ESTestCase.randomBoolean; @@ -26,6 +28,8 @@ public final class TestUtils { private TestUtils() {} + private static final Pattern WS_PATTERN = Pattern.compile("\\s"); + public static Literal of(Object value) { return of(Source.EMPTY, value); } @@ -59,4 +63,9 @@ public static FieldAttribute getFieldAttribute(String name) { public static FieldAttribute getFieldAttribute(String name, DataType dataType) { return new FieldAttribute(EMPTY, name, new EsField(name + "f", dataType, emptyMap(), true)); } + + /** Similar to {@link String#strip()}, but removes the WS throughout the entire string. */ + public static String stripThrough(String input) { + return WS_PATTERN.matcher(input).replaceAll(StringUtils.EMPTY); + } } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec index e103168d2e58..5b0cccc1ed43 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec @@ -1231,6 +1231,189 @@ a:keyword | upper:keyword | lower:keyword π/2 + a + B + Λ ºC | Π/2 + A + B + Λ ºC | π/2 + a + b + λ ºc ; +equalsToUpperPushedDown[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] +from employees +| where to_upper(first_name) == "GEORGI" +| keep emp_no, first_name +; + +emp_no:integer | first_name:keyword +10001 | Georgi +; + +equalsToUpperNestedPushedDown[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] +from employees +| where to_upper(to_upper(to_lower(first_name))) == "GEORGI" +| keep emp_no, first_name +; + +emp_no:integer | first_name:keyword +10001 | Georgi +; + +negatedEqualsToUpperPushedDown[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] +from employees +| sort emp_no +| where not(to_upper(first_name) == "GEORGI") +| keep emp_no, first_name +| limit 1 +; + +emp_no:integer | first_name:keyword +10002 | Bezalel +; + +notEqualsToUpperPushedDown[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] +from employees +| sort emp_no +| where to_upper(first_name) != "GEORGI" +| keep emp_no, first_name +| limit 1 +; + +emp_no:integer | first_name:keyword +10002 | Bezalel +; + +negatedNotEqualsToUpperPushedDown[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] +from employees +| sort emp_no +| where not(to_upper(first_name) != "GEORGI") +| keep emp_no, first_name +| limit 1 +; + +emp_no:integer | first_name:keyword +10001 | Georgi +; + +equalsToUpperFolded +from employees +| where to_upper(first_name) == "Georgi" +| keep emp_no, first_name +; + +emp_no:integer | first_name:keyword +; + +negatedEqualsToUpperFolded +from employees +| where not(to_upper(first_name) == "Georgi") +| stats c = count() +; + +c:long +90 +; + +equalsToUpperNullFolded +from employees +| where to_upper(null) == "Georgi" +| keep emp_no, first_name +; + +emp_no:integer | first_name:keyword +; + +equalsNullToUpperFolded +from employees +| where to_upper(first_name) == null::keyword +| keep emp_no, first_name +; + +emp_no:integer | first_name:keyword +; + +notEqualsToUpperNullFolded +from employees +| where to_upper(null) != "Georgi" +| keep emp_no, first_name +; + +emp_no:integer | first_name:keyword +; + +notEqualsNullToUpperFolded +from employees +| where to_upper(first_name) != null::keyword +| keep emp_no, first_name +; + +emp_no:integer | first_name:keyword +; + +notEqualsToUpperFolded +from employees +| where to_upper(first_name) != "Georgi" +| stats c = count() +; + +c:long +90 +; + +negatedNotEqualsToUpperFolded +from employees +| where not(to_upper(first_name) != "Georgi") +| stats c = count() +; + +c:long +0 +; + +equalsToLowerPushedDown[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] +from employees +| where to_lower(first_name) == "georgi" +| keep emp_no, first_name +; + +emp_no:integer | first_name:keyword +10001 | Georgi +; + +notEqualsToLowerPushedDown[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] +from employees +| sort emp_no +| where to_lower(first_name) != "georgi" +| keep emp_no, first_name +| limit 1 +; + +emp_no:integer | first_name:keyword +10002 | Bezalel +; + +equalsToLowerFolded +from employees +| where to_lower(first_name) == "Georgi" +| keep emp_no, first_name +; + +emp_no:integer | first_name:keyword +; + +notEqualsToLowerFolded +from employees +| where to_lower(first_name) != "Georgi" +| stats c = count() +; + +c:long +90 +; + +equalsToLowerWithUnico(rn|d)s +from employees +| where to_lower(concat(first_name, "🦄🦄")) != "georgi🦄🦄" +| stats c = count() +; + +// 10 null first names +c:long +89 +; + reverse required_capability: fn_reverse from employees | sort emp_no | eval name_reversed = REVERSE(first_name) | keep emp_no, first_name, name_reversed | limit 1; diff --git a/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerEvaluator.java b/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/string/ChangeCaseEvaluator.java similarity index 76% rename from x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerEvaluator.java rename to x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/string/ChangeCaseEvaluator.java index 61e7c5c3042d..02d1b1c86ea3 100644 --- a/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerEvaluator.java +++ b/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/string/ChangeCaseEvaluator.java @@ -20,25 +20,28 @@ import org.elasticsearch.xpack.esql.core.tree.Source; /** - * {@link EvalOperator.ExpressionEvaluator} implementation for {@link ToLower}. + * {@link EvalOperator.ExpressionEvaluator} implementation for {@link ChangeCase}. * This class is generated. Do not edit it. */ -public final class ToLowerEvaluator implements EvalOperator.ExpressionEvaluator { +public final class ChangeCaseEvaluator implements EvalOperator.ExpressionEvaluator { private final Source source; private final EvalOperator.ExpressionEvaluator val; private final Locale locale; + private final ChangeCase.Case caseType; + private final DriverContext driverContext; private Warnings warnings; - public ToLowerEvaluator(Source source, EvalOperator.ExpressionEvaluator val, Locale locale, - DriverContext driverContext) { + public ChangeCaseEvaluator(Source source, EvalOperator.ExpressionEvaluator val, Locale locale, + ChangeCase.Case caseType, DriverContext driverContext) { this.source = source; this.val = val; this.locale = locale; + this.caseType = caseType; this.driverContext = driverContext; } @@ -68,7 +71,7 @@ public BytesRefBlock eval(int positionCount, BytesRefBlock valBlock) { result.appendNull(); continue position; } - result.appendBytesRef(ToLower.process(valBlock.getBytesRef(valBlock.getFirstValueIndex(p), valScratch), this.locale)); + result.appendBytesRef(ChangeCase.process(valBlock.getBytesRef(valBlock.getFirstValueIndex(p), valScratch), this.locale, this.caseType)); } return result.build(); } @@ -78,7 +81,7 @@ public BytesRefVector eval(int positionCount, BytesRefVector valVector) { try(BytesRefVector.Builder result = driverContext.blockFactory().newBytesRefVectorBuilder(positionCount)) { BytesRef valScratch = new BytesRef(); position: for (int p = 0; p < positionCount; p++) { - result.appendBytesRef(ToLower.process(valVector.getBytesRef(p, valScratch), this.locale)); + result.appendBytesRef(ChangeCase.process(valVector.getBytesRef(p, valScratch), this.locale, this.caseType)); } return result.build(); } @@ -86,7 +89,7 @@ public BytesRefVector eval(int positionCount, BytesRefVector valVector) { @Override public String toString() { - return "ToLowerEvaluator[" + "val=" + val + ", locale=" + locale + "]"; + return "ChangeCaseEvaluator[" + "val=" + val + ", locale=" + locale + ", caseType=" + caseType + "]"; } @Override @@ -113,20 +116,24 @@ static class Factory implements EvalOperator.ExpressionEvaluator.Factory { private final Locale locale; - public Factory(Source source, EvalOperator.ExpressionEvaluator.Factory val, Locale locale) { + private final ChangeCase.Case caseType; + + public Factory(Source source, EvalOperator.ExpressionEvaluator.Factory val, Locale locale, + ChangeCase.Case caseType) { this.source = source; this.val = val; this.locale = locale; + this.caseType = caseType; } @Override - public ToLowerEvaluator get(DriverContext context) { - return new ToLowerEvaluator(source, val.get(context), locale, context); + public ChangeCaseEvaluator get(DriverContext context) { + return new ChangeCaseEvaluator(source, val.get(context), locale, caseType, context); } @Override public String toString() { - return "ToLowerEvaluator[" + "val=" + val + ", locale=" + locale + "]"; + return "ChangeCaseEvaluator[" + "val=" + val + ", locale=" + locale + ", caseType=" + caseType + "]"; } } } diff --git a/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperEvaluator.java b/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperEvaluator.java deleted file mode 100644 index 82412dde5394..000000000000 --- a/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperEvaluator.java +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License -// 2.0; you may not use this file except in compliance with the Elastic License -// 2.0. -package org.elasticsearch.xpack.esql.expression.function.scalar.string; - -import java.lang.IllegalArgumentException; -import java.lang.Override; -import java.lang.String; -import java.util.Locale; -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.compute.data.Block; -import org.elasticsearch.compute.data.BytesRefBlock; -import org.elasticsearch.compute.data.BytesRefVector; -import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.operator.DriverContext; -import org.elasticsearch.compute.operator.EvalOperator; -import org.elasticsearch.compute.operator.Warnings; -import org.elasticsearch.core.Releasables; -import org.elasticsearch.xpack.esql.core.tree.Source; - -/** - * {@link EvalOperator.ExpressionEvaluator} implementation for {@link ToUpper}. - * This class is generated. Do not edit it. - */ -public final class ToUpperEvaluator implements EvalOperator.ExpressionEvaluator { - private final Source source; - - private final EvalOperator.ExpressionEvaluator val; - - private final Locale locale; - - private final DriverContext driverContext; - - private Warnings warnings; - - public ToUpperEvaluator(Source source, EvalOperator.ExpressionEvaluator val, Locale locale, - DriverContext driverContext) { - this.source = source; - this.val = val; - this.locale = locale; - this.driverContext = driverContext; - } - - @Override - public Block eval(Page page) { - try (BytesRefBlock valBlock = (BytesRefBlock) val.eval(page)) { - BytesRefVector valVector = valBlock.asVector(); - if (valVector == null) { - return eval(page.getPositionCount(), valBlock); - } - return eval(page.getPositionCount(), valVector).asBlock(); - } - } - - public BytesRefBlock eval(int positionCount, BytesRefBlock valBlock) { - try(BytesRefBlock.Builder result = driverContext.blockFactory().newBytesRefBlockBuilder(positionCount)) { - BytesRef valScratch = new BytesRef(); - position: for (int p = 0; p < positionCount; p++) { - if (valBlock.isNull(p)) { - result.appendNull(); - continue position; - } - if (valBlock.getValueCount(p) != 1) { - if (valBlock.getValueCount(p) > 1) { - warnings().registerException(new IllegalArgumentException("single-value function encountered multi-value")); - } - result.appendNull(); - continue position; - } - result.appendBytesRef(ToUpper.process(valBlock.getBytesRef(valBlock.getFirstValueIndex(p), valScratch), this.locale)); - } - return result.build(); - } - } - - public BytesRefVector eval(int positionCount, BytesRefVector valVector) { - try(BytesRefVector.Builder result = driverContext.blockFactory().newBytesRefVectorBuilder(positionCount)) { - BytesRef valScratch = new BytesRef(); - position: for (int p = 0; p < positionCount; p++) { - result.appendBytesRef(ToUpper.process(valVector.getBytesRef(p, valScratch), this.locale)); - } - return result.build(); - } - } - - @Override - public String toString() { - return "ToUpperEvaluator[" + "val=" + val + ", locale=" + locale + "]"; - } - - @Override - public void close() { - Releasables.closeExpectNoException(val); - } - - private Warnings warnings() { - if (warnings == null) { - this.warnings = Warnings.createWarnings( - driverContext.warningsMode(), - source.source().getLineNumber(), - source.source().getColumnNumber(), - source.text() - ); - } - return warnings; - } - - static class Factory implements EvalOperator.ExpressionEvaluator.Factory { - private final Source source; - - private final EvalOperator.ExpressionEvaluator.Factory val; - - private final Locale locale; - - public Factory(Source source, EvalOperator.ExpressionEvaluator.Factory val, Locale locale) { - this.source = source; - this.val = val; - this.locale = locale; - } - - @Override - public ToUpperEvaluator get(DriverContext context) { - return new ToUpperEvaluator(source, val.get(context), locale, context); - } - - @Override - public String toString() { - return "ToUpperEvaluator[" + "val=" + val + ", locale=" + locale + "]"; - } - } -} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ChangeCase.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ChangeCase.java new file mode 100644 index 000000000000..fe9a2d5beb3c --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ChangeCase.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.scalar.string; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.lucene.BytesRefs; +import org.elasticsearch.compute.ann.Evaluator; +import org.elasticsearch.compute.ann.Fixed; +import org.elasticsearch.compute.operator.EvalOperator; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlConfigurationFunction; +import org.elasticsearch.xpack.esql.session.Configuration; + +import java.util.List; +import java.util.Locale; + +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isString; + +public abstract class ChangeCase extends EsqlConfigurationFunction { + + public enum Case { + UPPER { + @Override + String process(String value, Locale locale) { + return value.toUpperCase(locale); + } + + @Override + public boolean matchesCase(String value) { + return value.codePoints().allMatch(cp -> Character.getType(cp) != Character.LOWERCASE_LETTER); + } + }, + LOWER { + @Override + String process(String value, Locale locale) { + return value.toLowerCase(locale); + } + + @Override + public boolean matchesCase(String value) { + return value.codePoints().allMatch(cp -> Character.getType(cp) != Character.UPPERCASE_LETTER); + } + }; + + abstract String process(String value, Locale locale); + + public abstract boolean matchesCase(String value); + } + + private final Expression field; + private final Case caseType; + + protected ChangeCase(Source source, Expression field, Configuration configuration, Case caseType) { + super(source, List.of(field), configuration); + this.field = field; + this.caseType = caseType; + } + + @Override + public DataType dataType() { + return DataType.KEYWORD; + } + + @Override + protected TypeResolution resolveType() { + if (childrenResolved() == false) { + return new TypeResolution("Unresolved children"); + } + + return isString(field, sourceText(), DEFAULT); + } + + @Override + public boolean foldable() { + return field.foldable(); + } + + public Expression field() { + return field; + } + + public Case caseType() { + return caseType; + } + + public abstract Expression replaceChild(Expression child); + + @Override + public Expression replaceChildren(List newChildren) { + assert newChildren.size() == 1; + return replaceChild(newChildren.get(0)); + } + + @Evaluator + static BytesRef process(BytesRef val, @Fixed Locale locale, @Fixed Case caseType) { + return BytesRefs.toBytesRef(caseType.process(val.utf8ToString(), locale)); + } + + @Override + public EvalOperator.ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) { + var fieldEvaluator = toEvaluator.apply(field); + return new ChangeCaseEvaluator.Factory(source(), fieldEvaluator, configuration().locale(), caseType); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLower.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLower.java index 5f2bbcde5216..084afb1b6999 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLower.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLower.java @@ -7,37 +7,23 @@ package org.elasticsearch.xpack.esql.expression.function.scalar.string; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.lucene.BytesRefs; -import org.elasticsearch.compute.ann.Evaluator; -import org.elasticsearch.compute.ann.Fixed; -import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; -import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.expression.function.Example; import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; import org.elasticsearch.xpack.esql.expression.function.Param; -import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlConfigurationFunction; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.session.Configuration; import java.io.IOException; -import java.util.List; -import java.util.Locale; -import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; -import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isString; - -public class ToLower extends EsqlConfigurationFunction { +public class ToLower extends ChangeCase { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "ToLower", ToLower::new); - private final Expression field; - @FunctionInfo( returnType = { "keyword" }, description = "Returns a new string representing the input string converted to lower case.", @@ -52,8 +38,7 @@ public ToLower( ) Expression field, Configuration configuration ) { - super(source, List.of(field), configuration); - this.field = field; + super(source, field, configuration, Case.LOWER); } private ToLower(StreamInput in) throws IOException { @@ -70,52 +55,12 @@ public String getWriteableName() { return ENTRY.name; } - @Override - public DataType dataType() { - return DataType.KEYWORD; - } - - @Override - protected TypeResolution resolveType() { - if (childrenResolved() == false) { - return new TypeResolution("Unresolved children"); - } - - return isString(field, sourceText(), DEFAULT); - } - - @Override - public boolean foldable() { - return field.foldable(); - } - - @Evaluator - static BytesRef process(BytesRef val, @Fixed Locale locale) { - return BytesRefs.toBytesRef(val.utf8ToString().toLowerCase(locale)); - } - - @Override - public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) { - var fieldEvaluator = toEvaluator.apply(field); - return new ToLowerEvaluator.Factory(source(), fieldEvaluator, configuration().locale()); - } - - public Expression field() { - return field; - } - public ToLower replaceChild(Expression child) { return new ToLower(source(), child, configuration()); } - @Override - public Expression replaceChildren(List newChildren) { - assert newChildren.size() == 1; - return replaceChild(newChildren.get(0)); - } - @Override protected NodeInfo info() { - return NodeInfo.create(this, ToLower::new, field, configuration()); + return NodeInfo.create(this, ToLower::new, field(), configuration()); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpper.java index 7fdd5e39f96f..4509404754f3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpper.java @@ -7,37 +7,23 @@ package org.elasticsearch.xpack.esql.expression.function.scalar.string; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.lucene.BytesRefs; -import org.elasticsearch.compute.ann.Evaluator; -import org.elasticsearch.compute.ann.Fixed; -import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; -import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.expression.function.Example; import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; import org.elasticsearch.xpack.esql.expression.function.Param; -import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlConfigurationFunction; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.session.Configuration; import java.io.IOException; -import java.util.List; -import java.util.Locale; -import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; -import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isString; - -public class ToUpper extends EsqlConfigurationFunction { +public class ToUpper extends ChangeCase { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "ToUpper", ToUpper::new); - private final Expression field; - @FunctionInfo( returnType = { "keyword" }, description = "Returns a new string representing the input string converted to upper case.", @@ -52,8 +38,7 @@ public ToUpper( ) Expression field, Configuration configuration ) { - super(source, List.of(field), configuration); - this.field = field; + super(source, field, configuration, Case.UPPER); } private ToUpper(StreamInput in) throws IOException { @@ -62,7 +47,7 @@ private ToUpper(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(field); + out.writeNamedWriteable(field()); } @Override @@ -70,52 +55,12 @@ public String getWriteableName() { return ENTRY.name; } - @Override - public DataType dataType() { - return DataType.KEYWORD; - } - - @Override - protected TypeResolution resolveType() { - if (childrenResolved() == false) { - return new TypeResolution("Unresolved children"); - } - - return isString(field, sourceText(), DEFAULT); - } - - @Override - public boolean foldable() { - return field.foldable(); - } - - @Evaluator - static BytesRef process(BytesRef val, @Fixed Locale locale) { - return BytesRefs.toBytesRef(val.utf8ToString().toUpperCase(locale)); - } - - @Override - public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) { - var fieldEvaluator = toEvaluator.apply(field); - return new ToUpperEvaluator.Factory(source(), fieldEvaluator, configuration().locale()); - } - - public Expression field() { - return field; - } - public ToUpper replaceChild(Expression child) { return new ToUpper(source(), child, configuration()); } - @Override - public Expression replaceChildren(List newChildren) { - assert newChildren.size() == 1; - return replaceChild(newChildren.get(0)); - } - @Override protected NodeInfo info() { - return NodeInfo.create(this, ToUpper::new, field, configuration()); + return NodeInfo.create(this, ToUpper::new, field(), configuration()); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java index a5f97cf96137..36150083daec 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java @@ -49,6 +49,7 @@ import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceRegexMatch; import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceRowAsLocalRelation; import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStatsFilteredAggWithEval; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStringCasingWithInsensitiveEquals; import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceTrivialTypeConversions; import org.elasticsearch.xpack.esql.optimizer.rules.logical.SetAsOptimized; import org.elasticsearch.xpack.esql.optimizer.rules.logical.SimplifyComparisonsArithmetics; @@ -175,6 +176,7 @@ protected static Batch operators() { new CombineDisjunctions(), // TODO: bifunction can now (since we now have just one data types set) be pushed into the rule new SimplifyComparisonsArithmetics(DataType::areCompatible), + new ReplaceStringCasingWithInsensitiveEquals(), new ReplaceStatsFilteredAggWithEval(), new ExtractAggregateCommonFilter(), // prune/elimination diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceStringCasingWithInsensitiveEquals.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceStringCasingWithInsensitiveEquals.java new file mode 100644 index 000000000000..0fea7cf8ddc1 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceStringCasingWithInsensitiveEquals.java @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.logical; + +import org.elasticsearch.common.lucene.BytesRefs; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.expression.function.scalar.ScalarFunction; +import org.elasticsearch.xpack.esql.core.expression.predicate.logical.Not; +import org.elasticsearch.xpack.esql.core.expression.predicate.nulls.IsNotNull; +import org.elasticsearch.xpack.esql.core.expression.predicate.operator.comparison.BinaryComparison; +import org.elasticsearch.xpack.esql.expression.function.scalar.string.ChangeCase; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.InsensitiveEquals; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.NotEquals; + +public class ReplaceStringCasingWithInsensitiveEquals extends OptimizerRules.OptimizerExpressionRule { + + public ReplaceStringCasingWithInsensitiveEquals() { + super(OptimizerRules.TransformDirection.DOWN); + } + + @Override + protected Expression rule(ScalarFunction sf) { + Expression e = sf; + if (sf instanceof BinaryComparison bc) { + e = rewriteBinaryComparison(sf, bc, false); + } else if (sf instanceof Not not && not.field() instanceof BinaryComparison bc) { + e = rewriteBinaryComparison(sf, bc, true); + } + return e; + } + + private static Expression rewriteBinaryComparison(ScalarFunction sf, BinaryComparison bc, boolean negated) { + Expression e = sf; + if (bc.left() instanceof ChangeCase changeCase && bc.right().foldable()) { + if (bc instanceof Equals) { + e = replaceChangeCase(bc, changeCase, negated); + } else if (bc instanceof NotEquals) { // not actually used currently, `!=` is built as `NOT(==)` already + e = replaceChangeCase(bc, changeCase, negated == false); + } + } + return e; + } + + private static Expression replaceChangeCase(BinaryComparison bc, ChangeCase changeCase, boolean negated) { + var foldedRight = BytesRefs.toString(bc.right().fold()); + var field = unwrapCase(changeCase.field()); + var e = changeCase.caseType().matchesCase(foldedRight) + ? new InsensitiveEquals(bc.source(), field, bc.right()) + : Literal.of(bc, Boolean.FALSE); + if (negated) { + e = e instanceof Literal ? new IsNotNull(e.source(), field) : new Not(e.source(), e); + } + return e; + } + + private static Expression unwrapCase(Expression e) { + for (; e instanceof ChangeCase cc; e = cc.field()) { + } + return e; + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerTests.java index 69dbe023bde6..026d190c06e3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerTests.java @@ -83,7 +83,7 @@ protected Expression buildWithConfiguration(Source source, List args private static TestCaseSupplier supplier(String name, DataType type, Supplier valueSupplier) { return new TestCaseSupplier(name, List.of(type), () -> { List values = new ArrayList<>(); - String expectedToString = "ToLowerEvaluator[val=Attribute[channel=0], locale=en_US]"; + String expectedToString = "ChangeCaseEvaluator[val=Attribute[channel=0], locale=en_US, caseType=LOWER]"; String value = valueSupplier.get(); values.add(new TestCaseSupplier.TypedData(new BytesRef(value), type, "0")); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperTests.java index 33d6f929503b..027ac54d1587 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperTests.java @@ -83,7 +83,7 @@ protected Expression buildWithConfiguration(Source source, List args private static TestCaseSupplier supplier(String name, DataType type, Supplier valueSupplier) { return new TestCaseSupplier(name, List.of(type), () -> { List values = new ArrayList<>(); - String expectedToString = "ToUpperEvaluator[val=Attribute[channel=0], locale=en_US]"; + String expectedToString = "ChangeCaseEvaluator[val=Attribute[channel=0], locale=en_US, caseType=UPPER]"; String value = valueSupplier.get(); values.add(new TestCaseSupplier.TypedData(new BytesRef(value), type, "0")); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index 6ee035aa8bd8..7e65cb045b26 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute; import org.elasticsearch.xpack.esql.core.expression.predicate.logical.And; +import org.elasticsearch.xpack.esql.core.expression.predicate.logical.Not; import org.elasticsearch.xpack.esql.core.expression.predicate.logical.Or; import org.elasticsearch.xpack.esql.core.expression.predicate.nulls.IsNotNull; import org.elasticsearch.xpack.esql.core.expression.predicate.operator.comparison.BinaryComparison; @@ -85,6 +86,7 @@ import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.In; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.InsensitiveEquals; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThan; import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.index.IndexResolution; @@ -126,6 +128,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.function.BiFunction; import java.util.function.Function; @@ -5735,6 +5738,78 @@ public void testSimplifyComparisonArithmeticSkippedOnFloats() { } } + public void testReplaceStringCasingWithInsensitiveEqualsUpperFalse() { + var plan = optimizedPlan("FROM test | WHERE TO_UPPER(first_name) == \"VALÜe\""); + var local = as(plan, LocalRelation.class); + assertThat(local.supplier(), equalTo(LocalSupplier.EMPTY)); + } + + public void testReplaceStringCasingWithInsensitiveEqualsUpperTrue() { + var plan = optimizedPlan("FROM test | WHERE TO_UPPER(first_name) != \"VALÜe\""); + var limit = as(plan, Limit.class); + var filter = as(limit.child(), Filter.class); + var isNotNull = as(filter.condition(), IsNotNull.class); + assertThat(Expressions.name(isNotNull.field()), is("first_name")); + as(filter.child(), EsRelation.class); + } + + public void testReplaceStringCasingWithInsensitiveEqualsLowerFalse() { + var plan = optimizedPlan("FROM test | WHERE TO_LOWER(first_name) == \"VALÜe\""); + var local = as(plan, LocalRelation.class); + assertThat(local.supplier(), equalTo(LocalSupplier.EMPTY)); + } + + public void testReplaceStringCasingWithInsensitiveEqualsLowerTrue() { + var plan = optimizedPlan("FROM test | WHERE TO_LOWER(first_name) != \"VALÜe\""); + var limit = as(plan, Limit.class); + var filter = as(limit.child(), Filter.class); + assertThat(filter.condition(), instanceOf(IsNotNull.class)); + as(filter.child(), EsRelation.class); + } + + public void testReplaceStringCasingWithInsensitiveEqualsEquals() { + for (var fn : List.of("TO_LOWER", "TO_UPPER")) { + var value = fn.equals("TO_LOWER") ? fn.toLowerCase(Locale.ROOT) : fn.toUpperCase(Locale.ROOT); + value += "🐔✈🔥🎉"; // these should not cause folding, they're not in the upper/lower char class + var plan = optimizedPlan("FROM test | WHERE " + fn + "(first_name) == \"" + value + "\""); + var limit = as(plan, Limit.class); + var filter = as(limit.child(), Filter.class); + var insensitive = as(filter.condition(), InsensitiveEquals.class); + as(insensitive.left(), FieldAttribute.class); + var bRef = as(insensitive.right().fold(), BytesRef.class); + assertThat(bRef.utf8ToString(), is(value)); + as(filter.child(), EsRelation.class); + } + } + + public void testReplaceStringCasingWithInsensitiveEqualsNotEquals() { + for (var fn : List.of("TO_LOWER", "TO_UPPER")) { + var value = fn.equals("TO_LOWER") ? fn.toLowerCase(Locale.ROOT) : fn.toUpperCase(Locale.ROOT); + value += "🐔✈🔥🎉"; // these should not cause folding, they're not in the upper/lower char class + var plan = optimizedPlan("FROM test | WHERE " + fn + "(first_name) != \"" + value + "\""); + var limit = as(plan, Limit.class); + var filter = as(limit.child(), Filter.class); + var not = as(filter.condition(), Not.class); + var insensitive = as(not.field(), InsensitiveEquals.class); + as(insensitive.left(), FieldAttribute.class); + var bRef = as(insensitive.right().fold(), BytesRef.class); + assertThat(bRef.utf8ToString(), is(value)); + as(filter.child(), EsRelation.class); + } + } + + public void testReplaceStringCasingWithInsensitiveEqualsUnwrap() { + var plan = optimizedPlan("FROM test | WHERE TO_UPPER(TO_LOWER(TO_UPPER(first_name))) == \"VALÜ\""); + var limit = as(plan, Limit.class); + var filter = as(limit.child(), Filter.class); + var insensitive = as(filter.condition(), InsensitiveEquals.class); + var field = as(insensitive.left(), FieldAttribute.class); + assertThat(field.fieldName(), is("first_name")); + var bRef = as(insensitive.right().fold(), BytesRef.class); + assertThat(bRef.utf8ToString(), is("VALÜ")); + as(filter.child(), EsRelation.class); + } + @Override protected List filteredWarnings() { return withDefaultLimitWarning(super.filteredWarnings()); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 5ab45c8c5f38..ac56d13f870f 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -174,6 +174,7 @@ import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_SHAPE; import static org.elasticsearch.xpack.esql.core.type.DataType.GEO_POINT; import static org.elasticsearch.xpack.esql.core.type.DataType.GEO_SHAPE; +import static org.elasticsearch.xpack.esql.core.util.TestUtils.stripThrough; import static org.elasticsearch.xpack.esql.parser.ExpressionBuilder.MAX_EXPRESSION_DEPTH; import static org.elasticsearch.xpack.esql.parser.LogicalPlanBuilder.MAX_QUERY_DEPTH; import static org.hamcrest.Matchers.closeTo; @@ -1803,7 +1804,7 @@ public void testPushDownEqualsIgnoreCase() { assertThat(source.estimatedRowSize(), equalTo(allFieldRowSize + Integer.BYTES)); QueryBuilder query = source.query(); - assertNotNull(query); + assertNotNull(query); // TODO: verify query } /** @@ -1838,7 +1839,257 @@ public void testNoPushDownEvalEqualsIgnoreCase() { var source = source(extract.child()); QueryBuilder query = source.query(); - assertNull(query); + assertNull(query); // TODO: verify query + } + + public void testPushDownEqualsToUpper() { + doTestPushDownChangeCase(""" + from test + | where to_upper(first_name) == "FOO" + """, """ + { + "esql_single_value" : { + "field" : "first_name", + "next" : { + "term" : { + "first_name" : { + "value" : "FOO", + "case_insensitive" : true + } + } + }, + "source" : "to_upper(first_name) == \\"FOO\\"@2:9" + } + }"""); + } + + /* + * LimitExec[1000[INTEGER]] + * \_ExchangeExec[[_meta_field{f}#9, emp_no{f}#3, first_name{f}#4, gender{f}#5, hire_date{f}#10, job{f}#11, job.raw{f}#12, + * languages{f}#6, last_name{f}#7, long_noidx{f}#13, salary{f}#8],false] + * \_ProjectExec[[_meta_field{f}#9, emp_no{f}#3, first_name{f}#4, gender{f}#5, hire_date{f}#10, job{f}#11, job.raw{f}#12, + * languages{f}#6, last_name{f}#7, long_noidx{f}#13, salary{f}#8]] + * \_FieldExtractExec[_meta_field{f}#9, emp_no{f}#3, first_name{f}#4, gen..]<[]> + * \_EsQueryExec[test], indexMode[standard], query[{...}}][_doc{f}#25], limit[1000], sort[] estimatedRowSize[332] + */ + private void doTestPushDownChangeCase(String esql, String expected) { + var plan = physicalPlan(esql); + var optimized = optimizedPlan(plan); + var topLimit = as(optimized, LimitExec.class); + var exchange = asRemoteExchange(topLimit.child()); + var project = as(exchange.child(), ProjectExec.class); + var extractRest = as(project.child(), FieldExtractExec.class); + var source = source(extractRest.child()); + assertThat(source.estimatedRowSize(), equalTo(allFieldRowSize + Integer.BYTES)); + + QueryBuilder query = source.query(); + assertThat(stripThrough(query.toString()), is(stripThrough(expected))); + } + + public void testPushDownEqualsToLower() { + doTestPushDownChangeCase(""" + from test + | where to_lower(first_name) == "foo" + """, """ + { + "esql_single_value" : { + "field" : "first_name", + "next" : { + "term" : { + "first_name" : { + "value" : "foo", + "case_insensitive" : true + } + } + }, + "source" : "to_lower(first_name) == \\"foo\\"@2:9" + } + }"""); + } + + public void testPushDownNotEqualsToUpper() { + doTestPushDownChangeCase(""" + from test + | where to_upper(first_name) != "FOO" + """, """ + { + "esql_single_value" : { + "field" : "first_name", + "next" : { + "bool" : { + "must_not" : [ + { + "term" : { + "first_name" : { + "value" : "FOO", + "case_insensitive" : true + } + } + } + ], + "boost" : 1.0 + } + }, + "source" : "to_upper(first_name) != \\"FOO\\"@2:9" + } + }"""); + } + + public void testPushDownNotEqualsToLower() { + doTestPushDownChangeCase(""" + from test + | where to_lower(first_name) != "foo" + """, """ + { + "esql_single_value" : { + "field" : "first_name", + "next" : { + "bool" : { + "must_not" : [ + { + "term" : { + "first_name" : { + "value" : "foo", + "case_insensitive" : true + } + } + } + ], + "boost" : 1.0 + } + }, + "source" : "to_lower(first_name) != \\"foo\\"@2:9" + } + }"""); + } + + public void testPushDownChangeCaseMultiplePredicates() { + doTestPushDownChangeCase(""" + from test + | where to_lower(first_name) != "foo" or to_upper(first_name) == "FOO" or emp_no > 10 + """, """ + { + "bool" : { + "should" : [ + { + "esql_single_value" : { + "field" : "first_name", + "next" : { + "bool" : { + "must_not" : [ + { + "term" : { + "first_name" : { + "value" : "foo", + "case_insensitive" : true + } + } + } + ], + "boost" : 1.0 + } + }, + "source" : "to_lower(first_name) != \\"foo\\"@2:9" + } + }, + { + "esql_single_value" : { + "field" : "first_name", + "next" : { + "term" : { + "first_name" : { + "value" : "FOO", + "case_insensitive" : true + } + } + }, + "source" : "to_upper(first_name) == \\"FOO\\"@2:42" + } + }, + { + "esql_single_value" : { + "field" : "emp_no", + "next" : { + "range" : { + "emp_no" : { + "gt" : 10, + "boost" : 1.0 + } + } + }, + "source" : "emp_no > 10@2:75" + } + } + ], + "boost" : 1.0 + } + } + """); + } + + // same tree as with doTestPushDownChangeCase(), but with a topping EvalExec (for `x`) + public void testPushDownChangeCaseThroughEval() { + var esql = """ + from test + | eval x = first_name + | where to_lower(x) == "foo" + """; + var plan = physicalPlan(esql); + var optimized = optimizedPlan(plan); + var eval = as(optimized, EvalExec.class); + var topLimit = as(eval.child(), LimitExec.class); + var exchange = asRemoteExchange(topLimit.child()); + var project = as(exchange.child(), ProjectExec.class); + var extractRest = as(project.child(), FieldExtractExec.class); + var source = source(extractRest.child()); + + var expected = """ + { + "esql_single_value" : { + "field" : "first_name", + "next" : { + "term" : { + "first_name" : { + "value" : "foo", + "case_insensitive" : true + } + } + }, + "source" : "to_lower(x) == \\"foo\\"@3:9" + } + }"""; + QueryBuilder query = source.query(); + assertThat(stripThrough(query.toString()), is(stripThrough(expected))); + } + + /* + * LimitExec[1000[INTEGER]] + * \_ExchangeExec[[_meta_field{f}#9, emp_no{f}#3, first_name{f}#4, gender{f}#5, hire_date{f}#10, job{f}#11, job.raw{f}#12, + * languages{f}#6, last_name{f}#7, long_noidx{f}#13, salary{f}#8],false] + * \_ProjectExec[[_meta_field{f}#9, emp_no{f}#3, first_name{f}#4, gender{f}#5, hire_date{f}#10, job{f}#11, job.raw{f}#12, + * languages{f}#6, last_name{f}#7, long_noidx{f}#13, salary{f}#8]] + * \_FieldExtractExec[_meta_field{f}#9, emp_no{f}#3, gender{f}#5, hire_da..]<[]> + * \_LimitExec[1000[INTEGER]] + * \_FilterExec[NOT(INSENSITIVEEQUALS(CONCAT(first_name{f}#4,[66 6f 6f][KEYWORD]),[66 6f 6f][KEYWORD]))] + * \_FieldExtractExec[first_name{f}#4]<[]> + * \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#25], limit[], sort[] estimatedRowSize[332] + */ + public void testNoPushDownChangeCase() { + var plan = physicalPlan(""" + from test + | where to_lower(concat(first_name, "foo")) != "foo" + """); + + var optimized = optimizedPlan(plan); + var topLimit = as(optimized, LimitExec.class); + var exchange = asRemoteExchange(topLimit.child()); + var project = as(exchange.child(), ProjectExec.class); + var fieldExtract = as(project.child(), FieldExtractExec.class); + var limit = as(fieldExtract.child(), LimitExec.class); + var filter = as(limit.child(), FilterExec.class); + var fieldExtract2 = as(filter.child(), FieldExtractExec.class); + var source = source(fieldExtract2.child()); + assertThat(source.query(), nullValue()); } public void testPushDownNotRLike() { From 2f9cd0793c76c93d6f02edb86364a5ce204e56bb Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 23 Dec 2024 11:42:24 +0100 Subject: [PATCH 5/9] Adjust test mute #119191 (#119206) --- muted-tests.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/muted-tests.yml b/muted-tests.yml index a265b166026c..b86aad0d091d 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -306,6 +306,7 @@ tests: method: testFailureLoadingFields issue: https://github.com/elastic/elasticsearch/issues/118000 - class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT + method: test {yaml=indices.create/20_synthetic_source/create index with use_synthetic_source} issue: https://github.com/elastic/elasticsearch/issues/119191 # Examples: From 2786c58562d374640dd1f89ef01696b101c71580 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Mon, 23 Dec 2024 12:26:10 +0100 Subject: [PATCH 6/9] Fixed ValuesBytesRefGroupingAggregator CB leak (#119121) - Fixed a leak if the breaker breaks on `ValuesBytesRefGroupingAggregator` - Improved aggregators Cranky test to show a better error with the failing stacktrace --- .../aggregation/ValuesBytesRefAggregator.java | 16 ++++- .../aggregation/X-ValuesAggregator.java.st | 16 ++++- ...ValuesBytesRefAggregatorFunctionTests.java | 55 ++++++++++++++++ ...tesRefGroupingAggregatorFunctionTests.java | 63 +++++++++++++++++++ .../compute/operator/OperatorTestCase.java | 12 +++- 5 files changed, 156 insertions(+), 6 deletions(-) create mode 100644 x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregatorFunctionTests.java create mode 100644 x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesBytesRefGroupingAggregatorFunctionTests.java diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java index 602fd2943319..bd77bd7ff1e4 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java @@ -130,8 +130,20 @@ public static class GroupingState implements Releasable { private final BytesRefHash bytes; private GroupingState(BigArrays bigArrays) { - values = new LongLongHash(1, bigArrays); - bytes = new BytesRefHash(1, bigArrays); + LongLongHash _values = null; + BytesRefHash _bytes = null; + try { + _values = new LongLongHash(1, bigArrays); + _bytes = new BytesRefHash(1, bigArrays); + + values = _values; + bytes = _bytes; + + _values = null; + _bytes = null; + } finally { + Releasables.closeExpectNoException(_values, _bytes); + } } void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st index a8884c58116f..1cef234b2238 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st @@ -244,8 +244,20 @@ $endif$ $if(long||double)$ values = new LongLongHash(1, bigArrays); $elseif(BytesRef)$ - values = new LongLongHash(1, bigArrays); - bytes = new BytesRefHash(1, bigArrays); + LongLongHash _values = null; + BytesRefHash _bytes = null; + try { + _values = new LongLongHash(1, bigArrays); + _bytes = new BytesRefHash(1, bigArrays); + + values = _values; + bytes = _bytes; + + _values = null; + _bytes = null; + } finally { + Releasables.closeExpectNoException(_values, _bytes); + } $elseif(int||float)$ values = new LongHash(1, bigArrays); $endif$ diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregatorFunctionTests.java new file mode 100644 index 000000000000..c0a91fe22b87 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregatorFunctionTests.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockUtils; +import org.elasticsearch.compute.operator.SequenceBytesRefBlockSourceOperator; +import org.elasticsearch.compute.operator.SourceOperator; + +import java.util.Arrays; +import java.util.List; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.containsInAnyOrder; + +public class ValuesBytesRefAggregatorFunctionTests extends AggregatorFunctionTestCase { + @Override + protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + return new SequenceBytesRefBlockSourceOperator( + blockFactory, + IntStream.range(0, size).mapToObj(l -> new BytesRef(randomAlphaOfLengthBetween(0, 100))) + ); + } + + @Override + protected AggregatorFunctionSupplier aggregatorFunction(List inputChannels) { + return new ValuesBytesRefAggregatorFunctionSupplier(inputChannels); + } + + @Override + protected String expectedDescriptionOfAggregator() { + return "values of bytes"; + } + + @Override + public void assertSimpleOutput(List input, Block result) { + TreeSet set = new TreeSet<>((List) BlockUtils.toJavaObject(result, 0)); + Object[] values = input.stream() + .flatMap(AggregatorFunctionTestCase::allBytesRefs) + .collect(Collectors.toSet()) + .toArray(Object[]::new); + if (false == set.containsAll(Arrays.asList(values))) { + assertThat(set, containsInAnyOrder(values)); + } + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesBytesRefGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesBytesRefGroupingAggregatorFunctionTests.java new file mode 100644 index 000000000000..fc9bc90828df --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesBytesRefGroupingAggregatorFunctionTests.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockUtils; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.LongBytesRefTupleBlockSourceOperator; +import org.elasticsearch.compute.operator.SourceOperator; +import org.elasticsearch.core.Tuple; + +import java.util.Arrays; +import java.util.List; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class ValuesBytesRefGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase { + @Override + protected AggregatorFunctionSupplier aggregatorFunction(List inputChannels) { + return new ValuesBytesRefAggregatorFunctionSupplier(inputChannels); + } + + @Override + protected String expectedDescriptionOfAggregator() { + return "values of bytes"; + } + + @Override + protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + return new LongBytesRefTupleBlockSourceOperator( + blockFactory, + IntStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), new BytesRef(randomAlphaOfLengthBetween(0, 100)))) + ); + } + + @Override + public void assertSimpleGroup(List input, Block result, int position, Long group) { + Object[] values = input.stream().flatMap(p -> allBytesRefs(p, group)).collect(Collectors.toSet()).toArray(Object[]::new); + Object resultValue = BlockUtils.toJavaObject(result, position); + switch (values.length) { + case 0 -> assertThat(resultValue, nullValue()); + case 1 -> assertThat(resultValue, equalTo(values[0])); + default -> { + TreeSet set = new TreeSet<>((List) resultValue); + if (false == set.containsAll(Arrays.asList(values))) { + assertThat(set, containsInAnyOrder(values)); + } + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java index be792a0ef261..54db0453530b 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java @@ -143,6 +143,7 @@ public final void testSimpleWithCranky() { DriverContext driverContext = crankyDriverContext(); + Exception exception = null; boolean driverStarted = false; try { Operator operator = simple().get(driverContext); @@ -150,8 +151,8 @@ public final void testSimpleWithCranky() { drive(operator, input.iterator(), driverContext); // Either we get lucky and cranky doesn't throw and the test completes or we don't and it throws } catch (CircuitBreakingException e) { - logger.info("broken", e); assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE)); + exception = e; } if (driverStarted == false) { // if drive hasn't even started then we need to release the input pages @@ -159,7 +160,14 @@ public final void testSimpleWithCranky() { } // Note the lack of try/finally here - we're asserting that when the driver throws an exception we clear the breakers. - assertThat(inputFactoryContext.breaker().getUsed(), equalTo(0L)); + long inputUsedBytes = inputFactoryContext.breaker().getUsed(); + if (inputUsedBytes != 0L) { + fail(exception, "Expected no used bytes for input, found: " + inputUsedBytes); + } + long driverUsedBytes = driverContext.breaker().getUsed(); + if (driverUsedBytes != 0L) { + fail(exception, "Expected no used bytes for driver, found: " + driverUsedBytes); + } } /** From ed102f8c76fd52bd432580340b4467e1f4e14e87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Mon, 23 Dec 2024 13:37:59 +0100 Subject: [PATCH 7/9] ESQL: Lookup join with aliases (#118956) --- .../esql/qa/mixed/MixedClusterEsqlSpecIT.java | 4 +- .../xpack/esql/ccq/MultiClusterSpecIT.java | 8 +- .../rest/RequestIndexFilteringTestCase.java | 2 +- .../src/main/resources/lookup-join.csv-spec | 141 +++++++++--------- .../xpack/esql/action/EsqlCapabilities.java | 7 +- .../esql/planner/LocalExecutionPlanner.java | 11 +- .../elasticsearch/xpack/esql/CsvTests.java | 2 +- .../xpack/esql/analysis/AnalyzerTests.java | 8 +- .../xpack/esql/analysis/ParsingTests.java | 8 +- .../xpack/esql/analysis/VerifierTests.java | 2 +- .../optimizer/LogicalPlanOptimizerTests.java | 12 +- .../optimizer/PhysicalPlanOptimizerTests.java | 10 +- .../session/IndexResolverFieldNamesTests.java | 24 +-- .../test/esql/190_lookup_join.yml | 51 ++++++- 14 files changed, 171 insertions(+), 119 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/mixed-cluster/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/mixed/MixedClusterEsqlSpecIT.java b/x-pack/plugin/esql/qa/server/mixed-cluster/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/mixed/MixedClusterEsqlSpecIT.java index d4b087277df5..9a09401785df 100644 --- a/x-pack/plugin/esql/qa/server/mixed-cluster/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/mixed/MixedClusterEsqlSpecIT.java +++ b/x-pack/plugin/esql/qa/server/mixed-cluster/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/mixed/MixedClusterEsqlSpecIT.java @@ -21,7 +21,7 @@ import java.util.List; import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled; -import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V8; +import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V9; import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.ASYNC; public class MixedClusterEsqlSpecIT extends EsqlSpecTestCase { @@ -96,7 +96,7 @@ protected boolean supportsInferenceTestService() { @Override protected boolean supportsIndexModeLookup() throws IOException { - return hasCapabilities(List.of(JOIN_LOOKUP_V8.capabilityName())); + return hasCapabilities(List.of(JOIN_LOOKUP_V9.capabilityName())); } @Override diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index d7c57e23b714..a809216d3beb 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -48,7 +48,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2; -import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V8; +import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V9; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST; import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.SYNC; @@ -124,7 +124,7 @@ protected void shouldSkipTest(String testName) throws IOException { assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName())); assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName())); assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName())); - assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V8.capabilityName())); + assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V9.capabilityName())); } private TestFeatureService remoteFeaturesService() throws IOException { @@ -283,8 +283,8 @@ protected boolean supportsInferenceTestService() { @Override protected boolean supportsIndexModeLookup() throws IOException { - // CCS does not yet support JOIN_LOOKUP_V8 and clusters falsely report they have this capability - // return hasCapabilities(List.of(JOIN_LOOKUP_V8.capabilityName())); + // CCS does not yet support JOIN_LOOKUP_V9 and clusters falsely report they have this capability + // return hasCapabilities(List.of(JOIN_LOOKUP_V9.capabilityName())); return false; } } diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java index 355c403ce2a8..a83b6cf2e906 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java @@ -221,7 +221,7 @@ public void testIndicesDontExist() throws IOException { assertThat(e.getMessage(), containsString("index_not_found_exception")); assertThat(e.getMessage(), containsString("no such index [foo]")); - if (EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()) { + if (EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()) { e = expectThrows( ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM test1 | LOOKUP JOIN foo ON id1")) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 39638899cf6b..309386228b1c 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -8,7 +8,7 @@ ############################################### basicOnTheDataNode -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | EVAL language_code = languages @@ -25,7 +25,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; basicRow -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW language_code = 1 | LOOKUP JOIN languages_lookup ON language_code @@ -36,7 +36,7 @@ language_code:integer | language_name:keyword ; basicOnTheCoordinator -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | SORT emp_no @@ -53,7 +53,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; subsequentEvalOnTheDataNode -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | EVAL language_code = languages @@ -71,7 +71,7 @@ emp_no:integer | language_code:integer | language_name:keyword | language_code_x ; subsequentEvalOnTheCoordinator -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | SORT emp_no @@ -89,7 +89,7 @@ emp_no:integer | language_code:integer | language_name:keyword | language_code_x ; sortEvalBeforeLookup -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | SORT emp_no @@ -106,7 +106,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; nonUniqueLeftKeyOnTheDataNode -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | WHERE emp_no <= 10030 @@ -130,7 +130,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; nonUniqueRightKeyOnTheDataNode -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | EVAL language_code = emp_no % 10 @@ -150,7 +150,7 @@ emp_no:integer | language_code:integer | language_name:keyword | country:k ; nonUniqueRightKeyOnTheCoordinator -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | SORT emp_no @@ -170,7 +170,7 @@ emp_no:integer | language_code:integer | language_name:keyword | country:k ; nonUniqueRightKeyFromRow -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW language_code = 2 | LOOKUP JOIN languages_lookup_non_unique_key ON language_code @@ -183,8 +183,7 @@ language_code:integer | language_name:keyword | country:keyword ; repeatedIndexOnFrom -required_capability: join_lookup_v8 -required_capability: join_lookup_repeated_index_from +required_capability: join_lookup_v9 FROM languages_lookup | LOOKUP JOIN languages_lookup ON language_code @@ -202,7 +201,7 @@ dropAllLookedUpFieldsOnTheDataNode-Ignore // Depends on // https://github.com/elastic/elasticsearch/issues/118778 // https://github.com/elastic/elasticsearch/issues/118781 -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | EVAL language_code = emp_no % 10 @@ -223,7 +222,7 @@ dropAllLookedUpFieldsOnTheCoordinator-Ignore // Depends on // https://github.com/elastic/elasticsearch/issues/118778 // https://github.com/elastic/elasticsearch/issues/118781 -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | SORT emp_no @@ -248,7 +247,7 @@ emp_no:integer ############################################### filterOnLeftSide -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | EVAL language_code = languages @@ -265,7 +264,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; filterOnRightSide -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -281,7 +280,7 @@ FROM sample_data ; filterOnRightSideAfterStats -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -294,7 +293,7 @@ count:long | type:keyword ; filterOnJoinKey -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | EVAL language_code = languages @@ -309,7 +308,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; filterOnJoinKeyAndRightSide -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | WHERE emp_no < 10006 @@ -326,7 +325,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; filterOnRightSideOnTheCoordinator -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | SORT emp_no @@ -342,7 +341,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; filterOnJoinKeyOnTheCoordinator -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | SORT emp_no @@ -358,7 +357,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; filterOnJoinKeyAndRightSideOnTheCoordinator -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | SORT emp_no @@ -375,7 +374,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; filterOnTheDataNodeThenFilterOnTheCoordinator -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | EVAL language_code = languages @@ -396,7 +395,7 @@ emp_no:integer | language_code:integer | language_name:keyword ########################################################################### nullJoinKeyOnTheDataNode -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | WHERE emp_no < 10004 @@ -413,7 +412,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; mvJoinKeyOnTheDataNode -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | WHERE 10003 < emp_no AND emp_no < 10008 @@ -431,7 +430,7 @@ emp_no:integer | language_code:integer | language_name:keyword ; mvJoinKeyFromRow -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW language_code = [4, 5, 6, 7] | LOOKUP JOIN languages_lookup_non_unique_key ON language_code @@ -444,7 +443,7 @@ language_code:integer | language_name:keyword | country:keyword ; mvJoinKeyFromRowExpanded -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW language_code = [4, 5, 6, 7, 8] | MV_EXPAND language_code @@ -466,7 +465,7 @@ language_code:integer | language_name:keyword | country:keyword ########################################################################### joinOnNestedField -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM employees | WHERE 10000 < emp_no AND emp_no < 10006 @@ -486,7 +485,7 @@ emp_no:integer | language.id:integer | language.name:text joinOnNestedFieldRow -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW language.code = "EN" | LOOKUP JOIN languages_nested_fields ON language.code @@ -499,7 +498,7 @@ language.id:integer | language.code:keyword | language.name.keyword:keyword joinOnNestedNestedFieldRow -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW language.name.keyword = "English" | LOOKUP JOIN languages_nested_fields ON language.name.keyword @@ -515,7 +514,7 @@ language.id:integer | language.name:text | language.name.keyword:keyword ############################################### lookupIPFromRow -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", client_ip = "172.21.0.5", right = "right" | LOOKUP JOIN clientips_lookup ON client_ip @@ -526,7 +525,7 @@ left | 172.21.0.5 | right | Development ; lookupIPFromKeepRow -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", client_ip = "172.21.0.5", right = "right" | KEEP left, client_ip, right @@ -538,7 +537,7 @@ left | 172.21.0.5 | right | Development ; lookupIPFromRowWithShadowing -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right" | LOOKUP JOIN clientips_lookup ON client_ip @@ -549,7 +548,7 @@ left | 172.21.0.5 | right | Development ; lookupIPFromRowWithShadowingKeep -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right" | EVAL client_ip = client_ip::keyword @@ -562,7 +561,7 @@ left | 172.21.0.5 | right | Development ; lookupIPFromRowWithShadowingKeepReordered -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right" | EVAL client_ip = client_ip::keyword @@ -575,7 +574,7 @@ right | Development | 172.21.0.5 ; lookupIPFromIndex -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -594,7 +593,7 @@ ignoreOrder:true ; lookupIPFromIndexKeep -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -614,7 +613,7 @@ ignoreOrder:true ; lookupIPFromIndexKeepKeep -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | KEEP client_ip, event_duration, @timestamp, message @@ -636,7 +635,7 @@ timestamp:date | client_ip:keyword | event_duration:long | msg:keyword ; lookupIPFromIndexStats -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -652,7 +651,7 @@ count:long | env:keyword ; lookupIPFromIndexStatsKeep -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -669,7 +668,7 @@ count:long | env:keyword ; statsAndLookupIPFromIndex -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -690,7 +689,7 @@ count:long | client_ip:keyword | env:keyword ############################################### lookupMessageFromRow -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", message = "Connected to 10.1.0.1", right = "right" | LOOKUP JOIN message_types_lookup ON message @@ -701,7 +700,7 @@ left | Connected to 10.1.0.1 | right | Success ; lookupMessageFromKeepRow -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", message = "Connected to 10.1.0.1", right = "right" | KEEP left, message, right @@ -713,7 +712,7 @@ left | Connected to 10.1.0.1 | right | Success ; lookupMessageFromRowWithShadowing -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", message = "Connected to 10.1.0.1", type = "unknown", right = "right" | LOOKUP JOIN message_types_lookup ON message @@ -724,7 +723,7 @@ left | Connected to 10.1.0.1 | right | Success ; lookupMessageFromRowWithShadowingKeep -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", message = "Connected to 10.1.0.1", type = "unknown", right = "right" | LOOKUP JOIN message_types_lookup ON message @@ -736,7 +735,7 @@ left | Connected to 10.1.0.1 | right | Success ; lookupMessageFromIndex -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -754,7 +753,7 @@ ignoreOrder:true ; lookupMessageFromIndexKeep -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -773,7 +772,7 @@ ignoreOrder:true ; lookupMessageFromIndexKeepKeep -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | KEEP client_ip, event_duration, @timestamp, message @@ -793,7 +792,7 @@ ignoreOrder:true ; lookupMessageFromIndexKeepReordered -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -812,7 +811,7 @@ Success | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 ; lookupMessageFromIndexStats -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -827,7 +826,7 @@ count:long | type:keyword ; lookupMessageFromIndexStatsKeep -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -843,7 +842,7 @@ count:long | type:keyword ; statsAndLookupMessageFromIndex -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | STATS count = count(message) BY message @@ -861,7 +860,7 @@ count:long | type:keyword | message:keyword ; lookupMessageFromIndexTwice -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -883,7 +882,7 @@ ignoreOrder:true ; lookupMessageFromIndexTwiceKeep -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -906,7 +905,7 @@ ignoreOrder:true ; lookupMessageFromIndexTwiceFullyShadowing -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | LOOKUP JOIN message_types_lookup ON message @@ -930,7 +929,7 @@ ignoreOrder:true ############################################### lookupIPAndMessageFromRow -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", right = "right" | LOOKUP JOIN clientips_lookup ON client_ip @@ -942,7 +941,7 @@ left | 172.21.0.5 | Connected to 10.1.0.1 | right | Devel ; lookupIPAndMessageFromRowKeepBefore -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", right = "right" | KEEP left, client_ip, message, right @@ -955,7 +954,7 @@ left | 172.21.0.5 | Connected to 10.1.0.1 | right | Devel ; lookupIPAndMessageFromRowKeepBetween -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", right = "right" | LOOKUP JOIN clientips_lookup ON client_ip @@ -968,7 +967,7 @@ left | 172.21.0.5 | Connected to 10.1.0.1 | right | Devel ; lookupIPAndMessageFromRowKeepAfter -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", right = "right" | LOOKUP JOIN clientips_lookup ON client_ip @@ -981,7 +980,7 @@ left | 172.21.0.5 | Connected to 10.1.0.1 | right | Devel ; lookupIPAndMessageFromRowWithShadowing -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", env = "env", type = "type", right = "right" | LOOKUP JOIN clientips_lookup ON client_ip @@ -993,7 +992,7 @@ left | 172.21.0.5 | Connected to 10.1.0.1 | right | Devel ; lookupIPAndMessageFromRowWithShadowingKeep -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", env = "env", right = "right" | EVAL client_ip = client_ip::keyword @@ -1007,7 +1006,7 @@ left | 172.21.0.5 | Connected to 10.1.0.1 | right | Devel ; lookupIPAndMessageFromRowWithShadowingKeepKeep -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", env = "env", right = "right" | EVAL client_ip = client_ip::keyword @@ -1022,7 +1021,7 @@ left | 172.21.0.5 | Connected to 10.1.0.1 | right | Devel ; lookupIPAndMessageFromRowWithShadowingKeepKeepKeep -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", env = "env", right = "right" | EVAL client_ip = client_ip::keyword @@ -1038,7 +1037,7 @@ left | 172.21.0.5 | Connected to 10.1.0.1 | right | Devel ; lookupIPAndMessageFromRowWithShadowingKeepReordered -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 ROW left = "left", client_ip = "172.21.0.5", message = "Connected to 10.1.0.1", env = "env", right = "right" | EVAL client_ip = client_ip::keyword @@ -1052,7 +1051,7 @@ right | Development | Success | 172.21.0.5 ; lookupIPAndMessageFromIndex -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -1072,7 +1071,7 @@ ignoreOrder:true ; lookupIPAndMessageFromIndexKeep -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -1093,7 +1092,7 @@ ignoreOrder:true ; lookupIPAndMessageFromIndexStats -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -1111,7 +1110,7 @@ count:long | env:keyword | type:keyword ; lookupIPAndMessageFromIndexStatsKeep -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -1130,7 +1129,7 @@ count:long | env:keyword | type:keyword ; statsAndLookupIPAndMessageFromIndex -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -1149,7 +1148,7 @@ count:long | client_ip:keyword | message:keyword | env:keyword | type:keyw ; lookupIPAndMessageFromIndexChainedEvalKeep -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | EVAL client_ip = client_ip::keyword @@ -1171,7 +1170,7 @@ ignoreOrder:true ; lookupIPAndMessageFromIndexChainedRenameKeep -required_capability: join_lookup_v8 +required_capability: join_lookup_v9 FROM sample_data | EVAL client_ip = client_ip::keyword diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index b1b11ccb09c8..22f7937ccf4f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -560,12 +560,7 @@ public enum Cap { /** * LOOKUP JOIN */ - JOIN_LOOKUP_V8(Build.current().isSnapshot()), - - /** - * LOOKUP JOIN with the same index as the FROM - */ - JOIN_LOOKUP_REPEATED_INDEX_FROM(JOIN_LOOKUP_V8.isEnabled()), + JOIN_LOOKUP_V9(Build.current().isSnapshot()), /** * Fix for https://github.com/elastic/elasticsearch/issues/117054 diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index dedc61207143..c40263baa656 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -575,6 +575,15 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan if (localSourceExec.indexMode() != IndexMode.LOOKUP) { throw new IllegalArgumentException("can't plan [" + join + "]"); } + Map indicesWithModes = localSourceExec.index().indexNameWithModes(); + if (indicesWithModes.size() != 1) { + throw new IllegalArgumentException("can't plan [" + join + "], found more than 1 index"); + } + var entry = indicesWithModes.entrySet().iterator().next(); + if (entry.getValue() != IndexMode.LOOKUP) { + throw new IllegalArgumentException("can't plan [" + join + "], found index with mode [" + entry.getValue() + "]"); + } + String indexName = entry.getKey(); List matchFields = new ArrayList<>(join.leftFields().size()); for (Attribute m : join.leftFields()) { Layout.ChannelAndType t = source.layout.get(m.id()); @@ -595,7 +604,7 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan matchFields.getFirst().channel(), lookupFromIndexService, matchFields.getFirst().type(), - localSourceExec.index().name(), + indexName, join.leftFields().getFirst().name(), join.addedFields().stream().map(f -> (NamedExpression) f).toList(), join.source() diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 1e0374c64857..76744957ff5f 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -263,7 +263,7 @@ public final void test() throws Throwable { ); assumeFalse( "lookup join disabled for csv tests", - testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.JOIN_LOOKUP_V8.capabilityName()) + testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.JOIN_LOOKUP_V9.capabilityName()) ); assumeFalse( "can't use TERM function in csv tests", diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java index 674eda8916c5..be15bb7de8b4 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java @@ -2140,7 +2140,7 @@ public void testLookupMatchTypeWrong() { } public void testLookupJoinUnknownIndex() { - assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); String errorMessage = "Unknown index [foobar]"; IndexResolution missingLookupIndex = IndexResolution.invalid(errorMessage); @@ -2169,7 +2169,7 @@ public void testLookupJoinUnknownIndex() { } public void testLookupJoinUnknownField() { - assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); String query = "FROM test | LOOKUP JOIN languages_lookup ON last_name"; String errorMessage = "1:45: Unknown column [last_name] in right side of join"; @@ -2192,7 +2192,7 @@ public void testLookupJoinUnknownField() { } public void testMultipleLookupJoinsGiveDifferentAttributes() { - assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); // The field attributes that get contributed by different LOOKUP JOIN commands must have different name ids, // even if they have the same names. Otherwise, things like dependency analysis - like in PruneColumns - cannot work based on @@ -2222,7 +2222,7 @@ public void testMultipleLookupJoinsGiveDifferentAttributes() { } public void testLookupJoinIndexMode() { - assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); var indexResolution = AnalyzerTestUtils.expandedDefaultIndexResolution(); var lookupResolution = AnalyzerTestUtils.defaultLookupResolution(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java index 549ddce03c20..2f6cf46f2e2b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java @@ -113,7 +113,7 @@ public void testTooBigQuery() { } public void testJoinOnConstant() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); assertEquals( "1:55: JOIN ON clause only supports fields at the moment, found [123]", error("row languages = 1, gender = \"f\" | lookup join test on 123") @@ -129,7 +129,7 @@ public void testJoinOnConstant() { } public void testJoinOnMultipleFields() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); assertEquals( "1:35: JOIN ON clause only supports one field at the moment, found [2]", error("row languages = 1, gender = \"f\" | lookup join test on gender, languages") @@ -137,7 +137,7 @@ public void testJoinOnMultipleFields() { } public void testJoinTwiceOnTheSameField() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); assertEquals( "1:35: JOIN ON clause only supports one field at the moment, found [2]", error("row languages = 1, gender = \"f\" | lookup join test on languages, languages") @@ -145,7 +145,7 @@ public void testJoinTwiceOnTheSameField() { } public void testJoinTwiceOnTheSameField_TwoLookups() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); assertEquals( "1:80: JOIN ON clause only supports one field at the moment, found [2]", error("row languages = 1, gender = \"f\" | lookup join test on languages | eval x = 1 | lookup join test on gender, gender") diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java index 43d764ab2007..533cc59b824c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java @@ -1974,7 +1974,7 @@ public void testSortByAggregate() { } public void testLookupJoinDataTypeMismatch() { - assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); query("FROM test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code"); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index 7e65cb045b26..672eef7076c6 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -4927,7 +4927,7 @@ public void testPlanSanityCheck() throws Exception { } public void testPlanSanityCheckWithBinaryPlans() throws Exception { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); var plan = optimizedPlan(""" FROM test @@ -6003,7 +6003,7 @@ public void testLookupStats() { * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] */ public void testLookupJoinPushDownFilterOnJoinKeyWithRename() { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); String query = """ FROM test @@ -6045,7 +6045,7 @@ public void testLookupJoinPushDownFilterOnJoinKeyWithRename() { * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] */ public void testLookupJoinPushDownFilterOnLeftSideField() { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); String query = """ FROM test @@ -6088,7 +6088,7 @@ public void testLookupJoinPushDownFilterOnLeftSideField() { * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#18, language_name{f}#19] */ public void testLookupJoinPushDownDisabledForLookupField() { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); String query = """ FROM test @@ -6132,7 +6132,7 @@ public void testLookupJoinPushDownDisabledForLookupField() { * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#19, language_name{f}#20] */ public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightField() { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); String query = """ FROM test @@ -6183,7 +6183,7 @@ public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightFiel * \_EsRelation[languages_lookup][LOOKUP][language_code{f}#19, language_name{f}#20] */ public void testLookupJoinPushDownDisabledForDisjunctionBetweenLeftAndRightField() { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); String query = """ FROM test diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index ac56d13f870f..80f2772945e9 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -2615,7 +2615,7 @@ public void testVerifierOnMissingReferences() { } public void testVerifierOnMissingReferencesWithBinaryPlans() throws Exception { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); // Do not assert serialization: // This will have a LookupJoinExec, which is not serializable because it doesn't leave the coordinator. @@ -7298,7 +7298,7 @@ public void testLookupThenTopN() { } public void testLookupJoinFieldLoading() throws Exception { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); TestDataSource data = dataSetWithLookupIndices(Map.of("lookup_index", List.of("first_name", "foo", "bar", "baz"))); @@ -7375,7 +7375,7 @@ public void testLookupJoinFieldLoading() throws Exception { } public void testLookupJoinFieldLoadingTwoLookups() throws Exception { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); TestDataSource data = dataSetWithLookupIndices( Map.of( @@ -7429,7 +7429,7 @@ public void testLookupJoinFieldLoadingTwoLookups() throws Exception { @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/119082") public void testLookupJoinFieldLoadingTwoLookupsProjectInBetween() throws Exception { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); TestDataSource data = dataSetWithLookupIndices( Map.of( @@ -7470,7 +7470,7 @@ public void testLookupJoinFieldLoadingTwoLookupsProjectInBetween() throws Except @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/118778") public void testLookupJoinFieldLoadingDropAllFields() throws Exception { - assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("Requires LOOKUP JOIN", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); TestDataSource data = dataSetWithLookupIndices(Map.of("lookup_index", List.of("first_name", "foo", "bar", "baz"))); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java index 60bdf4e7f73d..b344bd6b6325 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java @@ -1365,7 +1365,7 @@ public void testMetrics() { } public void testLookupJoin() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); assertFieldNames( "FROM employees | KEEP languages | RENAME languages AS language_code | LOOKUP JOIN languages_lookup ON language_code", Set.of("languages", "languages.*", "language_code", "language_code.*"), @@ -1374,7 +1374,7 @@ public void testLookupJoin() { } public void testLookupJoinKeep() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); assertFieldNames( """ FROM employees @@ -1388,7 +1388,7 @@ public void testLookupJoinKeep() { } public void testLookupJoinKeepWildcard() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); assertFieldNames( """ FROM employees @@ -1402,7 +1402,7 @@ public void testLookupJoinKeepWildcard() { } public void testMultiLookupJoin() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); assertFieldNames( """ FROM sample_data @@ -1415,7 +1415,7 @@ public void testMultiLookupJoin() { } public void testMultiLookupJoinKeepBefore() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); assertFieldNames( """ FROM sample_data @@ -1429,7 +1429,7 @@ public void testMultiLookupJoinKeepBefore() { } public void testMultiLookupJoinKeepBetween() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); assertFieldNames( """ FROM sample_data @@ -1454,7 +1454,7 @@ public void testMultiLookupJoinKeepBetween() { } public void testMultiLookupJoinKeepAfter() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); assertFieldNames( """ FROM sample_data @@ -1481,7 +1481,7 @@ public void testMultiLookupJoinKeepAfter() { } public void testMultiLookupJoinKeepAfterWildcard() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); assertFieldNames( """ FROM sample_data @@ -1495,7 +1495,7 @@ public void testMultiLookupJoinKeepAfterWildcard() { } public void testMultiLookupJoinSameIndex() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); assertFieldNames( """ FROM sample_data @@ -1509,7 +1509,7 @@ public void testMultiLookupJoinSameIndex() { } public void testMultiLookupJoinSameIndexKeepBefore() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); assertFieldNames( """ FROM sample_data @@ -1524,7 +1524,7 @@ public void testMultiLookupJoinSameIndexKeepBefore() { } public void testMultiLookupJoinSameIndexKeepBetween() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); assertFieldNames( """ FROM sample_data @@ -1550,7 +1550,7 @@ public void testMultiLookupJoinSameIndexKeepBetween() { } public void testMultiLookupJoinSameIndexKeepAfter() { - assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V8.isEnabled()); + assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V9.isEnabled()); assertFieldNames( """ FROM sample_data diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml index 5b39f74de1b9..57d2dac23026 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml @@ -6,7 +6,7 @@ setup: - method: POST path: /_query parameters: [] - capabilities: [join_lookup_v8] + capabilities: [join_lookup_v9] reason: "uses LOOKUP JOIN" - do: indices.create: @@ -32,6 +32,13 @@ setup: type: long color: type: keyword + - do: + indices.update_aliases: + body: + actions: + - add: + index: test-lookup + alias: test-lookup-alias - do: bulk: index: "test" @@ -75,3 +82,45 @@ non-lookup index: - match: { error.type: "verification_exception" } - contains: { error.reason: "Found 1 problem\nline 1:43: invalid [test] resolution in lookup mode to an index in [standard] mode" } + +--- +alias: + - do: + esql.query: + body: + query: 'FROM test | SORT key | LOOKUP JOIN `test-lookup-alias` ON key | LIMIT 3' + + - match: {columns.0.name: "key"} + - match: {columns.0.type: "long"} + - match: {columns.1.name: "color"} + - match: {columns.1.type: "keyword"} + - match: {values.0: [1, "cyan"]} + - match: {values.1: [2, "yellow"]} + +--- +alias-repeated-alias: + - do: + esql.query: + body: + query: 'FROM test-lookup-alias | SORT key | LOOKUP JOIN `test-lookup-alias` ON key | LIMIT 3' + + - match: {columns.0.name: "key"} + - match: {columns.0.type: "long"} + - match: {columns.1.name: "color"} + - match: {columns.1.type: "keyword"} + - match: {values.0: [1, "cyan"]} + - match: {values.1: [2, "yellow"]} + +--- +alias-repeated-index: + - do: + esql.query: + body: + query: 'FROM test-lookup | SORT key | LOOKUP JOIN `test-lookup-alias` ON key | LIMIT 3' + + - match: {columns.0.name: "key"} + - match: {columns.0.type: "long"} + - match: {columns.1.name: "color"} + - match: {columns.1.type: "keyword"} + - match: {values.0: [1, "cyan"]} + - match: {values.1: [2, "yellow"]} From 7ba3cb9d0dc624f273f0cc8d58440992523de3cf Mon Sep 17 00:00:00 2001 From: Jonathan Buttner <56361221+jonathan-buttner@users.noreply.github.com> Date: Mon, 23 Dec 2024 07:57:52 -0500 Subject: [PATCH 8/9] [ML] Fix loss of context in the inference API for streaming APIs (#118999) * Adding context preserving fix * Update docs/changelog/118999.yaml * Update docs/changelog/118999.yaml * Using a setonce and adding a test * Updating the changelog --- docs/changelog/118999.yaml | 6 +++ .../inference/InferenceBaseRestTest.java | 39 ++++++++++++++----- .../xpack/inference/InferenceCrudIT.java | 14 +++++-- ...rverSentEventsRestActionListenerTests.java | 17 ++++++-- .../xpack/inference/InferencePlugin.java | 11 +++++- .../rest/RestStreamInferenceAction.java | 12 +++++- .../RestUnifiedCompletionInferenceAction.java | 16 +++++++- .../ServerSentEventsRestActionListener.java | 20 ++++++++-- .../rest/RestStreamInferenceActionTests.java | 13 ++++++- ...UnifiedCompletionInferenceActionTests.java | 12 +++++- 10 files changed, 134 insertions(+), 26 deletions(-) create mode 100644 docs/changelog/118999.yaml diff --git a/docs/changelog/118999.yaml b/docs/changelog/118999.yaml new file mode 100644 index 000000000000..0188cebbd768 --- /dev/null +++ b/docs/changelog/118999.yaml @@ -0,0 +1,6 @@ +pr: 118999 +summary: Fix loss of context in the inference API for streaming APIs +area: Machine Learning +type: bug +issues: + - 119000 diff --git a/x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/InferenceBaseRestTest.java b/x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/InferenceBaseRestTest.java index 5e6c4d53f4c5..cdc6d9b2dff5 100644 --- a/x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/InferenceBaseRestTest.java +++ b/x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/InferenceBaseRestTest.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; @@ -341,31 +342,44 @@ protected Map infer(String modelId, List input) throws I return inferInternal(endpoint, input, null, Map.of()); } - protected Deque streamInferOnMockService(String modelId, TaskType taskType, List input) throws Exception { + protected Deque streamInferOnMockService( + String modelId, + TaskType taskType, + List input, + @Nullable Consumer responseConsumerCallback + ) throws Exception { var endpoint = Strings.format("_inference/%s/%s/_stream", taskType, modelId); - return callAsync(endpoint, input); + return callAsync(endpoint, input, responseConsumerCallback); } - protected Deque unifiedCompletionInferOnMockService(String modelId, TaskType taskType, List input) - throws Exception { + protected Deque unifiedCompletionInferOnMockService( + String modelId, + TaskType taskType, + List input, + @Nullable Consumer responseConsumerCallback + ) throws Exception { var endpoint = Strings.format("_inference/%s/%s/_unified", taskType, modelId); - return callAsyncUnified(endpoint, input, "user"); + return callAsyncUnified(endpoint, input, "user", responseConsumerCallback); } - private Deque callAsync(String endpoint, List input) throws Exception { + private Deque callAsync(String endpoint, List input, @Nullable Consumer responseConsumerCallback) + throws Exception { var request = new Request("POST", endpoint); request.setJsonEntity(jsonBody(input, null)); - return execAsyncCall(request); + return execAsyncCall(request, responseConsumerCallback); } - private Deque execAsyncCall(Request request) throws Exception { + private Deque execAsyncCall(Request request, @Nullable Consumer responseConsumerCallback) throws Exception { var responseConsumer = new AsyncInferenceResponseConsumer(); request.setOptions(RequestOptions.DEFAULT.toBuilder().setHttpAsyncResponseConsumerFactory(() -> responseConsumer).build()); var latch = new CountDownLatch(1); client().performRequestAsync(request, new ResponseListener() { @Override public void onSuccess(Response response) { + if (responseConsumerCallback != null) { + responseConsumerCallback.accept(response); + } latch.countDown(); } @@ -378,11 +392,16 @@ public void onFailure(Exception exception) { return responseConsumer.events(); } - private Deque callAsyncUnified(String endpoint, List input, String role) throws Exception { + private Deque callAsyncUnified( + String endpoint, + List input, + String role, + @Nullable Consumer responseConsumerCallback + ) throws Exception { var request = new Request("POST", endpoint); request.setJsonEntity(createUnifiedJsonBody(input, role)); - return execAsyncCall(request); + return execAsyncCall(request, responseConsumerCallback); } private String createUnifiedJsonBody(List input, String role) throws IOException { diff --git a/x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/InferenceCrudIT.java b/x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/InferenceCrudIT.java index fc593a6a8b0f..49fce930cd72 100644 --- a/x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/InferenceCrudIT.java +++ b/x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/InferenceCrudIT.java @@ -10,6 +10,7 @@ package org.elasticsearch.xpack.inference; import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -28,6 +29,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -37,9 +39,15 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalToIgnoringCase; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; public class InferenceCrudIT extends InferenceBaseRestTest { + private static final Consumer VALIDATE_ELASTIC_PRODUCT_HEADER_CONSUMER = (r) -> assertThat( + r.getHeader("X-elastic-product"), + is("Elasticsearch") + ); + @SuppressWarnings("unchecked") public void testCRUD() throws IOException { for (int i = 0; i < 5; i++) { @@ -442,7 +450,7 @@ public void testUnsupportedStream() throws Exception { assertEquals(TaskType.SPARSE_EMBEDDING.toString(), singleModel.get("task_type")); try { - var events = streamInferOnMockService(modelId, TaskType.SPARSE_EMBEDDING, List.of(randomUUID())); + var events = streamInferOnMockService(modelId, TaskType.SPARSE_EMBEDDING, List.of(randomUUID()), null); assertThat(events.size(), equalTo(2)); events.forEach(event -> { switch (event.name()) { @@ -469,7 +477,7 @@ public void testSupportedStream() throws Exception { var input = IntStream.range(1, 2 + randomInt(8)).mapToObj(i -> randomAlphanumericOfLength(5)).toList(); try { - var events = streamInferOnMockService(modelId, TaskType.COMPLETION, input); + var events = streamInferOnMockService(modelId, TaskType.COMPLETION, input, VALIDATE_ELASTIC_PRODUCT_HEADER_CONSUMER); var expectedResponses = Stream.concat( input.stream().map(s -> s.toUpperCase(Locale.ROOT)).map(str -> "{\"completion\":[{\"delta\":\"" + str + "\"}]}"), @@ -496,7 +504,7 @@ public void testUnifiedCompletionInference() throws Exception { var input = IntStream.range(1, 2 + randomInt(8)).mapToObj(i -> randomAlphanumericOfLength(5)).toList(); try { - var events = unifiedCompletionInferOnMockService(modelId, TaskType.COMPLETION, input); + var events = unifiedCompletionInferOnMockService(modelId, TaskType.COMPLETION, input, VALIDATE_ELASTIC_PRODUCT_HEADER_CONSUMER); var expectedResponses = expectedResultsIterator(input); assertThat(events.size(), equalTo((input.size() + 1) * 2)); events.forEach(event -> { diff --git a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListenerTests.java b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListenerTests.java index ab3f466f3c11..b993cf36cb87 100644 --- a/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListenerTests.java +++ b/x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListenerTests.java @@ -17,6 +17,7 @@ import org.apache.http.nio.util.SimpleInputBuffer; import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; @@ -43,6 +44,7 @@ import org.elasticsearch.rest.RestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xpack.core.inference.action.InferenceAction; import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEvent; @@ -52,6 +54,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.Collections; import java.util.Deque; import java.util.Iterator; import java.util.List; @@ -96,6 +99,14 @@ protected Collection> nodePlugins() { } public static class StreamingPlugin extends Plugin implements ActionPlugin { + private final SetOnce threadPool = new SetOnce<>(); + + @Override + public Collection createComponents(PluginServices services) { + threadPool.set(services.threadPool()); + return Collections.emptyList(); + } + @Override public Collection getRestHandlers( Settings settings, @@ -122,7 +133,7 @@ public void handleRequest(RestRequest request, RestChannel channel, NodeClient c var publisher = new RandomPublisher(requestCount, withError); var inferenceServiceResults = new StreamingInferenceServiceResults(publisher); var inferenceResponse = new InferenceAction.Response(inferenceServiceResults, inferenceServiceResults.publisher()); - new ServerSentEventsRestActionListener(channel).onResponse(inferenceResponse); + new ServerSentEventsRestActionListener(channel, threadPool).onResponse(inferenceResponse); } }, new RestHandler() { @Override @@ -132,7 +143,7 @@ public List routes() { @Override public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) { - new ServerSentEventsRestActionListener(channel).onFailure(expectedException); + new ServerSentEventsRestActionListener(channel, threadPool).onFailure(expectedException); } }, new RestHandler() { @Override @@ -143,7 +154,7 @@ public List routes() { @Override public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) { var inferenceResponse = new InferenceAction.Response(new SingleInferenceServiceResults()); - new ServerSentEventsRestActionListener(channel).onResponse(inferenceResponse); + new ServerSentEventsRestActionListener(channel, threadPool).onResponse(inferenceResponse); } }); } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java index 72fa840ad19b..f98a7ebdee34 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java @@ -43,6 +43,7 @@ import org.elasticsearch.search.rank.RankDoc; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ScalingExecutorBuilder; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; @@ -154,6 +155,9 @@ public class InferencePlugin extends Plugin implements ActionPlugin, ExtensibleP private final SetOnce httpFactory = new SetOnce<>(); private final SetOnce amazonBedrockFactory = new SetOnce<>(); private final SetOnce serviceComponents = new SetOnce<>(); + // This is mainly so that the rest handlers can access the ThreadPool in a way that avoids potential null pointers from it + // not being initialized yet + private final SetOnce threadPoolSetOnce = new SetOnce<>(); private final SetOnce elasticInferenceServiceComponents = new SetOnce<>(); private final SetOnce inferenceServiceRegistry = new SetOnce<>(); private final SetOnce shardBulkInferenceActionFilter = new SetOnce<>(); @@ -197,9 +201,11 @@ public List getRestHandlers( Supplier nodesInCluster, Predicate clusterSupportsFeature ) { + assert serviceComponents.get() != null : "serviceComponents must be set before retrieving the rest handlers"; + var availableRestActions = List.of( new RestInferenceAction(), - new RestStreamInferenceAction(), + new RestStreamInferenceAction(threadPoolSetOnce), new RestGetInferenceModelAction(), new RestPutInferenceModelAction(), new RestUpdateInferenceModelAction(), @@ -208,7 +214,7 @@ public List getRestHandlers( new RestGetInferenceServicesAction() ); List conditionalRestActions = UnifiedCompletionFeature.UNIFIED_COMPLETION_FEATURE_FLAG.isEnabled() - ? List.of(new RestUnifiedCompletionInferenceAction()) + ? List.of(new RestUnifiedCompletionInferenceAction(threadPoolSetOnce)) : List.of(); return Stream.concat(availableRestActions.stream(), conditionalRestActions.stream()).toList(); @@ -219,6 +225,7 @@ public Collection createComponents(PluginServices services) { var throttlerManager = new ThrottlerManager(settings, services.threadPool(), services.clusterService()); var truncator = new Truncator(settings, services.clusterService()); serviceComponents.set(new ServiceComponents(services.threadPool(), throttlerManager, settings, truncator)); + threadPoolSetOnce.set(services.threadPool()); var httpClientManager = HttpClientManager.create(settings, services.threadPool(), services.clusterService(), throttlerManager); var httpRequestSenderFactory = new HttpRequestSender.Factory(serviceComponents.get(), httpClientManager, services.clusterService()); diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/RestStreamInferenceAction.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/RestStreamInferenceAction.java index 875c288da52b..881af435b29b 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/RestStreamInferenceAction.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/RestStreamInferenceAction.java @@ -7,13 +7,16 @@ package org.elasticsearch.xpack.inference.rest; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.inference.action.InferenceAction; import java.util.List; +import java.util.Objects; import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.xpack.inference.rest.Paths.STREAM_INFERENCE_ID_PATH; @@ -21,6 +24,13 @@ @ServerlessScope(Scope.PUBLIC) public class RestStreamInferenceAction extends BaseInferenceAction { + private final SetOnce threadPool; + + public RestStreamInferenceAction(SetOnce threadPool) { + super(); + this.threadPool = Objects.requireNonNull(threadPool); + } + @Override public String getName() { return "stream_inference_action"; @@ -38,6 +48,6 @@ protected InferenceAction.Request prepareInferenceRequest(InferenceAction.Reques @Override protected ActionListener listener(RestChannel channel) { - return new ServerSentEventsRestActionListener(channel); + return new ServerSentEventsRestActionListener(channel, threadPool); } } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/RestUnifiedCompletionInferenceAction.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/RestUnifiedCompletionInferenceAction.java index 5c71b560a6b9..51f1bc48c830 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/RestUnifiedCompletionInferenceAction.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/RestUnifiedCompletionInferenceAction.java @@ -7,15 +7,18 @@ package org.elasticsearch.xpack.inference.rest; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.inference.action.UnifiedCompletionAction; import java.io.IOException; import java.util.List; +import java.util.Objects; import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.xpack.inference.rest.Paths.UNIFIED_INFERENCE_ID_PATH; @@ -23,6 +26,13 @@ @ServerlessScope(Scope.PUBLIC) public class RestUnifiedCompletionInferenceAction extends BaseRestHandler { + private final SetOnce threadPool; + + public RestUnifiedCompletionInferenceAction(SetOnce threadPool) { + super(); + this.threadPool = Objects.requireNonNull(threadPool); + } + @Override public String getName() { return "unified_inference_action"; @@ -44,6 +54,10 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient request = UnifiedCompletionAction.Request.parseRequest(params.inferenceEntityId(), params.taskType(), inferTimeout, parser); } - return channel -> client.execute(UnifiedCompletionAction.INSTANCE, request, new ServerSentEventsRestActionListener(channel)); + return channel -> client.execute( + UnifiedCompletionAction.INSTANCE, + request, + new ServerSentEventsRestActionListener(channel, threadPool) + ); } } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListener.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListener.java index bf94f072b6e0..042c8b8a8346 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListener.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListener.java @@ -10,9 +10,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.BytesStream; @@ -29,6 +31,7 @@ import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.inference.action.InferenceAction; @@ -38,6 +41,7 @@ import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.Map; +import java.util.Objects; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicBoolean; @@ -55,6 +59,7 @@ public class ServerSentEventsRestActionListener implements ActionListener threadPool; /** * A listener for the first part of the next entry to become available for transmission. @@ -66,13 +71,14 @@ public class ServerSentEventsRestActionListener implements ActionListener nextBodyPartListener; - public ServerSentEventsRestActionListener(RestChannel channel) { - this(channel, channel.request()); + public ServerSentEventsRestActionListener(RestChannel channel, SetOnce threadPool) { + this(channel, channel.request(), threadPool); } - public ServerSentEventsRestActionListener(RestChannel channel, ToXContent.Params params) { + public ServerSentEventsRestActionListener(RestChannel channel, ToXContent.Params params, SetOnce threadPool) { this.channel = channel; this.params = params; + this.threadPool = Objects.requireNonNull(threadPool); } @Override @@ -99,7 +105,7 @@ protected void ensureOpen() { } private void initializeStream(InferenceAction.Response response) { - nextBodyPartListener = ActionListener.wrap(bodyPart -> { + ActionListener chunkedResponseBodyActionListener = ActionListener.wrap(bodyPart -> { // this is the first response, so we need to send the RestResponse to open the stream // all subsequent bytes will be delivered through the nextBodyPartListener channel.sendResponse(RestResponse.chunked(RestStatus.OK, bodyPart, this::release)); @@ -115,6 +121,12 @@ private void initializeStream(InferenceAction.Response response) { ) ); }); + + nextBodyPartListener = ContextPreservingActionListener.wrapPreservingContext( + chunkedResponseBodyActionListener, + threadPool.get().getThreadContext() + ); + // subscribe will call onSubscribe, which requests the first chunk response.publisher().subscribe(subscriber); } diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/rest/RestStreamInferenceActionTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/rest/RestStreamInferenceActionTests.java index b999e2c9b72f..f67680ef6b62 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/rest/RestStreamInferenceActionTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/rest/RestStreamInferenceActionTests.java @@ -12,8 +12,11 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.test.rest.RestActionTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.inference.action.InferenceAction; +import org.junit.After; import org.junit.Before; import static org.elasticsearch.xpack.inference.rest.BaseInferenceActionTests.createResponse; @@ -22,10 +25,18 @@ import static org.hamcrest.Matchers.instanceOf; public class RestStreamInferenceActionTests extends RestActionTestCase { + private final SetOnce threadPool = new SetOnce<>(); @Before public void setUpAction() { - controller().registerHandler(new RestStreamInferenceAction()); + threadPool.set(new TestThreadPool(getTestName())); + controller().registerHandler(new RestStreamInferenceAction(threadPool)); + } + + @After + public void tearDownAction() { + terminate(threadPool.get()); + } public void testStreamIsTrue() { diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/rest/RestUnifiedCompletionInferenceActionTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/rest/RestUnifiedCompletionInferenceActionTests.java index 5acfe67b175d..9dc23c890c14 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/rest/RestUnifiedCompletionInferenceActionTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/rest/RestUnifiedCompletionInferenceActionTests.java @@ -17,8 +17,11 @@ import org.elasticsearch.rest.RestResponse; import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.test.rest.RestActionTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.inference.action.UnifiedCompletionAction; +import org.junit.After; import org.junit.Before; import static org.elasticsearch.xpack.inference.rest.BaseInferenceActionTests.createResponse; @@ -27,10 +30,17 @@ import static org.hamcrest.Matchers.instanceOf; public class RestUnifiedCompletionInferenceActionTests extends RestActionTestCase { + private final SetOnce threadPool = new SetOnce<>(); @Before public void setUpAction() { - controller().registerHandler(new RestUnifiedCompletionInferenceAction()); + threadPool.set(new TestThreadPool(getTestName())); + controller().registerHandler(new RestUnifiedCompletionInferenceAction(threadPool)); + } + + @After + public void tearDownAction() { + terminate(threadPool.get()); } public void testStreamIsTrue() { From edb3818ecc0ff0c34a63dcac533f51cfee4c4443 Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Mon, 23 Dec 2024 15:16:35 +0100 Subject: [PATCH 9/9] Disable `TO_UPPER(null)`-tests prior to 8.17 (#119213) TO_UPPER/TO_LOWER resolution incorrectly returned child's type (that could also be `null`, type `NULL`), instead of KEYWORD/TEXT. So a test like `TO_UPPER(null) == "..."` fails on type mismatch. This was fixed collaterally by #114334 (8.17.0) Also, correct some of the tests skipping (that had however no impact, due to testing range). --- .../src/main/resources/string.csv-spec | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec index 5b0cccc1ed43..0ecff0e229ef 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec @@ -1231,7 +1231,7 @@ a:keyword | upper:keyword | lower:keyword π/2 + a + B + Λ ºC | Π/2 + A + B + Λ ºC | π/2 + a + b + λ ºc ; -equalsToUpperPushedDown[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] +equalsToUpperPushedDown#[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] from employees | where to_upper(first_name) == "GEORGI" | keep emp_no, first_name @@ -1241,7 +1241,7 @@ emp_no:integer | first_name:keyword 10001 | Georgi ; -equalsToUpperNestedPushedDown[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] +equalsToUpperNestedPushedDown#[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] from employees | where to_upper(to_upper(to_lower(first_name))) == "GEORGI" | keep emp_no, first_name @@ -1251,7 +1251,7 @@ emp_no:integer | first_name:keyword 10001 | Georgi ; -negatedEqualsToUpperPushedDown[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] +negatedEqualsToUpperPushedDown#[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] from employees | sort emp_no | where not(to_upper(first_name) == "GEORGI") @@ -1263,7 +1263,7 @@ emp_no:integer | first_name:keyword 10002 | Bezalel ; -notEqualsToUpperPushedDown[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] +notEqualsToUpperPushedDown#[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] from employees | sort emp_no | where to_upper(first_name) != "GEORGI" @@ -1275,7 +1275,7 @@ emp_no:integer | first_name:keyword 10002 | Bezalel ; -negatedNotEqualsToUpperPushedDown[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] +negatedNotEqualsToUpperPushedDown#[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] from employees | sort emp_no | where not(to_upper(first_name) != "GEORGI") @@ -1306,7 +1306,7 @@ c:long 90 ; -equalsToUpperNullFolded +equalsToUpperNullFolded#[skip:-8.16.99, reason:function's type corrected in #114334] from employees | where to_upper(null) == "Georgi" | keep emp_no, first_name @@ -1324,7 +1324,7 @@ from employees emp_no:integer | first_name:keyword ; -notEqualsToUpperNullFolded +notEqualsToUpperNullFolded#[skip:-8.16.99, reason:function's type corrected in #114334] from employees | where to_upper(null) != "Georgi" | keep emp_no, first_name @@ -1362,7 +1362,7 @@ c:long 0 ; -equalsToLowerPushedDown[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] +equalsToLowerPushedDown#[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] from employees | where to_lower(first_name) == "georgi" | keep emp_no, first_name @@ -1372,7 +1372,7 @@ emp_no:integer | first_name:keyword 10001 | Georgi ; -notEqualsToLowerPushedDown[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] +notEqualsToLowerPushedDown#[skip:-8.12.99, reason:case insensitive operators implemented in v 8.13] from employees | sort emp_no | where to_lower(first_name) != "georgi"