Skip to content

Commit

Permalink
Improve handling of more complex cases such as several enriches
Browse files Browse the repository at this point in the history
  • Loading branch information
smalyshev committed Oct 22, 2024
1 parent 38ef243 commit cd1c981
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,45 @@ public void testEnrichRemoteWithVendor() {
}
}

public void testEnrichRemoteWithVendorNoSort() {
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
Boolean requestIncludeMeta = includeCCSMetadata.v1();
boolean responseExpectMeta = includeCCSMetadata.v2();

for (Enrich.Mode hostMode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) {
var query = String.format(Locale.ROOT, """
FROM *:events,events
| LIMIT 100
| eval ip= TO_STR(host)
| %s
| %s
| stats c = COUNT(*) by vendor
""", enrichHosts(hostMode), enrichVendors(Enrich.Mode.REMOTE));
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
var values = getValuesList(resp);
values.sort(Comparator.comparing(o -> (String) o.get(1), Comparator.nullsLast(Comparator.naturalOrder())));
assertThat(
values,
equalTo(
List.of(
List.of(6L, "Apple"),
List.of(7L, "Microsoft"),
List.of(1L, "Redhat"),
List.of(2L, "Samsung"),
List.of(1L, "Sony"),
List.of(2L, "Suse"),
Arrays.asList(3L, (String) null)
)
)
);
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
assertCCSExecutionInfoDetails(executionInfo);
}
}
}

public void testTopNThenEnrichRemote() {
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
Boolean requestIncludeMeta = includeCCSMetadata.v1();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Lookup;
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.xpack.esql.plan.physical.RowExec;
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -106,24 +107,44 @@ public PhysicalPlan map(LogicalPlan p) {
// Unary Plan
//
if (localMode == false && p instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
// When we have remote enrich, we want to put it under FragmentExec, so it would be executed remotely.
// We're only going to do it on the coordinator node.
// The way we're going to do it is as follows:
// 1. Locate FragmentExec in the tree. If we have no FragmentExec, we won't do anything.
// 2. Put this Enrich under it, removing everything that was below it previously.
// 3. Above FragmentExec, we should deal with pipeline breakers, since pipeline ops already are supposed to go under
// FragmentExec.
// 4. Aggregates can't appear here since the plan should have errored out if we have aggregate inside remote Enrich.
// 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway).

var child = map(enrich.child());
AtomicBoolean hasFragment = new AtomicBoolean(false);
var childWithEnrich = child.transformDown(FragmentExec.class, (f) -> {
// var enrich2 = new Enrich(
// enrich.source(),
// f.fragment(),
// enrich.mode(),
// enrich.policyName(),
// enrich.matchField(),
// enrich.policy(),
// enrich.concreteIndices(),
// enrich.enrichFields()
// );
hasFragment.set(true);
return new FragmentExec(p);

var childTransformed = child.transformUp((f) -> {
// Once we reached FragmentExec, we stuff our Enrich under it
if (f instanceof FragmentExec) {
hasFragment.set(true);
return new FragmentExec(p);
}
if (f instanceof EnrichExec enrichExec) {
// It can only be ANY because COORDINATOR would have errored out earlier, and REMOTE should be under FragmentExec
assert enrichExec.mode() == Enrich.Mode.ANY : "enrich must be in ANY mode here";
return enrichExec.child();
}
// TODO: does this handle HashJoin correctly? Are we allowed to push hash joins inside FragmentExec?
if (f instanceof UnaryExec unaryExec) {
if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof OrderExec || f instanceof TopNExec) {
return f;
} else {
return unaryExec.child();
}
}
// Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it.
return f;
});

if (hasFragment.get()) {
return childWithEnrich;
return childTransformed;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

// @TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug")
// @TestLogging(value = "org.elasticsearch.xpack.esql:DEBUG", reason = "debug")
public class PhysicalPlanOptimizerTests extends ESTestCase {

private static final String PARAM_FORMATTING = "%1$s";
Expand Down Expand Up @@ -5851,8 +5851,7 @@ public void testEnrichBeforeLimit() {
| EVAL employee_id = to_str(emp_no)
| ENRICH _remote:departments
| LIMIT 10""");
var eval = as(plan, EvalExec.class);
var finalLimit = as(eval.child(), LimitExec.class);
var finalLimit = as(plan, LimitExec.class);
var exchange = as(finalLimit.child(), ExchangeExec.class);
var fragment = as(exchange.child(), FragmentExec.class);
var enrich = as(fragment.fragment(), Enrich.class);
Expand Down Expand Up @@ -5908,8 +5907,7 @@ public void testLimitThenEnrichRemote() {
| EVAL employee_id = to_str(emp_no)
| ENRICH _remote:departments
""");
var eval = as(plan, EvalExec.class);
var finalLimit = as(eval.child(), LimitExec.class);
var finalLimit = as(plan, LimitExec.class);
var exchange = as(finalLimit.child(), ExchangeExec.class);
var fragment = as(exchange.child(), FragmentExec.class);
var enrich = as(fragment.fragment(), Enrich.class);
Expand Down Expand Up @@ -6035,8 +6033,7 @@ public void testEnrichAfterTopN() {
| EVAL employee_id = to_str(emp_no)
| ENRICH _remote:departments
""");
var eval = as(plan, EvalExec.class);
var topN = as(eval.child(), TopNExec.class);
var topN = as(plan, TopNExec.class);
var exchange = as(topN.child(), ExchangeExec.class);
var fragment = as(exchange.child(), FragmentExec.class);
var enrich = as(fragment.fragment(), Enrich.class);
Expand Down

0 comments on commit cd1c981

Please sign in to comment.