Skip to content

Commit

Permalink
reafactor how actions handle failures, better response when non activ…
Browse files Browse the repository at this point in the history
…e shards exists, also, default logging to have action set to DEBUG so exceptions in actions are logged in the server
  • Loading branch information
kimchy committed Apr 6, 2010
1 parent 1a9c5d6 commit 2bb31fe
Show file tree
Hide file tree
Showing 42 changed files with 628 additions and 239 deletions.
2 changes: 2 additions & 0 deletions config/logging.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
rootLogger: INFO, console, file
logger:
jgroups: WARN
# log action execution errors for easier debugging
action : DEBUG

appender:
console:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
*/
public class NoShardAvailableActionException extends IndexShardException {

public NoShardAvailableActionException(ShardId shardId, String msg) {
super(shardId, msg);
}

public NoShardAvailableActionException(ShardId shardId, String msg, Throwable cause) {
super(shardId, msg, cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class BroadcastPingResponse extends BroadcastOperationResponse {

}

public BroadcastPingResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
public BroadcastPingResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
}

@Override public void readFrom(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct
successfulShards++;
}
}
return new BroadcastPingResponse(successfulShards, failedShards, shardFailures);
return new BroadcastPingResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
}

@Override protected BroadcastShardPingRequest newShardRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ public int shardId() {
super.writeTo(out);
out.writeVInt(shardId);
}

@Override public String toString() {
return "[" + index + "][" + shardId + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public class ClearIndicesCacheResponse extends BroadcastOperationResponse {

}

ClearIndicesCacheResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
ClearIndicesCacheResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
}

@Override public void readFrom(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,18 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
return new ClearIndicesCacheRequest();
}

@Override protected boolean ignoreNonActiveExceptions() {
return true;
}

@Override protected ClearIndicesCacheResponse newResponse(ClearIndicesCacheRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
// simply ignore non active shards
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
Expand All @@ -84,7 +88,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
successfulShards++;
}
}
return new ClearIndicesCacheResponse(successfulShards, failedShards, shardFailures);
return new ClearIndicesCacheResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
}

@Override protected ShardClearIndicesCacheRequest newShardRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public class FlushResponse extends BroadcastOperationResponse {

}

FlushResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
FlushResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
}

@Override public void readFrom(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,18 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
return new FlushRequest();
}

@Override protected boolean ignoreNonActiveExceptions() {
return true;
}

@Override protected FlushResponse newResponse(FlushRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
// a non active shard, ignore
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
Expand All @@ -83,7 +87,7 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
successfulShards++;
}
}
return new FlushResponse(successfulShards, failedShards, shardFailures);
return new FlushResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
}

@Override protected ShardFlushRequest newShardRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ public int shardId() {
super.writeTo(out);
out.writeVInt(shardId);
}

@Override public String toString() {
return "[" + index + "][" + shardId + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public class OptimizeResponse extends BroadcastOperationResponse {

}

OptimizeResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
OptimizeResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
}

@Override public void readFrom(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,18 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
return new OptimizeRequest();
}

@Override protected boolean ignoreNonActiveExceptions() {
return true;
}

@Override protected OptimizeResponse newResponse(OptimizeRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
// a non active shard, ignore...
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
Expand All @@ -84,7 +88,7 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
successfulShards++;
}
}
return new OptimizeResponse(successfulShards, failedShards, shardFailures);
return new OptimizeResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
}

@Override protected ShardOptimizeRequest newShardRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public class RefreshResponse extends BroadcastOperationResponse {

}

RefreshResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
RefreshResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
}

@Override public void readFrom(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,18 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
return new RefreshRequest();
}

@Override protected boolean ignoreNonActiveExceptions() {
return true;
}

@Override protected RefreshResponse newResponse(RefreshRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
// non active shard, ignore
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
Expand All @@ -84,7 +88,7 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
successfulShards++;
}
}
return new RefreshResponse(successfulShards, failedShards, shardFailures);
return new RefreshResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
}

@Override protected ShardRefreshRequest newShardRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import static org.elasticsearch.util.settings.ImmutableSettings.*;

/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class IndicesStatusResponse extends BroadcastOperationResponse {

Expand All @@ -50,8 +50,8 @@ public class IndicesStatusResponse extends BroadcastOperationResponse {
IndicesStatusResponse() {
}

IndicesStatusResponse(ShardStatus[] shards, ClusterState clusterState, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
IndicesStatusResponse(ShardStatus[] shards, ClusterState clusterState, int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.shards = shards;
indicesSettings = newHashMap();
for (ShardStatus shard : shards) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import static com.google.common.collect.Lists.*;

/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class TransportIndicesStatusAction extends TransportBroadcastOperationAction<IndicesStatusRequest, IndicesStatusResponse, TransportIndicesStatusAction.IndexShardStatusRequest, ShardStatus> {

Expand All @@ -65,6 +65,10 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
return new IndicesStatusRequest();
}

@Override protected boolean ignoreNonActiveExceptions() {
return true;
}

@Override protected IndicesStatusResponse newResponse(IndicesStatusRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
Expand All @@ -73,7 +77,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
// simply ignore non active shards
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
Expand All @@ -85,7 +89,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
successfulShards++;
}
}
return new IndicesStatusResponse(shards.toArray(new ShardStatus[shards.size()]), clusterState, successfulShards, failedShards, shardFailures);
return new IndicesStatusResponse(shards.toArray(new ShardStatus[shards.size()]), clusterState, shardsResponses.length(), successfulShards, failedShards, shardFailures);
}

@Override protected IndexShardStatusRequest newShardRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,6 @@ public CountRequest types(String... types) {
}

@Override public String toString() {
return "[" + Arrays.toString(indices) + "][" + Arrays.toString(types) + "], querySource[" + Unicode.fromBytes(querySource) + "]";
return "[" + Arrays.toString(indices) + "]" + Arrays.toString(types) + ", querySource[" + Unicode.fromBytes(querySource) + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public class CountResponse extends BroadcastOperationResponse {

}

CountResponse(long count, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
CountResponse(long count, int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.count = count;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
successfulShards++;
}
}
return new CountResponse(count, successfulShards, failedShards, shardFailures);
return new CountResponse(count, shardsResponses.length(), successfulShards, failedShards, shardFailures);
}

@Override protected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticSearchException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.util.Nullable;
import org.elasticsearch.util.Strings;
import org.elasticsearch.util.Unicode;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Arrays;

import static org.elasticsearch.action.Actions.*;

Expand Down Expand Up @@ -115,4 +117,8 @@ public String[] types() {
out.writeUTF(type);
}
}

@Override public String toString() {
return "[" + index + "]" + Arrays.toString(types) + ", query [" + Unicode.fromBytes(querySource) + "]";
}
}
Loading

0 comments on commit 2bb31fe

Please sign in to comment.