Skip to content

Commit

Permalink
Merge pull request #12947 from s1monw/expected_shard_size
Browse files Browse the repository at this point in the history
Add `expectedShardSize` to ShardRouting and use it in path.data allocation
  • Loading branch information
s1monw committed Aug 21, 2015
2 parents 703a4b3 + 3dd6c4a commit 3fb2d8e
Show file tree
Hide file tree
Showing 25 changed files with 412 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* <code>InternalClusterInfoService.shardIdentifierFromRouting(String)</code>
* for the key used in the shardSizes map
*/
public final class ClusterInfo {
public class ClusterInfo {

private final Map<String, DiskUsage> usages;
final Map<String, Long> shardSizes;
Expand All @@ -54,6 +54,11 @@ public Long getShardSize(ShardRouting shardRouting) {
return shardSizes.get(shardIdentifierFromRouting(shardRouting));
}

public long getShardSize(ShardRouting shardRouting, long defaultValue) {
Long shardSize = getShardSize(shardRouting);
return shardSize == null ? defaultValue : shardSize;
}

/**
* Method that incorporates the ShardId for the shard into a string that
* includes a 'p' or 'r' depending on whether the shard is a primary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
Expand All @@ -36,6 +37,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.node.settings.NodeSettingsService;
Expand All @@ -45,6 +47,7 @@
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* InternalClusterInfoService provides the ClusterInfoService interface,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,10 @@ public String prettyPrint() {
/**
* Moves a shard from unassigned to initialize state
*/
public void initialize(ShardRouting shard, String nodeId) {
public void initialize(ShardRouting shard, String nodeId, long expectedSize) {
ensureMutable();
assert shard.unassigned() : shard;
shard.initialize(nodeId);
shard.initialize(nodeId, expectedSize);
node(nodeId).add(shard);
inactiveShardCount++;
if (shard.primary()) {
Expand All @@ -362,10 +362,10 @@ public void initialize(ShardRouting shard, String nodeId) {
* shard as well as assigning it. And returning the target initializing
* shard.
*/
public ShardRouting relocate(ShardRouting shard, String nodeId) {
public ShardRouting relocate(ShardRouting shard, String nodeId, long expectedShardSize) {
ensureMutable();
relocatingShards++;
shard.relocate(nodeId);
shard.relocate(nodeId, expectedShardSize);
ShardRouting target = shard.buildTargetRelocatingShard();
node(target.currentNodeId()).add(target);
assignedShardsAdd(target);
Expand Down Expand Up @@ -608,16 +608,9 @@ public ShardRouting next() {
/**
* Initializes the current unassigned shard and moves it from the unassigned list.
*/
public void initialize(String nodeId) {
initialize(nodeId, current.version());
}

/**
* Initializes the current unassigned shard and moves it from the unassigned list.
*/
public void initialize(String nodeId, long version) {
public void initialize(String nodeId, long version, long expectedShardSize) {
innerRemove();
nodes.initialize(new ShardRouting(current, version), nodeId);
nodes.initialize(new ShardRouting(current, version), nodeId, expectedShardSize);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
*/
public final class ShardRouting implements Streamable, ToXContent {

/**
* Used if shard size is not available
*/
public static final long UNAVAILABLE_EXPECTED_SHARD_SIZE = -1;

private String index;
private int shardId;
private String currentNodeId;
Expand All @@ -50,6 +55,7 @@ public final class ShardRouting implements Streamable, ToXContent {
private final transient List<ShardRouting> asList;
private transient ShardId shardIdentifier;
private boolean frozen = false;
private long expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;

private ShardRouting() {
this.asList = Collections.singletonList(this);
Expand All @@ -60,7 +66,7 @@ public ShardRouting(ShardRouting copy) {
}

public ShardRouting(ShardRouting copy, long version) {
this(copy.index(), copy.id(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), copy.primary(), copy.state(), version, copy.unassignedInfo(), copy.allocationId(), true);
this(copy.index(), copy.id(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), copy.primary(), copy.state(), version, copy.unassignedInfo(), copy.allocationId(), true, copy.getExpectedShardSize());
}

/**
Expand All @@ -69,7 +75,7 @@ public ShardRouting(ShardRouting copy, long version) {
*/
ShardRouting(String index, int shardId, String currentNodeId,
String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version,
UnassignedInfo unassignedInfo, AllocationId allocationId, boolean internal) {
UnassignedInfo unassignedInfo, AllocationId allocationId, boolean internal, long expectedShardSize) {
this.index = index;
this.shardId = shardId;
this.currentNodeId = currentNodeId;
Expand All @@ -81,20 +87,24 @@ public ShardRouting(ShardRouting copy, long version) {
this.restoreSource = restoreSource;
this.unassignedInfo = unassignedInfo;
this.allocationId = allocationId;
this.expectedShardSize = expectedShardSize;
assert expectedShardSize == UNAVAILABLE_EXPECTED_SHARD_SIZE || state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state;
assert expectedShardSize >= 0 || state != ShardRoutingState.INITIALIZING || state != ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state;
assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta";
if (!internal) {
assert state == ShardRoutingState.UNASSIGNED;
assert currentNodeId == null;
assert relocatingNodeId == null;
assert allocationId == null;
}

}

/**
* Creates a new unassigned shard.
*/
public static ShardRouting newUnassigned(String index, int shardId, RestoreSource restoreSource, boolean primary, UnassignedInfo unassignedInfo) {
return new ShardRouting(index, shardId, null, null, restoreSource, primary, ShardRoutingState.UNASSIGNED, 0, unassignedInfo, null, true);
return new ShardRouting(index, shardId, null, null, restoreSource, primary, ShardRoutingState.UNASSIGNED, 0, unassignedInfo, null, true, UNAVAILABLE_EXPECTED_SHARD_SIZE);
}

/**
Expand Down Expand Up @@ -205,7 +215,7 @@ public String relocatingNodeId() {
public ShardRouting buildTargetRelocatingShard() {
assert relocating();
return new ShardRouting(index, shardId, relocatingNodeId, currentNodeId, restoreSource, primary, ShardRoutingState.INITIALIZING, version, unassignedInfo,
AllocationId.newTargetRelocation(allocationId), true);
AllocationId.newTargetRelocation(allocationId), true, expectedShardSize);
}

/**
Expand Down Expand Up @@ -317,6 +327,11 @@ public void readFromThin(StreamInput in) throws IOException {
if (in.readBoolean()) {
allocationId = new AllocationId(in);
}
if (relocating() || initializing()) {
expectedShardSize = in.readLong();
} else {
expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
}
freeze();
}

Expand Down Expand Up @@ -368,6 +383,10 @@ public void writeToThin(StreamOutput out) throws IOException {
} else {
out.writeBoolean(false);
}
if (relocating() || initializing()) {
out.writeLong(expectedShardSize);
}

}

@Override
Expand Down Expand Up @@ -397,33 +416,36 @@ void moveToUnassigned(UnassignedInfo unassignedInfo) {
relocatingNodeId = null;
this.unassignedInfo = unassignedInfo;
allocationId = null;
expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
}

/**
* Initializes an unassigned shard on a node.
*/
void initialize(String nodeId) {
void initialize(String nodeId, long expectedShardSize) {
ensureNotFrozen();
version++;
assert state == ShardRoutingState.UNASSIGNED : this;
assert relocatingNodeId == null : this;
state = ShardRoutingState.INITIALIZING;
currentNodeId = nodeId;
allocationId = AllocationId.newInitializing();
this.expectedShardSize = expectedShardSize;
}

/**
* Relocate the shard to another node.
*
* @param relocatingNodeId id of the node to relocate the shard
*/
void relocate(String relocatingNodeId) {
void relocate(String relocatingNodeId, long expectedShardSize) {
ensureNotFrozen();
version++;
assert state == ShardRoutingState.STARTED : "current shard has to be started in order to be relocated " + this;
state = ShardRoutingState.RELOCATING;
this.relocatingNodeId = relocatingNodeId;
this.allocationId = AllocationId.newRelocation(allocationId);
this.expectedShardSize = expectedShardSize;
}

/**
Expand All @@ -436,7 +458,7 @@ void cancelRelocation() {
assert state == ShardRoutingState.RELOCATING : this;
assert assignedToNode() : this;
assert relocatingNodeId != null : this;

expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
state = ShardRoutingState.STARTED;
relocatingNodeId = null;
allocationId = AllocationId.cancelRelocation(allocationId);
Expand Down Expand Up @@ -470,6 +492,7 @@ void moveToStarted() {
// relocation target
allocationId = AllocationId.finishRelocation(allocationId);
}
expectedShardSize = UNAVAILABLE_EXPECTED_SHARD_SIZE;
state = ShardRoutingState.STARTED;
}

Expand Down Expand Up @@ -669,6 +692,9 @@ public String shortSummary() {
if (this.unassignedInfo != null) {
sb.append(", ").append(unassignedInfo.toString());
}
if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE) {
sb.append(", expected_shard_size[").append(expectedShardSize).append("]");
}
return sb.toString();
}

Expand All @@ -682,7 +708,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field("shard", shardId().id())
.field("index", shardId().index().name())
.field("version", version);

if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE){
builder.field("expected_shard_size_in_bytes", expectedShardSize);
}
if (restoreSource() != null) {
builder.field("restore_source");
restoreSource().toXContent(builder, params);
Expand All @@ -709,4 +737,12 @@ void freeze() {
boolean isFrozen() {
return frozen;
}

/**
* Returns the expected shard size for {@link ShardRoutingState#RELOCATING} and {@link ShardRoutingState#INITIALIZING}
* shards. If it's size is not available {@value #UNAVAILABLE_EXPECTED_SHARD_SIZE} will be returned.
*/
public long getExpectedShardSize() {
return expectedShardSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ public boolean move(ShardRouting shard, RoutingNode node ) {
Decision decision = allocation.deciders().canAllocate(shard, target, allocation);
if (decision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
sourceNode.removeShard(shard);
ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId());
ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
currentNode.addShard(targetRelocatingShard, decision);
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId());
Expand Down Expand Up @@ -687,7 +687,7 @@ public int compare(ShardRouting o1,
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
}
routingNodes.initialize(shard, routingNodes.node(minNode.getNodeId()).nodeId());
routingNodes.initialize(shard, routingNodes.node(minNode.getNodeId()).nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
changed = true;
continue; // don't add to ignoreUnassigned
} else {
Expand Down Expand Up @@ -779,10 +779,10 @@ private boolean tryRelocateShard(Operation operation, ModelNode minNode, ModelNo
/* now allocate on the cluster - if we are started we need to relocate the shard */
if (candidate.started()) {
RoutingNode lowRoutingNode = routingNodes.node(minNode.getNodeId());
routingNodes.relocate(candidate, lowRoutingNode.nodeId());
routingNodes.relocate(candidate, lowRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));

} else {
routingNodes.initialize(candidate, routingNodes.node(minNode.getNodeId()).nodeId());
routingNodes.initialize(candidate, routingNodes.node(minNode.getNodeId()).nodeId(), allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
}
return true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
unassigned.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
"force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(), unassigned.unassignedInfo().getFailure()));
}
it.initialize(routingNode.nodeId());
it.initialize(routingNode.nodeId(), unassigned.version(), allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
break;
}
return new RerouteExplanation(this, decision);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
if (decision.type() == Decision.Type.THROTTLE) {
// its being throttled, maybe have a flag to take it into account and fail? for now, just do it since the "user" wants it...
}
allocation.routingNodes().relocate(shardRouting, toRoutingNode.nodeId());
allocation.routingNodes().relocate(shardRouting, toRoutingNode.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
}

if (!found) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.collect.Maps;
import org.elasticsearch.Version;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Classes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
DiscoveryNode node = nodesToAllocate.yesNodes.get(0);
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
changed = true;
unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion);
unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
} else if (nodesToAllocate.throttleNodes.isEmpty() == true && nodesToAllocate.noNodes.isEmpty() == false) {
DiscoveryNode node = nodesToAllocate.noNodes.get(0);
logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
changed = true;
unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion);
unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
} else {
// we are throttling this, but we have enough to allocate to this node, ignore it for now
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation, long allocateUna
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), shard, nodeWithHighestMatch.node());
// we found a match
changed = true;
unassignedIterator.initialize(nodeWithHighestMatch.nodeId());
unassignedIterator.initialize(nodeWithHighestMatch.nodeId(), shard.version(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
}
} else if (matchingNodes.hasAnyData() == false) {
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
Expand Down Expand Up @@ -270,7 +271,8 @@ private long getAvgShardSizeInBytes() throws IOException {
}
}

public synchronized IndexShard createShard(int sShardId, boolean primary) {
public synchronized IndexShard createShard(int sShardId, ShardRouting routing) {
final boolean primary = routing.primary();
/*
* TODO: we execute this in parallel but it's a synced method. Yet, we might
* be able to serialize the execution via the cluster state in the future. for now we just
Expand Down Expand Up @@ -299,7 +301,7 @@ public synchronized IndexShard createShard(int sShardId, boolean primary) {
}
}
if (path == null) {
path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings, getAvgShardSizeInBytes(), this);
path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings, routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE ? getAvgShardSizeInBytes() : routing.getExpectedShardSize(), this);
logger.debug("{} creating using a new path [{}]", shardId, path);
} else {
logger.debug("{} creating using an existing path [{}]", shardId, path);
Expand Down
Loading

0 comments on commit 3fb2d8e

Please sign in to comment.