Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clickhouse changes #1

Open
wants to merge 13 commits into
base: alertmanager_alerter
Choose a base branch
from
1 change: 1 addition & 0 deletions elastalert/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
'cardinality': ruletypes.CardinalityRule,
'metric_aggregation': ruletypes.MetricAggregationRule,
'percentage_match': ruletypes.PercentageMatchRule,
'error_rate': ruletypes.ErrorRateRule
}

# Used to map names of alerts to their classes
Expand Down
111 changes: 78 additions & 33 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@
import time
import timeit
import traceback
import requests
from email.mime.text import MIMEText
from smtplib import SMTP
from smtplib import SMTPException
from socket import error



import dateutil.tz
import kibana
import yaml
Expand All @@ -28,9 +31,11 @@
from elasticsearch.exceptions import TransportError
from enhancements import DropMatchException
from ruletypes import FlatlineRule
from ruletypes import ErrorRateRule
from util import add_raw_postfix
from util import cronite_datetime_to_timestamp
from util import dt_to_ts
from util import dt_to_ts_with_format
from util import dt_to_unix
from util import EAException
from util import elastalert_logger
Expand Down Expand Up @@ -156,6 +161,8 @@ def __init__(self, args):
self.writeback_es = elasticsearch_client(self.conf)
self._es_version = None

self.query_endpoint = self.conf['query_endpoint']

remove = []
for rule in self.rules:
if not self.init_rule(rule):
Expand Down Expand Up @@ -534,43 +541,77 @@ def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=Non
return {endtime: buckets}

def get_hits_aggregation(self, rule, starttime, endtime, index, query_key, term_size=None):
Copy link
Collaborator

@sivatarunp sivatarunp Mar 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are changing the common method being used, so I think it will effect the existing alerts? Can you check?
Also is this required just for response_time alert

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in traces use case it will be used only for response time avg alert. As we already discussed we will host elastalert for trace separately from logs, this should not be an issue

rule_filter = copy.copy(rule['filter'])
base_query = self.get_query(
rule_filter,
starttime,
endtime,
timestamp_field=rule['timestamp_field'],
sort=False,
to_ts_func=rule['dt_to_ts'],
five=rule['five']
)
if term_size is None:
term_size = rule.get('terms_size', 50)
query = self.get_aggregation_query(base_query, rule, query_key, term_size, rule['timestamp_field'])
agg_key = '{}({})'.format(rule['metric_agg_type'],rule['metric_agg_key'])
query = self.get_query_string(rule)
aggregation = {"function": rule['metric_agg_type'].upper(), "field": rule['metric_agg_key']}
data, count = self.get_ch_data(rule, starttime, endtime, agg_key, query, aggregation)

if data is None:
return {}
payload = {rule['metric_agg_key']+"_"+rule['metric_agg_type']: {'value': data}}

self.num_hits += count
return {endtime: payload}

def get_error_rate(self, rule, starttime, endtime):
elastalert_logger.info("query start time and endtime %s at %s" % (starttime, endtime))
agg_key = '{}({})'.format(rule['total_agg_type'],rule['total_agg_key'])
query = self.get_query_string(rule)
aggregation = {"function": rule['total_agg_type'].upper(), "field": rule['total_agg_key']}

total_data, total_count = self.get_ch_data(rule, starttime, endtime, agg_key, query, aggregation)

elastalert_logger.info("total data is %s" % (total_data))
if total_data is None:
return {}

agg_key = "count()"
if(query):
query = '{} AND {}'.format(query,rule['error_condition'])
else:
query = rule['error_condition']

aggregation = {"function": "COUNT", "field": "1"}

error_data, error_count = self.get_ch_data(rule, starttime, endtime, agg_key, query, aggregation)

elastalert_logger.info("error data is %s" % (error_data))
if error_data is None:
return {}

payload = {'error_count': error_data, 'total_count': total_data, 'start_time': starttime, 'end_time': endtime}
self.num_hits += int(total_count)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why num_hits is required?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num hits is mainly used in looping very large dataset in batches using scroll in ES. Also it appears in the alert. Updating it to follow the convention followed across the code


return {endtime: payload}

def get_query_string(self, rule):
if rule['filter'] and ('query_string' in rule['filter'][0]) and ('query' in rule['filter'][0]['query_string']):
return rule['filter'][0]['query_string']['query']
return ""

def get_ch_data(self, rule, starttime, endtime, agg_key, freshquery,aggregation):
elastalert_logger.info("query start timestamp and end timestamp %s at %s" % (dt_to_ts(starttime), dt_to_ts(endtime)))
data = {
"selects":[],
"start_time":dt_to_ts_with_format(starttime,"%Y-%m-%dT%H:%M:%S.%f")[:-3]+'Z',
"end_time":dt_to_ts_with_format(endtime,"%Y-%m-%dT%H:%M:%S.%f")[:-3]+'Z',
"freshquery": freshquery,
"group_bys":[],
"sort_orders":[{"sort_by": agg_key,"sort_direction":"desc"}],
"limit":"500",
"aggregations":[aggregation]
}
try:
if not rule['five']:
res = self.current_es.search(
index=index,
doc_type=rule.get('doc_type'),
body=query,
search_type='count',
ignore_unavailable=True
)
else:
res = self.current_es.search(index=index, doc_type=rule.get('doc_type'), body=query, size=0, ignore_unavailable=True)
except ElasticsearchException as e:
elastalert_logger.info("request data is %s" % json.dumps(data))
res = requests.post(self.query_endpoint, json=data)
res.raise_for_status()
except requests.exceptions.RequestException as e:
if len(str(e)) > 1024:
e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024)
self.handle_error('Error running query: %s' % (e), {'rule': rule['name']})
return None
if 'aggregations' not in res:
return {}
if not rule['five']:
payload = res['aggregations']['filtered']
else:
payload = res['aggregations']
self.num_hits += res['hits']['total']
return {endtime: payload}
return None,0
res = json.loads(res.content)
return res['data'][0][agg_key], res['rows']
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be there in these two values?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agg value and the number of rows satisfied the filter criteria


def remove_duplicate_events(self, data, rule):
new_events = []
Expand Down Expand Up @@ -616,6 +657,8 @@ def run_query(self, rule, start=None, end=None, scroll=False):
data = self.get_hits_count(rule, start, end, index)
elif rule.get('use_terms_query'):
data = self.get_hits_terms(rule, start, end, index, rule['query_key'])
elif isinstance(rule_inst, ErrorRateRule):
data = self.get_error_rate(rule, start, end)
elif rule.get('aggregation_query_element'):
data = self.get_hits_aggregation(rule, start, end, index, rule.get('query_key', None))
else:
Expand All @@ -633,6 +676,8 @@ def run_query(self, rule, start=None, end=None, scroll=False):
rule_inst.add_count_data(data)
elif rule.get('use_terms_query'):
rule_inst.add_terms_data(data)
elif isinstance(rule_inst, ErrorRateRule):
rule_inst.calculate_err_rate(data)
elif rule.get('aggregation_query_element'):
rule_inst.add_aggregation_data(data)
else:
Expand Down
29 changes: 29 additions & 0 deletions elastalert/ruletypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,35 @@ def unwrap_term_buckets(self, timestamp, term_buckets):
def check_matches(self, timestamp, query_key, aggregation_data):
raise NotImplementedError()

class ErrorRateRule(BaseAggregationRule):
""" A rule that determines error rate with sampling rate"""
required_options = frozenset(['sampling', 'threshold', 'unique_column'])
def __init__(self, *args):
super(ErrorRateRule, self).__init__(*args)
self.ts_field = self.rules.get('timestamp_field', '@timestamp')
self.rules['total_agg_key'] = self.rules['unique_column']
# hardcoding uniq aggregation for total count
self.rules['total_agg_type'] = "uniq"

self.rules['aggregation_query_element'] = self.generate_aggregation_query()

def get_match_str(self, match):
message = 'Threshold violation, error rate is %s' % (match['error_rate'])
return message

def generate_aggregation_query(self):
return {"function": self.rules['total_agg_type'].upper(), "field": self.rules['total_agg_key']}

def calculate_err_rate(self,payload):
for timestamp, payload_data in payload.iteritems():
if int(payload_data['total_count']) > 0:
rate = float(payload_data['error_count'])/float(payload_data['total_count'])
rate = float(rate)/float(self.rules['sampling'])
rate = rate*100
if 'threshold' in self.rules and rate > self.rules['threshold']:
match = {self.rules['timestamp_field']: timestamp, 'error_rate': rate, 'from': payload_data['start_time'], 'to': payload_data['end_time']}
self.add_match(match)


class MetricAggregationRule(BaseAggregationRule):
""" A rule that matches when there is a low number of events given a timeframe. """
Expand Down
8 changes: 8 additions & 0 deletions elastalert/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ oneOf:
# custom rules include a period in the rule type
type: {pattern: "[.]"}

- title: Error Rate
required: [sampling, threshold]
properties:
sampling: {type: integer}
threshold: {type: number}
error_condition: {type: string}
unique_column: {type: string}

properties:

# Common Settings
Expand Down