Skip to content

Commit

Permalink
Migrate peer recovery from translog to retention lease (elastic#49448)
Browse files Browse the repository at this point in the history
Since 7.4, we switch from translog to Lucene as the source of history
for peer recoveries. However, we reduce the likelihood of
operation-based recoveries when performing a full cluster restart from
pre-7.4 because existing copies do not have PPRL.

To remedy this issue, we fallback using translog in peer recoveries if
the recovering replica does not have a peer recovery retention lease,
and the replication group hasn't fully migrated to PRRL.

Relates elastic#45136
  • Loading branch information
dnhatn committed Dec 16, 2019
1 parent 404d14e commit a02fc62
Show file tree
Hide file tree
Showing 20 changed files with 446 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.RetentionLeaseUtils;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.rest.action.document.RestGetAction;
Expand Down Expand Up @@ -1267,6 +1268,12 @@ private void indexRandomDocuments(
}
}

private void indexDocument(String id) throws IOException {
final Request indexRequest = new Request("POST", "/" + index + "/" + type + "/" + id);
indexRequest.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("f", "v").endObject()));
assertOK(client().performRequest(indexRequest));
}

private int countOfIndexedRandomDocuments() throws IOException {
return Integer.parseInt(loadInfoDocument("count"));
}
Expand Down Expand Up @@ -1362,4 +1369,66 @@ public void testPeerRecoveryRetentionLeases() throws IOException {
RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index);
}
}

/**
* Tests that with or without soft-deletes, we should perform an operation-based recovery if there were some
* but not too many uncommitted documents (i.e., less than 10% of committed documents or the extra translog)
* before we restart the cluster. This is important when we move from translog based to retention leases based
* peer recoveries.
*/
public void testOperationBasedRecovery() throws Exception {
if (isRunningAgainstOldCluster()) {
final Settings.Builder settings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1);
if (getOldClusterVersion().onOrAfter(Version.V_6_7_0)) {
settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean());
}
createIndex(index, settings.build());
ensureGreen(index);
int committedDocs = randomIntBetween(100, 200);
for (int i = 0; i < committedDocs; i++) {
indexDocument(Integer.toString(i));
if (rarely()) {
flush(index, randomBoolean());
}
}
flush(index, true);
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
// less than 10% of the committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING).
int uncommittedDocs = randomIntBetween(0, (int) (committedDocs * 0.1));
for (int i = 0; i < uncommittedDocs; i++) {
final String id = Integer.toString(randomIntBetween(1, 100));
indexDocument(id);
}
} else {
ensureGreen(index);
assertNoFileBasedRecovery(index, n -> true);
}
}

/**
* Verifies that once all shard copies on the new version, we should turn off the translog retention for indices with soft-deletes.
*/
public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
assumeTrue("requires soft-deletes and retention leases", getOldClusterVersion().onOrAfter(Version.V_6_7_0));
if (isRunningAgainstOldCluster()) {
createIndex(index, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
ensureGreen(index);
int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
indexDocument(Integer.toString(randomIntBetween(1, 100)));
if (rarely()) {
flush(index, randomBoolean());
}
}
} else {
ensureGreen(index);
flush(index, true);
assertEmptyTranslog(index);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -516,10 +516,10 @@ public void testClosedIndexNoopRecovery() throws Exception {
switch (CLUSTER_TYPE) {
case OLD: break;
case MIXED:
assertNoFileBasedRecovery(indexName, s -> s.startsWith(CLUSTER_NAME + "-0"));
assertNoopRecoveries(indexName, s -> s.startsWith(CLUSTER_NAME + "-0"));
break;
case UPGRADED:
assertNoFileBasedRecovery(indexName, s -> s.startsWith(CLUSTER_NAME));
assertNoopRecoveries(indexName, s -> s.startsWith(CLUSTER_NAME));
break;
}
}
Expand Down Expand Up @@ -692,7 +692,7 @@ public void testUpdateDoc() throws Exception {
}
}

