From 515f84bf92ad06f2178d4d5269c38a78a6bf215b Mon Sep 17 00:00:00 2001 From: Nick Knize Date: Thu, 20 Oct 2022 11:50:33 -0500 Subject: [PATCH] [Remove] Deprecated serialization logic from pipeline aggs (#4847) Removes the deprecated readTo/writeFrom serialization logic from piepline aggs that is no longer used as of Legacy version 7.8. Signed-off-by: Nicholas Walter Knize --- CHANGELOG.md | 1 + .../org/opensearch/plugins/SearchPlugin.java | 74 +----------- .../org/opensearch/search/SearchModule.java | 36 ------ .../search/aggregations/AggregationPhase.java | 3 +- .../aggregations/InternalAggregation.java | 40 ------- .../aggregations/InternalAggregations.java | 107 +----------------- .../pipeline/AvgBucketPipelineAggregator.java | 14 --- .../BucketMetricsPipelineAggregator.java | 21 ---- .../BucketScriptPipelineAggregator.java | 28 ----- .../BucketSelectorPipelineAggregator.java | 26 ----- .../BucketSortPipelineAggregator.java | 27 ----- .../CumulativeSumPipelineAggregator.java | 21 ---- .../DerivativePipelineAggregator.java | 25 ---- ...ExtendedStatsBucketPipelineAggregator.java | 21 ---- .../pipeline/MaxBucketPipelineAggregator.java | 14 --- .../pipeline/MinBucketPipelineAggregator.java | 14 --- .../pipeline/MovAvgPipelineAggregator.java | 31 ----- .../pipeline/MovFnPipelineAggregator.java | 28 ----- .../PercentilesBucketPipelineAggregator.java | 23 ---- .../pipeline/PipelineAggregator.java | 55 +-------- .../SerialDiffPipelineAggregator.java | 25 ---- .../pipeline/SiblingPipelineAggregator.java | 9 -- .../StatsBucketPipelineAggregator.java | 11 -- .../pipeline/SumBucketPipelineAggregator.java | 14 --- .../search/query/QuerySearchResult.java | 40 ++----- .../opensearch/search/SearchModuleTests.java | 24 +--- .../InternalAggregationsTests.java | 27 +---- .../terms/SignificanceHeuristicTests.java | 5 - .../test/InternalAggregationTestCase.java | 58 ---------- 29 files changed, 22 insertions(+), 800 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e5d7a3b57862..c7b736205b87b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -102,6 +102,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Always auto release the flood stage block ([#4703](https://github.com/opensearch-project/OpenSearch/pull/4703)) - Remove LegacyESVersion.V_7_4_ and V_7_5_ Constants ([#4704](https://github.com/opensearch-project/OpenSearch/pull/4704)) - Remove Legacy Version support from Snapshot/Restore Service ([#4728](https://github.com/opensearch-project/OpenSearch/pull/4728)) +- Remove deprecated serialization logic from pipeline aggs ([#4847](https://github.com/opensearch-project/OpenSearch/pull/4847)) ### Fixed - `opensearch-service.bat start` and `opensearch-service.bat manager` failing to run ([#4289](https://github.com/opensearch-project/OpenSearch/pull/4289)) diff --git a/server/src/main/java/org/opensearch/plugins/SearchPlugin.java b/server/src/main/java/org/opensearch/plugins/SearchPlugin.java index af7d4fc2e9fe5..a61a73255aa61 100644 --- a/server/src/main/java/org/opensearch/plugins/SearchPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SearchPlugin.java @@ -621,52 +621,6 @@ class PipelineAggregationSpec extends SearchExtensionSpec< PipelineAggregationBuilder, ContextParser> { private final Map> resultReaders = new TreeMap<>(); - /** - * Read the aggregator from a stream. - * @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire - */ - @Deprecated - private final Writeable.Reader aggregatorReader; - - /** - * Specification of a {@link PipelineAggregator}. - * - * @param name holds the names by which this aggregation might be parsed. The {@link ParseField#getPreferredName()} is special as it - * is the name by under which the readers are registered. So it is the name that the {@link PipelineAggregationBuilder} and - * {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}. It is an error if - * {@link ParseField#getPreferredName()} conflicts with another registered name, including names from other plugins. - * @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a - * {@link StreamInput} - * @param parser reads the aggregation builder from XContent - */ - public PipelineAggregationSpec( - ParseField name, - Writeable.Reader builderReader, - ContextParser parser - ) { - super(name, builderReader, parser); - this.aggregatorReader = null; - } - - /** - * Specification of a {@link PipelineAggregator}. - * - * @param name name by which this aggregation might be parsed or deserialized. Make sure it is the name that the - * {@link PipelineAggregationBuilder} and {@link PipelineAggregator} should return from - * {@link NamedWriteable#getWriteableName()}. It is an error if this name conflicts with another registered name, including - * names from other plugins. - * @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a - * {@link StreamInput} - * @param parser reads the aggregation builder from XContent - */ - public PipelineAggregationSpec( - String name, - Writeable.Reader builderReader, - ContextParser parser - ) { - super(name, builderReader, parser); - this.aggregatorReader = null; - } /** * Specification of a {@link PipelineAggregator}. @@ -677,20 +631,14 @@ public PipelineAggregationSpec( * {@link ParseField#getPreferredName()} conflicts with another registered name, including names from other plugins. * @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a * {@link StreamInput} - * @param aggregatorReader reads the {@link PipelineAggregator} from a stream * @param parser reads the aggregation builder from XContent - * @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(ParseField, Writeable.Reader, ContextParser)} for - * pipelines implemented after 7.8.0 */ - @Deprecated public PipelineAggregationSpec( ParseField name, Writeable.Reader builderReader, - Writeable.Reader aggregatorReader, ContextParser parser ) { super(name, builderReader, parser); - this.aggregatorReader = aggregatorReader; } /** @@ -702,20 +650,15 @@ public PipelineAggregationSpec( * names from other plugins. * @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a * {@link StreamInput} - * @param aggregatorReader reads the {@link PipelineAggregator} from a stream * @param parser reads the aggregation builder from XContent - * @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(String, Writeable.Reader, ContextParser)} for pipelines - * implemented after 7.8.0 + */ - @Deprecated public PipelineAggregationSpec( String name, Writeable.Reader builderReader, - Writeable.Reader aggregatorReader, ContextParser parser ) { super(name, builderReader, parser); - this.aggregatorReader = aggregatorReader; } /** @@ -727,7 +670,6 @@ public PipelineAggregationSpec( * {@link ParseField#getPreferredName()} conflicts with another registered name, including names from other plugins. * @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a * {@link StreamInput} - * @param aggregatorReader reads the {@link PipelineAggregator} from a stream * @param parser reads the aggregation builder from XContent * @deprecated prefer the ctor that takes a {@link ContextParser} */ @@ -735,11 +677,9 @@ public PipelineAggregationSpec( public PipelineAggregationSpec( ParseField name, Writeable.Reader builderReader, - Writeable.Reader aggregatorReader, PipelineAggregator.Parser parser ) { super(name, builderReader, (p, n) -> parser.parse(n, p)); - this.aggregatorReader = aggregatorReader; } /** @@ -751,18 +691,15 @@ public PipelineAggregationSpec( * names from other plugins. * @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a * {@link StreamInput} - * @param aggregatorReader reads the {@link PipelineAggregator} from a stream * @deprecated prefer the ctor that takes a {@link ContextParser} */ @Deprecated public PipelineAggregationSpec( String name, Writeable.Reader builderReader, - Writeable.Reader aggregatorReader, PipelineAggregator.Parser parser ) { super(name, builderReader, (p, n) -> parser.parse(n, p)); - this.aggregatorReader = aggregatorReader; } /** @@ -781,15 +718,6 @@ public PipelineAggregationSpec addResultReader(String writeableName, Writeable.R return this; } - /** - * Read the aggregator from a stream. - * @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire - */ - @Deprecated - public Writeable.Reader getAggregatorReader() { - return aggregatorReader; - } - /** * Get the readers that must be registered for this aggregation's results. */ diff --git a/server/src/main/java/org/opensearch/search/SearchModule.java b/server/src/main/java/org/opensearch/search/SearchModule.java index 0149f9a025bcd..4bf7dca9ef444 100644 --- a/server/src/main/java/org/opensearch/search/SearchModule.java +++ b/server/src/main/java/org/opensearch/search/SearchModule.java @@ -211,21 +211,14 @@ import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder; import org.opensearch.search.aggregations.metrics.WeightedAvgAggregationBuilder; import org.opensearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.AvgBucketPipelineAggregator; import org.opensearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.BucketScriptPipelineAggregator; import org.opensearch.search.aggregations.pipeline.BucketSelectorPipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.BucketSelectorPipelineAggregator; import org.opensearch.search.aggregations.pipeline.BucketSortPipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.BucketSortPipelineAggregator; import org.opensearch.search.aggregations.pipeline.CumulativeSumPipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.CumulativeSumPipelineAggregator; import org.opensearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.DerivativePipelineAggregator; import org.opensearch.search.aggregations.pipeline.EwmaModel; import org.opensearch.search.aggregations.pipeline.ExtendedStatsBucketParser; import org.opensearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregator; import org.opensearch.search.aggregations.pipeline.HoltLinearModel; import org.opensearch.search.aggregations.pipeline.HoltWintersModel; import org.opensearch.search.aggregations.pipeline.InternalBucketMetricValue; @@ -236,24 +229,15 @@ import org.opensearch.search.aggregations.pipeline.InternalStatsBucket; import org.opensearch.search.aggregations.pipeline.LinearModel; import org.opensearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.MaxBucketPipelineAggregator; import org.opensearch.search.aggregations.pipeline.MinBucketPipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.MinBucketPipelineAggregator; import org.opensearch.search.aggregations.pipeline.MovAvgModel; import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator; import org.opensearch.search.aggregations.pipeline.MovFnPipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.MovFnPipelineAggregator; import org.opensearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregator; -import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.pipeline.SerialDiffPipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.SerialDiffPipelineAggregator; import org.opensearch.search.aggregations.pipeline.SimpleModel; import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregator; import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregator; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.fetch.FetchSubPhase; @@ -710,7 +694,6 @@ private void registerPipelineAggregations(List plugins) { new PipelineAggregationSpec( DerivativePipelineAggregationBuilder.NAME, DerivativePipelineAggregationBuilder::new, - DerivativePipelineAggregator::new, DerivativePipelineAggregationBuilder::parse ).addResultReader(InternalDerivative::new) ); @@ -718,7 +701,6 @@ private void registerPipelineAggregations(List plugins) { new PipelineAggregationSpec( MaxBucketPipelineAggregationBuilder.NAME, MaxBucketPipelineAggregationBuilder::new, - MaxBucketPipelineAggregator::new, MaxBucketPipelineAggregationBuilder.PARSER ) // This bucket is used by many pipeline aggreations. @@ -728,7 +710,6 @@ private void registerPipelineAggregations(List plugins) { new PipelineAggregationSpec( MinBucketPipelineAggregationBuilder.NAME, MinBucketPipelineAggregationBuilder::new, - MinBucketPipelineAggregator::new, MinBucketPipelineAggregationBuilder.PARSER ) /* Uses InternalBucketMetricValue */ @@ -737,7 +718,6 @@ private void registerPipelineAggregations(List plugins) { new PipelineAggregationSpec( AvgBucketPipelineAggregationBuilder.NAME, AvgBucketPipelineAggregationBuilder::new, - AvgBucketPipelineAggregator::new, AvgBucketPipelineAggregationBuilder.PARSER ) // This bucket is used by many pipeline aggreations. @@ -747,7 +727,6 @@ private void registerPipelineAggregations(List plugins) { new PipelineAggregationSpec( SumBucketPipelineAggregationBuilder.NAME, SumBucketPipelineAggregationBuilder::new, - SumBucketPipelineAggregator::new, SumBucketPipelineAggregationBuilder.PARSER ) /* Uses InternalSimpleValue */ @@ -756,7 +735,6 @@ private void registerPipelineAggregations(List plugins) { new PipelineAggregationSpec( StatsBucketPipelineAggregationBuilder.NAME, StatsBucketPipelineAggregationBuilder::new, - StatsBucketPipelineAggregator::new, StatsBucketPipelineAggregationBuilder.PARSER ).addResultReader(InternalStatsBucket::new) ); @@ -764,7 +742,6 @@ private void registerPipelineAggregations(List plugins) { new PipelineAggregationSpec( ExtendedStatsBucketPipelineAggregationBuilder.NAME, ExtendedStatsBucketPipelineAggregationBuilder::new, - ExtendedStatsBucketPipelineAggregator::new, new ExtendedStatsBucketParser() ).addResultReader(InternalExtendedStatsBucket::new) ); @@ -772,7 +749,6 @@ private void registerPipelineAggregations(List plugins) { new PipelineAggregationSpec( PercentilesBucketPipelineAggregationBuilder.NAME, PercentilesBucketPipelineAggregationBuilder::new, - PercentilesBucketPipelineAggregator::new, PercentilesBucketPipelineAggregationBuilder.PARSER ).addResultReader(InternalPercentilesBucket::new) ); @@ -780,7 +756,6 @@ private void registerPipelineAggregations(List plugins) { new PipelineAggregationSpec( MovAvgPipelineAggregationBuilder.NAME, MovAvgPipelineAggregationBuilder::new, - MovAvgPipelineAggregator::new, (XContentParser parser, String name) -> MovAvgPipelineAggregationBuilder.parse( movingAverageModelParserRegistry, name, @@ -792,7 +767,6 @@ private void registerPipelineAggregations(List plugins) { new PipelineAggregationSpec( CumulativeSumPipelineAggregationBuilder.NAME, CumulativeSumPipelineAggregationBuilder::new, - CumulativeSumPipelineAggregator::new, CumulativeSumPipelineAggregationBuilder.PARSER ) ); @@ -800,7 +774,6 @@ private void registerPipelineAggregations(List plugins) { new PipelineAggregationSpec( BucketScriptPipelineAggregationBuilder.NAME, BucketScriptPipelineAggregationBuilder::new, - BucketScriptPipelineAggregator::new, BucketScriptPipelineAggregationBuilder.PARSER ) ); @@ -808,7 +781,6 @@ private void registerPipelineAggregations(List plugins) { new PipelineAggregationSpec( BucketSelectorPipelineAggregationBuilder.NAME, BucketSelectorPipelineAggregationBuilder::new, - BucketSelectorPipelineAggregator::new, BucketSelectorPipelineAggregationBuilder::parse ) ); @@ -816,7 +788,6 @@ private void registerPipelineAggregations(List plugins) { new PipelineAggregationSpec( BucketSortPipelineAggregationBuilder.NAME, BucketSortPipelineAggregationBuilder::new, - BucketSortPipelineAggregator::new, BucketSortPipelineAggregationBuilder::parse ) ); @@ -824,7 +795,6 @@ private void registerPipelineAggregations(List plugins) { new PipelineAggregationSpec( SerialDiffPipelineAggregationBuilder.NAME, SerialDiffPipelineAggregationBuilder::new, - SerialDiffPipelineAggregator::new, SerialDiffPipelineAggregationBuilder::parse ) ); @@ -832,7 +802,6 @@ private void registerPipelineAggregations(List plugins) { new PipelineAggregationSpec( MovFnPipelineAggregationBuilder.NAME, MovFnPipelineAggregationBuilder::new, - MovFnPipelineAggregator::new, MovFnPipelineAggregationBuilder.PARSER ) ); @@ -847,11 +816,6 @@ private void registerPipelineAggregation(PipelineAggregationSpec spec) { namedWriteables.add( new NamedWriteableRegistry.Entry(PipelineAggregationBuilder.class, spec.getName().getPreferredName(), spec.getReader()) ); - if (spec.getAggregatorReader() != null) { - namedWriteables.add( - new NamedWriteableRegistry.Entry(PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader()) - ); - } for (Map.Entry> resultReader : spec.getResultReaders().entrySet()) { namedWriteables.add( new NamedWriteableRegistry.Entry(InternalAggregation.class, resultReader.getKey(), resultReader.getValue()) diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregationPhase.java b/server/src/main/java/org/opensearch/search/aggregations/AggregationPhase.java index c091a1bc47c05..946da6e9c3369 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregationPhase.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregationPhase.java @@ -148,8 +148,7 @@ public void execute(SearchContext context) { throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e); } } - context.queryResult() - .aggregations(new InternalAggregations(aggregations, context.request().source().aggregations()::buildPipelineTree)); + context.queryResult().aggregations(new InternalAggregations(aggregations)); // disable aggregations so that they don't run on next pages in case of scrolling context.aggregations(null); diff --git a/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java index 6ccd429388873..714de06fef1e0 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java @@ -31,7 +31,6 @@ package org.opensearch.search.aggregations; -import org.opensearch.LegacyESVersion; import org.opensearch.common.Strings; import org.opensearch.common.io.stream.NamedWriteable; import org.opensearch.common.io.stream.StreamInput; @@ -187,8 +186,6 @@ public void consumeBucketsAndMaybeBreak(int size) { protected final Map metadata; - private List pipelineAggregatorsForBwcSerialization; - /** * Constructs an aggregation result with a given name. * @@ -199,46 +196,18 @@ protected InternalAggregation(String name, Map metadata) { this.metadata = metadata; } - /** - * Merge a {@linkplain PipelineAggregator.PipelineTree} into this - * aggregation result tree before serializing to a node older than - * 7.8.0. - */ - public final void mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree pipelineTree) { - if (pipelineAggregatorsForBwcSerialization != null) { - /* - * This method is called once per level on the results but only - * has useful pipeline aggregations on the top level. Every level - * below the top will always be empty. So if we've already been - * called we should bail. This is pretty messy but it is the kind - * of weird thing we have to do to deal with bwc serialization.... - */ - return; - } - pipelineAggregatorsForBwcSerialization = pipelineTree.aggregators(); - forEachBucket(bucketAggs -> bucketAggs.mergePipelineTreeForBWCSerialization(pipelineTree)); - } - /** * Read from a stream. */ protected InternalAggregation(StreamInput in) throws IOException { name = in.readString(); metadata = in.readMap(); - if (in.getVersion().before(LegacyESVersion.V_7_8_0)) { - in.readNamedWriteableList(PipelineAggregator.class); - } } @Override public final void writeTo(StreamOutput out) throws IOException { out.writeString(name); out.writeGenericValue(metadata); - if (out.getVersion().before(LegacyESVersion.V_7_8_0)) { - assert pipelineAggregatorsForBwcSerialization != null - : "serializing to pre-7.8.0 versions should have called mergePipelineTreeForBWCSerialization"; - out.writeNamedWriteableList(pipelineAggregatorsForBwcSerialization); - } doWriteTo(out); } @@ -355,15 +324,6 @@ public Map getMetadata() { return metadata; } - /** - * The {@linkplain PipelineAggregator}s sent to older versions of OpenSearch. - * @deprecated only use these for serializing to older OpenSearch versions - */ - @Deprecated - public List pipelineAggregatorsForBwcSerialization() { - return pipelineAggregatorsForBwcSerialization; - } - @Override public String getType() { return getWriteableName(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/opensearch/search/aggregations/InternalAggregations.java index 79dd8d756dede..16d7898118fc3 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/opensearch/search/aggregations/InternalAggregations.java @@ -31,14 +31,12 @@ package org.opensearch.search.aggregations; -import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; import org.opensearch.search.aggregations.InternalAggregation.ReduceContext; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; -import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import org.opensearch.search.aggregations.pipeline.SiblingPipelineAggregator; import org.opensearch.search.aggregations.support.AggregationPath; @@ -50,13 +48,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.Collectors; -import static java.util.Collections.emptyList; -import static java.util.stream.Collectors.toList; - /** * An internal implementation of {@link Aggregations}. * @@ -76,29 +69,11 @@ public final class InternalAggregations extends Aggregations implements Writeabl } }; - /** - * The way to build a tree of pipeline aggregators. Used only for - * serialization backwards compatibility. - */ - private final Supplier pipelineTreeForBwcSerialization; - /** * Constructs a new aggregation. */ - private InternalAggregations(List aggregations) { - super(aggregations); - this.pipelineTreeForBwcSerialization = null; - } - - /** - * Constructs a node in the aggregation tree. - * @param pipelineTreeSource must be null inside the tree or after final reduction. Should reference the - * search request otherwise so we can properly serialize the response to - * versions of OpenSearch that require the pipelines to be serialized. - */ - public InternalAggregations(List aggregations, Supplier pipelineTreeSource) { + public InternalAggregations(List aggregations) { super(aggregations); - this.pipelineTreeForBwcSerialization = pipelineTreeSource; } public static InternalAggregations from(List aggregations) { @@ -110,45 +85,12 @@ public static InternalAggregations from(List aggregations) public static InternalAggregations readFrom(StreamInput in) throws IOException { final InternalAggregations res = from(in.readList(stream -> in.readNamedWriteable(InternalAggregation.class))); - if (in.getVersion().before(LegacyESVersion.V_7_8_0)) { - /* - * Setting the pipeline tree source to null is here is correct but - * only because we don't immediately pass the InternalAggregations - * off to another node. Instead, we always reduce together with - * many aggregations and that always adds the tree read from the - * current request. - */ - in.readNamedWriteableList(PipelineAggregator.class); - } return res; } @Override public void writeTo(StreamOutput out) throws IOException { - if (out.getVersion().before(LegacyESVersion.V_7_8_0)) { - if (pipelineTreeForBwcSerialization == null) { - mergePipelineTreeForBWCSerialization(PipelineTree.EMPTY); - out.writeNamedWriteableList(getInternalAggregations()); - out.writeNamedWriteableList(emptyList()); - } else { - PipelineAggregator.PipelineTree pipelineTree = pipelineTreeForBwcSerialization.get(); - mergePipelineTreeForBWCSerialization(pipelineTree); - out.writeNamedWriteableList(getInternalAggregations()); - out.writeNamedWriteableList(pipelineTree.aggregators()); - } - } else { - out.writeNamedWriteableList(getInternalAggregations()); - } - } - - /** - * Merge a {@linkplain PipelineAggregator.PipelineTree} into this - * aggregation result tree before serializing to a node older than - * 7.8.0. - */ - public void mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree pipelineTree) { - getInternalAggregations().stream() - .forEach(agg -> { agg.mergePipelineTreeForBWCSerialization(pipelineTree.subTree(agg.getName())); }); + out.writeNamedWriteableList(getInternalAggregations()); } /** @@ -160,26 +102,6 @@ public List copyResults() { return new ArrayList<>(getInternalAggregations()); } - /** - * Get the top level pipeline aggregators. - * @deprecated these only exist for BWC serialization - */ - @Deprecated - public List getTopLevelPipelineAggregators() { - if (pipelineTreeForBwcSerialization == null) { - return emptyList(); - } - return pipelineTreeForBwcSerialization.get().aggregators().stream().map(p -> (SiblingPipelineAggregator) p).collect(toList()); - } - - /** - * Get the transient pipeline tree used to serialize pipeline aggregators to old nodes. - */ - @Deprecated - Supplier getPipelineTreeForBwcSerialization() { - return pipelineTreeForBwcSerialization; - } - @SuppressWarnings("unchecked") private List getInternalAggregations() { return (List) aggregations; @@ -208,11 +130,7 @@ public double sortValue(AggregationPath.PathElement head, Iterator aggregationsList, ReduceContext context) { - InternalAggregations reduced = reduce( - aggregationsList, - context, - reducedAggregations -> new InternalAggregations(reducedAggregations, context.pipelineTreeForBwcSerialization()) - ); + InternalAggregations reduced = reduce(aggregationsList, context); if (reduced == null) { return null; } @@ -238,16 +156,8 @@ public static InternalAggregations topLevelReduce(List agg * {@link InternalAggregations} object found in the list. * Note that pipeline aggregations _are not_ reduced by this method. Pipelines are handled * separately by {@link InternalAggregations#topLevelReduce(List, ReduceContext)} - * @param ctor used to build the {@link InternalAggregations}. The top level reduce specifies a constructor - * that adds pipeline aggregation information that is used to send pipeline aggregations to - * older versions of Elasticsearch that require the pipeline aggregations to be returned - * as part of the aggregation tree */ - public static InternalAggregations reduce( - List aggregationsList, - ReduceContext context, - Function, InternalAggregations> ctor - ) { + public static InternalAggregations reduce(List aggregationsList, ReduceContext context) { if (aggregationsList.isEmpty()) { return null; } @@ -280,14 +190,7 @@ public static InternalAggregations reduce( } } - return ctor.apply(reducedAggregations); - } - - /** - * Version of {@link #reduce(List, ReduceContext, Function)} for nodes inside the aggregation tree. - */ - public static InternalAggregations reduce(List aggregationsList, ReduceContext context) { - return reduce(aggregationsList, context, InternalAggregations::from); + return new InternalAggregations(reducedAggregations); } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/pipeline/AvgBucketPipelineAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/pipeline/AvgBucketPipelineAggregator.java index e9e1f13a80766..e4f0294ae0723 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/pipeline/AvgBucketPipelineAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/pipeline/AvgBucketPipelineAggregator.java @@ -32,12 +32,10 @@ package org.opensearch.search.aggregations.pipeline; -import org.opensearch.common.io.stream.StreamInput; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.Map; /** @@ -59,18 +57,6 @@ public class AvgBucketPipelineAggregator extends BucketMetricsPipelineAggregator super(name, bucketsPaths, gapPolicy, format, metadata); } - /** - * Read from a stream. - */ - public AvgBucketPipelineAggregator(StreamInput in) throws IOException { - super(in); - } - - @Override - public String getWriteableName() { - return AvgBucketPipelineAggregationBuilder.NAME; - } - @Override protected void preCollection() { count = 0; diff --git a/server/src/main/java/org/opensearch/search/aggregations/pipeline/BucketMetricsPipelineAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/pipeline/BucketMetricsPipelineAggregator.java index 4836428027f08..5a2e1238e19c7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/pipeline/BucketMetricsPipelineAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/pipeline/BucketMetricsPipelineAggregator.java @@ -32,8 +32,6 @@ package org.opensearch.search.aggregations.pipeline; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregation; import org.opensearch.search.aggregations.Aggregations; @@ -43,7 +41,6 @@ import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.opensearch.search.aggregations.support.AggregationPath; -import java.io.IOException; import java.util.List; import java.util.Map; @@ -70,24 +67,6 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg this.format = format; } - /** - * Read from a stream. - */ - BucketMetricsPipelineAggregator(StreamInput in) throws IOException { - super(in); - format = in.readNamedWriteable(DocValueFormat.class); - gapPolicy = GapPolicy.readFrom(in); - } - - @Override - public final void doWriteTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(format); - gapPolicy.writeTo(out); - innerWriteTo(out); - } - - protected void innerWriteTo(StreamOutput out) throws IOException {} - @Override public final InternalAggregation doReduce(Aggregations aggregations, ReduceContext context) { preCollection(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java index abf98b6ea4a6f..41e11e2537845 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java @@ -32,8 +32,6 @@ package org.opensearch.search.aggregations.pipeline; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.script.BucketAggregationScript; import org.opensearch.script.Script; import org.opensearch.search.DocValueFormat; @@ -43,7 +41,6 @@ import org.opensearch.search.aggregations.InternalMultiBucketAggregation; import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -79,31 +76,6 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator { this.gapPolicy = gapPolicy; } - /** - * Read from a stream. - */ - @SuppressWarnings("unchecked") - public BucketScriptPipelineAggregator(StreamInput in) throws IOException { - super(in); - script = new Script(in); - formatter = in.readNamedWriteable(DocValueFormat.class); - gapPolicy = GapPolicy.readFrom(in); - bucketsPathsMap = (Map) in.readGenericValue(); - } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { - script.writeTo(out); - out.writeNamedWriteable(formatter); - gapPolicy.writeTo(out); - out.writeGenericValue(bucketsPathsMap); - } - - @Override - public String getWriteableName() { - return BucketScriptPipelineAggregationBuilder.NAME; - } - @Override public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { InternalMultiBucketAggregation originalAgg = diff --git a/server/src/main/java/org/opensearch/search/aggregations/pipeline/BucketSelectorPipelineAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/pipeline/BucketSelectorPipelineAggregator.java index 8f5426eae83a4..24f64bc78815d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/pipeline/BucketSelectorPipelineAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/pipeline/BucketSelectorPipelineAggregator.java @@ -32,8 +32,6 @@ package org.opensearch.search.aggregations.pipeline; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.script.BucketAggregationSelectorScript; import org.opensearch.script.Script; import org.opensearch.search.aggregations.InternalAggregation; @@ -41,7 +39,6 @@ import org.opensearch.search.aggregations.InternalMultiBucketAggregation; import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -72,29 +69,6 @@ public class BucketSelectorPipelineAggregator extends PipelineAggregator { this.gapPolicy = gapPolicy; } - /** - * Read from a stream. - */ - @SuppressWarnings("unchecked") - public BucketSelectorPipelineAggregator(StreamInput in) throws IOException { - super(in); - script = new Script(in); - gapPolicy = GapPolicy.readFrom(in); - bucketsPathsMap = (Map) in.readGenericValue(); - } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { - script.writeTo(out); - gapPolicy.writeTo(out); - out.writeGenericValue(bucketsPathsMap); - } - - @Override - public String getWriteableName() { - return BucketSelectorPipelineAggregationBuilder.NAME; - } - @Override public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { InternalMultiBucketAggregation originalAgg = diff --git a/server/src/main/java/org/opensearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java index 809514614b1f2..571fbf5ad42a6 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java @@ -31,8 +31,6 @@ package org.opensearch.search.aggregations.pipeline; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregation.ReduceContext; import org.opensearch.search.aggregations.InternalMultiBucketAggregation; @@ -41,7 +39,6 @@ import org.opensearch.search.sort.FieldSortBuilder; import org.opensearch.search.sort.SortOrder; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -75,30 +72,6 @@ public class BucketSortPipelineAggregator extends PipelineAggregator { this.gapPolicy = gapPolicy; } - /** - * Read from a stream. - */ - public BucketSortPipelineAggregator(StreamInput in) throws IOException { - super(in); - sorts = in.readList(FieldSortBuilder::new); - from = in.readVInt(); - size = in.readOptionalVInt(); - gapPolicy = GapPolicy.readFrom(in); - } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { - out.writeList(sorts); - out.writeVInt(from); - out.writeOptionalVInt(size); - gapPolicy.writeTo(out); - } - - @Override - public String getWriteableName() { - return BucketSortPipelineAggregationBuilder.NAME; - } - @Override public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { InternalMultiBucketAggregation originalAgg = diff --git a/server/src/main/java/org/opensearch/search/aggregations/pipeline/CumulativeSumPipelineAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/pipeline/CumulativeSumPipelineAggregator.java index cba35c89cc639..c11ed9d7ded7d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/pipeline/CumulativeSumPipelineAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/pipeline/CumulativeSumPipelineAggregator.java @@ -32,8 +32,6 @@ package org.opensearch.search.aggregations.pipeline; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregation.ReduceContext; @@ -43,7 +41,6 @@ import org.opensearch.search.aggregations.bucket.histogram.HistogramFactory; import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -65,24 +62,6 @@ public class CumulativeSumPipelineAggregator extends PipelineAggregator { this.formatter = formatter; } - /** - * Read from a stream. - */ - public CumulativeSumPipelineAggregator(StreamInput in) throws IOException { - super(in); - formatter = in.readNamedWriteable(DocValueFormat.class); - } - - @Override - public void doWriteTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(formatter); - } - - @Override - public String getWriteableName() { - return CumulativeSumPipelineAggregationBuilder.NAME; - } - @Override public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { InternalMultiBucketAggregation< diff --git a/server/src/main/java/org/opensearch/search/aggregations/pipeline/DerivativePipelineAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/pipeline/DerivativePipelineAggregator.java index 3603d1c5a0c58..4bdd2c3168fae 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/pipeline/DerivativePipelineAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/pipeline/DerivativePipelineAggregator.java @@ -32,8 +32,6 @@ package org.opensearch.search.aggregations.pipeline; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregation.ReduceContext; @@ -43,7 +41,6 @@ import org.opensearch.search.aggregations.bucket.histogram.HistogramFactory; import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -76,28 +73,6 @@ public class DerivativePipelineAggregator extends PipelineAggregator { this.xAxisUnits = xAxisUnits == null ? null : (double) xAxisUnits; } - /** - * Read from a stream. - */ - public DerivativePipelineAggregator(StreamInput in) throws IOException { - super(in); - formatter = in.readNamedWriteable(DocValueFormat.class); - gapPolicy = GapPolicy.readFrom(in); - xAxisUnits = in.readOptionalDouble(); - } - - @Override - public void doWriteTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(formatter); - gapPolicy.writeTo(out); - out.writeOptionalDouble(xAxisUnits); - } - - @Override - public String getWriteableName() { - return DerivativePipelineAggregationBuilder.NAME; - } - @Override public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { InternalMultiBucketAggregation< diff --git a/server/src/main/java/org/opensearch/search/aggregations/pipeline/ExtendedStatsBucketPipelineAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/pipeline/ExtendedStatsBucketPipelineAggregator.java index a63a1da332d08..81f4a441e3772 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/pipeline/ExtendedStatsBucketPipelineAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/pipeline/ExtendedStatsBucketPipelineAggregator.java @@ -32,13 +32,10 @@ package org.opensearch.search.aggregations.pipeline; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.Map; /** @@ -66,24 +63,6 @@ public class ExtendedStatsBucketPipelineAggregator extends BucketMetricsPipeline this.sigma = sigma; } - /** - * Read from a stream. - */ - public ExtendedStatsBucketPipelineAggregator(StreamInput in) throws IOException { - super(in); - sigma = in.readDouble(); - } - - @Override - protected void innerWriteTo(StreamOutput out) throws IOException { - out.writeDouble(sigma); - } - - @Override - public String getWriteableName() { - return ExtendedStatsBucketPipelineAggregationBuilder.NAME; - } - @Override protected void preCollection() { sum = 0; diff --git a/server/src/main/java/org/opensearch/search/aggregations/pipeline/MaxBucketPipelineAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/pipeline/MaxBucketPipelineAggregator.java index 1e327919755c7..7e63af31a9c86 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/pipeline/MaxBucketPipelineAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/pipeline/MaxBucketPipelineAggregator.java @@ -32,12 +32,10 @@ package org.opensearch.search.aggregations.pipeline; -import org.opensearch.common.io.stream.StreamInput; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -61,18 +59,6 @@ public class MaxBucketPipelineAggregator extends BucketMetricsPipelineAggregator super(name, bucketsPaths, gapPolicy, formatter, metadata); } - /** - * Read from a stream. - */ - public MaxBucketPipelineAggregator(StreamInput in) throws IOException { - super(in); - } - - @Override - public String getWriteableName() { - return MaxBucketPipelineAggregationBuilder.NAME; - } - @Override protected void preCollection() { maxBucketKeys = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/pipeline/MinBucketPipelineAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/pipeline/MinBucketPipelineAggregator.java index 2b57aac14ea1e..773fc7441d96e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/pipeline/MinBucketPipelineAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/pipeline/MinBucketPipelineAggregator.java @@ -32,12 +32,10 @@ package org.opensearch.search.aggregations.pipeline; -import org.opensearch.common.io.stream.StreamInput; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -61,18 +59,6 @@ public class MinBucketPipelineAggregator extends BucketMetricsPipelineAggregator super(name, bucketsPaths, gapPolicy, formatter, metadata); } - /** - * Read from a stream. - */ - public MinBucketPipelineAggregator(StreamInput in) throws IOException { - super(in); - } - - @Override - public String getWriteableName() { - return MinBucketPipelineAggregationBuilder.NAME; - } - @Override protected void preCollection() { minBucketKeys = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/pipeline/MovAvgPipelineAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/pipeline/MovAvgPipelineAggregator.java index 2e3c44ac3902d..aa78439784992 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/pipeline/MovAvgPipelineAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/pipeline/MovAvgPipelineAggregator.java @@ -33,8 +33,6 @@ package org.opensearch.search.aggregations.pipeline; import org.opensearch.common.collect.EvictingQueue; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregation.ReduceContext; @@ -45,7 +43,6 @@ import org.opensearch.search.aggregations.bucket.histogram.HistogramFactory; import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; @@ -88,34 +85,6 @@ public class MovAvgPipelineAggregator extends PipelineAggregator { this.minimize = minimize; } - /** - * Read from a stream. - */ - public MovAvgPipelineAggregator(StreamInput in) throws IOException { - super(in); - formatter = in.readNamedWriteable(DocValueFormat.class); - gapPolicy = GapPolicy.readFrom(in); - window = in.readVInt(); - predict = in.readVInt(); - model = in.readNamedWriteable(MovAvgModel.class); - minimize = in.readBoolean(); - } - - @Override - public void doWriteTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(formatter); - gapPolicy.writeTo(out); - out.writeVInt(window); - out.writeVInt(predict); - out.writeNamedWriteable(model); - out.writeBoolean(minimize); - } - - @Override - public String getWriteableName() { - return MovAvgPipelineAggregationBuilder.NAME; - } - @Override public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { InternalMultiBucketAggregation< diff --git a/server/src/main/java/org/opensearch/search/aggregations/pipeline/MovFnPipelineAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/pipeline/MovFnPipelineAggregator.java index 7b20a796b8134..a4c3c14f3365f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/pipeline/MovFnPipelineAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/pipeline/MovFnPipelineAggregator.java @@ -32,8 +32,6 @@ package org.opensearch.search.aggregations.pipeline; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.script.Script; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.InternalAggregation; @@ -42,7 +40,6 @@ import org.opensearch.search.aggregations.bucket.MultiBucketsAggregation; import org.opensearch.search.aggregations.bucket.histogram.HistogramFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -98,31 +95,6 @@ public class MovFnPipelineAggregator extends PipelineAggregator { this.shift = shift; } - public MovFnPipelineAggregator(StreamInput in) throws IOException { - super(in); - script = new Script(in); - formatter = in.readNamedWriteable(DocValueFormat.class); - gapPolicy = BucketHelpers.GapPolicy.readFrom(in); - bucketsPath = in.readString(); - window = in.readInt(); - shift = in.readInt(); - } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { - script.writeTo(out); - out.writeNamedWriteable(formatter); - gapPolicy.writeTo(out); - out.writeString(bucketsPath); - out.writeInt(window); - out.writeInt(shift); - } - - @Override - public String getWriteableName() { - return MovFnPipelineAggregationBuilder.NAME; - } - @Override public InternalAggregation reduce(InternalAggregation aggregation, InternalAggregation.ReduceContext reduceContext) { InternalMultiBucketAggregation< diff --git a/server/src/main/java/org/opensearch/search/aggregations/pipeline/PercentilesBucketPipelineAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/pipeline/PercentilesBucketPipelineAggregator.java index 7fad7e233c424..41699ffb8b6b8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/pipeline/PercentilesBucketPipelineAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/pipeline/PercentilesBucketPipelineAggregator.java @@ -32,13 +32,10 @@ package org.opensearch.search.aggregations.pipeline; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -69,26 +66,6 @@ public class PercentilesBucketPipelineAggregator extends BucketMetricsPipelineAg this.keyed = keyed; } - /** - * Read from a stream. - */ - public PercentilesBucketPipelineAggregator(StreamInput in) throws IOException { - super(in); - percents = in.readDoubleArray(); - keyed = in.readBoolean(); - } - - @Override - public void innerWriteTo(StreamOutput out) throws IOException { - out.writeDoubleArray(percents); - out.writeBoolean(keyed); - } - - @Override - public String getWriteableName() { - return PercentilesBucketPipelineAggregationBuilder.NAME; - } - @Override protected void preCollection() { data = new ArrayList<>(1024); diff --git a/server/src/main/java/org/opensearch/search/aggregations/pipeline/PipelineAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/pipeline/PipelineAggregator.java index 4b1134d9e1a60..204dcdb7a400f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/pipeline/PipelineAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/pipeline/PipelineAggregator.java @@ -32,11 +32,7 @@ package org.opensearch.search.aggregations.pipeline; -import org.opensearch.LegacyESVersion; import org.opensearch.common.ParseField; -import org.opensearch.common.io.stream.NamedWriteable; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.xcontent.XContentParser; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregation.ReduceContext; @@ -54,7 +50,7 @@ * * @opensearch.internal */ -public abstract class PipelineAggregator implements NamedWriteable { +public abstract class PipelineAggregator { /** * Parse the {@link PipelineAggregationBuilder} from a {@link XContentParser}. * @@ -139,55 +135,6 @@ protected PipelineAggregator(String name, String[] bucketsPaths, Map { diff --git a/server/src/main/java/org/opensearch/search/aggregations/pipeline/StatsBucketPipelineAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/pipeline/StatsBucketPipelineAggregator.java index 577f66f94e90f..a01be279853f5 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/pipeline/StatsBucketPipelineAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/pipeline/StatsBucketPipelineAggregator.java @@ -32,12 +32,10 @@ package org.opensearch.search.aggregations.pipeline; -import org.opensearch.common.io.stream.StreamInput; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.Map; /** @@ -61,15 +59,6 @@ public class StatsBucketPipelineAggregator extends BucketMetricsPipelineAggregat super(name, bucketsPaths, gapPolicy, formatter, metadata); } - public StatsBucketPipelineAggregator(StreamInput in) throws IOException { - super(in); - } - - @Override - public String getWriteableName() { - return StatsBucketPipelineAggregationBuilder.NAME; - } - @Override protected void preCollection() { sum = 0; diff --git a/server/src/main/java/org/opensearch/search/aggregations/pipeline/SumBucketPipelineAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/pipeline/SumBucketPipelineAggregator.java index 32a64fafa7a2d..01939b1b1063c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/pipeline/SumBucketPipelineAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/pipeline/SumBucketPipelineAggregator.java @@ -32,12 +32,10 @@ package org.opensearch.search.aggregations.pipeline; -import org.opensearch.common.io.stream.StreamInput; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import java.io.IOException; import java.util.Map; /** @@ -58,18 +56,6 @@ public class SumBucketPipelineAggregator extends BucketMetricsPipelineAggregator super(name, bucketsPaths, gapPolicy, formatter, metadata); } - /** - * Read from a stream. - */ - public SumBucketPipelineAggregator(StreamInput in) throws IOException { - super(in); - } - - @Override - public String getWriteableName() { - return SumBucketPipelineAggregationBuilder.NAME; - } - @Override protected void preCollection() { sum = 0; diff --git a/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java b/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java index 31fdc5c9d9e9d..a0c2970625472 100644 --- a/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java @@ -34,7 +34,6 @@ import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TotalHits; -import org.opensearch.LegacyESVersion; import org.opensearch.common.io.stream.DelayableWriteable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -95,11 +94,7 @@ public QuerySearchResult() { public QuerySearchResult(StreamInput in) throws IOException { super(in); - if (in.getVersion().onOrAfter(LegacyESVersion.V_7_7_0)) { - isNull = in.readBoolean(); - } else { - isNull = false; - } + isNull = in.readBoolean(); if (isNull == false) { ShardSearchContextId id = new ShardSearchContextId(in); readFromWithId(id, in); @@ -355,14 +350,8 @@ public void readFromWithId(ShardSearchContextId id, StreamInput in) throws IOExc } } setTopDocs(readTopDocs(in)); - if (in.getVersion().before(LegacyESVersion.V_7_7_0)) { - if (hasAggs = in.readBoolean()) { - aggregations = DelayableWriteable.referencing(InternalAggregations.readFrom(in)); - } - } else { - if (hasAggs = in.readBoolean()) { - aggregations = DelayableWriteable.delayed(InternalAggregations::readFrom, in); - } + if (hasAggs = in.readBoolean()) { + aggregations = DelayableWriteable.delayed(InternalAggregations::readFrom, in); } if (in.readBoolean()) { suggest = new Suggest(in); @@ -373,17 +362,13 @@ public void readFromWithId(ShardSearchContextId id, StreamInput in) throws IOExc hasProfileResults = profileShardResults != null; serviceTimeEWMA = in.readZLong(); nodeQueueSize = in.readInt(); - if (in.getVersion().onOrAfter(LegacyESVersion.V_7_10_0)) { - setShardSearchRequest(in.readOptionalWriteable(ShardSearchRequest::new)); - setRescoreDocIds(new RescoreDocIds(in)); - } + setShardSearchRequest(in.readOptionalWriteable(ShardSearchRequest::new)); + setRescoreDocIds(new RescoreDocIds(in)); } @Override public void writeTo(StreamOutput out) throws IOException { - if (out.getVersion().onOrAfter(LegacyESVersion.V_7_7_0)) { - out.writeBoolean(isNull); - } + out.writeBoolean(isNull); if (isNull == false) { contextId.writeTo(out); writeToNoId(out); @@ -406,12 +391,7 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeBoolean(false); } else { out.writeBoolean(true); - if (out.getVersion().before(LegacyESVersion.V_7_7_0)) { - InternalAggregations aggs = aggregations.expand(); - aggs.writeTo(out); - } else { - aggregations.writeTo(out); - } + aggregations.writeTo(out); } if (suggest == null) { out.writeBoolean(false); @@ -424,10 +404,8 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeOptionalWriteable(profileShardResults); out.writeZLong(serviceTimeEWMA); out.writeInt(nodeQueueSize); - if (out.getVersion().onOrAfter(LegacyESVersion.V_7_10_0)) { - out.writeOptionalWriteable(getShardSearchRequest()); - getRescoreDocIds().writeTo(out); - } + out.writeOptionalWriteable(getShardSearchRequest()); + getRescoreDocIds().writeTo(out); } public TotalHits getTotalHits() { diff --git a/server/src/test/java/org/opensearch/search/SearchModuleTests.java b/server/src/test/java/org/opensearch/search/SearchModuleTests.java index 05d4153949f9a..48a935ea984fa 100644 --- a/server/src/test/java/org/opensearch/search/SearchModuleTests.java +++ b/server/src/test/java/org/opensearch/search/SearchModuleTests.java @@ -56,7 +56,6 @@ import org.opensearch.search.aggregations.bucket.terms.heuristic.ChiSquare; import org.opensearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; import org.opensearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.DerivativePipelineAggregator; import org.opensearch.search.aggregations.pipeline.InternalDerivative; import org.opensearch.search.aggregations.pipeline.MovAvgModel; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; @@ -193,7 +192,6 @@ public List getPipelineAggregations() { new PipelineAggregationSpec( DerivativePipelineAggregationBuilder.NAME, DerivativePipelineAggregationBuilder::new, - DerivativePipelineAggregator::new, DerivativePipelineAggregationBuilder::parse ).addResultReader(InternalDerivative::new) ); @@ -375,12 +373,7 @@ public void testRegisterPipelineAggregation() { @Override public List getPipelineAggregations() { return singletonList( - new PipelineAggregationSpec( - "test", - TestPipelineAggregationBuilder::new, - TestPipelineAggregator::new, - TestPipelineAggregationBuilder::fromXContent - ) + new PipelineAggregationSpec("test", TestPipelineAggregationBuilder::new, TestPipelineAggregationBuilder::fromXContent) ); } })); @@ -570,21 +563,10 @@ protected void validate(ValidationContext context) {} * Dummy test {@link PipelineAggregator} used to test registering aggregation builders. */ private static class TestPipelineAggregator extends PipelineAggregator { - /** - * Read from a stream. - */ - TestPipelineAggregator(StreamInput in) throws IOException { - super(in); - } - - @Override - public String getWriteableName() { - return "test"; + TestPipelineAggregator() { + super("test", new String[] {}, null); } - @Override - protected void doWriteTo(StreamOutput out) throws IOException {} - @Override public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { return null; diff --git a/server/src/test/java/org/opensearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/opensearch/search/aggregations/InternalAggregationsTests.java index bb28c0657ac8e..2d022be10fe29 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/InternalAggregationsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/InternalAggregationsTests.java @@ -43,12 +43,9 @@ import org.opensearch.search.aggregations.bucket.histogram.InternalDateHistogramTests; import org.opensearch.search.aggregations.bucket.terms.StringTerms; import org.opensearch.search.aggregations.bucket.terms.StringTermsTests; -import org.opensearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder; import org.opensearch.search.aggregations.pipeline.InternalSimpleValueTests; import org.opensearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; -import org.opensearch.search.aggregations.pipeline.SiblingPipelineAggregator; -import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.InternalAggregationTestCase; @@ -91,7 +88,6 @@ public void testNonFinalReduceTopLevelPipelineAggs() { ); List aggs = singletonList(InternalAggregations.from(Collections.singletonList(terms))); InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, maxBucketReduceContext().forPartialReduction()); - assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size()); assertEquals(1, reducedAggs.aggregations.size()); } @@ -116,7 +112,6 @@ public void testFinalReduceTopLevelPipelineAggs() { Collections.singletonList(aggs), maxBucketReduceContext().forFinalReduction() ); - assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size()); assertEquals(2, reducedAggs.aggregations.size()); } @@ -130,10 +125,6 @@ private InternalAggregation.ReduceContextBuilder maxBucketReduceContext() { } public static InternalAggregations createTestInstance() throws Exception { - return createTestInstance(randomPipelineTree()); - } - - public static InternalAggregations createTestInstance(PipelineAggregator.PipelineTree pipelineTree) throws Exception { List aggsList = new ArrayList<>(); if (randomBoolean()) { StringTermsTests stringTermsTests = new StringTermsTests(); @@ -150,23 +141,7 @@ public static InternalAggregations createTestInstance(PipelineAggregator.Pipelin InternalSimpleValueTests simpleValueTests = new InternalSimpleValueTests(); aggsList.add(simpleValueTests.createTestInstance()); } - return new InternalAggregations(aggsList, () -> pipelineTree); - } - - private static PipelineAggregator.PipelineTree randomPipelineTree() { - List topLevelPipelineAggs = new ArrayList<>(); - if (randomBoolean()) { - if (randomBoolean()) { - topLevelPipelineAggs.add((SiblingPipelineAggregator) new MaxBucketPipelineAggregationBuilder("name1", "bucket1").create()); - } - if (randomBoolean()) { - topLevelPipelineAggs.add((SiblingPipelineAggregator) new AvgBucketPipelineAggregationBuilder("name2", "bucket2").create()); - } - if (randomBoolean()) { - topLevelPipelineAggs.add((SiblingPipelineAggregator) new SumBucketPipelineAggregationBuilder("name3", "bucket3").create()); - } - } - return new PipelineAggregator.PipelineTree(emptyMap(), topLevelPipelineAggs); + return new InternalAggregations(aggsList); } public void testSerialization() throws Exception { diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificanceHeuristicTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificanceHeuristicTests.java index 9c7196db68e97..c39445fef88c5 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificanceHeuristicTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificanceHeuristicTests.java @@ -32,7 +32,6 @@ package org.opensearch.search.aggregations.bucket.terms; import org.apache.lucene.util.BytesRef; -import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.common.Strings; import org.opensearch.common.io.stream.InputStreamStreamInput; @@ -57,7 +56,6 @@ import org.opensearch.search.aggregations.bucket.terms.heuristic.MutualInformation; import org.opensearch.search.aggregations.bucket.terms.heuristic.PercentageScore; import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; -import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.InternalAggregationTestCase; @@ -95,9 +93,6 @@ public void testStreamResponse() throws Exception { ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); out.setVersion(version); - if (version.before(LegacyESVersion.V_7_8_0)) { - sigTerms.mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree.EMPTY); - } out.writeNamedWriteable(sigTerms); // read diff --git a/test/framework/src/main/java/org/opensearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/opensearch/test/InternalAggregationTestCase.java index 5325c48e16913..2cba15f2e2039 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalAggregationTestCase.java @@ -144,12 +144,10 @@ import org.opensearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder; import org.opensearch.search.aggregations.metrics.WeightedAvgAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder; import org.opensearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; import org.opensearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder; import org.opensearch.search.aggregations.pipeline.InternalBucketMetricValue; import org.opensearch.search.aggregations.pipeline.InternalSimpleValue; -import org.opensearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder; import org.opensearch.search.aggregations.pipeline.ParsedBucketMetricValue; import org.opensearch.search.aggregations.pipeline.ParsedDerivative; import org.opensearch.search.aggregations.pipeline.ParsedExtendedStatsBucket; @@ -160,7 +158,6 @@ import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; -import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; import org.opensearch.test.hamcrest.OpenSearchAssertions; import java.io.IOException; @@ -175,7 +172,6 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyList; -import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static org.opensearch.common.xcontent.XContentHelper.toXContent; import static org.opensearch.search.aggregations.InternalMultiBucketAggregation.countInnerBucket; @@ -482,60 +478,6 @@ public final void testFromXContentWithRandomFields() throws IOException { assertFromXContent(aggregation, parsedAggregation); } - public void testMergePipelineTreeForBWCSerialization() { - T agg = createTestInstance(); - PipelineAggregator.PipelineTree pipelineTree = randomPipelineTree(agg); - agg.mergePipelineTreeForBWCSerialization(pipelineTree); - assertMergedPipelineTreeForBWCSerialization(agg, pipelineTree); - } - - public void testMergePipelineTreeTwice() { - T agg = createTestInstance(); - PipelineAggregator.PipelineTree pipelineTree = randomPipelineTree(agg); - agg.mergePipelineTreeForBWCSerialization(pipelineTree); - agg.mergePipelineTreeForBWCSerialization(randomPipelineTree(agg)); // This should be ignored - assertMergedPipelineTreeForBWCSerialization(agg, pipelineTree); - } - - public static PipelineAggregator.PipelineTree randomPipelineTree(InternalAggregation aggregation) { - Map subTree = new HashMap<>(); - aggregation.forEachBucket(bucketAggs -> { - for (Aggregation subAgg : bucketAggs) { - if (subTree.containsKey(subAgg.getName())) { - continue; - } - subTree.put(subAgg.getName(), randomPipelineTree((InternalAggregation) subAgg)); - } - }); - return new PipelineAggregator.PipelineTree(emptyMap(), randomPipelineAggregators()); - } - - public static List randomPipelineAggregators() { - List pipelines = new ArrayList<>(); - if (randomBoolean()) { - if (randomBoolean()) { - pipelines.add(new MaxBucketPipelineAggregationBuilder("name1", "bucket1").create()); - } - if (randomBoolean()) { - pipelines.add(new AvgBucketPipelineAggregationBuilder("name2", "bucket2").create()); - } - if (randomBoolean()) { - pipelines.add(new SumBucketPipelineAggregationBuilder("name3", "bucket3").create()); - } - } - return pipelines; - } - - @SuppressWarnings("deprecation") - private void assertMergedPipelineTreeForBWCSerialization(InternalAggregation agg, PipelineAggregator.PipelineTree pipelineTree) { - assertThat(agg.pipelineAggregatorsForBwcSerialization(), equalTo(pipelineTree.aggregators())); - agg.forEachBucket(bucketAggs -> { - for (Aggregation subAgg : bucketAggs) { - assertMergedPipelineTreeForBWCSerialization((InternalAggregation) subAgg, pipelineTree.subTree(subAgg.getName())); - } - }); - } - protected abstract void assertFromXContent(T aggregation, ParsedAggregation parsedAggregation) throws IOException; @SuppressWarnings("unchecked")