Skip to content

Commit

Permalink
Remove ARS and add tests for zone with undefined weight
Browse files Browse the repository at this point in the history
Signed-off-by: Anshu Agarwal <[email protected]>
  • Loading branch information
Anshu Agarwal committed Sep 1, 2022
1 parent 4cdbee3 commit 7a49e5c
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 85 deletions.
14 changes: 14 additions & 0 deletions server/src/main/java/org/opensearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.cluster.metadata.MetadataMappingService;
import org.opensearch.cluster.metadata.MetadataUpdateSettingsService;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.WeightedRoundRobinRoutingMetadata;
import org.opensearch.cluster.routing.DelayedAllocationService;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
Expand Down Expand Up @@ -191,6 +192,12 @@ public static List<Entry> getNamedWriteables() {
ComposableIndexTemplateMetadata::readDiffFrom
);
registerMetadataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom);
registerMetadataCustom(
entries,
WeightedRoundRobinRoutingMetadata.TYPE,
WeightedRoundRobinRoutingMetadata::new,
WeightedRoundRobinRoutingMetadata::readDiffFrom
);
// Task Status (not Diffable)
entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new));
return entries;
Expand Down Expand Up @@ -274,6 +281,13 @@ public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
DataStreamMetadata::fromXContent
)
);
entries.add(
new NamedXContentRegistry.Entry(
Metadata.Custom.class,
new ParseField(WeightedRoundRobinRoutingMetadata.TYPE),
WeightedRoundRobinRoutingMetadata::fromXContent
)
);
return entries;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,25 @@
*
* @opensearch.internal
*/
public class WeightedRoundRobinMetadata extends AbstractNamedDiffable<Metadata.Custom> implements Metadata.Custom {
private static final Logger logger = LogManager.getLogger(WeightedRoundRobinMetadata.class);
public class WeightedRoundRobinRoutingMetadata extends AbstractNamedDiffable<Metadata.Custom> implements Metadata.Custom {
private static final Logger logger = LogManager.getLogger(WeightedRoundRobinRoutingMetadata.class);
public static final String TYPE = "wrr_shard_routing";
private WRRWeights wrrWeight;

public WRRWeights getWrrWeight() {
return wrrWeight;
}

public WeightedRoundRobinMetadata setWrrWeight(WRRWeights wrrWeight) {
public WeightedRoundRobinRoutingMetadata setWrrWeight(WRRWeights wrrWeight) {
this.wrrWeight = wrrWeight;
return this;
}

public WeightedRoundRobinMetadata(StreamInput in) throws IOException {
public WeightedRoundRobinRoutingMetadata(StreamInput in) throws IOException {
this.wrrWeight = new WRRWeights(in);
}

public WeightedRoundRobinMetadata(WRRWeights wrrWeight) {
public WeightedRoundRobinRoutingMetadata(WRRWeights wrrWeight) {
this.wrrWeight = wrrWeight;
}

Expand Down Expand Up @@ -79,7 +79,7 @@ public static NamedDiff<Metadata.Custom> readDiffFrom(StreamInput in) throws IOE
return readDiffFrom(Metadata.Custom.class, TYPE, in);
}

public static WeightedRoundRobinMetadata fromXContent(XContentParser parser) throws IOException {
public static WeightedRoundRobinRoutingMetadata fromXContent(XContentParser parser) throws IOException {
String attrKey = null;
Object attrValue;
String attributeName = null;
Expand Down Expand Up @@ -117,14 +117,14 @@ public static WeightedRoundRobinMetadata fromXContent(XContentParser parser) thr
}
}
wrrWeight = new WRRWeights(attributeName, weights);
return new WeightedRoundRobinMetadata(wrrWeight);
return new WeightedRoundRobinRoutingMetadata(wrrWeight);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WeightedRoundRobinMetadata that = (WeightedRoundRobinMetadata) o;
WeightedRoundRobinRoutingMetadata that = (WeightedRoundRobinRoutingMetadata) o;
return wrrWeight.equals(that.wrrWeight);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,20 +294,13 @@ public ShardIterator activeInitializingShardsRankedIt(

/**
* Returns an iterator over active and initializing shards, shards are ordered by weighted round-robin scheduling
* policy with adaptive replica selection. The output from weighted round-robin is ordered using adaptive replica
* selection to select eligible nodes for better performance.
* policy.
* @param wrrWeight Weighted round-robin weight entity
* @param nodes discovered nodes in the cluster
* @return an iterator over active and initializing shards, ordered by weighted round-robin
* scheduling policy. Making sure that initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsWRR(
WRRWeights wrrWeight,
DiscoveryNodes nodes,
WRRShardsCache cache,
@Nullable ResponseCollectorService collector,
@Nullable Map<String, Long> nodeSearchCounts
) {
public ShardIterator activeInitializingShardsWRR(WRRWeights wrrWeight, DiscoveryNodes nodes, WRRShardsCache cache) {
final int seed = shuffler.nextSeed();
List<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
List<ShardRouting> orderedActiveShards;
Expand All @@ -317,25 +310,10 @@ public ShardIterator activeInitializingShardsWRR(
orderedActiveShards = getShardsWRR(activeShards, wrrWeight, nodes);
cache.getCache().put(new WRRShardsCache.Key(shardId), orderedActiveShards);
}

// In case the shardRouting list returned by weighted round-robin is empty, we fail open and consider all
// activeShards
orderedActiveShards = orderedActiveShards == null || orderedActiveShards.isEmpty() ? activeShards : orderedActiveShards;

// output from weighted round-robin is ordered using adaptive replica selection
orderedActiveShards = rankShardsAndUpdateStats(shuffler.shuffle(orderedActiveShards, seed), collector, nodeSearchCounts);

ordered.addAll(orderedActiveShards);

if (!allInitializingShards.isEmpty()) {
List<ShardRouting> orderedInitializingShards = getShardsWRR(allInitializingShards, wrrWeight, nodes);
// In case the shardRouting list returned by weighted round-robin is empty, we fail open and consider all
// initializing shards
orderedInitializingShards = orderedInitializingShards == null || orderedInitializingShards.isEmpty()
? allInitializingShards
: orderedInitializingShards;
// output from weighted round-robin is ordered using adaptive replica selection
orderedInitializingShards = rankShardsAndUpdateStats(orderedInitializingShards, collector, nodeSearchCounts);
ordered.addAll(orderedInitializingShards);
}
return new PlainShardIterator(shardId, ordered);
Expand Down Expand Up @@ -373,11 +351,13 @@ private List<WeightedRoundRobin.Entity<ShardRouting>> calculateShardWeight(
) {
List<WeightedRoundRobin.Entity<ShardRouting>> weightedShards = new ArrayList<>();
for (ShardRouting shard : shards) {
shard.currentNodeId();
DiscoveryNode node = nodes.get(shard.currentNodeId());
String attVal = node.getAttributes().get(wrrWeight.attributeName());
// If weight for a zone is not defined, considering it as 1 by default
Double weight = Double.parseDouble(wrrWeight.weights().getOrDefault(attVal, 1).toString());
// If weight for a zone is not defined, not considering shards from that zone
if (wrrWeight.weights().get(attVal) == null) {
continue;
}
Double weight = Double.parseDouble(wrrWeight.weights().get(attVal).toString());
weightedShards.add(new WeightedRoundRobin.Entity<>(weight, shard));
}
return weightedShards;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.WeightedRoundRobinMetadata;
import org.opensearch.cluster.metadata.WeightedRoundRobinRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -221,10 +221,11 @@ public GroupShardsIterator<ShardIterator> searchShards(
}

private void setWeightedRoundRobinAttributes(ClusterState clusterState, ClusterService clusterService) {
WeightedRoundRobinMetadata weightedRoundRobinMetadata = clusterState.metadata().custom(WeightedRoundRobinMetadata.TYPE);
this.isWeightedRoundRobinEnabled = weightedRoundRobinMetadata != null ? true : false;
WeightedRoundRobinRoutingMetadata weightedRoundRobinRoutingMetadata = clusterState.metadata()
.custom(WeightedRoundRobinRoutingMetadata.TYPE);
this.isWeightedRoundRobinEnabled = weightedRoundRobinRoutingMetadata != null ? true : false;
if (this.isWeightedRoundRobinEnabled) {
this.wrrWeights = weightedRoundRobinMetadata.getWrrWeight();
this.wrrWeights = weightedRoundRobinRoutingMetadata.getWrrWeight();
this.wrrShardsCache = getWrrShardsCache() != null ? getWrrShardsCache() : new WRRShardsCache(clusterService);
}
}
Expand Down Expand Up @@ -351,7 +352,7 @@ private ShardIterator shardRoutings(
@Nullable Map<String, Long> nodeCounts
) {
if (isWeightedRoundRobinEnabled()) {
return indexShard.activeInitializingShardsWRR(getWrrWeights(), nodes, wrrShardsCache, collectorService, nodeCounts);
return indexShard.activeInitializingShardsWRR(getWrrWeights(), nodes, wrrShardsCache);
} else if (ignoreAwarenessAttributes()) {
if (useAdaptiveReplicaSelection) {
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
public class WeightedRoundRobin<T> {

private List<WeightedRoundRobin.Entity<T>> entities;
private int turn;

public WeightedRoundRobin(List<WeightedRoundRobin.Entity<T>> entities) {
this.entities = entities;
Expand Down
Loading

0 comments on commit 7a49e5c

Please sign in to comment.