Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Revert "make ClusterDetailsEventProcessor and all its access methods …
Browse files Browse the repository at this point in the history
…`non-static` (#283)"

This reverts commit 700771e.
  • Loading branch information
Sid Narayan committed Jul 24, 2020
1 parent 657da28 commit 7697de3
Show file tree
Hide file tree
Showing 11 changed files with 34 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,14 @@ public class PerformanceAnalyzerApp {
new RcaStatsReporter(Arrays.asList(RCA_GRAPH_METRICS_AGGREGATOR,
RCA_RUNTIME_METRICS_AGGREGATOR, RCA_VERTICES_METRICS_AGGREGATOR,
ERRORS_AND_EXCEPTIONS_AGGREGATOR, PERIODIC_SAMPLE_AGGREGATOR));
public static PeriodicSamplers PERIODIC_SAMPLERS;
public static final PeriodicSamplers PERIODIC_SAMPLERS =
new PeriodicSamplers(PERIODIC_SAMPLE_AGGREGATOR, getAllSamplers(),
(MetricsConfiguration.CONFIG_MAP.get(StatsCollector.class).samplingInterval) / 2,
TimeUnit.MILLISECONDS);
public static final BlockingQueue<PAThreadException> exceptionQueue =
new ArrayBlockingQueue<>(EXCEPTION_QUEUE_LENGTH);

public static void main(String[] args) {
AppContext appContext = new AppContext();
PERIODIC_SAMPLERS = new PeriodicSamplers(PERIODIC_SAMPLE_AGGREGATOR, getAllSamplers(appContext),
(MetricsConfiguration.CONFIG_MAP.get(StatsCollector.class).samplingInterval) / 2,
TimeUnit.MILLISECONDS);
PluginSettings settings = PluginSettings.instance();
StatsCollector.STATS_TYPE = "agent-stats-metadata";
METRIC_COLLECTOR_EXECUTOR.addScheduledMetricCollector(StatsCollector.instance());
Expand All @@ -113,6 +112,7 @@ public static void main(String[] args) {

final GRPCConnectionManager connectionManager = new GRPCConnectionManager(
settings.getHttpsEnabled());
AppContext appContext = new AppContext();
final ClientServers clientServers = createClientServers(connectionManager, appContext);
startErrorHandlingThread();

Expand Down Expand Up @@ -243,10 +243,10 @@ public static ClientServers createClientServers(final GRPCConnectionManager conn
return new ClientServers(httpServer, netServer, netClient);
}

public static List<ISampler> getAllSamplers(final AppContext appContext) {
private static List<ISampler> getAllSamplers() {
List<ISampler> allSamplers = new ArrayList<>();
allSamplers.addAll(AllJvmSamplers.getJvmSamplers());
allSamplers.add(RcaStateSamplers.getRcaEnabledSampler(appContext));
allSamplers.add(RcaStateSamplers.getRcaEnabledSampler());

return allSamplers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,15 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.samplers;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaController;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaRuntimeMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.stats.collectors.SampleAggregator;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.stats.emitters.ISampler;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor.NodeDetails;
import com.google.common.annotations.VisibleForTesting;
import java.util.Objects;
import javax.annotation.Nonnull;

public class RcaEnabledSampler implements ISampler {
private final AppContext appContext;

RcaEnabledSampler(final AppContext appContext) {
Objects.requireNonNull(appContext);
this.appContext = appContext;
}

@Override
public void sample(SampleAggregator sampleCollector) {
Expand All @@ -42,8 +32,8 @@ public void sample(SampleAggregator sampleCollector) {

@VisibleForTesting
boolean isRcaEnabled() {
InstanceDetails currentNode = appContext.getMyInstanceDetails();
if (currentNode != null && currentNode.getIsMaster()) {
NodeDetails currentNode = ClusterDetailsEventProcessor.getCurrentNodeDetails();
if (currentNode != null && currentNode.getIsMasterNode()) {
return RcaController.isRcaEnabled();
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.samplers;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.stats.emitters.ISampler;

public class RcaStateSamplers {

public static ISampler getRcaEnabledSampler(final AppContext appContext) {
return new RcaEnabledSampler(appContext);
public static ISampler getRcaEnabledSampler() {
return new RcaEnabledSampler();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class ClusterDetailsEventProcessor implements EventProcessor {
/**
* keep a volatile immutable list to make the read/write to this list thread safe.
*/
private volatile ImmutableList<NodeDetails> nodesDetails = null;
private static volatile ImmutableList<NodeDetails> nodesDetails = null;

@Override
public void initializeProcessing(long startTime, long endTime) {}
Expand Down Expand Up @@ -92,19 +92,19 @@ public void commitBatchIfRequired() {

}

public void setNodesDetails(final List<NodeDetails> nodesDetails) {
this.nodesDetails = ImmutableList.copyOf(nodesDetails);
public static void setNodesDetails(List<NodeDetails> nodesDetails) {
ClusterDetailsEventProcessor.nodesDetails = ImmutableList.copyOf(nodesDetails);
}

public List<NodeDetails> getNodesDetails() {
public static List<NodeDetails> getNodesDetails() {
if (nodesDetails != null) {
return nodesDetails.asList();
} else {
return Collections.emptyList();
}
}

public List<NodeDetails> getDataNodesDetails() {
public static List<NodeDetails> getDataNodesDetails() {
List<NodeDetails> allNodes = getNodesDetails();
if (allNodes.size() > 0) {
return allNodes.stream()
Expand All @@ -115,7 +115,8 @@ public List<NodeDetails> getDataNodesDetails() {
}
}

public NodeDetails getCurrentNodeDetails() {
@Deprecated
public static NodeDetails getCurrentNodeDetails() {
List<NodeDetails> allNodes = getNodesDetails();
if (allNodes.size() > 0) {
return allNodes.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors;

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp.PERIODIC_SAMPLE_AGGREGATOR;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaTestHelper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.AnalysisGraph;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric;
Expand All @@ -38,7 +35,6 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaUtil;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.RCASchedulerTask;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.spec.MetricsDBProviderTestHelper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.stats.emitters.PeriodicSamplers;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.stats.measurements.MeasurementSet;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor;
import java.nio.file.Paths;
Expand All @@ -47,7 +43,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -136,11 +131,6 @@ public void rcaGraphMetrics() throws Exception {
}
for (JvmMetrics jvmMetrics1: jvmMetrics) {
if (!verify(jvmMetrics1)) {
PerformanceAnalyzerApp.PERIODIC_SAMPLERS = new PeriodicSamplers(PERIODIC_SAMPLE_AGGREGATOR,
PerformanceAnalyzerApp.getAllSamplers(appContext),
(MetricsConfiguration.CONFIG_MAP.get(StatsCollector.class).samplingInterval) / 2,
TimeUnit.MILLISECONDS);

PerformanceAnalyzerApp.PERIODIC_SAMPLERS.run();
}
Assert.assertTrue(verify(jvmMetrics1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaController;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaRuntimeMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.stats.collectors.SampleAggregator;
Expand All @@ -19,31 +18,25 @@

public class RcaEnabledSamplerTest {
private RcaEnabledSampler uut;
private AppContext appContext;

@Mock
private SampleAggregator sampleAggregator;

@Before
public void setup() {
MockitoAnnotations.initMocks(this);
appContext = new AppContext();
uut = new RcaEnabledSampler(appContext);
uut = new RcaEnabledSampler();
}

@Test
public void testIsRcaEnabled() {
assertFalse(uut.isRcaEnabled());
ClusterDetailsEventProcessor.NodeDetails details =
ClusterDetailsEventProcessorTestHelper.newNodeDetails("", "", false);

ClusterDetailsEventProcessor clusterDetailsEventProcessor = new ClusterDetailsEventProcessor();
clusterDetailsEventProcessor.setNodesDetails(Collections.singletonList(details));
appContext.setClusterDetailsEventProcessor(clusterDetailsEventProcessor);

ClusterDetailsEventProcessor.setNodesDetails(Collections.singletonList(details));
assertFalse(uut.isRcaEnabled());
details = ClusterDetailsEventProcessorTestHelper.newNodeDetails("", "", true);
clusterDetailsEventProcessor.setNodesDetails(Collections.singletonList(details));
ClusterDetailsEventProcessor.setNodesDetails(Collections.singletonList(details));
assertEquals(RcaController.isRcaEnabled(), uut.isRcaEnabled());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaController;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -17,6 +16,6 @@ public class RcaStateSamplersTest {
public void testGetRcaEnabledSampler() { // done for constructor coverage
uut = new RcaStateSamplers();
assertSame(uut.getClass(), RcaStateSamplers.class);
assertTrue(RcaStateSamplers.getRcaEnabledSampler(new AppContext()) instanceof RcaEnabledSampler);
assertTrue(RcaStateSamplers.getRcaEnabledSampler() instanceof RcaEnabledSampler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ public class ClusterUtilsTest {
private static final String HOST2 = "127.0.0.2";
private static final ClusterDetailsEventProcessor.NodeDetails EMPTY_DETAILS =
ClusterDetailsEventProcessorTestHelper.newNodeDetails("", "", false);
private ClusterDetailsEventProcessor clusterDetailsEventProcessor;

private List<InstanceDetails> getInstancesFromHost(List<String> hostIps) {
List<InstanceDetails> instances = new ArrayList<>();
Expand All @@ -30,16 +29,15 @@ private List<InstanceDetails> getInstancesFromHost(List<String> hostIps) {

@Before
public void setup() {
clusterDetailsEventProcessor = new ClusterDetailsEventProcessor();
clusterDetailsEventProcessor.setNodesDetails(Collections.singletonList(EMPTY_DETAILS));
ClusterDetailsEventProcessor.setNodesDetails(Collections.singletonList(EMPTY_DETAILS));
}

@Test
public void testIsHostAddressInCluster() {
// method should return false when there are no peers
Assert.assertFalse(ClusterUtils.isHostAddressInCluster(HOST, getInstancesFromHost(Collections.EMPTY_LIST)));
// method should properly recognize which hosts are peers and which aren't
clusterDetailsEventProcessor.setNodesDetails(Lists.newArrayList(
ClusterDetailsEventProcessor.setNodesDetails(Lists.newArrayList(
ClusterDetailsEventProcessorTestHelper.newNodeDetails(null, HOST, false)
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public static ClusterDetailsEventProcessor.NodeDetails newNodeDetails(final Stri
return createNodeDetails(nodeId, address, isMasterNode);
}

public ClusterDetailsEventProcessor generateClusterDetailsEvent() {
public void generateClusterDetailsEvent() {
if (nodeDetails.isEmpty()) {
return new ClusterDetailsEventProcessor();
return;
}
StringBuilder stringBuilder = new StringBuilder().append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds());
nodeDetails.stream().forEach(
Expand All @@ -58,6 +58,5 @@ public ClusterDetailsEventProcessor generateClusterDetailsEvent() {
Event testEvent = new Event("", stringBuilder.toString(), 0);
ClusterDetailsEventProcessor clusterDetailsEventProcessor = new ClusterDetailsEventProcessor();
clusterDetailsEventProcessor.processEvent(testEvent);
return clusterDetailsEventProcessor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,17 @@ public void testProcessEvent() throws Exception {
String address2 = "10.212.52.241";
boolean isMasterNode2 = false;

ClusterDetailsEventProcessor clusterDetailsEventProcessor;
try {
ClusterDetailsEventProcessorTestHelper clusterDetailsEventProcessorTestHelper = new ClusterDetailsEventProcessorTestHelper();
clusterDetailsEventProcessorTestHelper.addNodeDetails(nodeId1, address1, isMasterNode1);
clusterDetailsEventProcessorTestHelper.addNodeDetails(nodeId2, address2, isMasterNode2);
clusterDetailsEventProcessor = clusterDetailsEventProcessorTestHelper.generateClusterDetailsEvent();
clusterDetailsEventProcessorTestHelper.generateClusterDetailsEvent();
} catch (Exception e) {
Assert.assertTrue("got exception when generating cluster details event", false);
return;
}

List<NodeDetails> nodes = clusterDetailsEventProcessor.getNodesDetails();
List<NodeDetails> nodes = ClusterDetailsEventProcessor.getNodesDetails();

assertEquals(nodeId1, nodes.get(0).getId());
assertEquals(address1, nodes.get(0).getHostAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,7 @@ public void testRemoveNodeFromCluster() throws SQLException, ClassNotFoundExcept
Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy());
Assert.assertEquals(2, flowUnit.getSummary().getNumOfUnhealthyNodes());

ClusterDetailsEventProcessor clusterDetailsEventProcessor = removeNodeFromCluster();
appContext.setClusterDetailsEventProcessor(clusterDetailsEventProcessor);
removeNodeFromCluster();

nodeRca.mockFlowUnit();
flowUnit = clusterRca.operate();
Expand All @@ -269,10 +268,9 @@ public void testAddNewNodeIntoCluster() throws SQLException, ClassNotFoundExcept
Assert.assertEquals(1, flowUnit.getSummary().getNumOfUnhealthyNodes());
Assert.assertTrue(compareNodeSummary("node1", type1, flowUnit.getSummary().getHotNodeSummaryList().get(0)));

ClusterDetailsEventProcessor clusterDetailsEventProcessor = addNewNodeIntoCluster();
addNewNodeIntoCluster();

nodeRca.mockFlowUnit();
appContext.setClusterDetailsEventProcessor(clusterDetailsEventProcessor);
flowUnit = clusterRca.operate();
Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy());
Assert.assertEquals(1, flowUnit.getSummary().getNumOfUnhealthyNodes());
Expand All @@ -287,22 +285,22 @@ public void testAddNewNodeIntoCluster() throws SQLException, ClassNotFoundExcept
Assert.assertTrue(compareNodeSummary("node4", type2, clusterSummary.getHotNodeSummaryList().get(1)));
}

private ClusterDetailsEventProcessor removeNodeFromCluster() throws SQLException, ClassNotFoundException {
private void removeNodeFromCluster() throws SQLException, ClassNotFoundException {
ClusterDetailsEventProcessorTestHelper clusterDetailsEventProcessorTestHelper = new ClusterDetailsEventProcessorTestHelper();
clusterDetailsEventProcessorTestHelper.addNodeDetails("node2", "127.0.0.1", false);
clusterDetailsEventProcessorTestHelper.addNodeDetails("node3", "127.0.0.2", false);
clusterDetailsEventProcessorTestHelper.addNodeDetails("master", "127.0.0.9", NodeRole.ELECTED_MASTER, true);
return clusterDetailsEventProcessorTestHelper.generateClusterDetailsEvent();
clusterDetailsEventProcessorTestHelper.generateClusterDetailsEvent();
}

private ClusterDetailsEventProcessor addNewNodeIntoCluster() throws SQLException, ClassNotFoundException {
private void addNewNodeIntoCluster() throws SQLException, ClassNotFoundException {
ClusterDetailsEventProcessorTestHelper clusterDetailsEventProcessorTestHelper = new ClusterDetailsEventProcessorTestHelper();
clusterDetailsEventProcessorTestHelper.addNodeDetails("node1", "127.0.0.0", false);
clusterDetailsEventProcessorTestHelper.addNodeDetails("node2", "127.0.0.1", false);
clusterDetailsEventProcessorTestHelper.addNodeDetails("node3", "127.0.0.2", false);
clusterDetailsEventProcessorTestHelper.addNodeDetails("node4", "127.0.0.3", false);
clusterDetailsEventProcessorTestHelper.addNodeDetails("master", "127.0.0.9", NodeRole.ELECTED_MASTER, true);
return clusterDetailsEventProcessorTestHelper.generateClusterDetailsEvent();
clusterDetailsEventProcessorTestHelper.generateClusterDetailsEvent();
}

private boolean compareResourceSummary(Resource resource, HotResourceSummary resourceSummary) {
Expand Down

0 comments on commit 7697de3

Please sign in to comment.