Skip to content

Commit

Permalink
Implement follower rate limiting for file restore (elastic#37449)
Browse files Browse the repository at this point in the history
This is related to elastic#35975. This commit implements rate limiting on the
follower side using a new class `CombinedRateLimiter`.
  • Loading branch information
Tim-Brooks committed Jan 21, 2019
1 parent aa46a6f commit 9b9e7bc
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.util;

import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.common.unit.ByteSizeValue;

import java.util.concurrent.atomic.AtomicLong;

/**
* A rate limiter designed for multiple concurrent users.
*/
public class CombinedRateLimiter {

// TODO: This rate limiter has some concurrency issues between the two maybePause operations

private final AtomicLong bytesSinceLastPause = new AtomicLong();
private final RateLimiter.SimpleRateLimiter rateLimiter;
private volatile boolean rateLimit;

public CombinedRateLimiter(ByteSizeValue maxBytesPerSec) {
rateLimit = maxBytesPerSec.getBytes() > 0;
rateLimiter = new RateLimiter.SimpleRateLimiter(maxBytesPerSec.getMbFrac());
}

public long maybePause(int bytes) {
if (rateLimit) {
long bytesSincePause = bytesSinceLastPause.addAndGet(bytes);
if (bytesSincePause > rateLimiter.getMinPauseCheckBytes()) {
// Time to pause
bytesSinceLastPause.addAndGet(-bytesSincePause);
return Math.max(rateLimiter.pause(bytesSincePause), 0);
}
}
return 0;
}

public void setMBPerSec(ByteSizeValue maxBytesPerSec) {
rateLimit = maxBytesPerSec.getBytes() > 0;
rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
private final Settings settings;
private final CcrLicenseChecker ccrLicenseChecker;
private final SetOnce<CcrRestoreSourceService> restoreSourceService = new SetOnce<>();
private final SetOnce<CcrSettings> ccrSettings = new SetOnce<>();
private Client client;

private final boolean tribeNode;
Expand Down Expand Up @@ -169,6 +170,8 @@ public Collection<Object> createComponents(

CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService();
this.restoreSourceService.set(restoreSourceService);
CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings());
this.ccrSettings.set(ccrSettings);
return Arrays.asList(
ccrLicenseChecker,
restoreSourceService,
Expand Down Expand Up @@ -303,7 +306,8 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {

@Override
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
Repository.Factory repositoryFactory = (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings);
Repository.Factory repositoryFactory =
(metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get());
return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
*/
package org.elasticsearch.xpack.ccr;

import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CombinedRateLimiter;
import org.elasticsearch.xpack.core.XPackSettings;

import java.util.Arrays;
Expand All @@ -18,11 +23,6 @@
*/
public final class CcrSettings {

// prevent construction
private CcrSettings() {

}

/**
* Index setting for a following index.
*/
Expand All @@ -35,6 +35,14 @@ private CcrSettings() {
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting(
"ccr.auto_follow.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic);


/**
* Max bytes a node can recover per second.
*/
public static final Setting<ByteSizeValue> RECOVERY_MAX_BYTES_PER_SECOND =
Setting.byteSizeSetting("ccr.indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* The settings defined by CCR.
*
Expand All @@ -44,7 +52,23 @@ static List<Setting<?>> getSettings() {
return Arrays.asList(
XPackSettings.CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING,
RECOVERY_MAX_BYTES_PER_SECOND,
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT);
}

private final CombinedRateLimiter ccrRateLimiter;

public CcrSettings(Settings settings, ClusterSettings clusterSettings) {
this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings));
clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec);
}

private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
ccrRateLimiter.setMBPerSec(maxBytesPerSec);
}

public CombinedRateLimiter getRateLimiter() {
return ccrRateLimiter;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CombinedRateLimiter;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineException;
Expand All @@ -49,6 +51,7 @@
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.action.CcrRequests;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest;
Expand All @@ -66,6 +69,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.LongConsumer;

/**
* This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to
Expand All @@ -79,12 +83,17 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
private static final SnapshotId SNAPSHOT_ID = new SnapshotId(LATEST, LATEST);

private final RepositoryMetaData metadata;
private final CcrSettings ccrSettings;
private final String remoteClusterAlias;
private final Client client;
private final CcrLicenseChecker ccrLicenseChecker;

public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings) {
private final CounterMetric throttledTime = new CounterMetric();

public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings,
CcrSettings ccrSettings) {
this.metadata = metadata;
this.ccrSettings = ccrSettings;
assert metadata.name().startsWith(NAME_PREFIX) : "CcrRepository metadata.name() must start with: " + NAME_PREFIX;
this.remoteClusterAlias = Strings.split(metadata.name(), NAME_PREFIX)[1];
this.ccrLicenseChecker = ccrLicenseChecker;
Expand Down Expand Up @@ -206,7 +215,7 @@ public long getSnapshotThrottleTimeInNanos() {

@Override
public long getRestoreThrottleTimeInNanos() {
return 0;
return throttledTime.count();
}

@Override
Expand Down Expand Up @@ -257,7 +266,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
// TODO: There should be some local timeout. And if the remote cluster returns an unknown session
// response, we should be able to retry by creating a new session.
String name = metadata.name();
try (RestoreSession restoreSession = RestoreSession.openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) {
try (RestoreSession restoreSession = openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) {
restoreSession.restoreFiles();
} catch (Exception e) {
throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
Expand Down Expand Up @@ -287,6 +296,15 @@ private void maybeUpdateMappings(Client localClient, Client remoteClient, Index
}
}

private RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
RecoveryState recoveryState) {
String sessionUUID = UUIDs.randomBase64UUID();
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet();
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
response.getStoreFileMetaData(), ccrSettings.getRateLimiter(), throttledTime::inc);
}

private static class RestoreSession extends FileRestoreContext implements Closeable {

private static final int BUFFER_SIZE = 1 << 16;
Expand All @@ -295,23 +313,19 @@ private static class RestoreSession extends FileRestoreContext implements Closea
private final String sessionUUID;
private final DiscoveryNode node;
private final Store.MetadataSnapshot sourceMetaData;
private final CombinedRateLimiter rateLimiter;
private final LongConsumer throttleListener;

RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData) {
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CombinedRateLimiter rateLimiter,
LongConsumer throttleListener) {
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE);
this.remoteClient = remoteClient;
this.sessionUUID = sessionUUID;
this.node = node;
this.sourceMetaData = sourceMetaData;
}

static RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
RecoveryState recoveryState) {
String sessionUUID = UUIDs.randomBase64UUID();
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet();
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
response.getStoreFileMetaData());
this.rateLimiter = rateLimiter;
this.throttleListener = throttleListener;
}

void restoreFiles() throws IOException {
Expand All @@ -326,7 +340,7 @@ void restoreFiles() throws IOException {

@Override
protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata());
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), rateLimiter, throttleListener);
}

@Override
Expand All @@ -343,14 +357,19 @@ private static class RestoreFileInputStream extends InputStream {
private final String sessionUUID;
private final DiscoveryNode node;
private final StoreFileMetaData fileToRecover;
private final CombinedRateLimiter rateLimiter;
private final LongConsumer throttleListener;

private long pos = 0;

private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover) {
private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover,
CombinedRateLimiter rateLimiter, LongConsumer throttleListener) {
this.remoteClient = remoteClient;
this.sessionUUID = sessionUUID;
this.node = node;
this.fileToRecover = fileToRecover;
this.rateLimiter = rateLimiter;
this.throttleListener = throttleListener;
}


Expand All @@ -367,6 +386,10 @@ public int read(byte[] bytes, int off, int len) throws IOException {
}

int bytesRequested = (int) Math.min(remainingBytes, len);

long nanosPaused = rateLimiter.maybePause(bytesRequested);
throttleListener.accept(nanosPaused);

String fileName = fileToRecover.name();
GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested);
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response =
Expand All @@ -390,5 +413,6 @@ public int read(byte[] bytes, int off, int len) throws IOException {

return bytesReceived;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
Expand All @@ -38,6 +39,8 @@
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -235,6 +238,60 @@ public void testDocsAreRecovered() throws Exception {
thread.join();
}

public void testRateLimitingIsEmployed() throws Exception {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), "10K"));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());

String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
String leaderIndex = "index1";
String followerIndex = "index2";

final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen(leaderIndex);

final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);

List<CcrRepository> repositories = new ArrayList<>();

for (RepositoriesService repositoriesService : getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) {
Repository repository = repositoriesService.repository(leaderClusterRepoName);
repositories.add((CcrRepository) repository);
}

logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}

leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get();

Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex)
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName,
CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions,
"^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false,
false, true, settingsBuilder.build(), new String[0],
"restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]");

PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
future.actionGet();

assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0));

settingsRequest = new ClusterUpdateSettingsRequest();
ByteSizeValue defaultValue = CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getDefault(Settings.EMPTY);
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), defaultValue));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
}

public void testFollowerMappingIsUpdated() throws IOException {
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
String leaderIndex = "index1";
Expand Down

0 comments on commit 9b9e7bc

Please sign in to comment.