Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep commits and translog up to the global checkpoint #27606

Merged
merged 32 commits into from
Dec 13, 2017

Conversation

dnhatn
Copy link
Member

@dnhatn dnhatn commented Nov 30, 2017

I have to include both index commit and translog policies in a single PR because they need one another for testing. This PR is a rework of #27367.

We need to keep index commits and translog operations up to the current global checkpoint to allow us to throw away unsafe operations and increase the operation-based recovery chance. This is achieved by a new index deletion policy.

We need to keep index commits and translog operations up to the current
global checkpoint to allow us to throw away unsafe operations and
increase the operation-based recovery chance. This is achieved by a new
index deletion policy.
Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dnhatn - I left some small feedback points.

* @return a list of index commits that are not deleted by this policy.
*/
List<IndexCommit> onCommit(List<? extends IndexCommit> commits) throws IOException;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface clearly came from IndexDeletionPolicy but I think it's now different enough that it's worth breaking the link entirely and coming up with more descriptive names. The onInit() and onCommit() methods of IndexDeletionPolicy return void but here we're returning a list of the kept commits. Additionally, I think all the implementations just delegate onInit() to onCommit() (except KeepUntilGlobalCheckpointDeletionPolicy that special-cases an empty argument).

import java.util.List;

/**
* An {@link IndexDeletionPolicy} that deletes unneeded index commits, and returns index commits are not deleted by this policy.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an IndexDeletionPolicy any more!

case OPEN_INDEX_AND_TRANSLOG:
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just return store.loadSeqNoStats() here rather than breaking and returning at the bottom. Not sure if that's against our style tho?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/**
* An {@link ESIndexDeletionPolicy} that deletes index commits that are not required for recovery.
* In particular, this policy will delete index commits whose max sequence number is smaller (or equal) than
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/smaller (or equal) than/at most/ ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

commit(translog, generation);
}
long lastGen = randomLongBetween(1, translog.currentFileGeneration());
commit(translog, randomLongBetween(1, lastGen), lastGen); }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: } should be on its own line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


final List<IndexCommit> keptCommits = new ArrayList<>();
final int keptPosition = indexOfKeptCommits(commits);
final List<Integer> duplicateIndexes = indexesOfDuplicateCommits(commits);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only call this collection's contains() method so maybe a HashSet<Integer> would be better? Not sure if it gets big enough to make a difference.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect to have zero or one element in a list, there should be no difference. However, using Set makes more sense here; I updated. Thank you.

@@ -113,27 +113,10 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception {
orgReplica.applyIndexOperationOnReplica(3, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id2", new BytesArray("{}"), XContentType.JSON), u -> {});

final int translogOps;
final int translogOps = 4; // 3 ops + seqno gaps
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removes the tests for the cases where we've updated the index settings. Is that ok?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am working to have another test for this as the assumption in this test is no longer correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which other test is that?

/**
* An {@link IndexDeletionPolicy} that deletes unneeded index commits, and returns index commits are not deleted by this policy.
*/
public interface ESIndexDeletionPolicy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a separate interface? If we want to know which commits can be kept,
we can just do commits.stream().allMatch(c -> c.isDeleted() == false) after invoking the onInit or onCommit methods?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is to avoid keeping translog of snapshotted index commits. SnapshotDeletionPolicy suppresses the delete() for snapshotted commits.

https://github.com/apache/lucene-solr/blob/e2521b2a8baabdaf43b92192588f51e042d21e97/lucene/core/src/java/org/apache/lucene/index/SnapshotDeletionPolicy.java#L221-L225

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have documented its purpose.

@bleskes
Copy link
Contributor

bleskes commented Dec 1, 2017

@dnhatn I have a question before diving deeper - I expected to find CombinedDeletionPolicy merged with KeepUntilGlobalCheckpointDeletionPolicy into a single class. The hunch was that that will removed the need to look at the index commits to know if there were deleted by another policy, allowing that information to be directly transferred to the translog deletion policy. Did that not work for some reason?

@dnhatn
Copy link
Member Author

dnhatn commented Dec 1, 2017

I like the current design, three policies are decoupled; I tried to keep them separated as before. I will embed KeepUntilGlobalCheckpointDeletionPolicy code to the CombinedDeletionPolicy.

@dnhatn
Copy link
Member Author

dnhatn commented Dec 3, 2017

