Skip to content

Commit

Permalink
ESQL Move serialization of MV_ functions (#109892)
Browse files Browse the repository at this point in the history
This moves the serialization of the `MV_` functions from
`PlanNamedTypes` to their `NamedWriteable` to line up better with the
way the rest of Elasticsearch works.

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
nik9000 and elasticmachine authored Jun 19, 2024
1 parent 3301cef commit c900743
Show file tree
Hide file tree
Showing 35 changed files with 863 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@
*/
package org.elasticsearch.xpack.esql.core.expression.function.scalar;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

Expand All @@ -22,6 +27,21 @@ protected BinaryScalarFunction(Source source, Expression left, Expression right)
this.right = right;
}

protected BinaryScalarFunction(StreamInput in) throws IOException {
this(
Source.readFrom((StreamInput & PlanStreamInput) in),
((PlanStreamInput) in).readExpression(),
((PlanStreamInput) in).readExpression()
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
source().writeTo(out);
((PlanStreamOutput) out).writeExpression(left());
((PlanStreamOutput) out).writeExpression(right());
}

@Override
public final BinaryScalarFunction replaceChildren(List<Expression> newChildren) {
Expression newLeft = newChildren.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
Expand All @@ -15,7 +18,12 @@
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;
import org.elasticsearch.xpack.esql.expression.function.scalar.UnaryScalarFunction;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;

import java.io.IOException;
import java.util.List;

/**
* Base class for functions that reduce multivalued fields into single valued fields.
Expand All @@ -25,10 +33,39 @@
* </p>
*/
public abstract class AbstractMultivalueFunction extends UnaryScalarFunction {
public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return List.of(
MvAppend.ENTRY,
MvAvg.ENTRY,
MvConcat.ENTRY,
MvCount.ENTRY,
MvDedupe.ENTRY,
MvFirst.ENTRY,
MvLast.ENTRY,
MvMax.ENTRY,
MvMedian.ENTRY,
MvMin.ENTRY,
MvSlice.ENTRY,
MvSort.ENTRY,
MvSum.ENTRY,
MvZip.ENTRY
);
}

protected AbstractMultivalueFunction(Source source, Expression field) {
super(source, field);
}

protected AbstractMultivalueFunction(StreamInput in) throws IOException {
this(Source.readFrom((PlanStreamInput) in), ((PlanStreamInput) in).readExpression());
}

@Override
public final void writeTo(StreamOutput out) throws IOException {
Source.EMPTY.writeTo(out);
((PlanStreamOutput) out).writeExpression(field);
}

/**
* Build the evaluator given the evaluator a multivalued field.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.ann.Evaluator;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
Expand All @@ -25,9 +28,11 @@
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
import org.elasticsearch.xpack.esql.expression.function.Param;
import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlScalarFunction;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
Expand All @@ -41,6 +46,8 @@
* Appends values to a multi-value
*/
public class MvAppend extends EsqlScalarFunction implements EvaluatorMapper {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "MvAppend", MvAppend::new);

private final Expression field1, field2;
private DataType dataType;

Expand Down Expand Up @@ -103,6 +110,22 @@ public MvAppend(
this.field2 = field2;
}

private MvAppend(StreamInput in) throws IOException {
this(Source.readFrom((PlanStreamInput) in), ((PlanStreamInput) in).readExpression(), ((PlanStreamInput) in).readExpression());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
Source.EMPTY.writeTo(out);
out.writeNamedWriteable(field1);
out.writeNamedWriteable(field2);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
protected TypeResolution resolveType() {
if (childrenResolved() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.compute.ann.MvEvaluator;
import org.elasticsearch.compute.operator.EvalOperator;
import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator;
Expand All @@ -21,6 +23,7 @@
import org.elasticsearch.xpack.esql.expression.function.Param;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
Expand All @@ -31,6 +34,8 @@
* Reduce a multivalued field to a single valued field containing the average value.
*/
public class MvAvg extends AbstractMultivalueFunction {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "MvAvg", MvAvg::new);

@FunctionInfo(
returnType = "double",
description = "Converts a multivalued field into a single valued field containing the average of all of the values.",
Expand All @@ -47,6 +52,15 @@ public MvAvg(
super(source, field);
}

private MvAvg(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
protected TypeResolution resolveFieldType() {
return isType(field(), t -> t.isNumeric() && isRepresentable(t), sourceText(), null, "numeric");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.Page;
Expand All @@ -25,6 +27,7 @@
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
import org.elasticsearch.xpack.esql.expression.function.Param;

import java.io.IOException;
import java.util.function.Function;

import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isString;
Expand All @@ -33,6 +36,8 @@
* Reduce a multivalued string field to a single valued field by concatenating all values.
*/
public class MvConcat extends BinaryScalarFunction implements EvaluatorMapper {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "MvConcat", MvConcat::new);

@FunctionInfo(
returnType = "keyword",
description = "Converts a multivalued string expression into a single valued column "
Expand All @@ -53,6 +58,15 @@ public MvConcat(
super(source, field, delim);
}

private MvConcat(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
protected TypeResolution resolveType() {
if (childrenResolved() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.EvalOperator;
Expand All @@ -20,6 +22,7 @@
import org.elasticsearch.xpack.esql.expression.function.Param;
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
Expand All @@ -28,6 +31,8 @@
* Reduce a multivalued field to a single valued field containing the count of values.
*/
public class MvCount extends AbstractMultivalueFunction {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "MvCount", MvCount::new);

@FunctionInfo(
returnType = "integer",
description = "Converts a multivalued expression into a single valued column containing a count of the number of values.",
Expand Down Expand Up @@ -58,6 +63,15 @@ public MvCount(
super(source, v);
}

private MvCount(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
protected TypeResolution resolveFieldType() {
return isType(field(), EsqlDataTypes::isRepresentable, sourceText(), null, "representable");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator;
import org.elasticsearch.compute.operator.mvdedupe.MultivalueDedupe;
import org.elasticsearch.xpack.esql.core.expression.Expression;
Expand All @@ -18,6 +20,7 @@
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
Expand All @@ -26,6 +29,8 @@
* Removes duplicate values from a multivalued field.
*/
public class MvDedupe extends AbstractMultivalueFunction {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "MvDedupe", MvDedupe::new);

// @TODO: add unsigned_long
@FunctionInfo(
returnType = {
Expand Down Expand Up @@ -70,6 +75,15 @@ public MvDedupe(
super(source, field);
}

private MvDedupe(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
protected TypeResolution resolveFieldType() {
return isType(field(), EsqlDataTypes::isRepresentable, sourceText(), null, "representable");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.compute.ann.MvEvaluator;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
Expand All @@ -26,6 +28,7 @@
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
Expand All @@ -34,6 +37,8 @@
* Reduce a multivalued field to a single valued field containing the minimum value.
*/
public class MvFirst extends AbstractMultivalueFunction {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "MvFirst", MvFirst::new);

@FunctionInfo(
returnType = {
"boolean",
Expand Down Expand Up @@ -87,6 +92,15 @@ public MvFirst(
super(source, field);
}

private MvFirst(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
protected TypeResolution resolveFieldType() {
return isType(field(), EsqlDataTypes::isRepresentable, sourceText(), null, "representable");
Expand Down
Loading

0 comments on commit c900743

Please sign in to comment.