diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java index 81a06eb4524d1..69877c4cbe700 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java @@ -279,28 +279,30 @@ public static long getBehind(DataFrameTransformCheckpoint oldCheckpoint, DataFra throw new IllegalArgumentException("old checkpoint is newer than new checkpoint"); } - // get the sum of of shard checkpoints + // get the sum of of shard operations (that are fully replicated), which is 1 higher than the global checkpoint for each shard // note: we require shard checkpoints to strictly increase and never decrease - long oldCheckPointSum = 0; - long newCheckPointSum = 0; + long oldCheckPointOperationsSum = 0; + long newCheckPointOperationsSum = 0; for (Entry entry : oldCheckpoint.indicesCheckpoints.entrySet()) { // ignore entries that aren't part of newCheckpoint, e.g. deleted indices if (newCheckpoint.indicesCheckpoints.containsKey(entry.getKey())) { - oldCheckPointSum += Arrays.stream(entry.getValue()).sum(); + // Add 1 per shard as sequence numbers start at 0, i.e. sequence number 0 means there has been 1 operation + oldCheckPointOperationsSum += Arrays.stream(entry.getValue()).sum() + entry.getValue().length; } } for (long[] v : newCheckpoint.indicesCheckpoints.values()) { - newCheckPointSum += Arrays.stream(v).sum(); + // Add 1 per shard as sequence numbers start at 0, i.e. sequence number 0 means there has been 1 operation + newCheckPointOperationsSum += Arrays.stream(v).sum() + v.length; } // this should not be possible - if (newCheckPointSum < oldCheckPointSum) { + if (newCheckPointOperationsSum < oldCheckPointOperationsSum) { return -1L; } - return newCheckPointSum - oldCheckPointSum; + return newCheckPointOperationsSum - oldCheckPointOperationsSum; } private static Map readCheckpoints(Map readMap) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java index 67cc4b91584c2..298b018ce453b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java @@ -106,7 +106,7 @@ public void testGetBehind() { List checkpoints2 = new ArrayList<>(); for (int j = 0; j < shards; ++j) { - long shardCheckpoint = randomLongBetween(0, 1_000_000); + long shardCheckpoint = randomLongBetween(-1, 1_000_000); checkpoints1.add(shardCheckpoint); checkpoints2.add(shardCheckpoint + 10); } @@ -152,11 +152,11 @@ public void testGetBehind() { assertEquals(-1L, DataFrameTransformCheckpoint.getBehind(checkpointOlderButNewerShardsCheckpoint, checkpointOld)); // test cases where indices sets do not match - // remove something from old, so newer has 1 index more than old + // remove something from old, so newer has 1 index more than old: should be equivalent to old index existing but empty checkpointsByIndexOld.remove(checkpointsByIndexOld.firstKey()); long behind = DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew); - assertTrue("Expected behind (" + behind + ") > sum of shard checkpoints (" + indices * shards * 10L + ")", - behind > indices * shards * 10L); + assertTrue("Expected behind (" + behind + ") => sum of shard checkpoint differences (" + indices * shards * 10L + ")", + behind >= indices * shards * 10L); // remove same key: old and new should have equal indices again checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey()); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DefaultCheckpointProvider.java index 23b1bdde12b4f..5464304d5b8e0 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DefaultCheckpointProvider.java @@ -183,8 +183,8 @@ static Map extractIndexCheckPoints(ShardStats[] shards, Set checkpoints = checkpointsByIndex.get(indexName); @@ -215,8 +215,7 @@ static Map extractIndexCheckPoints(ShardStats[] shards, Set expectedC // add broken seqNoStats if requested if (missingSeqNoStats && index.getName().equals(missingSeqNoStatsIndex)) { - checkpoints.add(0L); + checkpoints.add(-1L); } else { validSeqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); checkpoints.add(globalCheckpoint);