From 18cb82b59124da2f5ff11ef756c9d08f8ff6abd0 Mon Sep 17 00:00:00 2001 From: Sruti Parthiban Date: Thu, 23 Jul 2020 16:38:12 -0700 Subject: [PATCH] Add cache decider and modify cache action --- .../actions/ModifyCacheCapacityAction.java | 148 ++++++++++++++++ .../deciders/CacheHealthDecider.java | 161 +++++++++++++++++ .../ModifyCacheCapacityActionTest.java | 109 ++++++++++++ .../deciders/CacheHealthDeciderTest.java | 162 ++++++++++++++++++ 4 files changed, 580 insertions(+) create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/ModifyCacheCapacityAction.java create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/CacheHealthDecider.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/ModifyCacheCapacityActionTest.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/CacheHealthDeciderTest.java diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/ModifyCacheCapacityAction.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/ModifyCacheCapacityAction.java new file mode 100644 index 000000000..e6cf611a3 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/ModifyCacheCapacityAction.java @@ -0,0 +1,148 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions; + +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.HEAP; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ModifyCacheCapacityAction implements Action { + public static final String NAME = "modifyCacheCapacity"; + public static final long COOL_OFF_PERIOD_IN_MILLIS = 300 * 1_000; + + private long currentCapacityInBytes; + private long desiredCapacityInBytes; + private ResourceEnum cacheType; + private NodeKey esNode; + + private Map stepSizeInBytes = new HashMap<>(); + private Map upperBoundInBytes = new HashMap<>(); + + public ModifyCacheCapacityAction( + final NodeKey esNode, + final ResourceEnum cacheType, + final long currentCapacityInBytes, + final boolean increase) { + // TODO: Also consume NodeConfigurationRca + setBounds(); + setStepSize(); + + this.esNode = esNode; + this.cacheType = cacheType; + this.currentCapacityInBytes = currentCapacityInBytes; + long desiredCapacity = + increase ? currentCapacityInBytes + getStepSize(cacheType) : currentCapacityInBytes; + setDesiredCapacity(desiredCapacity); + } + + @Override + public String name() { + return NAME; + } + + @Override + public boolean isActionable() { + return desiredCapacityInBytes != currentCapacityInBytes; + } + + @Override + public long coolOffPeriodInMillis() { + return COOL_OFF_PERIOD_IN_MILLIS; + } + + @Override + public List impactedNodes() { + return Collections.singletonList(esNode); + } + + @Override + public Map impact() { + final ImpactVector impactVector = new ImpactVector(); + if (desiredCapacityInBytes > currentCapacityInBytes) { + impactVector.increasesPressure(HEAP); + } else if (desiredCapacityInBytes < currentCapacityInBytes) { + impactVector.decreasesPressure(HEAP); + } + return Collections.singletonMap(esNode, impactVector); + } + + @Override + public void execute() { + // Making this a no-op for now + // TODO: Modify based on downstream CoS agent API calls + assert true; + } + + @Override + public String summary() { + if (!isActionable()) { + return String.format("No action to take for: [%s]", NAME); + } + return String.format("Update [%s] capacity from [%d] to [%d] on node [%s]", + cacheType.toString(), currentCapacityInBytes, desiredCapacityInBytes, esNode.getNodeId()); + } + + @Override + public String toString() { + return summary(); + } + + private void setBounds() { + // This is intentionally not made static because different nodes can + // have different bounds based on instance types + + // TODO: Read the upperBound from NodeConfigurationRca. + // Field data cache used when sorting on or computing aggregation on the field (in Bytes) + upperBoundInBytes.put(ResourceEnum.FIELD_DATA_CACHE, 12000 * 1_000_000L); + + // Shard request cache (in Bytes) + upperBoundInBytes.put(ResourceEnum.SHARD_REQUEST_CACHE, 120000 * 1_000L); + } + + private void setStepSize() { + // TODO: Update the step size to also include percentage of heap size along with absolute value + // Field data cache having step size of 512MB + stepSizeInBytes.put(ResourceEnum.FIELD_DATA_CACHE, 512 * 1_000_000L); + + // Shard request cache step size of 512KB + stepSizeInBytes.put(ResourceEnum.SHARD_REQUEST_CACHE, 512 * 1_000L); + } + + private long getStepSize(final ResourceEnum cacheType) { + return stepSizeInBytes.get(cacheType); + } + + private void setDesiredCapacity(final long desiredCapacity) { + this.desiredCapacityInBytes = Math.min(desiredCapacity, upperBoundInBytes.get(cacheType)); + } + + public long getCurrentCapacityInBytes() { + return currentCapacityInBytes; + } + + public long getDesiredCapacityInBytes() { + return desiredCapacityInBytes; + } + + public ResourceEnum getCacheType() { + return cacheType; + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/CacheHealthDecider.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/CacheHealthDecider.java new file mode 100644 index 000000000..9a4b2e2f3 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/CacheHealthDecider.java @@ -0,0 +1,161 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ModifyCacheCapacityAction; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.BaseClusterRca; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.FieldDataCacheClusterRca; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.ShardRequestCacheClusterRca; +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.List; + +// TODO: 1. Read current cache capacity, total cache capacity, upper bound, lower bound from NodeConfigurationRca +public class CacheHealthDecider extends Decider { + + public static final String NAME = "cacheHealthDecider"; + + private final FieldDataCacheClusterRca fieldDataCacheClusterRca; + private final ShardRequestCacheClusterRca shardRequestCacheClusterRca; + + List actionsByUserPriority = new ArrayList<>(); + private int counter = 0; + + public CacheHealthDecider(final long evalIntervalSeconds, + final int decisionFrequency, + final FieldDataCacheClusterRca fieldDataCacheClusterRca, + final ShardRequestCacheClusterRca shardRequestCacheClusterRca) { + // TODO: Also consume NodeConfigurationRca + super(evalIntervalSeconds, decisionFrequency); + + this.fieldDataCacheClusterRca = fieldDataCacheClusterRca; + this.shardRequestCacheClusterRca = shardRequestCacheClusterRca; + + configureActionPriority(); + } + + @Override + public String name() { + return NAME; + } + + @Override + public Decision operate() { + final ImmutableList cacheClusterRca = + ImmutableList.builder() + .add(shardRequestCacheClusterRca) + .add(fieldDataCacheClusterRca) + .build(); + + Decision decision = new Decision(System.currentTimeMillis(), NAME); + counter += 1; + if (counter < decisionFrequency) { + return decision; + } + counter = 0; + + // TODO: Tune only one resource at a time based on action priorities + cacheClusterRca.forEach(rca -> getActionsFromRca(rca, decision)); + return decision; + } + + private void getActionsFromRca( + final R cacheClusterRca, final Decision decision) { + if (!cacheClusterRca.getFlowUnits().isEmpty()) { + final ResourceFlowUnit flowUnit = cacheClusterRca.getFlowUnits().get(0); + if (!flowUnit.hasResourceSummary()) { + return; + } + + final HotClusterSummary clusterSummary = flowUnit.getSummary(); + + clusterSummary + .getHotNodeSummaryList() + .forEach( + hotNodeSummary -> { + final NodeKey esNode = + new NodeKey(hotNodeSummary.getNodeID(), hotNodeSummary.getHostAddress()); + for (final HotResourceSummary resource : + hotNodeSummary.getHotResourceSummaryList()) { + decision.addAction( + computeBestAction(esNode, resource.getResource().getResourceEnum())); + } + }); + } + } + + private void configureActionPriority() { + // TODO: Input from user configured yml + this.actionsByUserPriority.add(ModifyCacheCapacityAction.NAME); + } + + /** + * Evaluate the most relevant action for a node + * + *

Action relevance decided based on user configured priorities for now, this can be modified + * to consume better signals going forward. + */ + private Action computeBestAction(final NodeKey esNode, final ResourceEnum cacheType) { + Action action = null; + for (String actionName : actionsByUserPriority) { + action = + getAction(actionName, esNode, cacheType, getNodeCacheCapacityInBytes(esNode, cacheType), true); + if (action != null) { + break; + } + } + return action; + } + + private Action getAction(final String actionName, + final NodeKey esNode, + final ResourceEnum cacheType, + final long currentCapacityInBytes, + final boolean increase) { + if (ModifyCacheCapacityAction.NAME.equals(actionName)) { + return configureCacheCapacity(esNode, cacheType, currentCapacityInBytes, increase); + } + return null; + } + + private ModifyCacheCapacityAction configureCacheCapacity( + final NodeKey esNode, + final ResourceEnum cacheType, + final long currentCapacityInBytes, + final boolean increase) { + final ModifyCacheCapacityAction action = + new ModifyCacheCapacityAction(esNode, cacheType, currentCapacityInBytes, increase); + if (action.isActionable()) { + return action; + } + return null; + } + + private long getNodeCacheCapacityInBytes(final NodeKey esNode, final ResourceEnum cacheType) { + // TODO: use NodeConfigurationRca to return capacity, for now returning random value in Bytes + if (cacheType.equals(ResourceEnum.FIELD_DATA_CACHE)) { + return 1000L; + } + return 1000L; + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/ModifyCacheCapacityActionTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/ModifyCacheCapacityActionTest.java new file mode 100644 index 000000000..6a202e23d --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/ModifyCacheCapacityActionTest.java @@ -0,0 +1,109 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions; + +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.CPU; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.DISK; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.HEAP; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.NETWORK; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.RAM; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Impact; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey; +import java.util.Map; +import org.junit.Test; + +public class ModifyCacheCapacityActionTest { + + @Test + public void testIncreaseCapacity() { + NodeKey node1 = new NodeKey("node-1", "1.2.3.4"); + ModifyCacheCapacityAction modifyCacheCapacityAction = + new ModifyCacheCapacityAction(node1, ResourceEnum.FIELD_DATA_CACHE, 5000, true); + assertTrue( + modifyCacheCapacityAction.getDesiredCapacityInBytes() + > modifyCacheCapacityAction.getCurrentCapacityInBytes()); + assertTrue(modifyCacheCapacityAction.isActionable()); + assertEquals(300 * 1_000, modifyCacheCapacityAction.coolOffPeriodInMillis()); + assertEquals(ResourceEnum.FIELD_DATA_CACHE, modifyCacheCapacityAction.getCacheType()); + assertEquals(1, modifyCacheCapacityAction.impactedNodes().size()); + + Map impact = modifyCacheCapacityAction.impact().get(node1).getImpact(); + assertEquals(Impact.INCREASES_PRESSURE, impact.get(HEAP)); + assertEquals(Impact.NO_IMPACT, impact.get(CPU)); + assertEquals(Impact.NO_IMPACT, impact.get(NETWORK)); + assertEquals(Impact.NO_IMPACT, impact.get(RAM)); + assertEquals(Impact.NO_IMPACT, impact.get(DISK)); + } + + @Test + public void testNoIncreaseCapacity() { + NodeKey node1 = new NodeKey("node-1", "1.2.3.4"); + ModifyCacheCapacityAction modifyCacheCapacityAction = + new ModifyCacheCapacityAction(node1, ResourceEnum.FIELD_DATA_CACHE, 5000, false); + assertEquals( + modifyCacheCapacityAction.getDesiredCapacityInBytes(), + modifyCacheCapacityAction.getCurrentCapacityInBytes()); + assertFalse(modifyCacheCapacityAction.isActionable()); + assertEquals(300 * 1_000, modifyCacheCapacityAction.coolOffPeriodInMillis()); + assertEquals(ResourceEnum.FIELD_DATA_CACHE, modifyCacheCapacityAction.getCacheType()); + assertEquals(1, modifyCacheCapacityAction.impactedNodes().size()); + + Map impact = modifyCacheCapacityAction.impact().get(node1).getImpact(); + assertEquals(Impact.NO_IMPACT, impact.get(HEAP)); + assertEquals(Impact.NO_IMPACT, impact.get(CPU)); + assertEquals(Impact.NO_IMPACT, impact.get(NETWORK)); + assertEquals(Impact.NO_IMPACT, impact.get(RAM)); + assertEquals(Impact.NO_IMPACT, impact.get(DISK)); + } + + @Test + public void testBounds() { + // TODO: Move to work with test rcaConf when bounds moved to nodeConfiguration rca + NodeKey node1 = new NodeKey("node-1", "1.2.3.4"); + ModifyCacheCapacityAction fieldCacheIncrease = + new ModifyCacheCapacityAction( + node1, ResourceEnum.FIELD_DATA_CACHE, 12000 * 1_000_000L, true); + assertEquals( + fieldCacheIncrease.getDesiredCapacityInBytes(), + fieldCacheIncrease.getCurrentCapacityInBytes()); + assertFalse(fieldCacheIncrease.isActionable()); + assertNoImpact(node1, fieldCacheIncrease); + + ModifyCacheCapacityAction shardRequestCacheIncrease = + new ModifyCacheCapacityAction( + node1, ResourceEnum.SHARD_REQUEST_CACHE, 120000 * 1_000L, true); + assertEquals( + shardRequestCacheIncrease.getDesiredCapacityInBytes(), + shardRequestCacheIncrease.getCurrentCapacityInBytes()); + assertFalse(shardRequestCacheIncrease.isActionable()); + assertNoImpact(node1, shardRequestCacheIncrease); + } + + private void assertNoImpact(NodeKey node, ModifyCacheCapacityAction modifyCacheCapacityAction) { + Map impact = modifyCacheCapacityAction.impact().get(node).getImpact(); + assertEquals(Impact.NO_IMPACT, impact.get(HEAP)); + assertEquals(Impact.NO_IMPACT, impact.get(CPU)); + assertEquals(Impact.NO_IMPACT, impact.get(NETWORK)); + assertEquals(Impact.NO_IMPACT, impact.get(RAM)); + assertEquals(Impact.NO_IMPACT, impact.get(DISK)); + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/CacheHealthDeciderTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/CacheHealthDeciderTest.java new file mode 100644 index 000000000..d59ed7c47 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/CacheHealthDeciderTest.java @@ -0,0 +1,162 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.NodeRole; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.RcaTestHelper; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.FieldDataCacheClusterRca; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.ShardRequestCacheClusterRca; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; + +public class CacheHealthDeciderTest { + AppContext appContext; + + @Before + public void setupCluster() throws SQLException, ClassNotFoundException { + ClusterDetailsEventProcessor clusterDetailsEventProcessor = new ClusterDetailsEventProcessor(); + ClusterDetailsEventProcessor.NodeDetails node1 = + new ClusterDetailsEventProcessor.NodeDetails(NodeRole.DATA, "node1", "127.0.0.1", false); + ClusterDetailsEventProcessor.NodeDetails node2 = + new ClusterDetailsEventProcessor.NodeDetails(NodeRole.DATA, "node2", "127.0.0.2", false); + ClusterDetailsEventProcessor.NodeDetails node3 = + new ClusterDetailsEventProcessor.NodeDetails(NodeRole.DATA, "node3", "127.0.0.3", false); + ClusterDetailsEventProcessor.NodeDetails node4 = + new ClusterDetailsEventProcessor.NodeDetails(NodeRole.DATA, "node3", "127.0.0.4", false); + ClusterDetailsEventProcessor.NodeDetails master = + new ClusterDetailsEventProcessor.NodeDetails( + NodeRole.ELECTED_MASTER, "master", "127.0.0.9", true); + + final List nodes = new ArrayList<>(); + nodes.add(node1); + nodes.add(node2); + nodes.add(node3); + nodes.add(node4); + nodes.add(master); + clusterDetailsEventProcessor.setNodesDetails(nodes); + + appContext = new AppContext(); + appContext.setClusterDetailsEventProcessor(clusterDetailsEventProcessor); + } + + @Test + public void testHighEvictionRemediation() { + RcaTestHelper fieldDataCacheNodeRca = new RcaTestHelper<>("fieldDataCacheNodeRca"); + fieldDataCacheNodeRca.setAppContext(appContext); + + // node1: Field data and Shard request cache unhealthy + // node2: Only field data unhealthy + // node3: Only shard request unhealthy + // node4: all caches healthy + fieldDataCacheNodeRca.mockFlowUnit( + RcaTestHelper.generateFlowUnit( + "node1", + "127.0.0.1", + Resources.State.UNHEALTHY, + ResourceUtil.FIELD_DATA_CACHE_EVICTION), + RcaTestHelper.generateFlowUnit( + "node2", + "127.0.0.2", + Resources.State.UNHEALTHY, + ResourceUtil.FIELD_DATA_CACHE_EVICTION), + RcaTestHelper.generateFlowUnit("node3", "127.0.0.3", Resources.State.HEALTHY), + RcaTestHelper.generateFlowUnit("node4", "127.0.0.4", Resources.State.HEALTHY)); + + RcaTestHelper shardRequestCacheNodeRca = new RcaTestHelper<>("shardRequestCacheNodeRca"); + shardRequestCacheNodeRca.setAppContext(appContext); + + // node1: Field data and Shard request cache unhealthy + // node2: Only shard request eviction unhealthy + // node3: Only shard request hit unhealthy + // node4: all caches healthy + shardRequestCacheNodeRca.mockFlowUnit( + RcaTestHelper.generateFlowUnit( + "node1", + "127.0.0.1", + Resources.State.UNHEALTHY, + ResourceUtil.SHARD_REQUEST_CACHE_EVICTION), + RcaTestHelper.generateFlowUnit("node2", "127.0.0.2", Resources.State.HEALTHY), + RcaTestHelper.generateFlowUnit( + "node3", + "127.0.0.3", + Resources.State.UNHEALTHY, + ResourceUtil.SHARD_REQUEST_CACHE_EVICTION), + RcaTestHelper.generateFlowUnit("node4", "127.0.0.4", Resources.State.HEALTHY)); + + FieldDataCacheClusterRca fieldDataCacheClusterRca = new FieldDataCacheClusterRca(1, fieldDataCacheNodeRca); + fieldDataCacheClusterRca.setAppContext(appContext); + fieldDataCacheClusterRca.generateFlowUnitListFromLocal(null); + + ShardRequestCacheClusterRca shardRequestCacheClusterRca = + new ShardRequestCacheClusterRca(1, shardRequestCacheNodeRca); + shardRequestCacheClusterRca.setAppContext(appContext); + shardRequestCacheClusterRca.generateFlowUnitListFromLocal(null); + + CacheHealthDecider decider = + new CacheHealthDecider(5, 12, fieldDataCacheClusterRca, shardRequestCacheClusterRca); + + // Since deciderFrequency is 12, the first 11 invocations return empty decision + for (int i = 0; i < 11; i++) { + Decision decision = decider.operate(); + assertTrue(decision.isEmpty()); + } + + Decision decision = decider.operate(); + assertEquals(4, decision.getActions().size()); + + Map> nodeActionCounter = new HashMap<>(); + for (Action action : decision.getActions()) { + assertEquals(1, action.impactedNodes().size()); + String nodeId = action.impactedNodes().get(0).getNodeId(); + String summary = action.summary(); + if (summary.contains(ResourceEnum.FIELD_DATA_CACHE.toString())) { + nodeActionCounter + .computeIfAbsent(nodeId, k -> new HashMap<>()) + .merge(ResourceEnum.FIELD_DATA_CACHE, 1, Integer::sum); + } + if (summary.contains(ResourceEnum.SHARD_REQUEST_CACHE.toString())) { + nodeActionCounter + .computeIfAbsent(nodeId, k -> new HashMap<>()) + .merge(ResourceEnum.SHARD_REQUEST_CACHE, 1, Integer::sum); + } + } + + assertEquals(2, nodeActionCounter.get("node1").size()); + assertEquals(1, (int) nodeActionCounter.get("node1").get(ResourceEnum.FIELD_DATA_CACHE)); + assertEquals(1, (int) nodeActionCounter.get("node1").get(ResourceEnum.SHARD_REQUEST_CACHE)); + assertEquals(1, nodeActionCounter.get("node2").size()); + assertEquals(1, (int) nodeActionCounter.get("node2").get(ResourceEnum.FIELD_DATA_CACHE)); + assertEquals(1, nodeActionCounter.get("node3").size()); + assertEquals(1, (int) nodeActionCounter.get("node3").get(ResourceEnum.SHARD_REQUEST_CACHE)); + assertFalse(nodeActionCounter.containsKey("node4")); + } +}