Skip to content

Commit

Permalink
Batching of snapshot state updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Yannick Welsch committed May 15, 2015
1 parent c19b7a2 commit 692f610
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 71 deletions.
171 changes: 114 additions & 57 deletions src/main/java/org/elasticsearch/snapshots/RestoreService.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -38,6 +39,7 @@
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -46,6 +48,7 @@
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
Expand All @@ -54,6 +57,8 @@

import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;

import static com.google.common.collect.Lists.newArrayList;
Expand Down Expand Up @@ -116,6 +121,8 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
private final DynamicSettings dynamicSettings;

private final CopyOnWriteArrayList<ActionListener<RestoreCompletionResponse>> listeners = new CopyOnWriteArrayList<>();

private final BlockingQueue<UpdateIndexShardRestoreStatusRequest> updatedSnapshotStateQueue = ConcurrentCollections.newBlockingQueue();

@Inject
public RestoreService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService, TransportService transportService,
Expand Down Expand Up @@ -459,42 +466,75 @@ public RestoreInfo getRestoreInfo() {
* @param request update shard status request
*/
private void innerUpdateRestoreState(final UpdateIndexShardRestoreStatusRequest request) {
clusterService.submitStateUpdateTask("update snapshot state", new ProcessedClusterStateUpdateTask() {
logger.trace("received updated snapshot restore state [{}]", request);
updatedSnapshotStateQueue.add(request);

private RestoreInfo restoreInfo = null;
private Map<ShardId, ShardRestoreStatus> shards = null;
clusterService.submitStateUpdateTask("update snapshot state", new ProcessedClusterStateUpdateTask() {
private final List<UpdateIndexShardRestoreStatusRequest> drainedRequests = new ArrayList<>();
private Map<SnapshotId, Tuple<RestoreInfo, Map<ShardId, ShardRestoreStatus>>> batchedRestoreInfo = null;

@Override
public ClusterState execute(ClusterState currentState) {
MetaData metaData = currentState.metaData();
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
RestoreMetaData restore = metaData.custom(RestoreMetaData.TYPE);

if (request.processed) {
return currentState;
}

updatedSnapshotStateQueue.drainTo(drainedRequests);

final int batchSize = drainedRequests.size();

// nothing to process (a previous event has processed it already)
if (batchSize == 0) {
return currentState;
}

final MetaData metaData = currentState.metaData();
final RestoreMetaData restore = metaData.custom(RestoreMetaData.TYPE);
if (restore != null) {
boolean changed = false;
boolean found = false;
ArrayList<RestoreMetaData.Entry> entries = newArrayList();
int changedCount = 0;
final List<RestoreMetaData.Entry> entries = newArrayList();
for (RestoreMetaData.Entry entry : restore.entries()) {
if (entry.snapshotId().equals(request.snapshotId())) {
assert !found;
found = true;
Map<ShardId, ShardRestoreStatus> shards = newHashMap(entry.shards());
logger.trace("[{}] Updating shard [{}] with status [{}]", request.snapshotId(), request.shardId(), request.status().state());
shards.put(request.shardId(), request.status());
Map<ShardId, ShardRestoreStatus> shards = null;

for (int i = 0; i < batchSize; i++) {
final UpdateIndexShardRestoreStatusRequest updateSnapshotState = drainedRequests.get(i);
updateSnapshotState.processed = true;

if (entry.snapshotId().equals(updateSnapshotState.snapshotId())) {
logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshotId(), updateSnapshotState.shardId(), updateSnapshotState.status().state());
if (shards == null) {
shards = newHashMap(entry.shards());
}
shards.put(updateSnapshotState.shardId(), updateSnapshotState.status());
changedCount++;
}
}

if (shards != null) {
if (!completed(shards)) {
entries.add(new RestoreMetaData.Entry(entry.snapshotId(), RestoreMetaData.State.STARTED, entry.indices(), ImmutableMap.copyOf(shards)));
} else {
logger.info("restore [{}] is done", request.snapshotId());
restoreInfo = new RestoreInfo(entry.snapshotId().getSnapshot(), entry.indices(), shards.size(), shards.size() - failedShards(shards));
this.shards = shards;
logger.info("restore [{}] is done", entry.snapshotId());
if (batchedRestoreInfo == null) {
batchedRestoreInfo = newHashMap();
}
assert !batchedRestoreInfo.containsKey(entry.snapshotId());
batchedRestoreInfo.put(entry.snapshotId(),
new Tuple<RestoreInfo, Map<ShardId, ShardRestoreStatus>>(
new RestoreInfo(entry.snapshotId().getSnapshot(), entry.indices(), shards.size(), shards.size() - failedShards(shards)),
shards));
}
changed = true;
} else {
entries.add(entry);
}
}
if (changed) {
restore = new RestoreMetaData(entries.toArray(new RestoreMetaData.Entry[entries.size()]));
mdBuilder.putCustom(RestoreMetaData.TYPE, restore);

if (changedCount > 0) {
logger.warn("changed cluster state triggered by {} snapshot restore state updates", changedCount);

final RestoreMetaData updatedRestore = new RestoreMetaData(entries.toArray(new RestoreMetaData.Entry[entries.size()]));
final MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()).putCustom(RestoreMetaData.TYPE, updatedRestore);
return ClusterState.builder(currentState).metaData(mdBuilder).build();
}
}
Expand All @@ -503,48 +543,58 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, @Nullable Throwable t) {
logger.warn("[{}][{}] failed to update snapshot status to [{}]", t, request.snapshotId(), request.shardId(), request.status());
for (UpdateIndexShardRestoreStatusRequest request : drainedRequests) {
logger.warn("[{}][{}] failed to update snapshot status to [{}]", t, request.snapshotId(), request.shardId(), request.status());
}
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (restoreInfo != null) {
RoutingTable routingTable = newState.getRoutingTable();
final List<ShardId> waitForStarted = newArrayList();
for (Map.Entry<ShardId, ShardRestoreStatus> shard : shards.entrySet()) {
if (shard.getValue().state() == RestoreMetaData.State.SUCCESS ) {
ShardId shardId = shard.getKey();
ShardRouting shardRouting = findPrimaryShard(routingTable, shardId);
if (shardRouting != null && !shardRouting.active()) {
logger.trace("[{}][{}] waiting for the shard to start", request.snapshotId(), shardId);
waitForStarted.add(shardId);
if (batchedRestoreInfo != null) {
for (final Entry<SnapshotId, Tuple<RestoreInfo, Map<ShardId, ShardRestoreStatus>>> entry : batchedRestoreInfo.entrySet()) {
final SnapshotId snapshotId = entry.getKey();
final RestoreInfo restoreInfo = entry.getValue().v1();
final Map<ShardId, ShardRestoreStatus> shards = entry.getValue().v2();
if (restoreInfo == null) {
continue;
}
RoutingTable routingTable = newState.getRoutingTable();
final List<ShardId> waitForStarted = newArrayList();
for (Map.Entry<ShardId, ShardRestoreStatus> shard : shards.entrySet()) {
if (shard.getValue().state() == RestoreMetaData.State.SUCCESS ) {
ShardId shardId = shard.getKey();
ShardRouting shardRouting = findPrimaryShard(routingTable, shardId);
if (shardRouting != null && !shardRouting.active()) {
logger.trace("[{}][{}] waiting for the shard to start", snapshotId, shardId);
waitForStarted.add(shardId);
}
}
}
}
if (waitForStarted.isEmpty()) {
notifyListeners();
} else {
clusterService.addLast(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.routingTableChanged()) {
RoutingTable routingTable = event.state().getRoutingTable();
for (Iterator<ShardId> iterator = waitForStarted.iterator(); iterator.hasNext();) {
ShardId shardId = iterator.next();
ShardRouting shardRouting = findPrimaryShard(routingTable, shardId);
// Shard disappeared (index deleted) or became active
if (shardRouting == null || shardRouting.active()) {
iterator.remove();
logger.trace("[{}][{}] shard disappeared or started - removing", request.snapshotId(), shardId);
if (waitForStarted.isEmpty()) {
notifyListeners(snapshotId, restoreInfo);
} else {
clusterService.addLast(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.routingTableChanged()) {
RoutingTable routingTable = event.state().getRoutingTable();
for (Iterator<ShardId> iterator = waitForStarted.iterator(); iterator.hasNext();) {
ShardId shardId = iterator.next();
ShardRouting shardRouting = findPrimaryShard(routingTable, shardId);
// Shard disappeared (index deleted) or became active
if (shardRouting == null || shardRouting.active()) {
iterator.remove();
logger.trace("[{}][{}] shard disappeared or started - removing", snapshotId, shardId);
}
}
}
if (waitForStarted.isEmpty()) {
notifyListeners(snapshotId, restoreInfo);
clusterService.remove(this);
}
}
if (waitForStarted.isEmpty()) {
notifyListeners();
clusterService.remove(this);
}
}
});
});
}
}
}
}
Expand All @@ -560,10 +610,10 @@ private ShardRouting findPrimaryShard(RoutingTable routingTable, ShardId shardId
return null;
}

private void notifyListeners() {
private void notifyListeners(SnapshotId snapshotId, RestoreInfo restoreInfo) {
for (ActionListener<RestoreCompletionResponse> listener : listeners) {
try {
listener.onResponse(new RestoreCompletionResponse(request.snapshotId, restoreInfo));
listener.onResponse(new RestoreCompletionResponse(snapshotId, restoreInfo));
} catch (Throwable e) {
logger.warn("failed to update snapshot status for [{}]", e, listener);
}
Expand Down Expand Up @@ -945,6 +995,8 @@ private static class UpdateIndexShardRestoreStatusRequest extends TransportReque
private SnapshotId snapshotId;
private ShardId shardId;
private ShardRestoreStatus status;

volatile boolean processed; // state field, no need to serialize

private UpdateIndexShardRestoreStatusRequest() {

Expand Down Expand Up @@ -983,6 +1035,11 @@ public ShardId shardId() {
public ShardRestoreStatus status() {
return status;
}

@Override
public String toString() {
return "" + snapshotId + ", shardId [" + shardId + "], status [" + status.state() + "]";
}
}

/**
Expand Down
Loading

0 comments on commit 692f610

Please sign in to comment.