I expected to find CombinedDeletionPolicy merged with KeepUntilGlobalCheckpointDeletionPolicy into a single class

@bleskes, I have merged the KeepUntilGlobalCheckpointDeletionPolicy with CombinedDeletionPolicy. Could you please take a look? Thank you.

@dnhatn
Copy link
Member Author

dnhatn commented Dec 3, 2017

@DaveCTurner I've addressed your comments but not all as I've removed the new interface.

@bleskes
Copy link
Contributor

bleskes commented Dec 3, 2017

I like the current design, three policies are decoupled; I tried to keep them separated as before. I will embed KeepUntilGlobalCheckpointDeletionPolicy code to the CombinedDeletionPolicy.

I also find the composition of deletion policies elegant. Sadly elegance is an all or nothing thing. Adding extra wraps or copy interface means you lose it...

@bleskes, I have merged the KeepUntilGlobalCheckpointDeletionPolicy with CombinedDeletionPolicy. Could you please take a look? Thank you.

Thanks! I will look tomorrow morning.

@DaveCTurner
Copy link
Contributor

Yannick said:

Do we need a separate interface? If we want to know which commits can be kept,
we can just do commits.stream().allMatch(c -> c.isDeleted() == false) after invoking the onInit or onCommit methods?

The interface has gone (👍) and the second half of this idea seems like a good one. The meaning of the return value of deleteOldIndexCommits() requires reading the comments/implementation as it's not clear from the name, whereas commits.stream().allMatch(c -> c.isDeleted() == false) is transparent.

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a question about a potential simpler approach. Also, I see you added some clean up w.r.t to duplicate commits above the global checkpoint with the same max seq no - can you elaborate why this is needed? Can't we just leave them alone?

}