private void assertNoFileBasedRecovery(String indexName, Predicate<String> targetNode) throws IOException {
private void assertNoopRecoveries(String indexName, Predicate<String> targetNode) throws IOException {
Map<String, Object> recoveries = entityAsMap(client()
.performRequest(new Request("GET", indexName + "/_recovery?detailed=true")));

Expand Down Expand Up @@ -723,4 +723,55 @@ private void assertNoFileBasedRecovery(String indexName, Predicate<String> targe

assertTrue("must find replica", foundReplica);
}

/**
* Tests that with or without soft-deletes, we should perform an operation-based recovery if there were some
* but not too many uncommitted documents (i.e., less than 10% of committed documents or the extra translog)
* before we upgrade each node. This is important when we move from translog based to retention leases based
* peer recoveries.
*/
public void testOperationBasedRecovery() throws Exception {
final String index = "test_operation_based_recovery";
if (CLUSTER_TYPE == ClusterType.OLD) {
createIndex(index, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()).build());
ensureGreen(index);
indexDocs(index, 0, randomIntBetween(100, 200));
flush(index, randomBoolean());
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
// uncommitted docs must be less than 10% of committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING).
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3));
} else {
ensureGreen(index);
assertNoFileBasedRecovery(index, nodeName ->
CLUSTER_TYPE == ClusterType.UPGRADED
|| nodeName.startsWith(CLUSTER_NAME + "-0")
|| (nodeName.startsWith(CLUSTER_NAME + "-1") && Booleans.parseBoolean(System.getProperty("tests.first_round")) == false));
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3));
}
}

/**
* Verifies that once all shard copies on the new version, we should turn off the translog retention for indices with soft-deletes.
*/
public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
final String index = "turn_off_translog_retention";
if (CLUSTER_TYPE == ClusterType.OLD) {
createIndex(index, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 2))
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
ensureGreen(index);
indexDocs(index, 0, randomIntBetween(100, 200));
flush(index, randomBoolean());
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 100));
}
if (CLUSTER_TYPE == ClusterType.UPGRADED) {
ensureGreen(index);
flush(index, true);
assertEmptyTranslog(index);
}
}
}
24 changes: 15 additions & 9 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,21 +252,22 @@ public final class IndexSettings {
* Controls how long translog files that are no longer needed for persistence reasons
* will be kept around before being deleted. Keeping more files is useful to increase
* the chance of ops based recoveries for indices with soft-deletes disabled.
* This setting will be ignored if soft-deletes is enabled.
* This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4).
**/
public static final Setting<TimeValue> INDEX_TRANSLOG_RETENTION_AGE_SETTING =
Setting.timeSetting("index.translog.retention.age",
settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? TimeValue.MINUS_ONE : TimeValue.timeValueHours(12), TimeValue.MINUS_ONE,
Property.Dynamic, Property.IndexScope);
settings -> shouldDisableTranslogRetention(settings) ? TimeValue.MINUS_ONE : TimeValue.timeValueHours(12),
TimeValue.MINUS_ONE, Property.Dynamic, Property.IndexScope);

