Skip to content

Commit

Permalink
Merge 62d479d into efa0fa0
Browse files Browse the repository at this point in the history
  • Loading branch information
Tjofil authored Jun 16, 2023
2 parents efa0fa0 + 62d479d commit 33b02fc
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 18 deletions.
2 changes: 1 addition & 1 deletion config/rca_cluster_manager.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"max-flow-units-per-vertex-buffer": 200,

"tags": {
"locus": "cluster_manager-node"
"locus": "cluster_manager-node,data-node"
},

"remote-peers": ["ip1", "ip2", "ip3"],
Expand Down
2 changes: 1 addition & 1 deletion config/rca_idle_cluster_manager.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"max-flow-units-per-vertex-buffer": 200,

"tags": {
"locus": "idle-cluster_manager-node"
"locus": "idle-cluster_manager-node,data-node"
},

"remote-peers": ["ip1", "ip2", "ip3"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ private void start() {
return;
}

subscriptionManager.setCurrentLocus(rcaConf.getTagMap().get("locus"));
String currentLocus = RcaUtil.getPriorityLocus(rcaConf.getTagMap().get("locus"));
subscriptionManager.setCurrentLocus(currentLocus);
this.connectedComponents = getRcaGraphComponents(rcaConf);

// Mute the rca nodes after the graph creation and before the scheduler start
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ public final class Version {
* transferred packets should be dropped. Every increment here should be accompanied with a line
* describing the version bump.
*
* Note: The RCA version is agnostic of OpenSearch version.
* <p>Note: The RCA version is agnostic of OpenSearch version.
*/
static final class Major {
// Bumping this post the Commons Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2)
// and Service Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8) change
// Bumping this post the Commons
// Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2)
// and Service
// Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8)
// change
static final int RCA_MAJ_VERSION = 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,40 @@ public static List<ConnectedComponent> getAnalysisGraphComponents(AnalysisGraph
return Stats.getInstance().getConnectedComponents();
}

/**
* As there is possibility for host locus tags to be hybrid, in terms of rca subscription we
* still have to identify the host with single tag, the most priority one.
*/
public static String getPriorityLocus(String hostLocus) {
if (hostLocus == null || hostLocus.isEmpty()) {
return "";
}
List<String> hostLociStrings =
Arrays.asList(hostLocus.split(RcaConsts.RcaTagConstants.SEPARATOR));
// Non-empty string was split -> guaranteed to be of size at least one.
return hostLociStrings.get(0);
}

public static boolean containsAny(List<String> containerList, List<String> containedList) {
for (String elem : containedList) {
if (containerList.contains(elem)) {
return true;
}
}
return false;
}

public static boolean doTagsMatch(Node<?> node, RcaConf conf) {
Map<String, String> rcaTagMap = conf.getTagMap();
for (Map.Entry<String, String> tag : node.getTags().entrySet()) {
String rcaConfTagvalue = rcaTagMap.get(tag.getKey());
String rcaConfTag = rcaTagMap.get(tag.getKey());
if (rcaConfTag == null) {
return false;
}
List<String> rcaConfTagStrings = Arrays.asList(rcaConfTag.split(","));

return tag.getValue() != null
&& Arrays.asList(tag.getValue().split(",")).contains(rcaConfTagvalue);
&& containsAny(rcaConfTagStrings, Arrays.asList(tag.getValue().split(",")));
}
return true;
}
Expand All @@ -70,12 +98,14 @@ public static boolean shouldExecuteLocally(Node<?> node, RcaConf conf) {
final Map<String, String> nodeTagMap = node.getTags();

if (confTagMap != null && nodeTagMap != null) {
final String hostLocus = confTagMap.get(RcaConsts.RcaTagConstants.TAG_LOCUS);
final String hostLoci = confTagMap.get(RcaConsts.RcaTagConstants.TAG_LOCUS);
final String nodeLoci = nodeTagMap.get(RcaConsts.RcaTagConstants.TAG_LOCUS);
if (nodeLoci != null && !nodeLoci.isEmpty()) {
List<String> nodeLociStrings =
Arrays.asList(nodeLoci.split(RcaConsts.RcaTagConstants.SEPARATOR));
return nodeLociStrings.contains(hostLocus);
List<String> hostLociStrings =
Arrays.asList(hostLoci.split(RcaConsts.RcaTagConstants.SEPARATOR));
return containsAny(hostLociStrings, nodeLociStrings);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ public void construct() {
// Use EVALUATION_INTERVAL_SECONDS instead of RCA_PERIOD which resolved to 12 seconds.
// This is resulting in this RCA not getting executed in every 5 seconds.
Rca<ResourceFlowUnit<HotNodeSummary>> threadMetricsRca =
new ThreadMetricsRca(threadBlockedTime, threadWaitedTime, EVALUATION_INTERVAL_SECONDS);
new ThreadMetricsRca(
threadBlockedTime, threadWaitedTime, EVALUATION_INTERVAL_SECONDS);
threadMetricsRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE);
Expand Down Expand Up @@ -502,17 +503,15 @@ private void constructShardResourceUsageGraph() {
Metric cpuUtilization = new CPU_Utilization(EVALUATION_INTERVAL_SECONDS);

cpuUtilization.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE);
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);

addLeaf(cpuUtilization);

// High CPU Utilization RCA
HotShardRca hotShardRca =
new HotShardRca(EVALUATION_INTERVAL_SECONDS, RCA_PERIOD, cpuUtilization);
hotShardRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE);
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);
hotShardRca.addAllUpstreams(Arrays.asList(cpuUtilization));

// Hot Shard Cluster RCA which consumes the above
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.performanceanalyzer.rca.scheduler;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -27,6 +29,7 @@
import org.opensearch.performanceanalyzer.rca.framework.core.Node;
import org.opensearch.performanceanalyzer.rca.framework.core.Queryable;
import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf;
import org.opensearch.performanceanalyzer.rca.framework.util.RcaConsts;
import org.opensearch.performanceanalyzer.rca.framework.util.RcaUtil;
import org.opensearch.performanceanalyzer.rca.messages.DataMsg;
import org.opensearch.performanceanalyzer.rca.messages.IntentMsg;
Expand Down Expand Up @@ -398,4 +401,51 @@ public void mergeLists() {
AssertHelper.compareLists(l1.get(i), ret.get(i));
}
}

@Test
public void testHybridLocusTags() {
Node<MetricFlowUnit> cpuUtilization = new CPU_Utilization(5);
cpuUtilization.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);

Node<MetricFlowUnit> hotShardClusterRca = new CPU_Utilization(5);
hotShardClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_CLUSTER_MANAGER_NODE);

RcaConf nonDedicatedClusterManagerConf =
new RcaConf() {
@Override
public Map<String, String> getTagMap() {
return new HashMap<String, String>() {
{
this.put(
RcaConsts.RcaTagConstants.TAG_LOCUS,
"cluster_manager-node,data-node");
}
};
}
};

assertTrue(RcaUtil.shouldExecuteLocally(cpuUtilization, nonDedicatedClusterManagerConf));
assertTrue(
RcaUtil.shouldExecuteLocally(hotShardClusterRca, nonDedicatedClusterManagerConf));

RcaConf dedicatedClusterManagerConf =
new RcaConf() {
@Override
public Map<String, String> getTagMap() {
return new HashMap<String, String>() {
{
this.put(
RcaConsts.RcaTagConstants.TAG_LOCUS,
"cluster_manager-node");
}
};
}
};

assertFalse(RcaUtil.shouldExecuteLocally(cpuUtilization, dedicatedClusterManagerConf));
assertTrue(RcaUtil.shouldExecuteLocally(hotShardClusterRca, dedicatedClusterManagerConf));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ private List<ConnectedComponent> createAndExecuteRcaGraph(AppContext appContext)

RcaConf rcaConf = new RcaConf(dataNodeRcaConf);
subscriptionManager = new SubscriptionManager(new GRPCConnectionManager(false));
subscriptionManager.setCurrentLocus(rcaConf.getTagMap().get("locus"));
String currentLocus = RcaUtil.getPriorityLocus(rcaConf.getTagMap().get("locus"));
subscriptionManager.setCurrentLocus(currentLocus);

WireHopper wireHopper =
new WireHopper(
Expand Down Expand Up @@ -664,7 +665,8 @@ public void testHotShardClusterApiResponse() throws Exception {
RcaConf rcaConf = new RcaConf(dataNodeRcaConf);
SubscriptionManager subscriptionManager =
new SubscriptionManager(new GRPCConnectionManager(false));
subscriptionManager.setCurrentLocus(rcaConf.getTagMap().get("locus"));
String currentLocus = RcaUtil.getPriorityLocus(rcaConf.getTagMap().get("locus"));
subscriptionManager.setCurrentLocus(currentLocus);

AppContext appContext = RcaTestHelper.setMyIp("192.168.0.1", AllMetrics.NodeRole.DATA);

Expand Down Expand Up @@ -697,7 +699,8 @@ public void testHotShardClusterApiResponse() throws Exception {
RcaConf rcaConf2 = new RcaConf(clusterManagerNodeRcaConf);
SubscriptionManager subscriptionManager2 =
new SubscriptionManager(new GRPCConnectionManager(false));
subscriptionManager2.setCurrentLocus(rcaConf2.getTagMap().get("locus"));
String currentLocus2 = RcaUtil.getPriorityLocus(rcaConf2.getTagMap().get("locus"));
subscriptionManager2.setCurrentLocus(currentLocus2);

AppContext appContextClusterManager =
RcaTestHelper.setMyIp("192.168.0.4", AllMetrics.NodeRole.ELECTED_CLUSTER_MANAGER);
Expand Down

0 comments on commit 33b02fc

Please sign in to comment.