@Override
public void onInit(List<? extends IndexCommit> commits) throws IOException {
indexDeletionPolicy.onInit(commits);
final List<IndexCommit> keptCommits = deleteOldIndexCommits(commits);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rely on indexOfKeptCommits to give use the index of the first commit to be kept and use that to:

  1. Delete all commits before
  2. Set the translog deletion policy generation based on the commit the index points to.

Copy link
Member Author

@dnhatn dnhatn Dec 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I agree. Let's remove the duplicate logic first.

@dnhatn
Copy link
Member Author

dnhatn commented Dec 4, 2017

@bleskes, There are some cases we may end up with multiple index commits having max_seqno.

  1. If a (force) merge is executed, and the global checkpoint is still behind the max_seqno, we will have two commits with the same max_seqno. These two commits may require more disk space. When the global checkpoint catches up the max_seqno, we won't clean up the old commits unless a new commit is created. I added the duplicate logic to avoid this scenario.

  2. A recovering commit from a local store or local shard is immediately replaced by a fresh commit with the same max_seqno. This is not a disk space issue as these two commits should share the same underlying files. The issue is the recovering does not have TRANSLOG_GEN in its commit which requires a special handling when processing translog generation. We can solve this without the duplicate logic.

/** If needed, updates the metadata in the index writer to match the potentially new translog and history uuid */
private void updateWriterOnOpen() throws IOException {
Objects.requireNonNull(historyUUID);
final Map<String, String> commitUserData = commitDataAsMap(indexWriter);
boolean needsCommit = false;
if (historyUUID.equals(commitUserData.get(HISTORY_UUID_KEY)) == false) {
needsCommit = true;
} else {
assert config().getForceNewHistoryUUID() == false : "config forced a new history uuid but it didn't change";
assert openMode != EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG : "new index but it already has an existing history uuid";
}
if (translog.getTranslogUUID().equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) {
needsCommit = true;
} else {
assert openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "translog uuid didn't change but open mode is " + openMode;
}
if (needsCommit) {
commitIndexWriter(indexWriter, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
? commitUserData.get(SYNC_COMMIT_ID) : null);
}
}

@bleskes
Copy link
Contributor

bleskes commented Dec 4, 2017

@bleskes, There are some cases we may end up with multiple index commits having max_seqno.

Thanks. Let's keep this a simple as possible. If those duplicate commit become a problem we can fix it later. Note though that when those duplicate commits are the "good ones", we should keep just one of them by choosing the commit appropriately. This is simple as it's just a question of select the commit index correctly.

@dnhatn
Copy link
Member Author

dnhatn commented Dec 12, 2017

@elasticmachine please test this.

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @dnhatn !

assert engineConfig.getForceNewHistoryUUID() == false
|| openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG
|| openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
: "OpenMode must be either CREATE_INDEX_AND_TRANSLOG or OPEN_INDEX_CREATE_TRANSLOG if forceNewHistoryUUID is true";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add the current open mode to the message please

@dnhatn
Copy link
Member Author

dnhatn commented Dec 12, 2017

@DaveCTurner Could you please have a look? Thank you.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've lost the thread on this PR, but I requested changes on an earlier version that are either done or superseded.

@dnhatn
Copy link
Member Author

dnhatn commented Dec 13, 2017

Thanks @bleskes and @DaveCTurner.

@dnhatn dnhatn merged commit 57fc705 into elastic:master Dec 13, 2017
@dnhatn dnhatn deleted the gcp-deletion-policy branch December 13, 2017 00:20
dnhatn added a commit that referenced this pull request Dec 13, 2017
We need to keep index commits and translog operations up to the current 
global checkpoint to allow us to throw away unsafe operations and
increase the operation-based recovery chance. This is achieved by a new
index deletion policy.

Relates #10708
dnhatn added a commit that referenced this pull request Dec 13, 2017
The test testWithRandomException was not updated accordingly to the
latest translog policy. Method setTranslogGenerationOfLastCommit should
be called before whenever setMinTranslogGenerationForRecovery is called.

Relates #27606
dnhatn added a commit that referenced this pull request Dec 13, 2017
The test testWithRandomException was not updated accordingly to the
latest translog policy. Method setTranslogGenerationOfLastCommit should
be called before whenever setMinTranslogGenerationForRecovery is called.

Relates #27606
dnhatn added a commit that referenced this pull request Dec 16, 2017
Today we use the in-memory global checkpoint from SequenceNumbersService
to clean up unneeded commit points, however the latest global checkpoint
may haven't fsynced to the disk yet. If the translog checkpoint fsync
failed and we already use a higher global checkpoint to clean up commit
points, then we may have removed a safe commit which we try to keep for
recovery.

This commit updates the deletion policy using lastSyncedGlobalCheckpoint
from Translog rather the in memory global checkpoint.

Relates #27606
dnhatn added a commit to dnhatn/elasticsearch that referenced this pull request Dec 18, 2017
Today we use the in-memory global checkpoint from SequenceNumbersService
to clean up unneeded commit points, however the latest global checkpoint
may haven't fsynced to the disk yet. If the translog checkpoint fsync
failed and we already use a higher global checkpoint to clean up commit
points, then we may have removed a safe commit which we try to keep for
recovery.

This commit updates the deletion policy using lastSyncedGlobalCheckpoint
from Translog rather the in memory global checkpoint.

Relates elastic#27606
dnhatn added a commit to dnhatn/elasticsearch that referenced this pull request Jan 11, 2018
Currently we keep a 5.x index commit as a safe commit until we have a
6.x safe commit. During that time, if peer-recovery happens, a primary
will send a 5.x commit in file-based sync and the recovery will even
fail as the snapshotted commit does not have sequence number tags.

This commit updates the combined deletion policy to delete legacy
commits if there are 6.x commits.

Relates elastic#27606
Relates elastic#28038
dnhatn added a commit that referenced this pull request Jan 11, 2018
Currently we keep a 5.x index commit as a safe commit until we have a
6.x safe commit. During that time, if peer-recovery happens, a primary
will send a 5.x commit in file-based sync and the recovery will even
fail as the snapshotted commit does not have sequence number tags.

This commit updates the combined deletion policy to delete legacy
commits if there are 6.x commits.

Relates #27606
Relates #28038
dnhatn added a commit that referenced this pull request Jan 12, 2018
Currently we keep a 5.x index commit as a safe commit until we have a
6.x safe commit. During that time, if peer-recovery happens, a primary
will send a 5.x commit in file-based sync and the recovery will even
fail as the snapshotted commit does not have sequence number tags.

This commit updates the combined deletion policy to delete legacy
commits if there are 6.x commits.

Relates #27606
Relates #28038
@clintongormley clintongormley added :Distributed Indexing/Distributed A catch all label for anything in the Distributed Area. Please avoid if you can. :Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. and removed :Translog :Distributed Indexing/Distributed A catch all label for anything in the Distributed Area. Please avoid if you can. labels Feb 13, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. >enhancement v6.2.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants