Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce retention lease actions #38756

Merged
merged 11 commits into from
Feb 12, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.elasticsearch.action;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction;
import org.elasticsearch.action.admin.cluster.allocation.TransportClusterAllocationExplainAction;
import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
Expand Down Expand Up @@ -209,6 +209,7 @@
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.persistent.RemovePersistentTaskAction;
Expand All @@ -220,6 +221,7 @@
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.action.RestFieldCapabilitiesAction;
import org.elasticsearch.rest.action.RestMainAction;
import org.elasticsearch.rest.action.admin.cluster.RestAddVotingConfigExclusionAction;
import org.elasticsearch.rest.action.admin.cluster.RestCancelTasksAction;
import org.elasticsearch.rest.action.admin.cluster.RestClearVotingConfigExclusionsAction;
import org.elasticsearch.rest.action.admin.cluster.RestClusterAllocationExplainAction;
Expand Down Expand Up @@ -251,7 +253,6 @@
import org.elasticsearch.rest.action.admin.cluster.RestRestoreSnapshotAction;
import org.elasticsearch.rest.action.admin.cluster.RestSnapshotsStatusAction;
import org.elasticsearch.rest.action.admin.cluster.RestVerifyRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.RestAddVotingConfigExclusionAction;
import org.elasticsearch.rest.action.admin.indices.RestAnalyzeAction;
import org.elasticsearch.rest.action.admin.indices.RestClearIndicesCacheAction;
import org.elasticsearch.rest.action.admin.indices.RestCloseIndexAction;
Expand Down Expand Up @@ -529,6 +530,10 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class);
actions.register(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class);

// retention leases
actions.register(RetentionLeaseActions.Add.INSTANCE, RetentionLeaseActions.Add.TransportAction.class);
actions.register(RetentionLeaseActions.Renew.INSTANCE, RetentionLeaseActions.Renew.TransportAction.class);

