diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/DateHistogramGroupSourceTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/DateHistogramGroupSourceTests.java
index 3e119b79afb97..6464cea4cf024 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/DateHistogramGroupSourceTests.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/DateHistogramGroupSourceTests.java
@@ -74,19 +74,18 @@ public static DateHistogramGroupSource randomDateHistogramGroupSource() {
             dateHistogramGroupSource = new DateHistogramGroupSource(
                 field,
                 scriptConfig,
-                new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue()))
+                new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue())),
+                randomBoolean() ? randomZone() : null
             );
         } else {
             dateHistogramGroupSource = new DateHistogramGroupSource(
                 field,
                 scriptConfig,
-                new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w")))
+                new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))),
+                randomBoolean() ? randomZone() : null
             );
         }
 
-        if (randomBoolean()) {
-            dateHistogramGroupSource.setTimeZone(randomZone());
-        }
         return dateHistogramGroupSource;
     }
 
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SyncConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SyncConfig.java
index 44452426d40c3..342c53e4bcbc6 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SyncConfig.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SyncConfig.java
@@ -19,6 +19,8 @@ public interface SyncConfig extends ToXContentObject, NamedWriteable {
      */
     boolean isValid();
 
+    String getField();
+
     QueryBuilder getRangeQuery(TransformCheckpoint newCheckpoint);
 
     QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint);
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TimeSyncConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TimeSyncConfig.java
index d659a1f3905a0..a54507dc66178 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TimeSyncConfig.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TimeSyncConfig.java
@@ -25,7 +25,7 @@
 import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
 import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
 
