Skip to content

Commit

Permalink
[ML-DataFrame] Fix off-by-one error in checkpoint operations_behind (#…
Browse files Browse the repository at this point in the history
…46235)

Fixes a problem where operations_behind would be one less than
expected per shard in a new index matched by the data frame
transform source pattern.

For example, if a data frame transform had a source of foo*
and a new index foo-new was created with 2 shards and 7 documents
indexed in it then operations_behind would be 5 prior to this
change.

The problem was that an empty index has a global checkpoint
number of -1 and the sequence number of the first document that
is indexed into an index is 0, not 1.  This doesn't matter for
indices included in both the last and next checkpoints, as the
off-by-one errors cancelled, but for a new index it affected
the observed result.
  • Loading branch information
droberts195 authored Sep 3, 2019
1 parent daffcf1 commit 4d67dae
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,28 +279,30 @@ public static long getBehind(DataFrameTransformCheckpoint oldCheckpoint, DataFra
throw new IllegalArgumentException("old checkpoint is newer than new checkpoint");
}

// get the sum of of shard checkpoints
// get the sum of of shard operations (that are fully replicated), which is 1 higher than the global checkpoint for each shard
// note: we require shard checkpoints to strictly increase and never decrease
long oldCheckPointSum = 0;
long newCheckPointSum = 0;
long oldCheckPointOperationsSum = 0;
long newCheckPointOperationsSum = 0;

for (Entry<String, long[]> entry : oldCheckpoint.indicesCheckpoints.entrySet()) {
// ignore entries that aren't part of newCheckpoint, e.g. deleted indices
if (newCheckpoint.indicesCheckpoints.containsKey(entry.getKey())) {
oldCheckPointSum += Arrays.stream(entry.getValue()).sum();
// Add 1 per shard as sequence numbers start at 0, i.e. sequence number 0 means there has been 1 operation
oldCheckPointOperationsSum += Arrays.stream(entry.getValue()).sum() + entry.getValue().length;
}
}

for (long[] v : newCheckpoint.indicesCheckpoints.values()) {
newCheckPointSum += Arrays.stream(v).sum();
// Add 1 per shard as sequence numbers start at 0, i.e. sequence number 0 means there has been 1 operation
newCheckPointOperationsSum += Arrays.stream(v).sum() + v.length;
}

// this should not be possible
if (newCheckPointSum < oldCheckPointSum) {
if (newCheckPointOperationsSum < oldCheckPointOperationsSum) {
return -1L;
}

return newCheckPointSum - oldCheckPointSum;
return newCheckPointOperationsSum - oldCheckPointOperationsSum;
}

private static Map<String, long[]> readCheckpoints(Map<String, Object> readMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testGetBehind() {
List<Long> checkpoints2 = new ArrayList<>();

for (int j = 0; j < shards; ++j) {
long shardCheckpoint = randomLongBetween(0, 1_000_000);
long shardCheckpoint = randomLongBetween(-1, 1_000_000);
checkpoints1.add(shardCheckpoint);
checkpoints2.add(shardCheckpoint + 10);
}
Expand Down Expand Up @@ -152,11 +152,11 @@ public void testGetBehind() {
assertEquals(-1L, DataFrameTransformCheckpoint.getBehind(checkpointOlderButNewerShardsCheckpoint, checkpointOld));

// test cases where indices sets do not match
// remove something from old, so newer has 1 index more than old
// remove something from old, so newer has 1 index more than old: should be equivalent to old index existing but empty
checkpointsByIndexOld.remove(checkpointsByIndexOld.firstKey());
long behind = DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew);
assertTrue("Expected behind (" + behind + ") > sum of shard checkpoints (" + indices * shards * 10L + ")",
behind > indices * shards * 10L);
assertTrue("Expected behind (" + behind + ") => sum of shard checkpoint differences (" + indices * shards * 10L + ")",
behind >= indices * shards * 10L);

// remove same key: old and new should have equal indices again
checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<Stri
String indexName = shard.getShardRouting().getIndexName();

if (userIndices.contains(indexName)) {
// SeqNoStats could be `null`, assume the global checkpoint to be 0 in this case
long globalCheckpoint = shard.getSeqNoStats() == null ? 0 : shard.getSeqNoStats().getGlobalCheckpoint();
// SeqNoStats could be `null`, assume the global checkpoint to be -1 in this case
long globalCheckpoint = shard.getSeqNoStats() == null ? -1L : shard.getSeqNoStats().getGlobalCheckpoint();
if (checkpointsByIndex.containsKey(indexName)) {
// we have already seen this index, just check/add shards
TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(indexName);
Expand Down Expand Up @@ -215,8 +215,7 @@ static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<Stri

userIndicesClone.removeAll(checkpointsByIndex.keySet());
if (userIndicesClone.isEmpty() == false) {
logger.debug("Original set of user indices contained more indexes [{}]",
userIndicesClone);
logger.debug("Original set of user indices contained more indexes [{}]", userIndicesClone);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private static ShardStats[] createRandomShardStats(Map<String, long[]> expectedC

// add broken seqNoStats if requested
if (missingSeqNoStats && index.getName().equals(missingSeqNoStatsIndex)) {
checkpoints.add(0L);
checkpoints.add(-1L);
} else {
validSeqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
checkpoints.add(globalCheckpoint);
Expand Down

0 comments on commit 4d67dae

Please sign in to comment.