From e0b5e75d3b0a42d6c7feec3899fdc7b8bdd0700f Mon Sep 17 00:00:00 2001 From: nsano-rururu Date: Wed, 7 Jul 2021 22:19:26 +0900 Subject: [PATCH] =?UTF-8?q?Improved=20test=20code=20coverage=2070%=20?= =?UTF-8?q?=E2=86=92=2072%?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/alerters/chatwork_test.py | 44 +++++++ tests/alerters/discord_test.py | 58 +++++++++ tests/alerters/line_test.py | 45 +++++++ tests/alerters/opsgenie_test.py | 52 ++++++++ tests/alerters/telegram_test.py | 49 ++++++++ tests/alerters/thehive_test.py | 91 ++++++++++++++ tests/alerts_test.py | 15 +++ tests/util_test.py | 212 +++++++++++++++++++++++++++++++- 8 files changed, 561 insertions(+), 5 deletions(-) diff --git a/tests/alerters/chatwork_test.py b/tests/alerters/chatwork_test.py index e74532ab..9e8a8969 100644 --- a/tests/alerters/chatwork_test.py +++ b/tests/alerters/chatwork_test.py @@ -193,3 +193,47 @@ def test_chatwork_maxlength(): actual_data = mock_post_request.call_args_list[0][1]['params'] assert expected_data == actual_data + + +def test_chatwork_matchs(): + rule = { + 'name': 'Test Chatwork Rule', + 'type': 'any', + 'chatwork_apikey': 'xxxx1', + 'chatwork_room_id': 'xxxx2', + 'alert': [] + } + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = ChatworkAlerter(rule) + match = { + '@timestamp': '2021-01-01T00:00:00', + 'somefield': 'foobarbaz' + } + with mock.patch('requests.post') as mock_post_request: + alert.alert([match, match]) + expected_data = { + 'body': 'Test Chatwork Rule\n' + + '\n' + + '@timestamp: 2021-01-01T00:00:00\n' + + 'somefield: foobarbaz\n' + + '\n' + + '----------------------------------------\n' + + 'Test Chatwork Rule\n' + + '\n' + + '@timestamp: 2021-01-01T00:00:00\n' + + 'somefield: foobarbaz\n' + + '\n' + + '----------------------------------------\n', + } + + mock_post_request.assert_called_once_with( + 'https://api.chatwork.com/v2/rooms/xxxx2/messages', + params=mock.ANY, + headers={'X-ChatWorkToken': 'xxxx1'}, + proxies=None, + auth=None + ) + + actual_data = mock_post_request.call_args_list[0][1]['params'] + assert expected_data == actual_data diff --git a/tests/alerters/discord_test.py b/tests/alerters/discord_test.py index 42f7f968..bec8b67f 100644 --- a/tests/alerters/discord_test.py +++ b/tests/alerters/discord_test.py @@ -260,3 +260,61 @@ def test_discord_required_error(discord_webhook_url, expected_data): assert expected_data == actual_data except Exception as ea: assert expected_data in str(ea) + + +def test_discord_matches(): + rule = { + 'name': 'Test Discord Rule', + 'type': 'any', + 'discord_webhook_url': 'http://xxxxxxx', + 'discord_emoji_title': ':warning:', + 'discord_embed_color': 0xffffff, + 'discord_embed_footer': 'footer', + 'discord_embed_icon_url': 'http://xxxx/image.png', + 'alert': [], + 'alert_subject': 'Test Discord' + } + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = DiscordAlerter(rule) + match = { + '@timestamp': '2021-01-01T00:00:00', + 'somefield': 'foobarbaz' + } + with mock.patch('requests.post') as mock_post_request: + alert.alert([match, match]) + + expected_data = { + 'content': ':warning: Test Discord :warning:', + 'embeds': + [{ + 'description': 'Test Discord Rule\n' + + '\n' + + '@timestamp: 2021-01-01T00:00:00\n' + + 'somefield: foobarbaz\n' + + '\n' + + '----------------------------------------\n' + + 'Test Discord Rule\n' + + '\n' + + '@timestamp: 2021-01-01T00:00:00\n' + + 'somefield: foobarbaz\n' + + '\n' + + '----------------------------------------\n', + 'color': 0xffffff, + 'footer': { + 'text': 'footer', + 'icon_url': 'http://xxxx/image.png' + } + }] + } + + mock_post_request.assert_called_once_with( + rule['discord_webhook_url'], + data=mock.ANY, + headers={'Content-Type': 'application/json'}, + proxies=None, + auth=None + ) + + actual_data = json.loads(mock_post_request.call_args_list[0][1]['data']) + assert expected_data == actual_data diff --git a/tests/alerters/line_test.py b/tests/alerters/line_test.py index 511bead0..8e4ba3cc 100644 --- a/tests/alerters/line_test.py +++ b/tests/alerters/line_test.py @@ -149,3 +149,48 @@ def test_line_notify_maxlength(): actual_data = mock_post_request.call_args_list[0][1]['data'] assert expected_data == actual_data + + +def test_line_notify_matchs(): + rule = { + 'name': 'Test LineNotify Rule', + 'type': 'any', + 'linenotify_access_token': 'xxxxx', + 'alert': [] + } + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = LineNotifyAlerter(rule) + match = { + '@timestamp': '2021-01-01T00:00:00', + 'somefield': 'foobarbaz' + } + with mock.patch('requests.post') as mock_post_request: + alert.alert([match, match]) + + expected_data = { + 'message': 'Test LineNotify Rule\n' + '\n' + '@timestamp: 2021-01-01T00:00:00\n' + 'somefield: foobarbaz\n' + '\n' + '----------------------------------------\n' + 'Test LineNotify Rule\n' + '\n' + '@timestamp: 2021-01-01T00:00:00\n' + 'somefield: foobarbaz\n' + '\n' + '----------------------------------------\n' + } + + mock_post_request.assert_called_once_with( + 'https://notify-api.line.me/api/notify', + data=mock.ANY, + headers={ + 'Content-Type': 'application/x-www-form-urlencoded', + 'Authorization': 'Bearer {}'.format('xxxxx') + } + ) + + actual_data = mock_post_request.call_args_list[0][1]['data'] + assert expected_data == actual_data diff --git a/tests/alerters/opsgenie_test.py b/tests/alerters/opsgenie_test.py index 65ce7bcc..10947a95 100644 --- a/tests/alerters/opsgenie_test.py +++ b/tests/alerters/opsgenie_test.py @@ -908,3 +908,55 @@ def test_opsgenie_substitution(opsgenie_entity, expected_entity, opsgenie_priori assert mcal[0][1]['json']['entity'] == expected_entity assert mcal[0][1]['json']['priority'] == expected_priority + + +def test_opsgenie_details_with_constant_value_matchs(): + rule = { + 'name': 'Opsgenie Details', + 'type': mock_rule(), + 'opsgenie_account': 'genies', + 'opsgenie_key': 'ogkey', + 'opsgenie_details': {'Foo': 'Bar'} + } + match = { + '@timestamp': '2014-10-31T00:00:00' + } + alert = OpsGenieAlerter(rule) + + with mock.patch('requests.post') as mock_post_request: + alert.alert([match, match]) + + mock_post_request.assert_called_once_with( + 'https://api.opsgenie.com/v2/alerts', + headers={ + 'Content-Type': 'application/json', + 'Authorization': 'GenieKey ogkey' + }, + json=mock.ANY, + proxies=None + ) + + expected_json = { + 'description': 'Opsgenie Details\n' + '\n' + "{'@timestamp': '2014-10-31T00:00:00'}\n" + '\n' + '@timestamp: 2014-10-31T00:00:00\n' + '\n' + '----------------------------------------\n' + 'Opsgenie Details\n' + '\n' + "{'@timestamp': '2014-10-31T00:00:00'}\n" + '\n' + '@timestamp: 2014-10-31T00:00:00\n' + '\n' + '----------------------------------------\n', + 'details': {'Foo': 'Bar'}, + 'message': 'ElastAlert: Opsgenie Details', + 'priority': None, + 'source': 'ElastAlert', + 'tags': ['ElastAlert', 'Opsgenie Details'], + 'user': 'genies' + } + actual_json = mock_post_request.call_args_list[0][1]['json'] + assert expected_json == actual_json diff --git a/tests/alerters/telegram_test.py b/tests/alerters/telegram_test.py index fb1ef459..1831832d 100644 --- a/tests/alerters/telegram_test.py +++ b/tests/alerters/telegram_test.py @@ -200,3 +200,52 @@ def test_telegram_required_error(telegram_bot_token, telegram_room_id, expected_ assert expected_data == actual_data except Exception as ea: assert expected_data in str(ea) + + +def test_telegram_matchs(): + rule = { + 'name': 'Test Telegram Rule', + 'type': 'any', + 'telegram_bot_token': 'xxxxx1', + 'telegram_room_id': 'xxxxx2', + 'alert': [] + } + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = TelegramAlerter(rule) + match = { + '@timestamp': '2021-01-01T00:00:00', + 'somefield': 'foobarbaz' + } + with mock.patch('requests.post') as mock_post_request: + alert.alert([match, match]) + expected_data = { + 'chat_id': rule['telegram_room_id'], + 'text': '⚠ *Test Telegram Rule* ⚠ ```\n' + + 'Test Telegram Rule\n' + + '\n' + + '@timestamp: 2021-01-01T00:00:00\n' + + 'somefield: foobarbaz\n' + + '\n' + + '----------------------------------------\n' + + 'Test Telegram Rule\n' + + '\n' + + '@timestamp: 2021-01-01T00:00:00\n' + + 'somefield: foobarbaz\n' + + '\n' + + '----------------------------------------\n' + + ' ```', + 'parse_mode': 'markdown', + 'disable_web_page_preview': True + } + + mock_post_request.assert_called_once_with( + 'https://api.telegram.org/botxxxxx1/sendMessage', + data=mock.ANY, + headers={'content-type': 'application/json'}, + proxies=None, + auth=None + ) + + actual_data = json.loads(mock_post_request.call_args_list[0][1]['data']) + assert expected_data == actual_data diff --git a/tests/alerters/thehive_test.py b/tests/alerters/thehive_test.py index 9a14f16a..2663fe91 100644 --- a/tests/alerters/thehive_test.py +++ b/tests/alerters/thehive_test.py @@ -172,3 +172,94 @@ def test_thehive_getinfo(hive_host, expect): expected_data = expect actual_data = alert.get_info() assert expected_data == actual_data + + +def test_thehive_alerter2(): + rule = {'alert': [], + 'alert_text': '', + 'alert_text_type': 'alert_text_only', + 'description': 'test', + 'hive_alert_config': {'customFields': [{'name': 'test', + 'type': 'string', + 'value': 2}], + 'follow': True, + 'severity': 2, + 'source': 'elastalert', + 'status': 'New', + 'tags': ['test.port'], + 'tlp': 3, + 'type': 'external'}, + 'hive_connection': {'hive_apikey': '', + 'hive_host': 'https://localhost', + 'hive_port': 9000}, + 'hive_observable_data_mapping': [{'ip': 'test.ip', 'autonomous-system': 'test.as_number'}], + 'name': 'test-thehive', + 'tags': ['a', 'b'], + 'type': 'any'} + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = HiveAlerter(rule) + match = { + "test": { + "ip": "127.0.0.1", + "port": 9876, + "as_number": 1234 + }, + "@timestamp": "2021-05-09T14:43:30", + } + with mock.patch('requests.post') as mock_post_request: + alert.alert([match]) + + expected_data = { + "artifacts": [ + { + "data": "127.0.0.1", + "dataType": "ip", + "message": None, + "tags": [], + "tlp": 2 + }, + { + "data": "1234", + "dataType": "autonomous-system", + "message": None, + "tags": [], + "tlp": 2 + } + ], + "customFields": { + "test": { + "order": 0, + "string": 2 + } + }, + "description": "\n\n", + "follow": True, + "severity": 2, + "source": "elastalert", + "status": "New", + "tags": [ + "9876" + ], + "title": "test-thehive", + "tlp": 3, + "type": "external" + } + + conn_config = rule['hive_connection'] + alert_url = f"{conn_config['hive_host']}:{conn_config['hive_port']}/api/alert" + mock_post_request.assert_called_once_with( + alert_url, + data=mock.ANY, + headers={'Content-Type': 'application/json', + 'Authorization': 'Bearer '}, + verify=False, + proxies={'http': '', 'https': ''} + ) + + actual_data = json.loads(mock_post_request.call_args_list[0][1]['data']) + # The date and sourceRef are autogenerated, so we can't expect them to be a particular value + del actual_data['date'] + del actual_data['sourceRef'] + + assert expected_data == actual_data diff --git a/tests/alerts_test.py b/tests/alerts_test.py index ed07f7b5..2c1e25ab 100644 --- a/tests/alerts_test.py +++ b/tests/alerts_test.py @@ -296,3 +296,18 @@ def test_alert_subject_with_jinja(): assert "Test alert for the_owner;" in alertsubject assert "field field_value;" in alertsubject assert "Abc: abc from match" in alertsubject + + +def test_alert_getinfo(): + rule = { + 'name': 'test_rule', + 'type': mock_rule(), + 'owner': 'the_owner', + 'priority': 2, + 'alert_subject': 'A very long subject', + 'alert_subject_max_len': 5 + } + alert = Alerter(rule) + actual_data = alert.get_info() + expected_data = {'type': 'Unknown'} + assert expected_data == actual_data diff --git a/tests/util_test.py b/tests/util_test.py index 893278b0..ffffb188 100644 --- a/tests/util_test.py +++ b/tests/util_test.py @@ -1,15 +1,24 @@ # -*- coding: utf-8 -*- +import logging +import os +import pytest + from datetime import datetime from datetime import timedelta -from unittest import mock -import pytest from dateutil.parser import parse as dt +from unittest import mock + from elastalert.util import add_raw_postfix +from elastalert.util import build_es_conn_config +from elastalert.util import dt_to_ts from elastalert.util import dt_to_ts_with_format +from elastalert.util import EAException +from elastalert.util import elasticsearch_client from elastalert.util import flatten_dict from elastalert.util import format_index +from elastalert.util import get_module from elastalert.util import lookup_es_key from elastalert.util import parse_deadline from elastalert.util import parse_duration @@ -19,6 +28,7 @@ from elastalert.util import set_es_key from elastalert.util import should_scrolling_continue from elastalert.util import ts_to_dt_with_format +from elastalert.util import ts_utc_to_tz @pytest.mark.parametrize('spec, expected_delta', [ @@ -234,20 +244,212 @@ def test_should_scrolling_continue(): assert should_scrolling_continue(rule_over_max_scrolling) is False -def test_ts_to_dt_with_format(): +def test_ts_to_dt_with_format1(): assert ts_to_dt_with_format('2021/02/01 12:30:00', '%Y/%m/%d %H:%M:%S') == dt('2021-02-01 12:30:00+00:00') + + +def test_ts_to_dt_with_format2(): assert ts_to_dt_with_format('01/02/2021 12:30:00', '%d/%m/%Y %H:%M:%S') == dt('2021-02-01 12:30:00+00:00') -def test_dt_to_ts_with_format(): +def test_ts_to_dt_with_format3(): + date = datetime(2021, 7, 6, hour=0, minute=0, second=0) + assert ts_to_dt_with_format(date, '') == dt('2021-7-6 00:00') + + +def test_ts_to_dt_with_format4(): + assert ts_to_dt_with_format('01/02/2021 12:30:00 +0900', '%d/%m/%Y %H:%M:%S %z') == dt('2021-02-01 12:30:00+09:00') + + +def test_dt_to_ts_with_format1(): assert dt_to_ts_with_format(dt('2021-02-01 12:30:00+00:00'), '%Y/%m/%d %H:%M:%S') == '2021/02/01 12:30:00' + + +def test_dt_to_ts_with_format2(): assert dt_to_ts_with_format(dt('2021-02-01 12:30:00+00:00'), '%d/%m/%Y %H:%M:%S') == '01/02/2021 12:30:00' +def test_dt_to_ts_with_format3(): + assert dt_to_ts_with_format('2021-02-01 12:30:00+00:00', '%d/%m/%Y %H:%M:%S') == '2021-02-01 12:30:00+00:00' + + def test_flatten_dict(): assert flatten_dict({'test': 'value1', 'test2': 'value2'}) == {'test': 'value1', 'test2': 'value2'} -def test_pytzfy(): +def test_pytzfy1(): assert pytzfy(dt('2021-02-01 12:30:00+00:00')) == dt('2021-02-01 12:30:00+00:00') + + +def test_pytzfy2(): assert pytzfy(datetime(2018, 12, 31, 5, 0, 30, 1000)) == dt('2018-12-31 05:00:30.001000') + + +def test_get_module(): + with pytest.raises(EAException) as ea: + get_module('test') + assert 'Could not import module' in str(ea) + + +def test_dt_to_ts(caplog): + caplog.set_level(logging.WARNING) + dt_to_ts('a') + user, level, message = caplog.record_tuples[0] + assert 'elastalert' == user + assert logging.WARNING == level + assert 'Expected datetime, got' in message + + +def test_ts_utc_to_tz(): + date = datetime(2021, 7, 6, hour=0, minute=0, second=0) + actual_data = ts_utc_to_tz(date, 'Europe/Istanbul') + assert '2021-07-06 03:00:00+03:00' == str(actual_data) + + +test_build_es_conn_config_param = 'es_host, es_port, es_conn_timeout, es_send_get_body_as, ssl_show_warn, es_username, ' +test_build_es_conn_config_param += 'es_password, es_api_key, es_bearer, aws_region, profile, use_ssl, verify_certs, ' +test_build_es_conn_config_param += 'ca_certs, client_cert,client_key,es_url_prefix, expected_data' + + +@pytest.mark.parametrize(test_build_es_conn_config_param, [ + ('', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', True), + ('localhost', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', True), + ('localhost', 9200, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', + { + 'use_ssl': False, + 'verify_certs': True, + 'ca_certs': None, + 'client_cert': None, + 'client_key': None, + 'http_auth': None, + 'es_username': None, + 'es_password': None, + 'es_api_key': None, + 'es_bearer': None, + 'aws_region': None, + 'profile': None, + 'headers': None, + 'es_host': 'localhost', + 'es_port': 9200, + 'es_url_prefix': '', + 'es_conn_timeout': 20, + 'send_get_body_as': 'GET', + 'ssl_show_warn': True + }), + ('localhost', 9200, 30, 'POST', False, 'user', 'pass', 'key', 'bearer', 'us-east-1', 'default', + True, False, '/path/to/cacert.pem', '/path/to/client_cert.pem', '/path/to/client_key.key', 'elasticsearch', + { + 'use_ssl': True, + 'verify_certs': False, + 'ca_certs': '/path/to/cacert.pem', + 'client_cert': '/path/to/client_cert.pem', + 'client_key': '/path/to/client_key.key', + 'http_auth': None, + 'es_username': 'user', + 'es_password': 'pass', + 'es_api_key': 'key', + 'es_bearer': 'bearer', + 'aws_region': 'us-east-1', + 'profile': 'default', + 'headers': None, + 'es_host': 'localhost', + 'es_port': 9200, + 'es_url_prefix': 'elasticsearch', + 'es_conn_timeout': 30, + 'send_get_body_as': 'POST', + 'ssl_show_warn': False + }), +]) +def test_build_es_conn_config(es_host, es_port, es_conn_timeout, es_send_get_body_as, ssl_show_warn, es_username, + es_password, es_api_key, es_bearer, aws_region, profile, use_ssl, verify_certs, + ca_certs, client_cert, client_key, es_url_prefix, expected_data): + try: + conf = {} + if es_host: + conf['es_host'] = es_host + if es_port: + conf['es_port'] = es_port + if es_conn_timeout: + conf['es_conn_timeout'] = es_conn_timeout + if es_send_get_body_as: + conf['es_send_get_body_as'] = es_send_get_body_as + if ssl_show_warn != '': + conf['ssl_show_warn'] = ssl_show_warn + if es_username: + conf['es_username'] = es_username + if es_password: + conf['es_password'] = es_password + if es_api_key: + conf['es_api_key'] = es_api_key + if es_bearer: + conf['es_bearer'] = es_bearer + if aws_region: + conf['aws_region'] = aws_region + if profile: + conf['profile'] = profile + if use_ssl != '': + conf['use_ssl'] = use_ssl + if verify_certs != '': + conf['verify_certs'] = verify_certs + if ca_certs: + conf['ca_certs'] = ca_certs + if client_cert: + conf['client_cert'] = client_cert + if client_key: + conf['client_key'] = client_key + if es_url_prefix: + conf['es_url_prefix'] = es_url_prefix + actual = build_es_conn_config(conf) + assert expected_data == actual + except KeyError: + assert expected_data + + +@mock.patch.dict(os.environ, {'ES_USERNAME': 'USER', + 'ES_PASSWORD': 'PASS', + 'ES_API_KEY': 'KEY', + 'ES_BEARER': 'BEARE'}) +def test_build_es_conn_config2(): + conf = {} + conf['es_host'] = 'localhost' + conf['es_port'] = 9200 + expected = { + 'use_ssl': False, + 'verify_certs': True, + 'ca_certs': None, + 'client_cert': None, + 'client_key': None, + 'http_auth': None, + 'es_username': 'USER', + 'es_password': 'PASS', + 'es_api_key': 'KEY', + 'es_bearer': 'BEARE', + 'aws_region': None, + 'profile': None, + 'headers': None, + 'es_host': 'localhost', + 'es_port': 9200, + 'es_url_prefix': '', + 'es_conn_timeout': 20, + 'send_get_body_as': 'GET', + 'ssl_show_warn': True + } + actual = build_es_conn_config(conf) + assert expected == actual + + +@pytest.mark.parametrize('es_host, es_port, es_bearer, es_api_key', [ + ('localhost', 9200, '', ''), + ('localhost', 9200, 'bearer', 'bearer') +]) +@mock.patch.dict(os.environ, {'AWS_DEFAULT_REGION': ''}) +def test_elasticsearch_client(es_host, es_port, es_bearer, es_api_key): + conf = {} + conf['es_host'] = es_host + conf['es_port'] = es_port + if es_bearer: + conf['es_bearer'] = es_bearer + if es_api_key: + conf['es_api_key'] = es_api_key + acutual = elasticsearch_client(conf) + assert None is not acutual