Skip to content

Commit

Permalink
Move node-level reduction plan to data node (elastic#117422) (elastic…
Browse files Browse the repository at this point in the history
…#117534)

This change moves the logic for extracting the node-level plan to the 
data node instead of the coordinator. There are several benefits to
doing this on the data node instead:

1. Minimize serialization, especially inter-cluster communications.

2. Resolve the row size estimation issue when generating this plan on 
data nodes. This will be addressed in a follow-up.

3. Allow each cluster to decide whether to run node-level reduction 
based on its own topology.
  • Loading branch information
dnhatn authored Nov 26, 2024
1 parent 8734387 commit 35d2873
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INGEST_PIPELINE_CONFIGURATION_AS_MAP = def(8_797_00_0);
public static final TransportVersion INDEXING_PRESSURE_THROTTLING_STATS = def(8_798_00_0);
public static final TransportVersion REINDEX_DATA_STREAMS = def(8_799_00_0);

public static final TransportVersion ESQL_REMOVE_NODE_LEVEL_PLAN = def(8_800_00_0);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ public PhysicalPlan apply(PhysicalPlan plan) {
Source.EMPTY,
new Project(logicalFragment.source(), logicalFragment, output),
fragmentExec.esFilter(),
fragmentExec.estimatedRowSize(),
fragmentExec.reducer()
fragmentExec.estimatedRowSize()
);
return new ExchangeExec(exec.source(), output, exec.inBetweenAggs(), newChild);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {

private final LogicalPlan fragment;
private final QueryBuilder esFilter;
private final PhysicalPlan reducer; // datanode-level physical plan node that performs an intermediate (not partial) reduce

/**
* Estimate of the number of bytes that'll be loaded per position before
Expand All @@ -40,35 +39,42 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
private final int estimatedRowSize;

public FragmentExec(LogicalPlan fragment) {
this(fragment.source(), fragment, null, 0, null);
this(fragment.source(), fragment, null, 0);
}

public FragmentExec(Source source, LogicalPlan fragment, QueryBuilder esFilter, int estimatedRowSize, PhysicalPlan reducer) {
public FragmentExec(Source source, LogicalPlan fragment, QueryBuilder esFilter, int estimatedRowSize) {
super(source);
this.fragment = fragment;
this.esFilter = esFilter;
this.estimatedRowSize = estimatedRowSize;
this.reducer = reducer;
}

private FragmentExec(StreamInput in) throws IOException {
this(
Source.readFrom((PlanStreamInput) in),
in.readNamedWriteable(LogicalPlan.class),
in.readOptionalNamedWriteable(QueryBuilder.class),
in.readOptionalVInt(),
in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readOptionalNamedWriteable(PhysicalPlan.class) : null
);
super(Source.readFrom((PlanStreamInput) in));
this.fragment = in.readNamedWriteable(LogicalPlan.class);
this.esFilter = in.readOptionalNamedWriteable(QueryBuilder.class);
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_REMOVE_NODE_LEVEL_PLAN)) {
this.estimatedRowSize = in.readVInt();
} else {
this.estimatedRowSize = Objects.requireNonNull(in.readOptionalVInt());
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
in.readOptionalNamedWriteable(PhysicalPlan.class); // for old reducer
}
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
Source.EMPTY.writeTo(out);
out.writeNamedWriteable(fragment());
out.writeOptionalNamedWriteable(esFilter());
out.writeOptionalVInt(estimatedRowSize());
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
out.writeOptionalNamedWriteable(reducer);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_REMOVE_NODE_LEVEL_PLAN)) {
out.writeVInt(estimatedRowSize);
} else {
out.writeOptionalVInt(estimatedRowSize());
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
out.writeOptionalNamedWriteable(null);// for old reducer
}
}
}

Expand All @@ -89,13 +95,9 @@ public Integer estimatedRowSize() {
return estimatedRowSize;
}

public PhysicalPlan reducer() {
return reducer;
}

@Override
protected NodeInfo<FragmentExec> info() {
return NodeInfo.create(this, FragmentExec::new, fragment, esFilter, estimatedRowSize, reducer);
return NodeInfo.create(this, FragmentExec::new, fragment, esFilter, estimatedRowSize);
}

@Override
Expand All @@ -108,24 +110,20 @@ public PhysicalPlan estimateRowSize(State state) {
int estimatedRowSize = state.consumeAllFields(false);
return Objects.equals(estimatedRowSize, this.estimatedRowSize)
? this
: new FragmentExec(source(), fragment, esFilter, estimatedRowSize, reducer);
: new FragmentExec(source(), fragment, esFilter, estimatedRowSize);
}

public FragmentExec withFragment(LogicalPlan fragment) {
return Objects.equals(fragment, this.fragment) ? this : new FragmentExec(source(), fragment, esFilter, estimatedRowSize, reducer);
return Objects.equals(fragment, this.fragment) ? this : new FragmentExec(source(), fragment, esFilter, estimatedRowSize);
}

public FragmentExec withFilter(QueryBuilder filter) {
return Objects.equals(filter, this.esFilter) ? this : new FragmentExec(source(), fragment, filter, estimatedRowSize, reducer);
}

public FragmentExec withReducer(PhysicalPlan reducer) {
return Objects.equals(reducer, this.reducer) ? this : new FragmentExec(source(), fragment, esFilter, estimatedRowSize, reducer);
return Objects.equals(filter, this.esFilter) ? this : new FragmentExec(source(), fragment, filter, estimatedRowSize);
}

@Override
public int hashCode() {
return Objects.hash(fragment, esFilter, estimatedRowSize, reducer);
return Objects.hash(fragment, esFilter, estimatedRowSize);
}

@Override
Expand All @@ -141,8 +139,7 @@ public boolean equals(Object obj) {
FragmentExec other = (FragmentExec) obj;
return Objects.equals(fragment, other.fragment)
&& Objects.equals(esFilter, other.esFilter)
&& Objects.equals(estimatedRowSize, other.estimatedRowSize)
&& Objects.equals(reducer, other.reducer);
&& Objects.equals(estimatedRowSize, other.estimatedRowSize);
}

@Override
Expand All @@ -154,7 +151,6 @@ public String nodeString() {
sb.append(", estimatedRowSize=");
sb.append(estimatedRowSize);
sb.append(", reducer=[");
sb.append(reducer == null ? "" : reducer.toString());
sb.append("], fragment=[<>\n");
sb.append(fragment.toString());
sb.append("<>]]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
Expand Down Expand Up @@ -314,14 +315,7 @@ private void startComputeOnDataNodes(
EsqlExecutionInfo executionInfo,
ComputeListener computeListener
) {
var planWithReducer = configuration.pragmas().nodeLevelReduction() == false
? dataNodePlan
: dataNodePlan.transformUp(FragmentExec.class, f -> {
PhysicalPlan reductionNode = PlannerUtils.dataNodeReductionPlan(f.fragment(), dataNodePlan);
return reductionNode == null ? f : f.withReducer(reductionNode);
});

QueryBuilder requestFilter = PlannerUtils.requestTimestampFilter(planWithReducer);
QueryBuilder requestFilter = PlannerUtils.requestTimestampFilter(dataNodePlan);
var lookupListener = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink());
// SearchShards API can_match is done in lookupDataNodes
lookupDataNodes(parentTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(dataNodeResult -> {
Expand Down Expand Up @@ -361,7 +355,7 @@ private void startComputeOnDataNodes(
clusterAlias,
node.shardIds,
node.aliasFilters,
planWithReducer,
dataNodePlan,
originalIndices.indices(),
originalIndices.indicesOptions()
),
Expand Down Expand Up @@ -450,12 +444,12 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan,
);

LOGGER.debug("Received physical plan:\n{}", plan);

plan = PlannerUtils.localPlan(context.searchExecutionContexts(), context.configuration, plan);
// the planner will also set the driver parallelism in LocalExecutionPlanner.LocalExecutionPlan (used down below)
// it's doing this in the planning of EsQueryExec (the source of the data)
// see also EsPhysicalOperationProviders.sourcePhysicalOperation
LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(plan);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Local execution plan:\n{}", localExecutionPlan.describe());
}
Expand Down Expand Up @@ -785,14 +779,23 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
listener.onFailure(new IllegalStateException("expected a fragment plan for a remote compute; got " + request.plan()));
return;
}

var localExchangeSource = new ExchangeSourceExec(plan.source(), plan.output(), plan.isIntermediateAgg());
FragmentExec fragment = (FragmentExec) fragments.get(0);
Holder<PhysicalPlan> reducePlanHolder = new Holder<>();
if (request.pragmas().nodeLevelReduction()) {
PhysicalPlan dataNodePlan = request.plan();
request.plan()
.forEachUp(
FragmentExec.class,
f -> { reducePlanHolder.set(PlannerUtils.dataNodeReductionPlan(f.fragment(), dataNodePlan)); }
);
}
reducePlan = new ExchangeSinkExec(
plan.source(),
plan.output(),
plan.isIntermediateAgg(),
fragment.reducer() != null ? fragment.reducer().replaceChildren(List.of(localExchangeSource)) : localExchangeSource
reducePlanHolder.get() != null
? reducePlanHolder.get().replaceChildren(List.of(localExchangeSource))
: localExchangeSource
);
} else {
listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + request.plan()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ protected boolean alwaysEmptySource() {
* See {@link #testManyTypeConflicts(boolean, ByteSizeValue)} for more.
*/
public void testManyTypeConflicts() throws IOException {
testManyTypeConflicts(false, ByteSizeValue.ofBytes(1424048));
testManyTypeConflicts(false, ByteSizeValue.ofBytes(1424046L));
/*
* History:
* 2.3mb - shorten error messages for UnsupportedAttributes #111973
* 1.8mb - cache EsFields #112008
* 1.4mb - string serialization #112929
* 1424046b - remove node-level plan #117422
*/
}

Expand All @@ -80,7 +81,7 @@ public void testManyTypeConflicts() throws IOException {
* See {@link #testManyTypeConflicts(boolean, ByteSizeValue)} for more.
*/
public void testManyTypeConflictsWithParent() throws IOException {
testManyTypeConflicts(true, ByteSizeValue.ofBytes(2774192));
testManyTypeConflicts(true, ByteSizeValue.ofBytes(2774190));
/*
* History:
* 2 gb+ - start
Expand All @@ -89,6 +90,7 @@ public void testManyTypeConflictsWithParent() throws IOException {
* 3.1mb - cache EsFields #112008
* 2774214b - string serialization #112929
* 2774192b - remove field attribute #112881
* 2774190b - remove node-level plan #117422
*/
}

Expand All @@ -103,11 +105,12 @@ private void testManyTypeConflicts(boolean withParent, ByteSizeValue expected) t
* with a single root field that has many children, grandchildren etc.
*/
public void testDeeplyNestedFields() throws IOException {
ByteSizeValue expected = ByteSizeValue.ofBytes(47252411);
ByteSizeValue expected = ByteSizeValue.ofBytes(47252409);
/*
* History:
* 48223371b - string serialization #112929
* 47252411b - remove field attribute #112881
* 47252409b - remove node-level plan
*/

int depth = 6;
Expand All @@ -123,11 +126,12 @@ public void testDeeplyNestedFields() throws IOException {
* with a single root field that has many children, grandchildren etc.
*/
public void testDeeplyNestedFieldsKeepOnlyOne() throws IOException {
ByteSizeValue expected = ByteSizeValue.ofBytes(9425806);
ByteSizeValue expected = ByteSizeValue.ofBytes(9425804);
/*
* History:
* 9426058b - string serialization #112929
* 9425806b - remove field attribute #112881
* 9425804b - remove node-level plan #117422
*/

int depth = 6;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ public static FragmentExec randomFragmentExec(int depth) {
LogicalPlan fragment = AbstractLogicalPlanSerializationTests.randomChild(depth);
QueryBuilder esFilter = EsqlQueryRequestTests.randomQueryBuilder();
int estimatedRowSize = between(0, Integer.MAX_VALUE);
PhysicalPlan reducer = randomChild(depth);
return new FragmentExec(source, fragment, esFilter, estimatedRowSize, reducer);
return new FragmentExec(source, fragment, esFilter, estimatedRowSize);
}

@Override
Expand All @@ -36,15 +35,13 @@ protected FragmentExec mutateInstance(FragmentExec instance) throws IOException
LogicalPlan fragment = instance.fragment();
QueryBuilder esFilter = instance.esFilter();
int estimatedRowSize = instance.estimatedRowSize();
PhysicalPlan reducer = instance.reducer();
switch (between(0, 3)) {
switch (between(0, 2)) {
case 0 -> fragment = randomValueOtherThan(fragment, () -> AbstractLogicalPlanSerializationTests.randomChild(0));
case 1 -> esFilter = randomValueOtherThan(esFilter, EsqlQueryRequestTests::randomQueryBuilder);
case 2 -> estimatedRowSize = randomValueOtherThan(estimatedRowSize, () -> between(0, Integer.MAX_VALUE));
case 3 -> reducer = randomValueOtherThan(reducer, () -> randomChild(0));
default -> throw new UnsupportedEncodingException();
}
return new FragmentExec(instance.source(), fragment, esFilter, estimatedRowSize, reducer);
return new FragmentExec(instance.source(), fragment, esFilter, estimatedRowSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ private PhysicalPlan plan(String query, QueryBuilder restFilter) {
// System.out.println("physical\n" + physical);
physical = physical.transformUp(
FragmentExec.class,
f -> new FragmentExec(f.source(), f.fragment(), restFilter, f.estimatedRowSize(), f.reducer())
f -> new FragmentExec(f.source(), f.fragment(), restFilter, f.estimatedRowSize())
);
physical = physicalPlanOptimizer.optimize(physical);
// System.out.println("optimized\n" + physical);
Expand Down

0 comments on commit 35d2873

Please sign in to comment.