diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java
new file mode 100644
index 0000000000000..961d7c99a621a
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.terms;
+
+import org.apache.lucene.util.PriorityQueue;
+import org.elasticsearch.Version;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.search.aggregations.AggregationExecutionException;
+import org.elasticsearch.search.aggregations.BucketOrder;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
+import org.elasticsearch.search.aggregations.InternalOrder;
+import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.search.aggregations.InternalOrder.isKeyAsc;
+import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder;
+
+/**
+ * Base class for terms and multi_terms aggregation that handles common reduce logic
+ */
+public abstract class AbstractInternalTerms<
+ A extends AbstractInternalTerms,
+ B extends AbstractInternalTerms.AbstractTermsBucket
+ > extends InternalMultiBucketAggregation {
+
+ public AbstractInternalTerms(String name,
+ Map metadata) {
+ super(name, metadata);
+ }
+
+ protected AbstractInternalTerms(StreamInput in) throws IOException {
+
+ super(in);
+ }
+
+ public abstract static class AbstractTermsBucket extends InternalMultiBucketAggregation.InternalBucket {
+
+ protected abstract void updateDocCountError(long docCountErrorDiff);
+
+ protected abstract void setDocCountError(long docCountError);
+
+ protected abstract boolean getShowDocCountError();
+
+ protected abstract long getDocCountError();
+ }
+
+ /**
+ * Create an array to hold some buckets. Used in collecting the results.
+ */
+ protected abstract B[] createBucketsArray(int size);
+
+ /**
+ * Creates InternalTerms at the end of the merge
+ */
+ protected abstract A create(String name, List buckets, BucketOrder reduceOrder, long docCountError, long otherDocCount);
+
+ protected abstract int getShardSize();
+
+ protected abstract BucketOrder getReduceOrder();
+
+ protected abstract BucketOrder getOrder();
+
+ protected abstract long getSumOfOtherDocCounts();
+
+ protected abstract long getDocCountError();
+
+ protected abstract void setDocCountError(long docCountError);
+
+ protected abstract long getMinDocCount();
+
+ protected abstract int getRequiredSize();
+
+ abstract B createBucket(long docCount, InternalAggregations aggs, long docCountError, B prototype);
+
+ @Override
+ public B reduceBucket(List buckets, ReduceContext context) {
+ assert buckets.size() > 0;
+ long docCount = 0;
+ // For the per term doc count error we add up the errors from the
+ // shards that did not respond with the term. To do this we add up
+ // the errors from the shards that did respond with the terms and
+ // subtract that from the sum of the error from all shards
+ long docCountError = 0;
+ List aggregationsList = new ArrayList<>(buckets.size());
+ for (B bucket : buckets) {
+ docCount += bucket.getDocCount();
+ if (docCountError != -1) {
+ if (bucket.getShowDocCountError() == false || bucket.getDocCountError() == -1) {
+ docCountError = -1;
+ } else {
+ docCountError += bucket.getDocCountError();
+ }
+ }
+ aggregationsList.add((InternalAggregations) bucket.getAggregations());
+ }
+ InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
+ return createBucket(docCount, aggs, docCountError, buckets.get(0));
+ }
+
+ private BucketOrder getReduceOrder(List aggregations) {
+ BucketOrder thisReduceOrder = null;
+ for (InternalAggregation aggregation : aggregations) {
+ @SuppressWarnings("unchecked")
+ A terms = (A) aggregation;
+ if (terms.getBuckets().size() == 0) {
+ continue;
+ }
+ if (thisReduceOrder == null) {
+ thisReduceOrder = terms.getReduceOrder();
+ } else if (thisReduceOrder.equals(terms.getReduceOrder()) == false) {
+ return getOrder();
+ }
+ }
+ return thisReduceOrder != null ? thisReduceOrder : getOrder();
+ }
+
+ private long getDocCountError(A terms) {
+ int size = terms.getBuckets().size();
+ if (size == 0 || size < terms.getShardSize() || isKeyOrder(terms.getOrder())) {
+ return 0;
+ } else if (InternalOrder.isCountDesc(terms.getOrder())) {
+ if (terms.getDocCountError() > 0) {
+ // If there is an existing docCountError for this agg then
+ // use this as the error for this aggregation
+ return terms.getDocCountError();
+ } else {
+ // otherwise use the doc count of the last term in the
+ // aggregation
+ return terms.getBuckets().stream().mapToLong(AbstractTermsBucket::getDocCount).min().getAsLong();
+ }
+ } else {
+ return -1;
+ }
+ }
+
+ private List reduceMergeSort(List aggregations,
+ BucketOrder thisReduceOrder, InternalAggregation.ReduceContext reduceContext) {
+ assert isKeyOrder(thisReduceOrder);
+ final Comparator cmp = thisReduceOrder.comparator();
+ final PriorityQueue> pq = new PriorityQueue<>(aggregations.size()) {
+ @Override
+ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
+ return cmp.compare(a.current(), b.current()) < 0;
+ }
+ };
+ for (InternalAggregation aggregation : aggregations) {
+ @SuppressWarnings("unchecked")
+ A terms = (A) aggregation;
+ if (terms.getBuckets().isEmpty() == false) {
+ pq.add(new IteratorAndCurrent<>(terms.getBuckets().iterator()));
+ }
+ }
+ List reducedBuckets = new ArrayList<>();
+ // list of buckets coming from different shards that have the same key
+ List currentBuckets = new ArrayList<>();
+ B lastBucket = null;
+ while (pq.size() > 0) {
+ final IteratorAndCurrent top = pq.top();
+ assert lastBucket == null || cmp.compare(top.current(), lastBucket) >= 0;
+ if (lastBucket != null && cmp.compare(top.current(), lastBucket) != 0) {
+ // the key changes, reduce what we already buffered and reset the buffer for current buckets
+ final B reduced = reduceBucket(currentBuckets, reduceContext);
+ reducedBuckets.add(reduced);
+ currentBuckets.clear();
+ }
+ lastBucket = top.current();
+ currentBuckets.add(top.current());
+ if (top.hasNext()) {
+ top.next();
+ assert cmp.compare(top.current(), lastBucket) > 0 : "shards must return data sorted by key";
+ pq.updateTop();
+ } else {
+ pq.pop();
+ }
+ }
+
+ if (currentBuckets.isEmpty() == false) {
+ final B reduced = reduceBucket(currentBuckets, reduceContext);
+ reducedBuckets.add(reduced);
+ }
+ return reducedBuckets;
+ }
+
+ private List reduceLegacy(List aggregations, InternalAggregation.ReduceContext reduceContext) {
+ Map