Skip to content

Commit

Permalink
Retention lease background sync 6.x (elastic#38263)
Browse files Browse the repository at this point in the history
This commit introduces a background sync for retention leases. The idea
here is that we do a heavyweight sync when adding a new retention lease,
and then periodically we want to background sync any retention lease
renewals to the replicas. As long as the background sync interval is
significantly lower than the extended lifetime of a retention lease, it
is okay if from time to time a replica misses a sync (it will still have
an older version of the lease that is retaining more data as we assume
that renewals do not decrease the retaining sequence number). There are
two follow-ups that will come after this commit. The first is to address
the fact that we have not adapted the should periodically flush logic to
possibly flush the retention leases. We want to do something like flush
if we have not flushed in the last five minutes and there are renewed
retention leases since the last time that we flushed. An additional
follow-up will remove the syncing of retention leases when a retention
lease expires. Today this sync could be invoked in the background by a
merge operation. Rather, we will move the syncing of retention lease
expiration to be done under the background sync. The background sync
will use the heavyweight sync (write action) if a lease has expired, and
will use the lightweight background sync (replication action) otherwise.
  • Loading branch information
jasontedor authored Feb 4, 2019
1 parent e811b49 commit d5dd8db
Show file tree
Hide file tree
Showing 20 changed files with 662 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ public void respond(ActionListener<Response> listener) {
}
}

protected static class ReplicaResult {
public static class ReplicaResult {
final Exception finalFailure;

public ReplicaResult(Exception finalFailure) {
Expand Down
53 changes: 48 additions & 5 deletions server/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private volatile AsyncRefreshTask refreshTask;
private volatile AsyncTranslogFSync fsyncTask;
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
private volatile AsyncRetentionLeaseBackgroundSyncTask retentionLeaseBackgroundSyncTask;

// don't convert to Setting<> and register... we only set this in tests and register via a plugin
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
Expand Down Expand Up @@ -196,6 +197,7 @@ public IndexService(
this.refreshTask = new AsyncRefreshTask(this);
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseBackgroundSyncTask = new AsyncRetentionLeaseBackgroundSyncTask(this);
rescheduleFsyncTask(indexSettings.getTranslogDurability());
}

Expand Down Expand Up @@ -285,7 +287,8 @@ public synchronized void close(final String reason, boolean delete) throws IOExc
refreshTask,
fsyncTask,
trimTranslogTask,
globalCheckpointTask);
globalCheckpointTask,
retentionLeaseBackgroundSyncTask);
}
}
}
Expand Down Expand Up @@ -402,7 +405,7 @@ public synchronized IndexShard createShard(
searchOperationListeners,
indexingOperationListeners,
() -> globalCheckpointSyncer.accept(shardId),
(retentionLeases, listener) -> retentionLeaseSyncer.syncRetentionLeasesForShard(shardId, retentionLeases, listener),
retentionLeaseSyncer,
circuitBreakerService);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down Expand Up @@ -764,6 +767,14 @@ private void maybeTrimTranslog() {
}

private void maybeSyncGlobalCheckpoints() {
sync(is -> is.maybeSyncGlobalCheckpoint("background"), "global checkpoint");
}

private void backgroundSyncRetentionLeases() {
sync(IndexShard::backgroundSyncRetentionLeases, "retention lease");
}

private void sync(final Consumer<IndexShard> sync, final String source) {
for (final IndexShard shard : this.shards.values()) {
if (shard.routingEntry().active() && shard.routingEntry().primary()) {
switch (shard.state()) {
Expand All @@ -777,17 +788,17 @@ private void maybeSyncGlobalCheckpoints() {
case STARTED:
try {
shard.runUnderPrimaryPermit(
() -> shard.maybeSyncGlobalCheckpoint("background"),
() -> sync.accept(shard),
e -> {
if (e instanceof AlreadyClosedException == false
&& e instanceof IndexShardClosedException == false) {
logger.warn(
new ParameterizedMessage(
"{} failed to execute background global checkpoint sync", shard.shardId()), e);
"{} failed to execute background {} sync", shard.shardId(), source), e);
}
},
ThreadPool.Names.SAME,
"background global checkpoint sync");
"background " + source + " sync");
} catch (final AlreadyClosedException | IndexShardClosedException e) {
// the shard was closed concurrently, continue
}
Expand Down Expand Up @@ -893,6 +904,15 @@ public String toString() {
Property.Dynamic,
Property.IndexScope);

// this setting is intentionally not registered, it is only used in tests
public static final Setting<TimeValue> RETENTION_LEASE_SYNC_INTERVAL_SETTING =
Setting.timeSetting(
"index.soft_deletes.retention_lease.sync_interval",
new TimeValue(5, TimeUnit.MINUTES),
new TimeValue(0, TimeUnit.MILLISECONDS),
Property.Dynamic,
Property.IndexScope);

/**
* Background task that syncs the global checkpoint to replicas.
*/
Expand All @@ -919,6 +939,29 @@ public String toString() {
}
}

final class AsyncRetentionLeaseBackgroundSyncTask extends BaseAsyncTask {

AsyncRetentionLeaseBackgroundSyncTask(final IndexService indexService) {
super(indexService, RETENTION_LEASE_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings()));
}

@Override
protected void runInternal() {
indexService.backgroundSyncRetentionLeases();
}

@Override
protected String getThreadPool() {
return ThreadPool.Names.MANAGEMENT;
}

@Override
public String toString() {
return "retention_lease_background_sync";
}

}

