Skip to content

Commit

Permalink
Remove code duplication
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Mar 15, 2024
1 parent 6112f4b commit e798d7a
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -721,11 +721,11 @@ public Settings onNodeStopped(String nodeName) throws Exception {
);

assertThat(response.getNodes(), hasSize(1));
assertThat(response.getNodes().get(0).allocationId(), notNullValue());
assertThat(response.getNodes().get(0).getGatewayShardStarted().allocationId(), notNullValue());
if (corrupt) {
assertThat(response.getNodes().get(0).storeException(), notNullValue());
assertThat(response.getNodes().get(0).getGatewayShardStarted().storeException(), notNullValue());
} else {
assertThat(response.getNodes().get(0).storeException(), nullValue());
assertThat(response.getNodes().get(0).getGatewayShardStarted().storeException(), nullValue());
}

// start another node so cluster consistency checks won't time out due to the lack of state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ void finish() {
storeStatuses.add(
new IndicesShardStoresResponse.StoreStatus(
response.getNode(),
response.allocationId(),
response.getGatewayShardStarted().allocationId(),
allocationStatus,
response.storeException()
response.getGatewayShardStarted().storeException()
)
);
}
Expand Down Expand Up @@ -308,7 +308,8 @@ private IndicesShardStoresResponse.StoreStatus.AllocationStatus getAllocationSta
* A shard exists/existed in a node only if shard state file exists in the node
*/
private boolean shardExistsInNode(final NodeGatewayStartedShards response) {
return response.storeException() != null || response.allocationId() != null;
return response.getGatewayShardStarted().storeException() != null
|| response.getGatewayShardStarted().allocationId() != null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ private static List<NodeGatewayShardStarted> adaptToNodeStartedShardList(FetchRe
shardsState.getData().forEach((node, nodeGatewayStartedShard) -> {
nodeShardStates.add(
new NodeGatewayShardStarted(
nodeGatewayStartedShard.allocationId(),
nodeGatewayStartedShard.primary(),
nodeGatewayStartedShard.replicationCheckpoint(),
nodeGatewayStartedShard.storeException(),
nodeGatewayStartedShard.getGatewayShardStarted().allocationId(),
nodeGatewayStartedShard.getGatewayShardStarted().primary(),
nodeGatewayStartedShard.getGatewayShardStarted().replicationCheckpoint(),
nodeGatewayStartedShard.getGatewayShardStarted().storeException(),
node
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,12 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) {
);
return new NodeGatewayStartedShards(
clusterService.localNode(),
shardInfo.allocationId(),
shardInfo.primary(),
shardInfo.replicationCheckpoint(),
shardInfo.storeException()
new GatewayShardStarted(
shardInfo.allocationId(),
shardInfo.primary(),
shardInfo.replicationCheckpoint(),
shardInfo.storeException()
)
);
} catch (Exception e) {
throw new OpenSearchException("failed to load started shards", e);
Expand Down Expand Up @@ -303,81 +305,51 @@ public String getCustomDataPath() {
* @opensearch.internal
*/
public static class NodeGatewayStartedShards extends BaseNodeResponse {
private final String allocationId;
private final boolean primary;
private final Exception storeException;
private final ReplicationCheckpoint replicationCheckpoint;
private final GatewayShardStarted gatewayShardStarted;

public NodeGatewayStartedShards(StreamInput in) throws IOException {
super(in);
allocationId = in.readOptionalString();
primary = in.readBoolean();
String allocationId = in.readOptionalString();
boolean primary = in.readBoolean();
Exception storeException;
if (in.readBoolean()) {
storeException = in.readException();
} else {
storeException = null;
}
ReplicationCheckpoint replicationCheckpoint;
if (in.getVersion().onOrAfter(Version.V_2_3_0) && in.readBoolean()) {
replicationCheckpoint = new ReplicationCheckpoint(in);
} else {
replicationCheckpoint = null;
}
this.gatewayShardStarted = new GatewayShardStarted(allocationId, primary, replicationCheckpoint, storeException);
}

public NodeGatewayStartedShards(
DiscoveryNode node,
String allocationId,
boolean primary,
ReplicationCheckpoint replicationCheckpoint
) {
this(node, allocationId, primary, replicationCheckpoint, null);
public GatewayShardStarted getGatewayShardStarted() {
return gatewayShardStarted;
}

public NodeGatewayStartedShards(
DiscoveryNode node,
String allocationId,
boolean primary,
ReplicationCheckpoint replicationCheckpoint,
Exception storeException
) {
public NodeGatewayStartedShards(DiscoveryNode node, GatewayShardStarted gatewayShardStarted) {
super(node);
this.allocationId = allocationId;
this.primary = primary;
this.replicationCheckpoint = replicationCheckpoint;
this.storeException = storeException;
}

public String allocationId() {
return this.allocationId;
}

public boolean primary() {
return this.primary;
}

public ReplicationCheckpoint replicationCheckpoint() {
return this.replicationCheckpoint;
}

public Exception storeException() {
return this.storeException;
this.gatewayShardStarted = gatewayShardStarted;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(allocationId);
out.writeBoolean(primary);
if (storeException != null) {
out.writeOptionalString(gatewayShardStarted.allocationId());
out.writeBoolean(gatewayShardStarted.primary());
if (gatewayShardStarted.storeException() != null) {
out.writeBoolean(true);
out.writeException(storeException);
out.writeException(gatewayShardStarted.storeException());
} else {
out.writeBoolean(false);
}
if (out.getVersion().onOrAfter(Version.V_2_3_0)) {
if (replicationCheckpoint != null) {
if (gatewayShardStarted.replicationCheckpoint() != null) {
out.writeBoolean(true);
replicationCheckpoint.writeTo(out);
gatewayShardStarted.replicationCheckpoint().writeTo(out);
} else {
out.writeBoolean(false);
}
Expand All @@ -395,33 +367,17 @@ public boolean equals(Object o) {

NodeGatewayStartedShards that = (NodeGatewayStartedShards) o;

return primary == that.primary
&& Objects.equals(allocationId, that.allocationId)
&& Objects.equals(storeException, that.storeException)
&& Objects.equals(replicationCheckpoint, that.replicationCheckpoint);
return gatewayShardStarted.equals(that.gatewayShardStarted);
}

@Override
public int hashCode() {
int result = (allocationId != null ? allocationId.hashCode() : 0);
result = 31 * result + (primary ? 1 : 0);
result = 31 * result + (storeException != null ? storeException.hashCode() : 0);
result = 31 * result + (replicationCheckpoint != null ? replicationCheckpoint.hashCode() : 0);
return result;
return gatewayShardStarted.hashCode();
}

@Override
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append("NodeGatewayStartedShards[").append("allocationId=").append(allocationId).append(",primary=").append(primary);
if (storeException != null) {
buf.append(",storeException=").append(storeException);
}
if (replicationCheckpoint != null) {
buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString());
}
buf.append("]");
return buf.toString();
return gatewayShardStarted.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -843,10 +843,12 @@ public TestAllocator addData(
node,
new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(
node,
allocationId,
primary,
replicationCheckpoint,
storeException
new TransportNodesGatewayStartedShardHelper.GatewayShardStarted(
allocationId,
primary,
replicationCheckpoint,
storeException
)
)
);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PrimaryShardAllocator;
import org.opensearch.gateway.ReplicaShardAllocator;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper;
import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata;
Expand Down Expand Up @@ -91,9 +92,12 @@ protected AsyncShardFetch.FetchResult<NodeGatewayStartedShards> fetchData(ShardR
routing -> currentNodes.get(routing.currentNodeId()),
routing -> new NodeGatewayStartedShards(
currentNodes.get(routing.currentNodeId()),
routing.allocationId().getId(),
routing.primary(),
getReplicationCheckpoint(shardId, routing.currentNodeId())
new TransportNodesGatewayStartedShardHelper.GatewayShardStarted(
routing.allocationId().getId(),
routing.primary(),
getReplicationCheckpoint(shardId, routing.currentNodeId()),
null
)
)
)
);
Expand Down

0 comments on commit e798d7a

Please sign in to comment.