Skip to content

Commit

Permalink
Fixing remote ENRICH by pushing the Enrich inside FragmentExec
Browse files Browse the repository at this point in the history
  • Loading branch information
smalyshev committed Oct 11, 2024
1 parent 693fb95 commit a478b6a
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -470,26 +471,72 @@ public void testEnrichRemoteWithVendor() {
}

public void testTopNThenEnrichRemote() {
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
Boolean requestIncludeMeta = includeCCSMetadata.v1();
boolean responseExpectMeta = includeCCSMetadata.v2();

String query = String.format(Locale.ROOT, """
FROM *:events,events
| eval ip= TO_STR(host)
| SORT ip
| SORT timestamp, user, ip
| LIMIT 5
| %s
| %s | KEEP host, timestamp, user, os
""", enrichHosts(Enrich.Mode.REMOTE));
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after LIMIT"));
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
assertThat(
getValuesList(resp),
equalTo(
List.of(
List.of("192.168.1.2", 1L, "andres", "Windows"),
List.of("192.168.1.3", 1L, "matthew", "MacOS"),
Arrays.asList("192.168.1.25", 1L, "park", (String) null),
List.of("192.168.1.5", 2L, "akio", "Android"),
List.of("192.168.1.6", 2L, "sergio", "iOS")
)
)
);
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
assertCCSExecutionInfoDetails(executionInfo);
}
}

public void testLimitThenEnrichRemote() {
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
Boolean requestIncludeMeta = includeCCSMetadata.v1();
boolean responseExpectMeta = includeCCSMetadata.v2();

String query = String.format(Locale.ROOT, """
FROM *:events,events
| LIMIT 10
| LIMIT 25
| eval ip= TO_STR(host)
| %s
| %s | KEEP host, timestamp, user, ip, os
""", enrichHosts(Enrich.Mode.REMOTE));
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after LIMIT"));
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
var values = getValuesList(resp);
values.sort(
Comparator.comparingLong((List<Object> o) -> (Long) o.get(1))
.thenComparing(o -> (String) o.get(0))
.thenComparing(o -> (String) o.get(2))
);
assertThat(
values.subList(0, 5),
equalTo(
List.of(
List.of("192.168.1.2", 1L, "andres", "192.168.1.2", "Windows"),
Arrays.asList("192.168.1.25", 1L, "park", (String) null, (String) null),
List.of("192.168.1.3", 1L, "matthew", "192.168.1.3", "MacOS"),
List.of("192.168.1.5", 2L, "akio", "192.168.1.5", "Android"),
List.of("192.168.1.5", 2L, "simon", "192.168.1.5", "Android")
)
)
);
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
assertCCSExecutionInfoDetails(executionInfo);
}
}

