Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML-DataFrame] Fix off-by-one error in checkpoint operations_behind #46235

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,14 @@ public static long getBehind(DataFrameTransformCheckpoint oldCheckpoint, DataFra
}
}

for (long[] v : newCheckpoint.indicesCheckpoints.values()) {
newCheckPointSum += Arrays.stream(v).sum();
for (Entry<String, long[]> entry : newCheckpoint.indicesCheckpoints.entrySet()) {
newCheckPointSum += Arrays.stream(entry.getValue()).sum();
Copy link
Contributor Author

@droberts195 droberts195 Sep 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that for every brand new shard that has never had a document indexed into it this sum will include a value of -1. This is not new - the equivalent line in the old code did this too. Before this PR you could make operations_behind go negative by creating a new index with more shards than documents!

if (oldCheckpoint.indicesCheckpoints.containsKey(entry.getKey()) == false) {
// Sequence numbers start at 0 rather than 1, so if there's no corresponding value
// to subtract then operations behind needs to be the total number of operations on
// the new index and that needs to account for sequence number 0
newCheckPointSum += entry.getValue().length;
}
}

// this should not be possible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testGetBehind() {
List<Long> checkpoints2 = new ArrayList<>();

for (int j = 0; j < shards; ++j) {
long shardCheckpoint = randomLongBetween(0, 1_000_000);
long shardCheckpoint = randomLongBetween(-1, 1_000_000);
checkpoints1.add(shardCheckpoint);
checkpoints2.add(shardCheckpoint + 10);
}
Expand Down Expand Up @@ -152,11 +152,11 @@ public void testGetBehind() {
assertEquals(-1L, DataFrameTransformCheckpoint.getBehind(checkpointOlderButNewerShardsCheckpoint, checkpointOld));

// test cases where indices sets do not match
// remove something from old, so newer has 1 index more than old
// remove something from old, so newer has 1 index more than old: should be equivalent to old index existing but empty
checkpointsByIndexOld.remove(checkpointsByIndexOld.firstKey());
long behind = DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew);
assertTrue("Expected behind (" + behind + ") > sum of shard checkpoints (" + indices * shards * 10L + ")",
behind > indices * shards * 10L);
assertTrue("Expected behind (" + behind + ") => sum of shard checkpoint differences (" + indices * shards * 10L + ")",
behind >= indices * shards * 10L);

// remove same key: old and new should have equal indices again
checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<Stri
String indexName = shard.getShardRouting().getIndexName();

if (userIndices.contains(indexName)) {
// SeqNoStats could be `null`, assume the global checkpoint to be 0 in this case
long globalCheckpoint = shard.getSeqNoStats() == null ? 0 : shard.getSeqNoStats().getGlobalCheckpoint();
// SeqNoStats could be `null`, assume the global checkpoint to be -1 in this case
long globalCheckpoint = shard.getSeqNoStats() == null ? -1L : shard.getSeqNoStats().getGlobalCheckpoint();
if (checkpointsByIndex.containsKey(indexName)) {
// we have already seen this index, just check/add shards
TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(indexName);
Expand Down Expand Up @@ -215,8 +215,7 @@ static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<Stri

userIndicesClone.removeAll(checkpointsByIndex.keySet());
if (userIndicesClone.isEmpty() == false) {
logger.debug("Original set of user indices contained more indexes [{}]",
userIndicesClone);
logger.debug("Original set of user indices contained more indexes [{}]", userIndicesClone);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private static ShardStats[] createRandomShardStats(Map<String, long[]> expectedC

// add broken seqNoStats if requested
if (missingSeqNoStats && index.getName().equals(missingSeqNoStatsIndex)) {
checkpoints.add(0L);
checkpoints.add(-1L);
} else {
validSeqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
checkpoints.add(globalCheckpoint);
Expand Down