AsyncRefreshTask getRefreshTask() { // for tests
return refreshTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
private RetentionLeases retentionLeases = RetentionLeases.EMPTY;

/**
* Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. Note that only
* the primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas.
* Get all non-expired retention leases tracked on this shard. Note that only the primary shard calculates which leases are expired,
* and if any have expired, syncs the retention leases to any replicas.
*
* @return the retention leases
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.seqno;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Objects;

/**
* Replication action responsible for background syncing retention leases to replicas. This action is deliberately a replication action so
* that if a replica misses a background retention lease sync then that shard will not be marked as stale. We have some tolerance for a
* shard copy missing renewals of retention leases since the background sync interval is much smaller than the expected lifetime of
* retention leases.
*/
public class RetentionLeaseBackgroundSyncAction extends TransportReplicationAction<
RetentionLeaseBackgroundSyncAction.Request,
RetentionLeaseBackgroundSyncAction.Request,
ReplicationResponse> {

public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync";

private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class);

protected Logger getLogger() {
return LOGGER;
}

@Inject
public RetentionLeaseBackgroundSyncAction(
final Settings settings,
final TransportService transportService,
final ClusterService clusterService,
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
super(
settings,
ACTION_NAME,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
Request::new,
Request::new,
ThreadPool.Names.MANAGEMENT);
}

/**
* Background sync the specified retention leases for the specified shard.
*
* @param shardId the shard to sync
* @param retentionLeases the retention leases to sync
*/
public void backgroundSync(
final ShardId shardId,
final RetentionLeases retentionLeases) {
Objects.requireNonNull(shardId);
Objects.requireNonNull(retentionLeases);
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
execute(
new Request(shardId, retentionLeases),
ActionListener.wrap(
r -> {},
e -> {
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e);
}
}));
}
}

@Override
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(final Request request, final IndexShard primary) {
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
primary.afterWriteOperation();
return new PrimaryResult<>(request, new ReplicationResponse());
}

@Override
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica){
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
replica.afterWriteOperation();
return new ReplicaResult();
}

public static final class Request extends ReplicationRequest<Request> {

private RetentionLeases retentionLeases;

public RetentionLeases getRetentionLeases() {
return retentionLeases;
}

public Request() {

}

public Request(final ShardId shardId, final RetentionLeases retentionLeases) {
super(Objects.requireNonNull(shardId));
this.retentionLeases = Objects.requireNonNull(retentionLeases);
}

@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
retentionLeases = new RetentionLeases(in);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(Objects.requireNonNull(out));
retentionLeases.writeTo(out);
}

@Override
public String toString() {
return "Request{" +
"retentionLeases=" + retentionLeases +
", shardId=" + shardId +
", timeout=" + timeout +
", index='" + index + '\'' +
", waitForActiveShards=" + waitForActiveShards +
'}';
}

}

@Override
protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public RetentionLeaseSyncAction(
* @param retentionLeases the retention leases to sync
* @param listener the callback to invoke when the sync completes normally or abnormally
*/
public void syncRetentionLeasesForShard(
public void sync(
final ShardId shardId,
final RetentionLeases retentionLeases,
final ActionListener<ReplicationResponse> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
* A functional interface that represents a method for syncing retention leases to replica shards after a new retention lease is added on
* the primary.
*/
@FunctionalInterface
public interface RetentionLeaseSyncer {

/**
Expand All @@ -38,9 +37,20 @@ public interface RetentionLeaseSyncer {
* @param retentionLeases the retention leases to sync
* @param listener the callback when sync completes
*/
void syncRetentionLeasesForShard(
ShardId shardId,
RetentionLeases retentionLeases,
ActionListener<ReplicationResponse> listener);
void sync(ShardId shardId, RetentionLeases retentionLeases, ActionListener<ReplicationResponse> listener);

void backgroundSync(ShardId shardId, RetentionLeases retentionLeases);

RetentionLeaseSyncer EMPTY = new RetentionLeaseSyncer() {
@Override
public void sync(final ShardId shardId, final RetentionLeases retentionLeases, final ActionListener<ReplicationResponse> listener) {

}

@Override
public void backgroundSync(final ShardId shardId, final RetentionLeases retentionLeases) {

}
};

}
Loading

0 comments on commit d5dd8db

Please sign in to comment.