Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce retention leases versioning #37951

Merged
merged 31 commits into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
98f4239
Introduce retention leases versioning
jasontedor Jan 29, 2019
38dffb5
Add Javadocs
jasontedor Jan 29, 2019
c715b5e
License header
jasontedor Jan 29, 2019
f8e8547
Relocate methods
jasontedor Jan 29, 2019
4aa6b77
Merge branch 'master' into retention-leases-version
jasontedor Jan 31, 2019
6ab494d
Add primary term
jasontedor Jan 31, 2019
2d3772c
Merge remote-tracking branch 'elastic/master' into retention-leases-v…
jasontedor Jan 31, 2019
954db81
Remove dead constructor
jasontedor Jan 31, 2019
9d09989
Partition leases when calculating expiration
jasontedor Jan 31, 2019
17d2095
Remove unnecessary map
jasontedor Jan 31, 2019
e724746
Fix test
jasontedor Jan 31, 2019
e8971b9
Fix spelling and imports
jasontedor Jan 31, 2019
f3cb4eb
Add tests and docs for supersedes
jasontedor Jan 31, 2019
0a7d19b
Adjust primary term limit
jasontedor Jan 31, 2019
034a840
Add missing newline
jasontedor Jan 31, 2019
05b69c0
Update tests
jasontedor Jan 31, 2019
600809c
Fix test
jasontedor Jan 31, 2019
39023d8
Fix imports
jasontedor Jan 31, 2019
80b08b3
Fix some naming
jasontedor Jan 31, 2019
ae0fc2a
Fix checkstyle
jasontedor Jan 31, 2019
08095cb
Update server/src/test/java/org/elasticsearch/index/seqno/Replication…
jasontedor Feb 1, 2019
8d9deae
Fix compilation
jasontedor Feb 1, 2019
fce4da6
Fix checkstyle
jasontedor Feb 1, 2019
76bb46b
Inline
jasontedor Feb 1, 2019
44c5e0f
Inline
jasontedor Feb 1, 2019
bc96632
Fix test
jasontedor Feb 1, 2019
becb055
Adjust docs
jasontedor Feb 1, 2019
73fb8f5
Disable BWC
jasontedor Feb 1, 2019
7cf145c
Merge branch 'master' into retention-leases-version
jasontedor Feb 1, 2019
686d35e
Merge branch 'master' into retention-leases-version
jasontedor Feb 1, 2019
be0edfc
Merge remote-tracking branch 'elastic/master' into retention-leases-v…
jasontedor Feb 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -81,15 +80,15 @@ public final class EngineConfig {
@Nullable
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
* soft deleted should be retained.
*
* @return a supplier of outstanding retention leases
*/
public Supplier<Collection<RetentionLease>> retentionLeasesSupplier() {
public Supplier<RetentionLeases> retentionLeasesSupplier() {
return retentionLeasesSupplier;
}

Expand Down Expand Up @@ -141,7 +140,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier,
Supplier<Collection<RetentionLease>> retentionLeasesSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier) {
this.shardId = shardId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
Expand Down Expand Up @@ -2348,9 +2348,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
* We sample these from the policy (which occurs under a lock) to ensure that we have a consistent view of the minimum
* retained sequence number, and the retention leases.
*/
final Tuple<Long, Collection<RetentionLease>> retentionPolicy = softDeletesPolicy.getRetentionPolicy();
final Tuple<Long, RetentionLeases> retentionPolicy = softDeletesPolicy.getRetentionPolicy();
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1()));
commitData.put(Engine.RETENTION_LEASES, RetentionLease.encodeRetentionLeases(retentionPolicy.v2()));
commitData.put(Engine.RETENTION_LEASES, RetentionLeases.encodeRetentionLeases(retentionPolicy.v2()));
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;