-public class TimeSyncConfig  implements SyncConfig {
+public class TimeSyncConfig implements SyncConfig {
 
     public static final TimeValue DEFAULT_DELAY = TimeValue.timeValueSeconds(60);
     private static final String NAME = "data_frame_transform_pivot_sync_time";
@@ -37,17 +37,18 @@ public class TimeSyncConfig  implements SyncConfig {
     private static final ConstructingObjectParser<TimeSyncConfig, Void> LENIENT_PARSER = createParser(true);
 
     private static ConstructingObjectParser<TimeSyncConfig, Void> createParser(boolean lenient) {
-        ConstructingObjectParser<TimeSyncConfig, Void> parser = new ConstructingObjectParser<>(NAME, lenient,
-            args -> {
-                String field = (String) args[0];
-                TimeValue delay = (TimeValue) args[1];
-                return new TimeSyncConfig(field, delay);
-            });
+        ConstructingObjectParser<TimeSyncConfig, Void> parser = new ConstructingObjectParser<>(NAME, lenient, args -> {
+            String field = (String) args[0];
+            TimeValue delay = (TimeValue) args[1];
+            return new TimeSyncConfig(field, delay);
+        });
         parser.declareString(constructorArg(), TransformField.FIELD);
-        parser.declareField(optionalConstructorArg(),
+        parser.declareField(
+            optionalConstructorArg(),
             (p, c) -> TimeValue.parseTimeValue(p.text(), DEFAULT_DELAY, TransformField.DELAY.getPreferredName()),
             TransformField.DELAY,
-            ObjectParser.ValueType.STRING);
+            ObjectParser.ValueType.STRING
+        );
         return parser;
     }
 
@@ -65,6 +66,7 @@ public TimeSyncConfig(StreamInput in) throws IOException {
         this.delay = in.readTimeValue();
     }
 
+    @Override
     public String getField() {
         return field;
     }
@@ -105,12 +107,11 @@ public boolean equals(Object other) {
 
         final TimeSyncConfig that = (TimeSyncConfig) other;
 
-        return Objects.equals(this.field, that.field)
-                && Objects.equals(this.delay, that.delay);
+        return Objects.equals(this.field, that.field) && Objects.equals(this.delay, that.delay);
     }
 
     @Override
-    public int hashCode(){
+    public int hashCode() {
         return Objects.hash(field, delay);
     }
 
@@ -139,7 +140,8 @@ public QueryBuilder getRangeQuery(TransformCheckpoint newCheckpoint) {
 
     @Override
     public QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint) {
-        return new RangeQueryBuilder(field).gte(oldCheckpoint.getTimeUpperBound()).lt(newCheckpoint.getTimeUpperBound())
-                .format("epoch_millis");
+        return new RangeQueryBuilder(field).gte(oldCheckpoint.getTimeUpperBound())
+            .lt(newCheckpoint.getTimeUpperBound())
+            .format("epoch_millis");
     }
 }
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java
index 6509546a2da5a..f61497e31d3e6 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java
@@ -7,15 +7,18 @@
 
 import org.elasticsearch.Version;
 import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.Rounding;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 import org.elasticsearch.common.xcontent.ObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentFragment;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.RangeQueryBuilder;
 import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
 
@@ -105,6 +108,11 @@ public boolean equals(Object other) {
         public int hashCode() {
             return Objects.hash(interval);
         }
+
+        @Override
+        public String toString() {
+            return interval.toString();
+        }
     }
 
     public static class CalendarInterval implements Interval {
@@ -169,6 +177,11 @@ public boolean equals(Object other) {
         public int hashCode() {
             return Objects.hash(interval);
         }
+
+        @Override
+        public String toString() {
+            return interval.toString();
+        }
     }
 
     private Interval readInterval(StreamInput in) throws IOException {
@@ -195,11 +208,26 @@ private void writeInterval(Interval interval, StreamOutput out) throws IOExcepti
     private static final ConstructingObjectParser<DateHistogramGroupSource, Void> LENIENT_PARSER = createParser(true);
 
     private final Interval interval;
-    private ZoneId timeZone;
+    private final ZoneId timeZone;
+    private Rounding rounding;
 
-    public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval) {
+    public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval, ZoneId timeZone) {
         super(field, scriptConfig);
         this.interval = interval;
+        this.timeZone = timeZone;
+
+        Rounding.DateTimeUnit timeUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(interval.toString());
+        final Rounding.Builder roundingBuilder;
+        if (timeUnit != null) {
+            roundingBuilder = new Rounding.Builder(timeUnit);
+        } else {
+            roundingBuilder = new Rounding.Builder(TimeValue.parseTimeValue(interval.toString(), interval.getName()));
+        }
+
+        if (timeZone != null) {
+            roundingBuilder.timeZone(timeZone);
+        }
+        this.rounding = roundingBuilder.build();
     }
 
     public DateHistogramGroupSource(StreamInput in) throws IOException {
@@ -218,6 +246,7 @@ private static ConstructingObjectParser<DateHistogramGroupSource, Void> createPa
             ScriptConfig scriptConfig = (ScriptConfig) args[1];
             String fixedInterval = (String) args[2];
             String calendarInterval = (String) args[3];
+            ZoneId zoneId = (ZoneId) args[4];
 
             Interval interval = null;
 
@@ -231,7 +260,7 @@ private static ConstructingObjectParser<DateHistogramGroupSource, Void> createPa
                 throw new IllegalArgumentException("You must specify either fixed_interval or calendar_interval, found none");
             }
 
-            return new DateHistogramGroupSource(field, scriptConfig, interval);
+            return new DateHistogramGroupSource(field, scriptConfig, interval, zoneId);
         });
 
         declareValuesSourceFields(parser, lenient);
@@ -239,7 +268,7 @@ private static ConstructingObjectParser<DateHistogramGroupSource, Void> createPa
         parser.declareString(optionalConstructorArg(), new ParseField(FixedInterval.NAME));
         parser.declareString(optionalConstructorArg(), new ParseField(CalendarInterval.NAME));
 
-        parser.declareField(DateHistogramGroupSource::setTimeZone, p -> {
+        parser.declareField(optionalConstructorArg(), p -> {
             if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
                 return ZoneId.of(p.text());
             } else {
@@ -267,8 +296,8 @@ public ZoneId getTimeZone() {
         return timeZone;
     }
 
-    public void setTimeZone(ZoneId timeZone) {
-        this.timeZone = timeZone;
+    public Rounding getRounding() {
+        return rounding;
     }
 
     @Override
@@ -315,9 +344,16 @@ public int hashCode() {
     }
 
     @Override
-    public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets) {
-        // no need for an extra range filter as this is already done by checkpoints
-        return null;
+    public QueryBuilder getIncrementalBucketUpdateFilterQuery(
+        Set<String> changedBuckets,
+        String synchronizationField,
+        long synchronizationTimestamp
+    ) {
+        if (synchronizationField != null && synchronizationField.equals(field) && synchronizationTimestamp > 0) {
+            return new RangeQueryBuilder(field).gte(rounding.round(synchronizationTimestamp)).format("epoch_millis");
+        } else {
+            return null;
+        }
     }
 
     @Override
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java
index 0eba51cb51293..4f62faa7cb09b 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java
@@ -101,7 +101,11 @@ public int hashCode() {
     }
 
     @Override
-    public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets) {
+    public QueryBuilder getIncrementalBucketUpdateFilterQuery(
+        Set<String> changedBuckets,
+        String synchronizationField,
+        long synchronizationTimestamp
+    ) {
         // histograms are simple and cheap, so we skip this optimization
         return null;
     }
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java
index bd215126119d5..569bf08d8209a 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java
@@ -116,7 +116,11 @@ public void writeTo(StreamOutput out) throws IOException {
 
     public abstract boolean supportsIncrementalBucketUpdate();
 
-    public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets);
+    public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery(
+        Set<String> changedBuckets,
+        String synchronizationField,
+        long synchronizationTimestamp
+    );
 
     public String getField() {
         return field;
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java
index e0d7c8a4f9c57..17a173836cb95 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java
@@ -54,8 +54,15 @@ public static TermsGroupSource fromXContent(final XContentParser parser, boolean
     }
 
     @Override
-    public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets) {
-        return new TermsQueryBuilder(field, changedBuckets);
+    public QueryBuilder getIncrementalBucketUpdateFilterQuery(
+        Set<String> changedBuckets,
+        String synchronizationField,
+        long synchronizationTimestamp
+    ) {
+        if (changedBuckets != null && changedBuckets.isEmpty() == false) {
+            return new TermsQueryBuilder(field, changedBuckets);
+        }
+        return null;
     }
 
     @Override
diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java
index 59a7d1861d329..56886bf5bd972 100644
--- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java
+++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java
@@ -21,9 +21,9 @@
 import java.util.Collections;
 import java.util.Map;
 
-import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig;
 import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig;
 import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig;
+import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig;
 import static org.hamcrest.Matchers.equalTo;
 
 public class TransformConfigUpdateTests extends AbstractSerializingTransformTestCase<TransformConfigUpdate> {
@@ -184,6 +184,11 @@ public String getWriteableName() {
             return "foo";
         }
 
+        @Override
+        public String getField() {
+            return "foo";
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {}
 
diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java
index 6ce9a9373c750..265f2a9446828 100644
--- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java
+++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java
@@ -10,11 +10,17 @@
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
+import org.elasticsearch.common.time.DateFormatter;
+import org.elasticsearch.common.time.DateFormatters;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
 import org.elasticsearch.test.AbstractSerializingTestCase;
 
 import java.io.IOException;
+import java.time.ZoneOffset;
+import java.time.temporal.TemporalAccessor;
+
+import static org.hamcrest.Matchers.equalTo;
 
 public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase<DateHistogramGroupSource> {
 
@@ -26,19 +32,20 @@ public static DateHistogramGroupSource randomDateHistogramGroupSource() {
             dateHistogramGroupSource = new DateHistogramGroupSource(
                 field,
                 scriptConfig,
-                new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue()))
+                new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomTimeValue(1, 100, "d", "h", "ms", "s", "m"))),
+                randomBoolean() ? randomZone() : null
             );
         } else {
             dateHistogramGroupSource = new DateHistogramGroupSource(
                 field,
                 scriptConfig,
-                new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w")))
+                new DateHistogramGroupSource.CalendarInterval(
+                    new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w", "M", "q", "y"))
+                ),
+                randomBoolean() ? randomZone() : null
             );
         }
 
-        if (randomBoolean()) {
-            dateHistogramGroupSource.setTimeZone(randomZone());
-        }
         return dateHistogramGroupSource;
     }
 
@@ -70,4 +77,56 @@ protected Reader<DateHistogramGroupSource> instanceReader() {
         return DateHistogramGroupSource::new;
     }
 
+    public void testRoundingDateHistogramFixedInterval() {
+        String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
+        DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource(
+            field,
+            null,
+            new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval("1d")),
+            null
+        );
+
+        // not meant to be complete rounding tests, see {@link RoundingTests} for more
+        assertNotNull(dateHistogramGroupSource.getRounding());
+
+        assertThat(
+            dateHistogramGroupSource.getRounding().round(time("2020-03-26T23:59:59.000Z")),
+            equalTo(time("2020-03-26T00:00:00.000Z"))
+        );
+        assertThat(
+            dateHistogramGroupSource.getRounding().round(time("2020-03-26T00:00:01.000Z")),
+            equalTo(time("2020-03-26T00:00:00.000Z"))
+        );
+    }
+
+    public void testRoundingDateHistogramCalendarInterval() {
+        String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
+        DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource(
+            field,
+            null,
+            new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval("1w")),
+            null
+        );
+
+        // not meant to be complete rounding tests, see {@link RoundingTests} for more
+        assertNotNull(dateHistogramGroupSource.getRounding());
+
+        assertThat(
+            dateHistogramGroupSource.getRounding().round(time("2020-03-26T23:59:59.000Z")),
+            equalTo(time("2020-03-23T00:00:00.000Z"))
+        );
+        assertThat(
+            dateHistogramGroupSource.getRounding().round(time("2020-03-29T23:59:59.000Z")),
+            equalTo(time("2020-03-23T00:00:00.000Z"))
+        );
+        assertThat(
+            dateHistogramGroupSource.getRounding().round(time("2020-03-23T00:00:01.000Z")),
+            equalTo(time("2020-03-23T00:00:00.000Z"))
+        );
+    }
+
+    private static long time(String time) {
+        TemporalAccessor accessor = DateFormatter.forPattern("date_optional_time").withZone(ZoneOffset.UTC).parse(time);
+        return DateFormatters.from(accessor).toInstant().toEpochMilli();
+    }
 }
diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java
index 60afee087fb0c..80b9c9ca1ddba 100644
--- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java
+++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java
@@ -64,15 +64,11 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
      * which query filters to run and which index requests to send
      */
     private enum RunState {
-        // do a complete query/index, this is used for batch transforms and for bootstrapping (1st run)
-        FULL_RUN,
+        // apply bucket results
+        APPLY_BUCKET_RESULTS,
 
-        // Partial run modes in 2 stages:
-        // identify buckets that have changed
-        PARTIAL_RUN_IDENTIFY_CHANGES,
-
-        // recalculate buckets based on the update list
-        PARTIAL_RUN_APPLY_CHANGES
+        // identify buckets that have changed, used for continuous if terms is used in group_by
+        IDENTIFY_CHANGES,
     }
 
     public static final int MINIMUM_PAGE_SIZE = 10;
@@ -112,7 +108,7 @@ private enum RunState {
     private volatile RunState runState;
 
     // hold information for continuous mode (partial updates)
-    private volatile Map<String, Set<String>> changedBuckets;
+    private volatile Map<String, Set<String>> changedBuckets = Collections.emptyMap();
     private volatile Map<String, Object> changedBucketsAfterKey;
 
     private volatile long lastCheckpointCleanup = 0L;
@@ -146,7 +142,7 @@ public TransformIndexer(
         this.context = ExceptionsHelper.requireNonNull(context, "context");
 
         // give runState a default
-        this.runState = RunState.FULL_RUN;
+        this.runState = RunState.APPLY_BUCKET_RESULTS;
     }
 
     public int getPageSize() {
@@ -342,7 +338,7 @@ protected void onFinish(ActionListener<Void> listener) {
             // reset the page size, so we do not memorize a low page size forever
             pageSize = pivot.getInitialPageSize();
             // reset the changed bucket to free memory
-            changedBuckets = null;
+            changedBuckets = Collections.emptyMap();
 
             long checkpoint = context.getAndIncrementCheckpoint();
             lastCheckpoint = getNextCheckpoint();
@@ -414,11 +410,9 @@ protected IterationResult<TransformIndexerPosition> doProcess(SearchResponse sea
         final CompositeAggregation agg = aggregations.get(COMPOSITE_AGGREGATION_NAME);
 
         switch (runState) {
-            case FULL_RUN:
+            case APPLY_BUCKET_RESULTS:
                 return processBuckets(agg);
-            case PARTIAL_RUN_APPLY_CHANGES:
-                return processPartialBucketUpdates(agg);
-            case PARTIAL_RUN_IDENTIFY_CHANGES:
+            case IDENTIFY_CHANGES:
                 return processChangedBuckets(agg);
 
             default:
@@ -582,7 +576,19 @@ private void sourceHasChanged(ActionListener<Boolean> hasChangedListener) {
     private IterationResult<TransformIndexerPosition> processBuckets(final CompositeAggregation agg) {
         // we reached the end
         if (agg.getBuckets().isEmpty()) {
-            return new IterationResult<>(Collections.emptyList(), null, true);
+            if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false || pivot.supportsIncrementalBucketUpdate() == false) {
+                return new IterationResult<>(Collections.emptyList(), null, true);
+            }
+
+            // cleanup changed Buckets
+            changedBuckets = Collections.emptyMap();
+
+            // reset the runState to fetch changed buckets
+            runState = RunState.IDENTIFY_CHANGES;
+
+            // advance the cursor for changed bucket detection
+            return new IterationResult<>(Collections.emptyList(), new TransformIndexerPosition(null, changedBucketsAfterKey), false);
+
         }
 
         long docsBeforeProcess = getStats().getNumDocuments();
@@ -608,21 +614,6 @@ private IterationResult<TransformIndexerPosition> processBuckets(final Composite
         return result;
     }
 
-    private IterationResult<TransformIndexerPosition> processPartialBucketUpdates(final CompositeAggregation agg) {
-        // we reached the end
-        if (agg.getBuckets().isEmpty()) {
-            // cleanup changed Buckets
-            changedBuckets = null;
-
-            // reset the runState to fetch changed buckets
-            runState = RunState.PARTIAL_RUN_IDENTIFY_CHANGES;
-            // advance the cursor for changed bucket detection
-            return new IterationResult<>(Collections.emptyList(), new TransformIndexerPosition(null, changedBucketsAfterKey), false);
-        }
-
-        return processBuckets(agg);
-    }
-
     private IterationResult<TransformIndexerPosition> processChangedBuckets(final CompositeAggregation agg) {
         // initialize the map of changed buckets, the map might be empty if source do not require/implement
         // changed bucket detection
@@ -631,7 +622,7 @@ private IterationResult<TransformIndexerPosition> processChangedBuckets(final Co
         // reached the end?
         if (agg.getBuckets().isEmpty()) {
             // reset everything and return the end marker
-            changedBuckets = null;
+            changedBuckets = Collections.emptyMap();
             changedBucketsAfterKey = null;
             return new IterationResult<>(Collections.emptyList(), null, true);
         }
@@ -644,7 +635,7 @@ private IterationResult<TransformIndexerPosition> processChangedBuckets(final Co
         changedBucketsAfterKey = agg.afterKey();
 
         // reset the runState to fetch the partial updates next
-        runState = RunState.PARTIAL_RUN_APPLY_CHANGES;
+        runState = RunState.APPLY_BUCKET_RESULTS;
 
         return new IterationResult<>(Collections.emptyList(), getPosition(), false);
     }
@@ -721,15 +712,12 @@ protected SearchRequest buildSearchRequest() {
         SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0);
 
         switch (runState) {
-            case FULL_RUN:
-                buildFullRunQuery(sourceBuilder);
+            case APPLY_BUCKET_RESULTS:
+                buildUpdateQuery(sourceBuilder);
                 break;
-            case PARTIAL_RUN_IDENTIFY_CHANGES:
+            case IDENTIFY_CHANGES:
                 buildChangedBucketsQuery(sourceBuilder);
                 break;
-            case PARTIAL_RUN_APPLY_CHANGES:
-                buildPartialUpdateQuery(sourceBuilder);
-                break;
             default:
                 // Any other state is a bug, should not happen
                 logger.warn("Encountered unexpected run state [" + runState + "]");
@@ -740,26 +728,6 @@ protected SearchRequest buildSearchRequest() {
         return searchRequest;
     }
 
-    private SearchSourceBuilder buildFullRunQuery(SearchSourceBuilder sourceBuilder) {
-        TransformIndexerPosition position = getPosition();
-
-        sourceBuilder.aggregation(pivot.buildAggregation(position != null ? position.getIndexerPosition() : null, pageSize));
-        TransformConfig config = getConfig();
-
-        QueryBuilder pivotQueryBuilder = config.getSource().getQueryConfig().getQuery();
-        if (isContinuous()) {
-            BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(pivotQueryBuilder)
-                .filter(config.getSyncConfig().getRangeQuery(nextCheckpoint));
-            sourceBuilder.query(filteredQuery);
-        } else {
-            sourceBuilder.query(pivotQueryBuilder);
-        }
-
-        logger.trace("running full run query: {}", sourceBuilder);
-
-        return sourceBuilder;
-    }
-
     private SearchSourceBuilder buildChangedBucketsQuery(SearchSourceBuilder sourceBuilder) {
         assert isContinuous();
 
@@ -781,9 +749,7 @@ private SearchSourceBuilder buildChangedBucketsQuery(SearchSourceBuilder sourceB
         return sourceBuilder;
     }
 
-    private SearchSourceBuilder buildPartialUpdateQuery(SearchSourceBuilder sourceBuilder) {
-        assert isContinuous();
-
+    private SearchSourceBuilder buildUpdateQuery(SearchSourceBuilder sourceBuilder) {
         TransformIndexerPosition position = getPosition();
 
         sourceBuilder.aggregation(pivot.buildAggregation(position != null ? position.getIndexerPosition() : null, pageSize));
@@ -791,18 +757,28 @@ private SearchSourceBuilder buildPartialUpdateQuery(SearchSourceBuilder sourceBu
 
         QueryBuilder pivotQueryBuilder = config.getSource().getQueryConfig().getQuery();
 
+        // if its either the 1st run or not continuous, do not apply extra filters
+        if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) {
+            sourceBuilder.query(pivotQueryBuilder);
+            logger.trace("running query: {}", sourceBuilder);
+
+            return sourceBuilder;
+        }
+
         BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(pivotQueryBuilder)
             .filter(config.getSyncConfig().getRangeQuery(nextCheckpoint));
 
-        if (changedBuckets != null && changedBuckets.isEmpty() == false) {
-            QueryBuilder pivotFilter = pivot.filterBuckets(changedBuckets);
-            if (pivotFilter != null) {
-                filteredQuery.filter(pivotFilter);
-            }
+        QueryBuilder pivotFilter = pivot.filterBuckets(
+            changedBuckets,
+            config.getSyncConfig().getField(),
+            lastCheckpoint.getTimeUpperBound()
+        );
+        if (pivotFilter != null) {
+            filteredQuery.filter(pivotFilter);
         }
 
         sourceBuilder.query(filteredQuery);
-        logger.trace("running partial update query: {}", sourceBuilder);
+        logger.trace("running query: {}", sourceBuilder);
 
         return sourceBuilder;
     }
@@ -903,16 +879,16 @@ protected boolean shouldAuditOnFinish(long completedCheckpoint) {
     private RunState determineRunStateAtStart() {
         // either 1st run or not a continuous transform
         if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) {
-            return RunState.FULL_RUN;
+            return RunState.APPLY_BUCKET_RESULTS;
         }
 
-        // if incremental update is not supported, do a full run
+        // if incremental update is not supported, do a normal run
         if (pivot.supportsIncrementalBucketUpdate() == false) {
-            return RunState.FULL_RUN;
+            return RunState.APPLY_BUCKET_RESULTS;
         }
 
         // continuous mode: we need to get the changed buckets first
-        return RunState.PARTIAL_RUN_IDENTIFY_CHANGES;
+        return RunState.IDENTIFY_CHANGES;
     }
 
     /**
diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java
index 47259ee8c8142..16bd7d9f6a935 100644
--- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java
+++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java
@@ -8,6 +8,7 @@
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.search.SearchAction;
@@ -174,7 +175,6 @@ public Stream<Map<String, Object>> extractResults(
         Map<String, String> fieldTypeMap,
         TransformIndexerStats transformIndexerStats
     ) {
-
         GroupConfig groups = config.getGroupConfig();
         Collection<AggregationBuilder> aggregationBuilders = config.getAggregationConfig().getAggregatorFactories();
         Collection<PipelineAggregationBuilder> pipelineAggregationBuilders = config.getAggregationConfig().getPipelineAggregatorFactories();
@@ -189,44 +189,31 @@ public Stream<Map<String, Object>> extractResults(
         );
     }
 
-    public QueryBuilder filterBuckets(Map<String, Set<String>> changedBuckets) {
-
-        if (changedBuckets == null || changedBuckets.isEmpty()) {
-            return null;
-        }
+    public QueryBuilder filterBuckets(
+        Map<String, Set<String>> changedBuckets,
+        String synchronizationField,
+        long lastSynchronizationCheckpoint
+    ) {
+        assert changedBuckets != null;
 
         if (config.getGroupConfig().getGroups().size() == 1) {
             Entry<String, SingleGroupSource> entry = config.getGroupConfig().getGroups().entrySet().iterator().next();
-            // it should not be possible to get into this code path
-            assert (entry.getValue().supportsIncrementalBucketUpdate());
-
-            logger.trace("filter by bucket: " + entry.getKey() + "/" + entry.getValue().getField());
-            if (changedBuckets.containsKey(entry.getKey())) {
-                return entry.getValue().getIncrementalBucketUpdateFilterQuery(changedBuckets.get(entry.getKey()));
-            } else {
-                // should never happen
-                throw new RuntimeException("Could not find bucket value for key " + entry.getKey());
-            }
+            logger.trace(() -> new ParameterizedMessage("filter by bucket: {}/{}", entry.getKey(), entry.getValue().getField()));
+            Set<String> changedBucketsByGroup = changedBuckets.get(entry.getKey());
+            return entry.getValue()
+                .getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationField, lastSynchronizationCheckpoint);
         }
 
         // else: more than 1 group by, need to nest it
         BoolQueryBuilder filteredQuery = new BoolQueryBuilder();
         for (Entry<String, SingleGroupSource> entry : config.getGroupConfig().getGroups().entrySet()) {
-            if (entry.getValue().supportsIncrementalBucketUpdate() == false) {
-                continue;
-            }
-
-            if (changedBuckets.containsKey(entry.getKey())) {
-                QueryBuilder sourceQueryFilter = entry.getValue().getIncrementalBucketUpdateFilterQuery(changedBuckets.get(entry.getKey()));
-                // the source might not define an filter optimization
-                if (sourceQueryFilter != null) {
-                    filteredQuery.filter(sourceQueryFilter);
-                }
-            } else {
-                // should never happen
-                throw new RuntimeException("Could not find bucket value for key " + entry.getKey());
+            Set<String> changedBucketsByGroup = changedBuckets.get(entry.getKey());
+            QueryBuilder sourceQueryFilter = entry.getValue()
+                .getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationField, lastSynchronizationCheckpoint);
+            // the source might not define a filter optimization
+            if (sourceQueryFilter != null) {
+                filteredQuery.filter(sourceQueryFilter);
             }
-
         }
 
         return filteredQuery;