Skip to content

Commit

Permalink
AA AM Wiring (#469)
Browse files Browse the repository at this point in the history
* Bug Fixes
Code Refactoring

* fix bugs in traces decoupling, fix bugs in alerts connector

* fix more bugs with alerts connector

* more bug fixes, re-introduce metric_key as a value in anomaly and subscription requests

* revert subscription UI workflow to how it was previously, but wired in with new data model

* fix create subscription, add error handling for empty subscription

* fix issue with setState in the new subscription box

* fix issues regarding strong and weak anomalies, slightly modify UI pieces to reflect more information, change wording, refactor backend algorithms

* bug fix for rerendering alert history expand component on alert type tab switch

* bugfix

* switching flatten to merge in api response

* Fixing the arguments to the fetchOperations method.
Removing console.log statement

* fix issues with interval, moving it into search and out of tabs state

* fix bug with interval removing other tabs

* add default value to alertFreqInSec (fixes isUnhealthy value in alerts)

* pass interval into getUnhealyCount

* Formatting alerts properly
  • Loading branch information
Jason Bulicek authored and absrivastava committed Feb 20, 2019
1 parent a843ddc commit 3138028
Show file tree
Hide file tree
Showing 38 changed files with 502 additions and 518 deletions.
6 changes: 3 additions & 3 deletions deployment/terraform/templates/haystack-ui_json.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
"host": "${graphite_hostname}",
"port": ${graphite_port}
},
"grpcOptions": {
"grpc.max_receive_message_length": 52428800
}
"connectors": {
"traces": {
"connectorName": "haystack",
"haystackHost": "${trace_reader_hostname}",
"haystackPort": ${trace_reader_service_port},
"serviceRefreshIntervalInSecs": 60,
"fieldKeys": [${whitelisted_fields}],
"grpcOptions": {
"grpc.max_receive_message_length": 52428800
}
},
"trends": {
"connectorName": "haystack",
Expand Down
2 changes: 1 addition & 1 deletion haystack-idl
4 changes: 4 additions & 0 deletions server/config/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ module.exports = {
// base64 and periodreplacement are supported, default to noop if none provided
encoder: 'periodreplacement',

grpcOptions: {
'grpc.max_receive_message_length': 10485760
},

// this list defines subsystems for which UI should be enabled
// traces connector must be present in connectors config
connectors: {
Expand Down
136 changes: 87 additions & 49 deletions server/connectors/alerts/haystack/alertsConnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const _ = require('lodash');
const grpc = require('grpc');

const config = require('../../../config/config');
const servicesConnector = require('../../services/servicesConnector');
const servicesConnector = config.connectors.traces && require('../../services/servicesConnector'); // eslint-disable-line

const fetcher = require('../../operations/grpcFetcher');
const services = require('../../../../static_codegen/anomaly/anomalyReader_grpc_pb');
Expand All @@ -28,83 +28,120 @@ const MetricpointNameEncoder = require('../../utils/encoders/MetricpointNameEnco

const metricpointNameEncoder = new MetricpointNameEncoder(config.encoder);

const grpcOptions = {
'grpc.max_receive_message_length': 10485760, // todo: do I need these?
...config.connectors.traces.grpcOptions
};
const grpcOptions = config.grpcOptions || {};

const connector = {};
const client = new services.AnomalyReaderClient(
`${config.connectors.alerts.haystackHost}:${config.connectors.alerts.haystackPort}`,
grpc.credentials.createInsecure(),
grpcOptions); // TODO make client secure
const alertTypes = ['durationTP99', 'failureCount'];
const alertTypes = ['duration', 'failure-span'];
const getAnomaliesFetcher = fetcher('getAnomalies', client);
const alertFreqInSec = config.connectors.alerts.alertFreqInSec; // TODO make this based on alert type
const alertFreqInSec = config.connectors.alerts.alertFreqInSec || 300; // TODO make this based on alert type


function fetchOperations(serviceName) {
return servicesConnector.getOperations(serviceName);
return servicesConnector && servicesConnector.getOperations(serviceName);
}

function sameOperationAndType(alertToCheck, operationName, type) {
if (!alertToCheck) {
return false;
}
const operationToCheck = alertToCheck.labelsMap.find(label => label[0] === 'operationName');
const typeToCheck = alertToCheck.labelsMap.find(label => label[0] === 'metric_key');
return ((operationToCheck && operationToCheck[1] === operationName) && typeToCheck && typeToCheck[1] === type);
}

function parseOperationAlertsResponse(data) {
return data.searchanomalyresponseList.map((anomalyResponse) => {
const labels = anomalyResponse.labels;

const operationName = labels.operationName;
const alertType = labels.alertType;
const latestUnhealthy = _.maxBy(anomalyResponse.anomalies, anomaly => anomaly.timestamp);

const isUnhealthy = (latestUnhealthy && latestUnhealthy.timestamp >= (Date.now() - alertFreqInSec));
const timestamp = latestUnhealthy && latestUnhealthy.timestamp;
return {
operationName,
alertType,
isUnhealthy,
timestamp
};
const fullAnomalyList = data.searchanomalyresponseList;
const mappedAndMergedResponse = fullAnomalyList.map((anomalyResponse, baseIterationIndex) => {
if (anomalyResponse === null) return null;
const operationLabel = anomalyResponse.labelsMap.find(label => label[0] === 'operationName');
if (operationLabel) {
const operationName = operationLabel[1];
const type = anomalyResponse.labelsMap.find(label => label[0] === 'metric_key')[1];
let anomaliesList = anomalyResponse.anomaliesList;

fullAnomalyList.slice(baseIterationIndex + 1, fullAnomalyList.length).forEach((alertToCheck, checkIndex) => {
if (sameOperationAndType(alertToCheck, operationName, type)) {
anomaliesList = _.merge(anomaliesList, alertToCheck.anomaliesList);
fullAnomalyList[baseIterationIndex + checkIndex + 1] = null;
}
});

const latestUnhealthy = _.maxBy(anomaliesList, anomaly => anomaly.timestamp);
const timestamp = latestUnhealthy && latestUnhealthy.timestamp * 1000;
const isUnhealthy = (timestamp && timestamp >= (Date.now() - (alertFreqInSec * 1000)));

return {
operationName,
type,
isUnhealthy,
timestamp
};
}

return null;
});

return _.filter(mappedAndMergedResponse, a => a !== null);
}

function fetchOperationAlerts(serviceName, interval, from) {
function fetchAlerts(serviceName, interval, from, stat, key) {
const request = new messages.SearchAnamoliesRequest();
request.getLabelsMap()
.set('serviceName', metricpointNameEncoder.encodeMetricpointName(decodeURIComponent(serviceName)))
.set('interval', interval)
.set('mtype', 'gauge')
.set('product', 'haystack');
request.setStarttime(from);
request.setEndtime(Date.now());
.set('product', 'haystack')
.set('stat', stat)
.set('metric_key', key);
request.setStarttime(Math.trunc(from / 1000));
request.setEndtime(Math.trunc(Date.now() / 1000));
request.setSize(-1);

return getAnomaliesFetcher
.fetch(request)
.then(pbResult => parseOperationAlertsResponse(messages.SearchAnomaliesResponse.toObject(false, pbResult)));
}

function mergeOperationsWithAlerts({operationAlerts, operations}) {
return _.flatten(operations.map(operation => alertTypes.map((alertType) => {
const operationAlert = operationAlerts.find(alert => (alert.operationName.toLowerCase() === operation.toLowerCase() && alert.type === alertType));
function fetchOperationAlerts(serviceName, interval, from) {
return Q.all([fetchAlerts(serviceName, interval, from, '*_99', 'duration'), fetchAlerts(serviceName, interval, from, 'count', 'failure-span')])
.then(stats => (_.merge(stats[0], stats[1])));
}

if (operationAlert !== undefined) {
function mergeOperationsWithAlerts({operationAlerts, operations}) {
if (operations && operations.length) {
return _.flatten(operations.map(operation => alertTypes.map((alertType) => {
const operationAlert = operationAlerts.find(alert => (alert.operationName.toLowerCase() === operation.toLowerCase() && alert.type === alertType));

if (operationAlert !== undefined) {
return {
...operationAlert
};
}
return {
...operationAlert
operationName: operation,
type: alertType,
isUnhealthy: false,
timestamp: null
};
}
return {
operationName: operation,
type: alertType,
isUnhealthy: false,
timestamp: null
};
})));
})));
}

return _.flatten(alertTypes.map(alertType => (_.filter(operationAlerts, alert => (alert.type === alertType)))));
}

function returnAnomalies(data) {
if (!data || !data.length || !data[0].length) {
if (!data || !data.length || !data[0].anomaliesList.length) {
return [];
}

return data[0].anomalies;
return _.flatten(data.map((anomaly) => {
const strength = anomaly.labelsMap.find(label => label[0] === 'anomalyLevel')[1];
return anomaly.anomaliesList.map(a => ({strength, ...a}));
}));
}

function getActiveAlertCount(operationAlerts) {
Expand All @@ -113,7 +150,7 @@ function getActiveAlertCount(operationAlerts) {

connector.getServiceAlerts = (serviceName, interval) => {
// todo: calculate "from" value based on selected interval
const oneDayAgo = Math.trunc(Date.now() - (24 * 60 * 60 * 1000));
const oneDayAgo = Math.trunc((Date.now() - (24 * 60 * 60 * 1000)));
return Q.all([fetchOperations(serviceName), fetchOperationAlerts(serviceName, interval, oneDayAgo)])
.then(stats => mergeOperationsWithAlerts({
operations: stats[0],
Expand All @@ -130,20 +167,21 @@ connector.getAnomalies = (serviceName, operationName, alertType, from, interval)
.set('serviceName', metricpointNameEncoder.encodeMetricpointName(decodeURIComponent(serviceName)))
.set('operationName', metricpointNameEncoder.encodeMetricpointName(decodeURIComponent(operationName)))
.set('product', 'haystack')
.set('name', alertType)
.set('metric_key', alertType)
.set('stat', stat)
.set('interval', interval)
.set('mtype', 'gauge');
request.setStarttime(from);
request.setEndtime(Date.now());
request.setStarttime(Math.trunc(from / 1000));
request.setEndtime(Math.trunc(Date.now() / 1000));
request.setSize(-1);

return getAnomaliesFetcher
.fetch(request)
.then(pbResult => returnAnomalies(messages.SearchAnomaliesResponse.toObject(false, pbResult)));
.then(pbResult => returnAnomalies(messages.SearchAnomaliesResponse.toObject(false, pbResult).searchanomalyresponseList));
};

connector.getServiceUnhealthyAlertCount = serviceName =>
fetchOperationAlerts(serviceName, '5m', Math.trunc(Date.now() - (5 * 60 * 1000)))
connector.getServiceUnhealthyAlertCount = (serviceName, interval) =>
fetchOperationAlerts(serviceName, interval, Math.trunc((Date.now() - (5 * 60 * 1000))))
.then(result => getActiveAlertCount(result));

module.exports = connector;
15 changes: 7 additions & 8 deletions server/connectors/alerts/haystack/subscriptionsConnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ const putter = require('../../operations/grpcPutter');
const deleter = require('../../operations/grpcDeleter');
const poster = require('../../operations/grpcPoster');

const grpcOptions = {
'grpc.max_receive_message_length': 10485760, // todo: do I need these?
...config.connectors.traces.grpcOptions
};
const grpcOptions = config.grpcOptions || {};

const MetricpointNameEncoder = require('../../utils/encoders/MetricpointNameEncoder');

const metricpointNameEncoder = new MetricpointNameEncoder(config.encoder);

const client = new services.SubscriptionManagementClient(
`${config.connectors.alerts.haystackHost}:${config.connectors.alerts.haystackPort}`,
Expand Down Expand Up @@ -84,9 +84,9 @@ connector.searchSubscriptions = (serviceName, operationName, alertType, interval

const request = new messages.SearchSubscriptionRequest();
request.getLabelsMap()
.set('serviceName', decodeURIComponent(serviceName))
.set('operationName', decodeURIComponent(operationName))
.set('type', alertType)
.set('serviceName', metricpointNameEncoder.encodeMetricpointName(decodeURIComponent(serviceName)))
.set('operationName', metricpointNameEncoder.encodeMetricpointName(decodeURIComponent(operationName)))
.set('metric_key', alertType)
.set('stat', stat)
.set('interval', interval)
.set('product', 'haystack')
Expand All @@ -96,7 +96,6 @@ connector.searchSubscriptions = (serviceName, operationName, alertType, interval
.fetch(request)
.then((result) => {
const pbResult = messages.SearchSubscriptionResponse.toObject(false, result);
console.log(pbResult.subscriptionresponseList.map(pbSubResponse => converter.toSubscriptionJson(pbSubResponse)));
return pbResult.subscriptionresponseList.map(pbSubResponse => converter.toSubscriptionJson(pbSubResponse));
});
};
Expand Down
46 changes: 20 additions & 26 deletions server/connectors/alerts/stub/alertsConnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,75 +17,69 @@
const Q = require('q');

function getRandomTimeStamp() {
const currentTime = ((new Date()).getTime()) * 1000;
return (currentTime - Math.floor((Math.random() * 5000 * 60 * 1000)));
const currentTime = ((new Date()).getTime());
return (currentTime - Math.floor((Math.random() * 5000 * 60)));
}

function generateAnomaly() {
const currentTime = ((new Date()).getTime()) * 1000;
const timestamp = (currentTime - Math.floor((Math.random() * 2000000 * 60 * 1000)));
const expectedValue = Math.floor(Math.random() * 100000);
const observedValue = Math.floor(expectedValue * (Math.random() * 100));
const currentTime = ((new Date()).getTime() / 1000);
const timestamp = (currentTime - Math.floor((Math.random() * 2000 * 60)));
const expectedvalue = Math.floor(Math.random() * 100000);
const observedvalue = Math.floor(expectedvalue * (Math.random() * 100));
return {
observedValue,
expectedValue,
timestamp
observedvalue,
expectedvalue,
timestamp,
strength: observedvalue % 2 ? 'STRONG' : 'WEAK'
};
}

function getAlerts() {
return [
{
operationName: 'tarley-1',
type: 'count',
type: 'duration',
isUnhealthy: true,
timestamp: getRandomTimeStamp()
},
{
operationName: 'tarley-1',
type: 'durationTP99',
type: 'failure-span',
isUnhealthy: true,
timestamp: getRandomTimeStamp()
},
{
operationName: 'tarley-1',
type: 'failureCount',
operationName: 'tully-1',
type: 'duration',
isUnhealthy: false,
timestamp: getRandomTimeStamp()
},
{
operationName: 'tully-1',
type: 'count',
type: 'failure-span',
isUnhealthy: false,
timestamp: getRandomTimeStamp()
},
{
operationName: 'tully-1',
type: 'durationTP99',
type: 'duration',
isUnhealthy: false,
timestamp: getRandomTimeStamp()
},
{
}, {
operationName: 'tully-1',
type: 'failureCount',
type: 'failure-span',
isUnhealthy: false,
timestamp: getRandomTimeStamp()
},
{
operationName: 'dondarrion-1',
type: 'count',
isUnhealthy: true,
timestamp: getRandomTimeStamp()
},
{
operationName: 'dondarrion-1',
type: 'durationTP99',
type: 'duration',
isUnhealthy: false,
timestamp: getRandomTimeStamp()
},
{
operationName: 'dondarrion-1',
type: 'failureCount',
type: 'failure-span',
isUnhealthy: false,
timestamp: getRandomTimeStamp()
}
Expand Down
Loading

0 comments on commit 3138028

Please sign in to comment.