/**
* Controls how many translog files that are no longer needed for persistence reasons
* will be kept around before being deleted. Keeping more files is useful to increase
* the chance of ops based recoveries for indices with soft-deletes disabled.
* This setting will be ignored if soft-deletes is enabled.
* This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4).
**/
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
Setting.byteSizeSetting("index.translog.retention.size", settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? "-1" : "512MB",
Setting.byteSizeSetting("index.translog.retention.size",
settings -> shouldDisableTranslogRetention(settings) ? "-1" : "512MB",
Property.Dynamic, Property.IndexScope);

/**
Expand Down Expand Up @@ -587,7 +588,7 @@ private void setFlushAfterMergeThresholdSize(ByteSizeValue byteSizeValue) {
}

private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
if (softDeleteEnabled && byteSizeValue.getBytes() >= 0) {
if (shouldDisableTranslogRetention(settings) && byteSizeValue.getBytes() >= 0) {
// ignore the translog retention settings if soft-deletes enabled
this.translogRetentionSize = new ByteSizeValue(-1);
} else {
Expand All @@ -596,7 +597,7 @@ private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
}

private void setTranslogRetentionAge(TimeValue age) {
if (softDeleteEnabled && age.millis() >= 0) {
if (shouldDisableTranslogRetention(settings) && age.millis() >= 0) {
// ignore the translog retention settings if soft-deletes enabled
this.translogRetentionAge = TimeValue.MINUS_ONE;
} else {
Expand Down Expand Up @@ -784,7 +785,7 @@ public TimeValue getRefreshInterval() {
* Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries
*/
public ByteSizeValue getTranslogRetentionSize() {
assert softDeleteEnabled == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize;
assert shouldDisableTranslogRetention(settings) == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize;
return translogRetentionSize;
}

Expand All @@ -793,7 +794,7 @@ public ByteSizeValue getTranslogRetentionSize() {
* around
*/
public TimeValue getTranslogRetentionAge() {
assert softDeleteEnabled == false || translogRetentionAge.millis() == -1L : translogRetentionSize;
assert shouldDisableTranslogRetention(settings) == false || translogRetentionAge.millis() == -1L : translogRetentionSize;
return translogRetentionAge;
}

Expand All @@ -805,6 +806,11 @@ public int getTranslogRetentionTotalFiles() {
return INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.get(getSettings());
}

private static boolean shouldDisableTranslogRetention(Settings settings) {
return INDEX_SOFT_DELETES_SETTING.get(settings)
&& IndexMetaData.SETTING_INDEX_VERSION_CREATED.get(settings).onOrAfter(Version.V_7_4_0);
}

/**
* Returns the generation threshold size. As sequence numbers can cause multiple generations to
* be preserved for rollback purposes, we want to keep the size of individual generations from
Expand Down
24 changes: 17 additions & 7 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
Expand Down Expand Up @@ -729,7 +730,7 @@ public enum SearcherScope {
/**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
public abstract Closeable acquireRetentionLock();
public abstract Closeable acquireHistoryRetentionLock(HistorySource historySource);

/**
* Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
Expand All @@ -742,19 +743,20 @@ public abstract Translog.Snapshot newChangesSnapshot(String source, MapperServic
* Creates a new history snapshot for reading operations since {@code startingSeqNo} (inclusive).
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
public abstract Translog.Snapshot readHistoryOperations(String source,
MapperService mapperService, long startingSeqNo) throws IOException;
public abstract Translog.Snapshot readHistoryOperations(String reason, HistorySource historySource,
MapperService mapperService, long startingSeqNo) throws IOException;

/**
* Returns the estimated number of history operations whose seq# at least {@code startingSeqNo}(inclusive) in this engine.
*/
public abstract int estimateNumberOfHistoryOperations(String source,
MapperService mapperService, long startingSeqNo) throws IOException;
public abstract int estimateNumberOfHistoryOperations(String reason, HistorySource historySource,
MapperService mapperService, long startingSeqNo) throws IOException;

/**
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
*/
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;
public abstract boolean hasCompleteOperationHistory(String reason, HistorySource historySource,
MapperService mapperService, long startingSeqNo) throws IOException;

/**
* Gets the minimum retained sequence number for this engine.
Expand Down Expand Up @@ -1816,7 +1818,8 @@ public IndexCommit getIndexCommit() {
}
}

public void onSettingsChanged() {
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {

}

/**
Expand Down Expand Up @@ -1950,4 +1953,11 @@ public interface TranslogRecoveryRunner {
* to advance this marker to at least the given sequence number.
*/
public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary);

/**
* Whether we should read history operations from translog or Lucene index
*/
public enum HistorySource {
TRANSLOG, INDEX
}
}
Loading

0 comments on commit a02fc62

Please sign in to comment.