Skip to content

Commit

Permalink
Merge branch 'master' into retention-lease-exceptions
Browse files Browse the repository at this point in the history
* master:
  Enable removal of retention leases (elastic#38751)
  Make the 'get templates' types deprecation message consistent. (elastic#38533)
  Copy retention leases when trim unsafe commits (elastic#37995)
  Fix the version check for LegacyGeoShapeFieldMapper (elastic#38547)
  • Loading branch information
jasontedor committed Feb 12, 2019
2 parents cd8ca81 + 58a7716 commit 5f674dc
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public Mapper.Builder parse(String name, Map<String, Object> node, ParserContext
}
}
final Builder builder;
if (parsedDeprecatedParams || parserContext.indexVersionCreated().before(Version.V_7_0_0)) {
if (parsedDeprecatedParams || parserContext.indexVersionCreated().before(Version.V_6_6_0)) {
// Legacy index-based shape
builder = new LegacyGeoShapeFieldMapper.Builder(name, deprecatedParameters);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
private final LongSupplier currentTimeMillisSupplier;

/**
* A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync
* retention leases to replicas.
* A callback when a new retention lease is created or an existing retention lease is removed. In practice, this callback invokes the
* retention lease sync action, to sync retention leases to replicas.
*/
private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onAddRetentionLease;
private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases;

/**
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
Expand Down Expand Up @@ -246,7 +246,7 @@ public RetentionLease addRetentionLease(
Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList()));
currentRetentionLeases = retentionLeases;
}
onAddRetentionLease.accept(currentRetentionLeases, listener);
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
return retentionLease;
}

Expand Down Expand Up @@ -283,6 +283,29 @@ public synchronized RetentionLease renewRetentionLease(final String id, final lo
return retentionLease;
}

/**
* Removes an existing retention lease.
*
* @param id the identifier of the retention lease
* @param listener the callback when the retention lease is successfully removed and synced to replicas
*/
public void removeRetentionLease(final String id, final ActionListener<ReplicationResponse> listener) {
Objects.requireNonNull(listener);
final RetentionLeases currentRetentionLeases;
synchronized (this) {
assert primaryMode;
if (retentionLeases.contains(id) == false) {
throw new IllegalArgumentException("retention lease with ID [" + id + "] does not exist");
}
retentionLeases = new RetentionLeases(
operationPrimaryTerm,
retentionLeases.version() + 1,
retentionLeases.leases().stream().filter(lease -> lease.id().equals(id) == false).collect(Collectors.toList()));
currentRetentionLeases = retentionLeases;
}
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
}

/**
* Updates retention leases on a replica.
*
Expand Down Expand Up @@ -563,7 +586,7 @@ private static long inSyncCheckpointStates(
* @param indexSettings the index settings
* @param operationPrimaryTerm the current primary term
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
* @param onAddRetentionLease a callback when a new retention lease is created or an existing retention lease expires
* @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires
*/
public ReplicationTracker(
final ShardId shardId,
Expand All @@ -573,7 +596,7 @@ public ReplicationTracker(
final long globalCheckpoint,
final LongConsumer onGlobalCheckpointUpdated,
final LongSupplier currentTimeMillisSupplier,
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onAddRetentionLease) {
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.shardAllocationId = allocationId;
Expand All @@ -585,7 +608,7 @@ public ReplicationTracker(
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier);
this.onAddRetentionLease = Objects.requireNonNull(onAddRetentionLease);
this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases);
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1956,6 +1956,19 @@ public RetentionLease renewRetentionLease(final String id, final long retainingS
return replicationTracker.renewRetentionLease(id, retainingSequenceNumber, source);
}

/**
* Removes an existing retention lease.
*
* @param id the identifier of the retention lease
* @param listener the callback when the retention lease is successfully removed and synced to replicas
*/
public void removeRetentionLease(final String id, final ActionListener<ReplicationResponse> listener) {
Objects.requireNonNull(listener);
assert assertPrimaryMode();
verifyNotClosed();
replicationTracker.removeRetentionLease(id, listener);
}

/**
* Updates retention leases on a replica.
*
Expand Down
14 changes: 11 additions & 3 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -1521,7 +1521,8 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long
if (existingCommits.isEmpty()) {
throw new IllegalArgumentException("No index found to trim");
}
final String translogUUID = existingCommits.get(existingCommits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);
final IndexCommit lastIndexCommitCommit = existingCommits.get(existingCommits.size() - 1);
final String translogUUID = lastIndexCommitCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY);
final IndexCommit startingIndexCommit;
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
// are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
Expand All @@ -1546,7 +1547,14 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long
+ startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY) + "] is not equal to last commit's translog uuid ["
+ translogUUID + "]");
}
if (startingIndexCommit.equals(existingCommits.get(existingCommits.size() - 1)) == false) {
if (startingIndexCommit.equals(lastIndexCommitCommit) == false) {
/*
* Unlike other commit tags, the retention-leases tag is not restored when an engine is
* recovered from translog. We need to manually copy it from the last commit to the safe commit;
* otherwise we might lose the latest committed retention leases when re-opening an engine.
*/
final Map<String, String> userData = new HashMap<>(startingIndexCommit.getUserData());
userData.put(Engine.RETENTION_LEASES, lastIndexCommitCommit.getUserData().getOrDefault(Engine.RETENTION_LEASES, ""));
try (IndexWriter writer = newAppendingIndexWriter(directory, startingIndexCommit)) {
// this achieves two things:
// - by committing a new commit based on the starting commit, it make sure the starting commit will be opened
Expand All @@ -1557,7 +1565,7 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long

// The new commit will use segment files from the starting commit but userData from the last commit by default.
// Thus, we need to manually set the userData from the starting commit to the new commit.
writer.setLiveCommitData(startingIndexCommit.getUserData().entrySet());
writer.setLiveCommitData(userData.entrySet());
writer.commit();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public class RestGetIndexTemplateAction extends BaseRestHandler {
Collections.singleton(INCLUDE_TYPE_NAME_PARAMETER), Settings.FORMAT_PARAMS));
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(
LogManager.getLogger(RestGetIndexTemplateAction.class));
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal]" +
" Specifying include_type_name in get index template requests is deprecated.";
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Using include_type_name in get " +
"index template requests is deprecated. The parameter will be removed in the next major version.";

public RestGetIndexTemplateAction(final Settings settings, final RestController controller) {
super(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public ExternalMapper build(BuilderContext context) {
BinaryFieldMapper binMapper = binBuilder.build(context);
BooleanFieldMapper boolMapper = boolBuilder.build(context);
GeoPointFieldMapper pointMapper = latLonPointBuilder.build(context);
BaseGeoShapeFieldMapper shapeMapper = (context.indexCreatedVersion().before(Version.V_7_0_0))
BaseGeoShapeFieldMapper shapeMapper = (context.indexCreatedVersion().before(Version.V_6_6_0))
? legacyShapeBuilder.build(context)
: shapeBuilder.build(context);
FieldMapper stringMapper = (FieldMapper)stringBuilder.build(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,105 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() {
}
}

public void testRemoveRetentionLease() {
final AllocationId allocationId = AllocationId.newInitializing();
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
primaryTerm,
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {});
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
final long[] minimumRetainingSequenceNumbers = new long[length];
for (int i = 0; i < length; i++) {
if (rarely() && primaryTerm < Long.MAX_VALUE) {
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
replicationTracker.setOperationPrimaryTerm(primaryTerm);
}
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replicationTracker.addRetentionLease(
Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
}

for (int i = 0; i < length; i++) {
if (rarely() && primaryTerm < Long.MAX_VALUE) {
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
replicationTracker.setOperationPrimaryTerm(primaryTerm);
}
/*
* Remove from the end since it will make the following assertion easier; we want to ensure that only the intended lease was
* removed.
*/
replicationTracker.removeRetentionLease(Integer.toString(length - i - 1), ActionListener.wrap(() -> {}));
assertRetentionLeases(
replicationTracker,
length - i - 1,
minimumRetainingSequenceNumbers,
primaryTerm,
1 + length + i,
true,
false);
}
}

public void testRemoveRetentionLeaseCausesRetentionLeaseSync() {
final AllocationId allocationId = AllocationId.newInitializing();
final Map<String, Long> retainingSequenceNumbers = new HashMap<>();
final AtomicBoolean invoked = new AtomicBoolean();
final AtomicReference<ReplicationTracker> reference = new AtomicReference<>();
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
randomNonNegativeLong(),
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {
// we do not want to hold a lock on the replication tracker in the callback!
assertFalse(Thread.holdsLock(reference.get()));
invoked.set(true);
assertThat(
leases.leases()
.stream()
.collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)),
equalTo(retainingSequenceNumbers));
});
reference.set(replicationTracker);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);

final int length = randomIntBetween(0, 8);
for (int i = 0; i < length; i++) {
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
retainingSequenceNumbers.put(id, retainingSequenceNumber);
replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {}));
// assert that the new retention lease callback was invoked
assertTrue(invoked.get());

// reset the invocation marker so that we can assert the callback was not invoked when removing the lease
invoked.set(false);
retainingSequenceNumbers.remove(id);
replicationTracker.removeRetentionLease(id, ActionListener.wrap(() -> {}));
assertTrue(invoked.get());
}
}

public void testExpirationOnPrimary() {
runExpirationTest(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,68 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception {
}
}

public void testRetentionLeaseSyncedOnRemove() throws Exception {
final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numberOfReplicas)
.build();
createIndex("index", settings);
ensureGreen("index");
final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId();
final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName();
final IndexShard primary = internalCluster()
.getInstance(IndicesService.class, primaryShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
final int length = randomIntBetween(1, 8);
final Map<String, RetentionLease> currentRetentionLeases = new HashMap<>();
for (int i = 0; i < length; i++) {
final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8));
final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
final String source = randomAlphaOfLength(8);
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
// simulate a peer recovery which locks the soft deletes policy on the primary
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {};
currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
latch.await();
retentionLock.close();
}

for (int i = 0; i < length; i++) {
final String id = randomFrom(currentRetentionLeases.keySet());
final CountDownLatch latch = new CountDownLatch(1);
primary.removeRetentionLease(id, ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())));
// simulate a peer recovery which locks the soft deletes policy on the primary
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {};
currentRetentionLeases.remove(id);
latch.await();
retentionLock.close();

// check retention leases have been committed on the primary
final RetentionLeases primaryCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
primary.commitStats().getUserData().get(Engine.RETENTION_LEASES));
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primaryCommittedRetentionLeases)));

// check current retention leases have been synced to all replicas
for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) {
final String replicaShardNodeId = replicaShard.currentNodeId();
final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName();
final IndexShard replica = internalCluster()
.getInstance(IndicesService.class, replicaShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
final Map<String, RetentionLease> retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases());
assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases));

// check retention leases have been committed on the replica
final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
replica.commitStats().getUserData().get(Engine.RETENTION_LEASES));
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases)));
}
}
}

public void testRetentionLeasesSyncOnExpiration() throws Exception {
final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
Expand Down
Loading

0 comments on commit 5f674dc

Please sign in to comment.