From cc9e3afdad85c4d088a92dc5192849c583035d25 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 3 Sep 2019 12:41:44 +0100 Subject: [PATCH] [ML-DataFrame] Fix off-by-one error in checkpoint operations_behind (#46235) Fixes a problem where operations_behind would be one less than expected per shard in a new index matched by the data frame transform source pattern. For example, if a data frame transform had a source of foo* and a new index foo-new was created with 2 shards and 7 documents indexed in it then operations_behind would be 5 prior to this change. The problem was that an empty index has a global checkpoint number of -1 and the sequence number of the first document that is indexed into an index is 0, not 1. This doesn't matter for indices included in both the last and next checkpoints, as the off-by-one errors cancelled, but for a new index it affected the observed result. --- .../transforms/DataFrameTransformCheckpoint.java | 16 +++++++++------- .../DataFrameTransformCheckpointTests.java | 8 ++++---- .../checkpoint/DefaultCheckpointProvider.java | 7 +++---- ...ataFrameTransformsCheckpointServiceTests.java | 2 +- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java index 81a06eb4524d1..69877c4cbe700 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java @@ -279,28 +279,30 @@ public static long getBehind(DataFrameTransformCheckpoint oldCheckpoint, DataFra throw new IllegalArgumentException("old checkpoint is newer than new checkpoint"); } - // get the sum of of shard checkpoints + // get the sum of of shard operations (that are fully replicated), which is 1 higher than the global checkpoint for each shard // note: we require shard checkpoints to strictly increase and never decrease - long oldCheckPointSum = 0; - long newCheckPointSum = 0; + long oldCheckPointOperationsSum = 0; + long newCheckPointOperationsSum = 0; for (Entry entry : oldCheckpoint.indicesCheckpoints.entrySet()) { // ignore entries that aren't part of newCheckpoint, e.g. deleted indices if (newCheckpoint.indicesCheckpoints.containsKey(entry.getKey())) { - oldCheckPointSum += Arrays.stream(entry.getValue()).sum(); + // Add 1 per shard as sequence numbers start at 0, i.e. sequence number 0 means there has been 1 operation + oldCheckPointOperationsSum += Arrays.stream(entry.getValue()).sum() + entry.getValue().length; } } for (long[] v : newCheckpoint.indicesCheckpoints.values()) { - newCheckPointSum += Arrays.stream(v).sum(); + // Add 1 per shard as sequence numbers start at 0, i.e. sequence number 0 means there has been 1 operation + newCheckPointOperationsSum += Arrays.stream(v).sum() + v.length; } // this should not be possible - if (newCheckPointSum < oldCheckPointSum) { + if (newCheckPointOperationsSum < oldCheckPointOperationsSum) { return -1L; } - return newCheckPointSum - oldCheckPointSum; + return newCheckPointOperationsSum - oldCheckPointOperationsSum; } private static Map readCheckpoints(Map readMap) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java index 67cc4b91584c2..298b018ce453b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java @@ -106,7 +106,7 @@ public void testGetBehind() { List checkpoints2 = new ArrayList<>(); for (int j = 0; j < shards; ++j) { - long shardCheckpoint = randomLongBetween(0, 1_000_000); + long shardCheckpoint = randomLongBetween(-1, 1_000_000); checkpoints1.add(shardCheckpoint); checkpoints2.add(shardCheckpoint + 10); } @@ -152,11 +152,11 @@ public void testGetBehind() { assertEquals(-1L, DataFrameTransformCheckpoint.getBehind(checkpointOlderButNewerShardsCheckpoint, checkpointOld)); // test cases where indices sets do not match - // remove something from old, so newer has 1 index more than old + // remove something from old, so newer has 1 index more than old: should be equivalent to old index existing but empty checkpointsByIndexOld.remove(checkpointsByIndexOld.firstKey()); long behind = DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew); - assertTrue("Expected behind (" + behind + ") > sum of shard checkpoints (" + indices * shards * 10L + ")", - behind > indices * shards * 10L); + assertTrue("Expected behind (" + behind + ") => sum of shard checkpoint differences (" + indices * shards * 10L + ")", + behind >= indices * shards * 10L); // remove same key: old and new should have equal indices again checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey()); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DefaultCheckpointProvider.java index 23b1bdde12b4f..5464304d5b8e0 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DefaultCheckpointProvider.java @@ -183,8 +183,8 @@ static Map extractIndexCheckPoints(ShardStats[] shards, Set checkpoints = checkpointsByIndex.get(indexName); @@ -215,8 +215,7 @@ static Map extractIndexCheckPoints(ShardStats[] shards, Set expectedC // add broken seqNoStats if requested if (missingSeqNoStats && index.getName().equals(missingSeqNoStatsIndex)) { - checkpoints.add(0L); + checkpoints.add(-1L); } else { validSeqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); checkpoints.add(globalCheckpoint);