return unmodifiableMap(actions.getRegistry());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ public enum SearcherScope {
/**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
public abstract Closeable acquireRetentionLockForPeerRecovery();
public abstract Closeable acquireRetentionLock();
jasontedor marked this conversation as resolved.
Show resolved Hide resolved

/**
* Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
Expand All @@ -771,6 +771,13 @@ public abstract int estimateNumberOfHistoryOperations(String source,
*/
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;

/**
* Gets the minimum retained sequence number for this engine.
*
* @return the minimum retained sequence number
*/
public abstract long getMinRetainedSeqNo();

public abstract TranslogStats getTranslogStats();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2567,13 +2567,13 @@ public boolean hasCompleteOperationHistory(String source, MapperService mapperSe
* Returns the minimum seqno that is retained in the Lucene index.
* Operations whose seq# are at least this value should exist in the Lucene index.
*/
final long getMinRetainedSeqNo() {
public final long getMinRetainedSeqNo() {
assert softDeleteEnabled : Thread.currentThread().getName();
return softDeletesPolicy.getMinRetainedSeqNo();
}

@Override
public Closeable acquireRetentionLockForPeerRecovery() {
public Closeable acquireRetentionLock() {
if (softDeleteEnabled) {
return softDeletesPolicy.acquireRetentionLock();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public void syncTranslog() {
}

@Override
public Closeable acquireRetentionLockForPeerRecovery() {
public Closeable acquireRetentionLock() {
return () -> {};
}

Expand Down Expand Up @@ -311,6 +311,11 @@ public boolean hasCompleteOperationHistory(String source, MapperService mapperSe
return false;
}

@Override
public long getMinRetainedSeqNo() {
throw new UnsupportedOperationException();
jasontedor marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public TranslogStats getTranslogStats() {
return translogStats;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.seqno;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

public class RetentionLeaseActions {

static abstract class TransportRetentionLeaseAction extends TransportSingleShardAction<Request, Response> {

private Logger logger = LogManager.getLogger(getClass());
jasontedor marked this conversation as resolved.
Show resolved Hide resolved

private final IndicesService indicesService;

@Inject
public TransportRetentionLeaseAction(
final String name,
final ThreadPool threadPool,
final ClusterService clusterService,
final TransportService transportService,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final IndicesService indicesService) {
super(name, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, Request::new, ThreadPool.Names.MANAGEMENT);
this.indicesService = Objects.requireNonNull(indicesService);
}

@Override
protected ShardsIterator shards(ClusterState state, InternalRequest request) {
return state
.routingTable()
.shardRoutingTable(request.concreteIndex(), request.request().getShardId().id())
.primaryShardIt();
}

@Override
protected Response shardOperation(final Request request, final ShardId shardId) {
final IndexService indexService = indicesService.indexServiceSafe(request.getShardId().getIndex());
final IndexShard indexShard = indexService.getShard(request.getShardId().id());

final CompletableFuture<Releasable> permit = new CompletableFuture<>();
final ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {

@Override
public void onResponse(Releasable releasable) {
if (permit.complete(releasable) == false) {
releasable.close();
}
}

@Override
public void onFailure(Exception e) {
permit.completeExceptionally(e);
}

};

indexShard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, request);

// block until we have the permit
try (Releasable ignore = FutureUtils.get(permit)) {
doRetentionLeaseAction(indexShard, request);
} finally {
// just in case we got an exception (likely interrupted) while waiting for the get
permit.whenComplete((r, e) -> {
if (r != null) {
r.close();
}
if (e != null) {
logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e);
}
});
}

return new Response();
}

abstract void doRetentionLeaseAction(IndexShard indexShard, Request request);

@Override
protected Response newResponse() {
return new Response();
}

@Override
protected boolean resolveIndex(final Request request) {
return false;
}

}

public static class Add extends Action<Response> {

public static final Add INSTANCE = new Add();
public static final String NAME = "indices:admin/seq_no/add_retention_lease";

private Add() {
super(NAME);
}

public static class TransportAction extends TransportRetentionLeaseAction {

@Inject
public TransportAction(
final ThreadPool threadPool,
final ClusterService clusterService,
final TransportService transportService,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final IndicesService indicesService) {
super(NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, indicesService);
}

@Override
void doRetentionLeaseAction(final IndexShard indexShard, final Request request) {
indexShard.addRetentionLease(request.getId(), request.getRetainingSequenceNumber(), request.getSource(), ActionListener.wrap(() -> {}));
}
}

@Override
public Response newResponse() {
return new Response();
}

}

public static class Renew extends Action<Response> {

public static final Renew INSTANCE = new Renew();
public static final String NAME = "indices:admin/seq_no/renew_retention_lease";

private Renew() {
super(NAME);
}

public static class TransportAction extends TransportRetentionLeaseAction {

@Inject
public TransportAction(
final ThreadPool threadPool,
final ClusterService clusterService,
final TransportService transportService,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final IndicesService indicesService) {
super(NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, indicesService);
}


@Override
void doRetentionLeaseAction(final IndexShard indexShard, final Request request) {
indexShard.renewRetentionLease(request.getId(), request.getRetainingSequenceNumber(), request.getSource());
}
}

@Override
public Response newResponse() {
return new Response();
}

}

public static class Request extends SingleShardRequest<Request> {

public static long RETAIN_ALL = -1;

private ShardId shardId;

public ShardId getShardId() {
return shardId;
}

private String id;

public String getId() {
return id;
}

private long retainingSequenceNumber;

public long getRetainingSequenceNumber() {
return retainingSequenceNumber;
}

private String source;

public String getSource() {
return source;
}

public Request() {
}

public Request(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) {
super(Objects.requireNonNull(shardId).getIndexName());
this.shardId = shardId;
this.id = Objects.requireNonNull(id);
if (retainingSequenceNumber < 0 && retainingSequenceNumber != RETAIN_ALL) {
throw new IllegalArgumentException(
"retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range");
}
this.retainingSequenceNumber = retainingSequenceNumber;
this.source = Objects.requireNonNull(source);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
id = in.readString();
retainingSequenceNumber = in.readZLong();
source = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeString(id);
out.writeZLong(retainingSequenceNumber);
out.writeString(source);
}

}

public static class Response extends ActionResponse {

}

}
Loading