public void testAggThenEnrichRemote() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Lookup;
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
Expand Down Expand Up @@ -574,22 +573,15 @@ private static void checkForSortOnSpatialTypes(LogicalPlan p, Set<Failure> local
*/
private static void checkRemoteEnrich(LogicalPlan plan, Set<Failure> failures) {
boolean[] agg = { false };
boolean[] limit = { false };
boolean[] enrichCoord = { false };

plan.forEachUp(UnaryPlan.class, u -> {
if (u instanceof Limit) {
limit[0] = true; // TODO: Make Limit then enrich_remote work
}
if (u instanceof Aggregate) {
agg[0] = true;
} else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
enrichCoord[0] = true;
}
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
if (limit[0]) {
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LIMIT"));
}
if (agg[0]) {
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* <p>This class is part of the planner</p>
Expand Down Expand Up @@ -104,6 +105,27 @@ public PhysicalPlan map(LogicalPlan p) {
//
// Unary Plan
//
if (localMode == false && p instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
var child = map(enrich.child());
AtomicBoolean hasFragment = new AtomicBoolean(false);
var childWithEnrich = child.transformDown(FragmentExec.class, (f) -> {
// var enrich2 = new Enrich(
// enrich.source(),
// f.fragment(),
// enrich.mode(),
// enrich.policyName(),
// enrich.matchField(),
// enrich.policy(),
// enrich.concreteIndices(),
// enrich.enrichFields()
// );
hasFragment.set(true);
return new FragmentExec(p);
});
if (hasFragment.get()) {
return childWithEnrich;
}
}

if (p instanceof UnaryPlan ua) {
var child = map(ua.child());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5841,14 +5841,15 @@ public void testEnrichBeforeLimit() {
| EVAL employee_id = to_str(emp_no)
| ENRICH _remote:departments
| LIMIT 10""");
var enrich = as(plan, EnrichExec.class);
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
var eval = as(enrich.child(), EvalExec.class);
var eval = as(plan, EvalExec.class);
var finalLimit = as(eval.child(), LimitExec.class);
var exchange = as(finalLimit.child(), ExchangeExec.class);
var fragment = as(exchange.child(), FragmentExec.class);
var partialLimit = as(fragment.fragment(), Limit.class);
var enrich = as(fragment.fragment(), Enrich.class);
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
var evalFragment = as(enrich.child(), Eval.class);
var partialLimit = as(evalFragment.child(), Limit.class);
as(partialLimit.child(), EsRelation.class);
}
}
Expand Down Expand Up @@ -5891,13 +5892,22 @@ public void testLimitThenEnrich() {
}

public void testLimitThenEnrichRemote() {
var error = expectThrows(VerificationException.class, () -> physicalPlan("""
var plan = physicalPlan("""
FROM test
| LIMIT 10
| EVAL employee_id = to_str(emp_no)
| ENRICH _remote:departments
"""));
assertThat(error.getMessage(), containsString("line 4:3: ENRICH with remote policy can't be executed after LIMIT"));
""");
var eval = as(plan, EvalExec.class);
var finalLimit = as(eval.child(), LimitExec.class);
var exchange = as(finalLimit.child(), ExchangeExec.class);
var fragment = as(exchange.child(), FragmentExec.class);
var enrich = as(fragment.fragment(), Enrich.class);
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
var evalFragment = as(enrich.child(), Eval.class);
var partialLimit = as(evalFragment.child(), Limit.class);
as(partialLimit.child(), EsRelation.class);
}

public void testEnrichBeforeTopN() {
Expand Down Expand Up @@ -5951,6 +5961,23 @@ public void testEnrichBeforeTopN() {
var eval = as(enrich.child(), Eval.class);
as(eval.child(), EsRelation.class);
}
{
var plan = physicalPlan("""
FROM test
| EVAL employee_id = to_str(emp_no)
| ENRICH _remote:departments
| SORT department
| LIMIT 10""");
var topN = as(plan, TopNExec.class);
var exchange = as(topN.child(), ExchangeExec.class);
var fragment = as(exchange.child(), FragmentExec.class);
var partialTopN = as(fragment.fragment(), TopN.class);
var enrich = as(partialTopN.child(), Enrich.class);
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
var eval = as(enrich.child(), Eval.class);
as(eval.child(), EsRelation.class);
}
}

public void testEnrichAfterTopN() {
Expand Down Expand Up @@ -5990,6 +6017,25 @@ public void testEnrichAfterTopN() {
var partialTopN = as(fragment.fragment(), TopN.class);
as(partialTopN.child(), EsRelation.class);
}
{
var plan = physicalPlan("""
FROM test
| SORT emp_no
| LIMIT 10
| EVAL employee_id = to_str(emp_no)
| ENRICH _remote:departments
""");
var eval = as(plan, EvalExec.class);
var topN = as(eval.child(), TopNExec.class);
var exchange = as(topN.child(), ExchangeExec.class);
var fragment = as(exchange.child(), FragmentExec.class);
var enrich = as(fragment.fragment(), Enrich.class);
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
var evalFragment = as(enrich.child(), Eval.class);
var partialTopN = as(evalFragment.child(), TopN.class);
as(partialTopN.child(), EsRelation.class);
}
}

public void testManyEnrich() {
Expand Down

0 comments on commit a478b6a

Please sign in to comment.