Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
rca it first cut, after removing the refactoring changes in the exist…
Browse files Browse the repository at this point in the history
…ing code
  • Loading branch information
yojs committed Jul 24, 2020
1 parent ec29ff6 commit 962031d
Show file tree
Hide file tree
Showing 33 changed files with 1,956 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.NetClient;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.net.NetServer;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaController;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.MetricsDBProvider;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.JvmMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
Expand Down Expand Up @@ -136,12 +137,13 @@ private static void startRcaTopLevelThread(final ClientServers clientServers,
Util.DATA_DIR,
RcaConsts.RCA_STATE_CHECK_INTERVAL_IN_MS,
RcaConsts.nodeRolePollerPeriodicityInSeconds * 1000,
appContext
appContext, new MetricsDBProvider()
);
startRcaTopLevelThread(rcaController, threadProvider);
}

public static Thread startRcaTopLevelThread(final RcaController rcaController1, final ThreadProvider threadProvider) {
public static Thread startRcaTopLevelThread(final RcaController rcaController1,
final ThreadProvider threadProvider) {
Thread rcaControllerThread = threadProvider.createThreadForRunnable(() -> rcaController1.run(),
PerformanceAnalyzerThreads.RCA_CONTROLLER);
rcaControllerThread.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public static HttpServer createInternalServer(String portFromSetting, String hos
server.setExecutor(Executors.newCachedThreadPool());
return server;
} catch (java.net.BindException ex) {
LOG.error("Could not create HttpServer on port {}", internalPort, ex);
Runtime.getRuntime().halt(1);
} catch (Exception ex) {
ex.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class RcaController {

// This needs to be volatile as the NodeRolePoller writes it but the Nanny reads it.
protected volatile NodeRole currentRole = NodeRole.UNKNOWN;
private volatile List<ConnectedComponent> connectedComponents;

private final ThreadProvider threadProvider;
private RCAScheduler rcaScheduler;
Expand Down Expand Up @@ -125,7 +126,9 @@ public class RcaController {

private final AppContext appContext;

protected Queryable dbProvider = null;
protected volatile Queryable dbProvider = null;

private volatile Persistable persistenceProvider;

public RcaController(
final ThreadProvider threadProvider,
Expand All @@ -135,7 +138,8 @@ public RcaController(
final String rca_enabled_conf_location,
final long rcaStateCheckIntervalMillis,
final long nodeRoleCheckPeriodicityMillis,
final AppContext appContext) {
final AppContext appContext,
final Queryable dbProvider) {
this.threadProvider = threadProvider;
this.appContext = appContext;
this.netOpsExecutorService = netOpsExecutorService;
Expand All @@ -152,6 +156,9 @@ public RcaController(
this.rcaStateCheckIntervalMillis = rcaStateCheckIntervalMillis;
this.roleCheckPeriodicity = nodeRoleCheckPeriodicityMillis;
this.deliberateInterrupt = false;
this.connectedComponents = null;
this.dbProvider = dbProvider;
this.persistenceProvider = null;
}

@VisibleForTesting
Expand All @@ -163,6 +170,7 @@ public RcaController() {
rcaStateCheckIntervalMillis = 0;
roleCheckPeriodicity = 0;
appContext = null;
this.persistenceProvider = null;
}

protected List<ConnectedComponent> getRcaGraphComponents(
Expand All @@ -178,35 +186,32 @@ private void start() {
try {
Objects.requireNonNull(subscriptionManager);
Objects.requireNonNull(rcaConf);
if (dbProvider == null) {
return;
}

subscriptionManager.setCurrentLocus(rcaConf.getTagMap().get("locus"));
List<ConnectedComponent> connectedComponents = getRcaGraphComponents(rcaConf);
this.connectedComponents = getRcaGraphComponents(rcaConf);

// Mute the rca nodes after the graph creation and before the scheduler start
readAndUpdateMutesRcasDuringStart();

Queryable db;
if (dbProvider == null) {
db = new MetricsDBProvider();
} else {
db = dbProvider;
}
ThresholdMain thresholdMain = new ThresholdMain(RcaConsts.THRESHOLDS_PATH, rcaConf);
Persistable persistable = PersistenceFactory.create(rcaConf);
persistenceProvider = PersistenceFactory.create(rcaConf);
networkThreadPoolReference
.set(RcaControllerHelper.buildNetworkThreadPool(rcaConf.getNetworkQueueLength()));
addRcaRequestHandler();
queryRcaRequestHandler.setPersistable(persistable);
queryRcaRequestHandler.setPersistable(persistenceProvider);
receivedFlowUnitStore = new ReceivedFlowUnitStore(rcaConf.getPerVertexBufferLength());
WireHopper net =
new WireHopper(nodeStateManager, rcaNetClient, subscriptionManager,
networkThreadPoolReference, receivedFlowUnitStore, appContext);
this.rcaScheduler =
new RCAScheduler(connectedComponents,
db,
dbProvider,
rcaConf,
thresholdMain,
persistable,
persistenceProvider,
net,
appContext);

Expand All @@ -227,8 +232,9 @@ private void start() {
| MalformedConfig
| SQLException
| IOException e) {
LOG.error("Couldn't build connected components or persistable.. Ran into {}", e.getMessage());
e.printStackTrace();
LOG.error("Couldn't build connected components or persistable..", e);
} catch (Exception ex) {
LOG.error("Couldn't start RcaController", ex);
}
}

Expand Down Expand Up @@ -257,6 +263,10 @@ private void restart() {
StatsCollector.instance().logMetric(RcaConsts.RCA_SCHEDULER_RESTART_METRIC);
}

protected RcaConf getRcaConfForMyRole(NodeRole role) {
return RcaControllerHelper.pickRcaConfForRole(role);
}

public void run() {
long tick = 1;
long nodeRoleCheckInTicks = roleCheckPeriodicity / rcaStateCheckIntervalMillis;
Expand All @@ -274,7 +284,7 @@ public void run() {

// If RCA is enabled, update Analysis graph with Muted RCAs value
if (rcaEnabled) {
rcaConf = RcaControllerHelper.pickRcaConfForRole(currentRole);
rcaConf = getRcaConfForMyRole(currentRole);
LOG.debug("Updating Analysis Graph with Muted RCAs");
readAndUpdateMutesRcas();
}
Expand All @@ -285,14 +295,17 @@ public void run() {
Thread.sleep(rcaStateCheckIntervalMillis - duration);
}
} catch (InterruptedException ie) {
if (!deliberateInterrupt) {
LOG.error("RCA controller thread was interrupted. Reason: {}", ie.getMessage());
LOG.error(ie);
if (deliberateInterrupt) {
// This should only happen in case of tests. So, its okay for this log level to be info.
LOG.info("RcaController thread interrupted..");
} else {
LOG.error("RCA controller thread was interrupted.", ie);
}
break;
}
tick++;
}
LOG.info("RcaController exits..");
}

private void checkUpdateNodeRole(final InstanceDetails currentNode) {
Expand All @@ -311,7 +324,12 @@ private void readRcaEnabledFromConf() {
() -> {
try (Scanner sc = new Scanner(filePath)) {
String nextLine = sc.nextLine();
rcaEnabled = Boolean.parseBoolean(nextLine);
boolean oldVal = rcaEnabled;
boolean newVal = Boolean.parseBoolean(nextLine);
if (oldVal != newVal) {
rcaEnabled = newVal;
LOG.info("RCA enabled changed from {} to {}", oldVal, newVal);
}
} catch (IOException e) {
LOG.error("Error reading file '{}': {}", filePath.toString(), e);
e.printStackTrace();
Expand Down Expand Up @@ -350,8 +368,10 @@ private void readAndUpdateMutesRcasDuringStart() {
*/
private void readAndUpdateMutesRcas() {
try {
if (ConnectedComponent.getNodeNames().isEmpty()) {
LOG.info("Analysis graph not initialized/has been reset; returning.");
Set<String> allNodes = ConnectedComponent.getNodesForAllComponents(this.connectedComponents);

if (allNodes.isEmpty()) {
LOG.debug("Analysis graph not initialized/has been reset; returning.");
return;
}

Expand All @@ -363,18 +383,18 @@ private void readAndUpdateMutesRcas() {
LOG.info("RCAs provided for muting : {}", rcasForMute);

// Update rcasForMute to retain only valid RCAs
rcasForMute.retainAll(ConnectedComponent.getNodeNames());
rcasForMute.retainAll(allNodes);

// If rcasForMute post validation is empty but rcaConf.getMutedRcaList() is not empty
// all the input RCAs are incorrect.
if (rcasForMute.isEmpty() && !rcaConf.getMutedRcaList().isEmpty()) {
if (lastModifiedTimeInMillisInMemory == 0) {
LOG.error("Removing Incorrect RCA(s): {} provided before RCA Scheduler start. Valid RCAs: {}.",
rcaConf.getMutedRcaList(), ConnectedComponent.getNodeNames());
rcaConf.getMutedRcaList(), allNodes);

} else {
LOG.error("Incorrect RCA(s): {}, cannot be muted. Valid RCAs: {}, Muted RCAs: {}",
rcaConf.getMutedRcaList(), ConnectedComponent.getNodeNames(), Stats.getInstance().getMutedGraphNodes());
rcaConf.getMutedRcaList(), allNodes, Stats.getInstance().getMutedGraphNodes());
return;
}
}
Expand Down Expand Up @@ -461,4 +481,14 @@ private void addRcaRequestHandler() {
public void setDeliberateInterrupt() {
deliberateInterrupt = true;
}

@VisibleForTesting
public List<ConnectedComponent> getConnectedComponents() {
return connectedComponents;
}

@VisibleForTesting
public Persistable getPersistenceProvider() {
return persistenceProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,19 @@
// TODO: There should be a validation for the expected fields.
@JsonIgnoreProperties(ignoreUnknown = true)
class ConfJsonWrapper {
public static final String RCA_STORE_LOC = "rca-store-location";
public static final String THRESHOLD_STORE_LOC = "threshold-store-location";
public static final String NEW_RCA_CHECK_MINS = "new-rca-check-minutes";
public static final String NEW_THRESHOLDS_CHECK_MINS = "new-threshold-check-minutes";
public static final String TAGS = "tags";
public static final String REMOTE_PEERS = "remote-peers";
public static final String DATASTORE = "datastore";
public static final String ANALYSIS_GRAPH_IMPL = "analysis-graph-implementor";
public static final String NETWORK_QUEUE_LEN = "network-queue-length";
public static final String MAX_FLOW_UNIT_PER_VERTEX = "max-flow-units-per-vertex-buffer";
public static final String RCA_CONFIG_SETTINGS = "rca-config-settings";
public static final String MUTED_RCAS = "muted-rcas";

private static final Logger LOG = LogManager.getLogger(ConfJsonWrapper.class);
private final String rcaStoreLoc;
private final String thresholdStoreLoc;
private final long newRcaCheckPeriocicityMins;
Expand Down Expand Up @@ -96,18 +107,18 @@ Map<String, Object> getRcaConfigSettings() {
}

ConfJsonWrapper(
@JsonProperty("rca-store-location") String rcaStoreLoc,
@JsonProperty("threshold-store-location") String thresholdStoreLoc,
@JsonProperty("new-rca-check-minutes") long newRcaCheckPeriocicityMins,
@JsonProperty("new-threshold-check-minutes") long newThresholdCheckPeriodicityMins,
@JsonProperty("tags") Map<String, String> tags,
@JsonProperty("remote-peers") List<String> peers,
@JsonProperty("datastore") Map<String, String> datastore,
@JsonProperty("analysis-graph-implementor") String analysisGraphEntryPoint,
@JsonProperty("network-queue-length") int networkQueueLength,
@JsonProperty("max-flow-units-per-vertex-buffer") int perVertexBufferLength,
@JsonProperty("rca-config-settings") Map<String, Object> rcaConfigSettings,
@JsonProperty("muted-rcas") List<String> mutedRcas) {
@JsonProperty(RCA_STORE_LOC) String rcaStoreLoc,
@JsonProperty(THRESHOLD_STORE_LOC) String thresholdStoreLoc,
@JsonProperty(NEW_RCA_CHECK_MINS) long newRcaCheckPeriocicityMins,
@JsonProperty(NEW_THRESHOLDS_CHECK_MINS) long newThresholdCheckPeriodicityMins,
@JsonProperty(TAGS) Map<String, String> tags,
@JsonProperty(REMOTE_PEERS) List<String> peers,
@JsonProperty(DATASTORE) Map<String, String> datastore,
@JsonProperty(ANALYSIS_GRAPH_IMPL) String analysisGraphEntryPoint,
@JsonProperty(NETWORK_QUEUE_LEN) int networkQueueLength,
@JsonProperty(MAX_FLOW_UNIT_PER_VERTEX) int perVertexBufferLength,
@JsonProperty(RCA_CONFIG_SETTINGS) Map<String, Object> rcaConfigSettings,
@JsonProperty(MUTED_RCAS) List<String> mutedRcas) {
this.creationTime = System.currentTimeMillis();
this.rcaStoreLoc = rcaStoreLoc;
this.thresholdStoreLoc = thresholdStoreLoc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class ConnectedComponent {

/* The elements in the inner list can be executed in parallel. Two inner lists have to be executed in order. */
private List<List<Node<?>>> dependencyOrderedNodes;
private static Set<String> nodeNames = new HashSet<>();
private Set<String> nodeNames = new HashSet<>();
private int graphId;

public Set<Node<?>> getAllNodes() {
Expand Down Expand Up @@ -117,8 +117,18 @@ public List<List<Node<?>>> getAllNodesByDependencyOrder() {
return dependencyOrderedNodes;
}

public static Set<String> getNodeNames() {
public Set<String> getNodeNames() {
return nodeNames;
}

public static Set<String> getNodesForAllComponents(List<ConnectedComponent> connectedComponentList) {
Set<String> allNodes = new HashSet<>();

if (connectedComponentList != null) {
for (ConnectedComponent connectedComponent : connectedComponentList) {
allNodes.addAll(connectedComponent.getNodeNames());
}
}
return allNodes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import org.jooq.Record;
import org.jooq.Result;

public interface Persistable {
/**
Expand Down Expand Up @@ -54,4 +56,10 @@ public interface Persistable {
<T extends ResourceFlowUnit> void write(Node<?> node, T flowUnit) throws SQLException, IOException;

void close() throws SQLException;

List<Result<Record>> getRecordsForAllTables();

Result<Record> getRecordsForTable(String tableName);

List<String> getAllPersistedRcas();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.Result;
import org.jooq.exception.DataAccessException;

// TODO: Scheme to rotate the current file and garbage collect older files.
Expand Down Expand Up @@ -113,6 +115,8 @@ abstract void createTable(

abstract void createNewDSLContext();

public abstract List<Result<Record>> getRecordsForAllTables();

// Not required for now.
@Override
public List<ResourceFlowUnit> read(Node<?> node) {
Expand Down
Loading

0 comments on commit 962031d

Please sign in to comment.