Expand All @@ -47,13 +48,13 @@ final class SoftDeletesPolicy {
// The min seq_no value that is retained - ops after this seq# should exist in the Lucene index.
private long minRetainedSeqNo;
// provides the retention leases used to calculate the minimum sequence number to retain
private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;

SoftDeletesPolicy(
final LongSupplier globalCheckpointSupplier,
final long minRetainedSeqNo,
final long retentionOperations,
final Supplier<Collection<RetentionLease>> retentionLeasesSupplier) {
final Supplier<RetentionLeases> retentionLeasesSupplier) {
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.retentionOperations = retentionOperations;
this.minRetainedSeqNo = minRetainedSeqNo;
Expand Down Expand Up @@ -110,12 +111,12 @@ synchronized long getMinRetainedSeqNo() {
return getRetentionPolicy().v1();
}

public synchronized Tuple<Long, Collection<RetentionLease>> getRetentionPolicy() {
public synchronized Tuple<Long, RetentionLeases> getRetentionPolicy() {
/*
* When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is
* locked for peer recovery.
*/
final Collection<RetentionLease> retentionLeases = retentionLeasesSupplier.get();
final RetentionLeases retentionLeases = retentionLeasesSupplier.get();
// do not advance if the retention lock is held
if (retentionLockCount == 0) {
/*
Expand All @@ -130,6 +131,7 @@ public synchronized Tuple<Long, Collection<RetentionLease>> getRetentionPolicy()

// calculate the minimum sequence number to retain based on retention leases
final long minimumRetainingSequenceNumber = retentionLeases
.leases()
.stream()
.mapToLong(RetentionLease::retainingSequenceNumber)
.min()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.CopyOnWriteHashMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -38,9 +39,7 @@
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -54,6 +53,7 @@
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;

/**
* This class is responsible for tracking the replication group with its progress and safety markers (local and global checkpoints).
Expand Down Expand Up @@ -157,7 +157,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
* A callback when a new retention lease is created or an existing retention lease expires. In practice, this callback invokes the
* retention lease sync action, to sync retention leases to replicas.
*/
private final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onSyncRetentionLeases;
private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases;

/**
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
Expand All @@ -170,48 +170,47 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
volatile ReplicationGroup replicationGroup;

private final Map<String, RetentionLease> retentionLeases = new HashMap<>();

private Collection<RetentionLease> copyRetentionLeases() {
assert Thread.holdsLock(this);
return Collections.unmodifiableCollection(new ArrayList<>(retentionLeases.values()));
}
private RetentionLeases retentionLeases = RetentionLeases.EMPTY;

/**
* Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. Note that only
* the primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas.
*
* @return the retention leases
*/
public Collection<RetentionLease> getRetentionLeases() {
public RetentionLeases getRetentionLeases() {
final boolean wasPrimaryMode;
final Collection<RetentionLease> nonExpiredRetentionLeases;
final RetentionLeases nonExpiredRetentionLeases;
synchronized (this) {
if (primaryMode) {
// the primary calculates the non-expired retention leases and syncs them to replicas
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
final Collection<RetentionLease> expiredRetentionLeases = retentionLeases

final Map<String, RetentionLease> leases = RetentionLeases.toMap(retentionLeases);

final Collection<RetentionLease> expiredRetentionLeases = leases
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
.values()
.stream()
.filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() > retentionLeaseMillis)
.collect(Collectors.toList());
if (expiredRetentionLeases.isEmpty()) {
// early out as no retention leases have expired
return copyRetentionLeases();
return retentionLeases;
}
// clean up the expired retention leases
for (final RetentionLease expiredRetentionLease : expiredRetentionLeases) {
retentionLeases.remove(expiredRetentionLease.id());
leases.remove(expiredRetentionLease.id());
}
retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, leases.values());
}
/*
* At this point, we were either in primary mode and have updated the non-expired retention leases into the tracking map, or
* we were in replica mode and merely need to copy the existing retention leases since a replica does not calculate the
* non-expired retention leases, instead receiving them on syncs from the primary.
*/
wasPrimaryMode = primaryMode;
nonExpiredRetentionLeases = copyRetentionLeases();
nonExpiredRetentionLeases = retentionLeases;
}
if (wasPrimaryMode) {
onSyncRetentionLeases.accept(nonExpiredRetentionLeases, ActionListener.wrap(() -> {}));
Expand All @@ -236,15 +235,18 @@ public RetentionLease addRetentionLease(
final ActionListener<ReplicationResponse> listener) {
Objects.requireNonNull(listener);
final RetentionLease retentionLease;
final Collection<RetentionLease> currentRetentionLeases;
final RetentionLeases currentRetentionLeases;
synchronized (this) {
assert primaryMode;
if (retentionLeases.containsKey(id)) {
if (retentionLeases.contains(id)) {
throw new IllegalArgumentException("retention lease with ID [" + id + "] already exists");
}
retentionLease = new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
retentionLeases.put(id, retentionLease);
currentRetentionLeases = copyRetentionLeases();
retentionLeases = new RetentionLeases(
operationPrimaryTerm,
retentionLeases.version() + 1,
Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList()));
currentRetentionLeases = retentionLeases;
}
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
return retentionLease;
Expand All @@ -261,18 +263,25 @@ public RetentionLease addRetentionLease(
*/
public synchronized RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
assert primaryMode;
if (retentionLeases.containsKey(id) == false) {
if (retentionLeases.contains(id) == false) {
throw new IllegalArgumentException("retention lease with ID [" + id + "] does not exist");
}
final RetentionLease retentionLease =
new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
final RetentionLease existingRetentionLease = retentionLeases.put(id, retentionLease);
final RetentionLease existingRetentionLease = retentionLeases.get(id);
assert existingRetentionLease != null;
assert existingRetentionLease.retainingSequenceNumber() <= retentionLease.retainingSequenceNumber() :
"retention lease renewal for [" + id + "]"
+ " from [" + source + "]"
+ " renewed a lower retaining sequence number [" + retentionLease.retainingSequenceNumber() + "]"
+ " than the current lease retaining sequence number [" + existingRetentionLease.retainingSequenceNumber() + "]";
retentionLeases = new RetentionLeases(
operationPrimaryTerm,
retentionLeases.version() + 1,
Stream.concat(
retentionLeases.leases().stream().filter(lease -> lease.id().equals(id) == false),
Stream.of(retentionLease))
.collect(Collectors.toList()));
return retentionLease;
}

Expand All @@ -281,10 +290,11 @@ public synchronized RetentionLease renewRetentionLease(final String id, final lo
*
* @param retentionLeases the retention leases
*/
public synchronized void updateRetentionLeasesOnReplica(final Collection<RetentionLease> retentionLeases) {
public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases retentionLeases) {
assert primaryMode == false;
this.retentionLeases.clear();
this.retentionLeases.putAll(retentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity())));
if (retentionLeases.supercedes(this.retentionLeases)) {
this.retentionLeases = retentionLeases;
}
}

public static class CheckpointState implements Writeable {
Expand Down Expand Up @@ -565,7 +575,7 @@ public ReplicationTracker(
final long globalCheckpoint,
final LongConsumer onGlobalCheckpointUpdated,
final LongSupplier currentTimeMillisSupplier,
final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.shardAllocationId = allocationId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -168,18 +167,6 @@ static String encodeRetentionLease(final RetentionLease retentionLease) {
retentionLease.source());
}

/**
* Encodes a collection of retention leases as a string. This encoding can be decoed by {@link #decodeRetentionLeases(String)}. The
* encoding is a comma-separated encoding of each retention lease as encoded by {@link #encodeRetentionLease(RetentionLease)}.
*
* @param retentionLeases the retention leases
* @return the encoding of the retention leases
*/
public static String encodeRetentionLeases(final Collection<RetentionLease> retentionLeases) {
Objects.requireNonNull(retentionLeases);
return retentionLeases.stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(","));
}

/**
* Decodes a retention lease encoded by {@link #encodeRetentionLease(RetentionLease)}.
*
Expand All @@ -201,23 +188,6 @@ static RetentionLease decodeRetentionLease(final String encodedRetentionLease) {
return new RetentionLease(id, retainingSequenceNumber, timestamp, source);
}

/**
* Decodes retention leases encoded by {@link #encodeRetentionLeases(Collection)}.
*
* @param encodedRetentionLeases an encoded collection of retention leases
* @return the decoded retention leases
*/
public static Collection<RetentionLease> decodeRetentionLeases(final String encodedRetentionLeases) {
Objects.requireNonNull(encodedRetentionLeases);
if (encodedRetentionLeases.isEmpty()) {
return Collections.emptyList();
}
assert Arrays.stream(encodedRetentionLeases.split(","))
.allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+"))
: encodedRetentionLeases;
return Arrays.stream(encodedRetentionLeases.split(",")).map(RetentionLease::decodeRetentionLease).collect(Collectors.toList());
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
Expand All @@ -244,14 +214,4 @@ public String toString() {
'}';
}

/**
* A utility method to convert a collection of retention leases to a map from retention lease ID to retention lease.
*
* @param leases the leases
* @return the map from retention lease ID to retention lease
*/
static Map<String, RetentionLease> toMap(final Collection<RetentionLease> leases) {
return leases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity()));
}

}
Loading