Skip to content

Commit

Permalink
idk
Browse files Browse the repository at this point in the history
  • Loading branch information
jasontedor committed Feb 6, 2019
1 parent 56326cb commit e60eb03
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.elasticsearch.action;

import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -9,15 +10,19 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

public class RetentionLeaseAction extends Action<RetentionLeaseAction.Response> {

Expand All @@ -44,11 +49,47 @@ protected TransportAction(
}

@Override
protected Response shardOperation(final Request request, final ShardId shardId) throws IOException {
protected ShardsIterator shards(ClusterState state, InternalRequest request) {
return state
.routingTable()
.shardRoutingTable(request.concreteIndex(), request.request().getShardId().id())
.primaryShardIt();
}

@Override
protected Response shardOperation(final Request request, final ShardId shardId) {
final IndexService indexService = indicesService.indexServiceSafe(request.getShardId().getIndex());
final IndexShard indexShard = indexService.getShard(request.getShardId().id());
indexShard.acquireRetentionLockForPeerRecovery();
indexShard.addRetentionLease(request.getId(), request.getRetainingSequenceNumber(), request.getSource(), ActionListener.wrap(() -> {}));

final CompletableFuture<Releasable> permit = new CompletableFuture<>();
final ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
if (permit.complete(releasable) == false) {
releasable.close();
}
}

@Override
public void onFailure(Exception e) {
permit.completeExceptionally(e);
}
};
indexShard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, request);
try (Releasable ignore = FutureUtils.get(permit)) {
indexShard.addRetentionLease(request.getId(), request.getRetainingSequenceNumber(), request.getSource(), ActionListener.wrap(() -> {}));
} finally {
// just in case we got an exception (likely interrupted) while waiting for the get
permit.whenComplete((r, e) -> {
if (r != null) {
r.close();
}
if (e != null) {
logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e);
}
});
}

return new Response();
}

Expand All @@ -62,15 +103,12 @@ protected boolean resolveIndex(Request request) {
return false;
}

@Override
protected ShardsIterator shards(ClusterState state, InternalRequest request) {
return null;
}

}

public static class Request extends SingleShardRequest<Request> {

public static long RETAIN_ALL = -1;

private ShardId shardId;

public ShardId getShardId() {
Expand Down Expand Up @@ -101,7 +139,7 @@ public Request() {
public Request(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) {
this.shardId = Objects.requireNonNull(shardId);
this.id = Objects.requireNonNull(id);
if (retainingSequenceNumber < 0) {
if (retainingSequenceNumber < -1) {
throw new IllegalArgumentException(
"retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range");
}
Expand All @@ -119,7 +157,7 @@ public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
id = in.readString();
retainingSequenceNumber = in.readVLong();
retainingSequenceNumber = in.readZLong();
source = in.readString();
}

Expand All @@ -128,7 +166,7 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeString(id);
out.writeVLong(retainingSequenceNumber);
out.writeZLong(retainingSequenceNumber);
out.writeString(source);
}

Expand All @@ -144,5 +182,4 @@ public Response newResponse() {
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,8 @@ public abstract int estimateNumberOfHistoryOperations(String source,
*/
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;

public abstract long getMinRetainedSeqNo();

public abstract TranslogStats getTranslogStats();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2562,7 +2562,7 @@ public boolean hasCompleteOperationHistory(String source, MapperService mapperSe
* Returns the minimum seqno that is retained in the Lucene index.
* Operations whose seq# are at least this value should exist in the Lucene index.
*/
final long getMinRetainedSeqNo() {
public final long getMinRetainedSeqNo() {
assert softDeleteEnabled : Thread.currentThread().getName();
return softDeletesPolicy.getMinRetainedSeqNo();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,11 @@ public boolean hasCompleteOperationHistory(String source, MapperService mapperSe
return false;
}

@Override
public long getMinRetainedSeqNo() {
return Long.MAX_VALUE;
}

@Override
public TranslogStats getTranslogStats() {
return translogStats;
Expand Down
30 changes: 27 additions & 3 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -153,6 +154,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -1739,7 +1741,7 @@ public void onSettingsChanged() {
/**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
public Closeable acquireRetentionLockForPeerRecovery() {
public Closeable acquireRetentionLock() {
return getEngine().acquireRetentionLockForPeerRecovery();
}

Expand All @@ -1760,12 +1762,16 @@ public Translog.Snapshot getHistoryOperations(String source, long startingSeqNo)

/**
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
* This method should be called after acquiring the retention lock; See {@link #acquireRetentionLockForPeerRecovery()}
* This method should be called after acquiring the retention lock; See {@link #acquireRetentionLock()}
*/
public boolean hasCompleteHistoryOperations(String source, long startingSeqNo) throws IOException {
return getEngine().hasCompleteOperationHistory(source, mapperService, startingSeqNo);
}

public long getMinRetainedSeqNo() {
return getEngine().getMinRetainedSeqNo();
}

/**
* Creates a new changes snapshot for reading operations whose seq_no are between {@code fromSeqNo}(inclusive)
* and {@code toSeqNo}(inclusive). The caller has to close the returned snapshot after finishing the reading.
Expand Down Expand Up @@ -1938,7 +1944,25 @@ public RetentionLease addRetentionLease(
Objects.requireNonNull(listener);
assert assertPrimaryMode();
verifyNotClosed();
return replicationTracker.addRetentionLease(id, retainingSequenceNumber, source, listener);
try (Closeable ignore = acquireRetentionLock()) {
final long minimumRetainingSequenceNumber = getMinRetainedSeqNo();
final long actualRetainingSequenceNumber;
if (retainingSequenceNumber == -1) {
actualRetainingSequenceNumber = minimumRetainingSequenceNumber;
} else {
if (retainingSequenceNumber < minimumRetainingSequenceNumber) {
final String message = String.format(
"retaining sequence number [%d] can not be satisfied on the primary shard, can only guarantee [%d]",
retainingSequenceNumber,
minimumRetainingSequenceNumber);
throw new IllegalArgumentException(message);
}
actualRetainingSequenceNumber = retainingSequenceNumber;
}
return replicationTracker.addRetentionLease(id, actualRetainingSequenceNumber, source, listener);
} catch (final IOException e) {
throw new AssertionError(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ",
shard, cancellableThreads, logger);
final Closeable retentionLock = shard.acquireRetentionLockForPeerRecovery();
final Closeable retentionLock = shard.acquireRetentionLock();
resources.add(retentionLock);
final long startingSeqNo;
final long requiredSeqNoRangeStart;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
// simulate a peer recovery which locks the soft deletes policy on the primary
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {};
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {};
currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
latch.await();
retentionLock.close();
Expand Down

0 comments on commit e60eb03

Please sign in to comment.