Skip to content

Commit

Permalink
Reduce recovery time with compress or secure transport.
Browse files Browse the repository at this point in the history
  • Loading branch information
kovrus committed Sep 11, 2019
1 parent 4f93e02 commit 041d4d4
Show file tree
Hide file tree
Showing 15 changed files with 700 additions and 158 deletions.
9 changes: 8 additions & 1 deletion blob/src/main/java/io/crate/blob/BlobService.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
import org.elasticsearch.transport.TransportService;

import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING;

public class BlobService extends AbstractLifecycleComponent {

private final BlobIndicesService blobIndicesService;
Expand All @@ -50,6 +53,7 @@ public class BlobService extends AbstractLifecycleComponent {
private final BlobTransferTarget blobTransferTarget;
private final Client client;
private final PipelineRegistry piplineRegistry;
private final Settings settings;

@Inject
public BlobService(ClusterService clusterService,
Expand All @@ -59,7 +63,8 @@ public BlobService(ClusterService clusterService,
TransportService transportService,
BlobTransferTarget blobTransferTarget,
Client client,
PipelineRegistry pipelineRegistry) {
PipelineRegistry pipelineRegistry,
Settings settings) {
this.clusterService = clusterService;
this.blobIndicesService = blobIndicesService;
this.blobHeadRequestHandler = blobHeadRequestHandler;
Expand All @@ -68,6 +73,7 @@ public BlobService(ClusterService clusterService,
this.blobTransferTarget = blobTransferTarget;
this.client = client;
this.piplineRegistry = pipelineRegistry;
this.settings = settings;
}

public RemoteDigestBlob newBlob(String index, String digest) {
Expand All @@ -92,6 +98,7 @@ protected void doStart() throws ElasticsearchException {
recoveryTarget,
request,
fileChunkSizeInBytes,
INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings),
transportService,
blobTransferTarget,
blobIndicesService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ public BlobRecoveryHandler(IndexShard shard,
RecoveryTargetHandler recoveryTarget,
StartRecoveryRequest request,
int fileChunkSizeInBytes,
int maxConcurrentFileChunks,
final TransportService transportService,
BlobTransferTarget blobTransferTarget,
BlobIndicesService blobIndicesService) {
super(shard, recoveryTarget, request, fileChunkSizeInBytes);
super(shard, recoveryTarget, request, fileChunkSizeInBytes, maxConcurrentFileChunks);
assert BlobIndex.isBlobIndex(shard.shardId().getIndexName()) : "Shard must belong to a blob index";
this.blobShard = blobIndicesService.blobShardSafe(request.shardId());
this.request = request;
Expand Down
18 changes: 18 additions & 0 deletions es/es-server/src/main/java/org/elasticsearch/common/Numbers.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,24 @@ public static long bytesToLong(BytesRef bytes) {
return (((long) high) << 32) | (low & 0x0ffffffffL);
}

/**
* Converts a long to a byte array.
*
* @param val The long to convert to a byte array
* @return The byte array converted
*/
public static byte[] longToBytes(long val) {
byte[] arr = new byte[8];
arr[0] = (byte) (val >>> 56);
arr[1] = (byte) (val >>> 48);
arr[2] = (byte) (val >>> 40);
arr[3] = (byte) (val >>> 32);
arr[4] = (byte) (val >>> 24);
arr[5] = (byte) (val >>> 16);
arr[6] = (byte) (val >>> 8);
arr[7] = (byte) (val);
return arr;
}

/** Return the long that {@code n} stores, or throws an exception if the
* stored value cannot be converted to a long that stores the exact same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,23 +176,19 @@ synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request,

private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) {
RecoverySourceHandler handler;
final RemoteRecoveryTargetHandler recoveryTarget =
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));

// CRATE_PATCH: used to inject BlobRecoveryHandler
int recoveryChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt();
handler = getCustomRecoverySourceHandler(
final RemoteRecoveryTargetHandler recoveryTarget = new RemoteRecoveryTargetHandler(
request.recoveryId(),
request.shardId(),
transportService,
request.targetNode(),
recoverySettings,
throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
handler = new RecoverySourceHandler(
shard,
recoveryTarget,
request,
recoveryChunkSizeInBytes
);

if (handler != null){
return handler;
}
return new RecoverySourceHandler(shard, recoveryTarget, request, recoveryChunkSizeInBytes);
request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
recoverySettings.getMaxConcurrentFileChunks());
return handler;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
Expand Down Expand Up @@ -571,8 +573,7 @@ class FileChunkTransportRequestHandler implements TransportRequestHandler<Recove

@Override
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.target();
final RecoveryState.Index indexState = recoveryTarget.state().getIndex();
if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) {
Expand All @@ -591,8 +592,17 @@ public void messageReceived(final RecoveryFileChunkRequest request, TransportCha
}
}

recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(),
request.lastChunk(), request.totalTranslogOps()
final ActionListener<TransportResponse> listener =
new HandledTransportAction.ChannelActionListener<>(channel, Actions.FILE_CHUNK, request);
recoveryTarget.writeFileChunk(
request.metadata(),
request.position(),
request.content(),
request.lastChunk(),
request.totalTranslogOps(),
ActionListener.wrap(
nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE),
listener::onFailure)
);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ public class RecoverySettings {
Setting.byteSizeSetting("indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
Property.Dynamic, Property.NodeScope);

/**
* Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node.
*/
public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING =
Setting.intSetting("indices.recovery.max_concurrent_file_chunks", 2, 1, 5, Property.Dynamic, Property.NodeScope);

/**
* how long to wait before retrying after issues cause by cluster state syncing between nodes
* i.e., local node is not yet known on remote node, remote shard not yet started etc.
Expand Down Expand Up @@ -78,6 +84,7 @@ public class RecoverySettings {
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);

private volatile ByteSizeValue maxBytesPerSec;
private volatile int maxConcurrentFileChunks;
private volatile SimpleRateLimiter rateLimiter;
private volatile TimeValue retryDelayStateSync;
private volatile TimeValue retryDelayNetwork;
Expand All @@ -92,6 +99,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings);
// doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes)
// and we want to give the master time to remove a faulty node
this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings);
this.retryDelayNetwork = INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.get(settings);

this.internalActionTimeout = INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING.get(settings);
Expand All @@ -109,6 +117,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, this::setRetryDelayStateSync);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, this::setRetryDelayNetwork);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING, this::setInternalActionTimeout);
Expand Down Expand Up @@ -180,4 +189,12 @@ private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
}
}

public int getMaxConcurrentFileChunks() {
return maxConcurrentFileChunks;
}

private void setMaxConcurrentFileChunks(int maxConcurrentFileChunks) {
this.maxConcurrentFileChunks = maxConcurrentFileChunks;
}
}
Loading

0 comments on commit 041d4d4

Please sign in to comment.