From 8e06619074c39bacef87029dc0e7bf18de846f48 Mon Sep 17 00:00:00 2001 From: nsano-rururu Date: Fri, 4 Jun 2021 19:04:01 +0900 Subject: [PATCH] Fixed post_ignore_ssl_errors in http_post alerter --- elastalert/alerters/httppost.py | 6 +- tests/alerters/httppost_test.py | 100 +++++++++++++++++++++----------- 2 files changed, 69 insertions(+), 37 deletions(-) diff --git a/elastalert/alerters/httppost.py b/elastalert/alerters/httppost.py index 74f1635b..d5f4aaff 100644 --- a/elastalert/alerters/httppost.py +++ b/elastalert/alerters/httppost.py @@ -13,11 +13,11 @@ class HTTPPostAlerter(Alerter): def __init__(self, rule): super(HTTPPostAlerter, self).__init__(rule) - post_url = self.rule.get('http_post_url') + post_url = self.rule['http_post_url'] if isinstance(post_url, str): post_url = [post_url] self.post_url = post_url - self.post_proxy = self.rule.get('http_post_proxy') + self.post_proxy = self.rule.get('http_post_proxy', None) self.post_payload = self.rule.get('http_post_payload', {}) self.post_static_payload = self.rule.get('http_post_static_payload', {}) self.post_all_values = self.rule.get('http_post_all_values', not self.post_payload) @@ -41,6 +41,8 @@ def alert(self, matches): verify = self.post_ca_certs else: verify = not self.post_ignore_ssl_errors + if self.post_ignore_ssl_errors: + requests.packages.urllib3.disable_warnings() headers.update(self.post_http_headers) proxies = {'https': self.post_proxy} if self.post_proxy else None diff --git a/tests/alerters/httppost_test.py b/tests/alerters/httppost_test.py index bff4aa20..e84f45fe 100644 --- a/tests/alerters/httppost_test.py +++ b/tests/alerters/httppost_test.py @@ -213,49 +213,31 @@ def test_http_alerter_headers(): assert expected_data == json.loads(mock_post_request.call_args_list[0][1]['data']) -def test_http_alerter_post_ca_certs_true(): +@pytest.mark.parametrize('ca_certs, ignore_ssl_errors, excpet_verify', [ + ('', '', True), + ('', True, False), + ('', False, True), + (True, '', True), + (True, True, True), + (True, False, True), + (False, '', True), + (False, True, False), + (False, False, True) +]) +def test_http_alerter_post_ca_certs(ca_certs, ignore_ssl_errors, excpet_verify): rule = { 'name': 'Test HTTP Post Alerter Without Payload', 'type': 'any', 'http_post_url': 'http://test.webhook.url', 'http_post_static_payload': {'name': 'somestaticname'}, - 'http_post_ca_certs': True, 'alert': [] } - rules_loader = FileRulesLoader({}) - rules_loader.load_modules(rule) - alert = HTTPPostAlerter(rule) - match = { - '@timestamp': '2017-01-01T00:00:00', - 'somefield': 'foobarbaz' - } - with mock.patch('requests.post') as mock_post_request: - alert.alert([match]) - expected_data = { - '@timestamp': '2017-01-01T00:00:00', - 'somefield': 'foobarbaz', - 'name': 'somestaticname' - } - mock_post_request.assert_called_once_with( - rule['http_post_url'], - data=mock.ANY, - headers={'Content-Type': 'application/json', 'Accept': 'application/json;charset=utf-8'}, - proxies=None, - timeout=10, - verify=True - ) - assert expected_data == json.loads(mock_post_request.call_args_list[0][1]['data']) + if ca_certs != '': + rule['http_post_ca_certs'] = ca_certs + if ignore_ssl_errors != '': + rule['http_post_ignore_ssl_errors'] = ignore_ssl_errors -def test_http_alerter_post_ca_certs_false(): - rule = { - 'name': 'Test HTTP Post Alerter Without Payload', - 'type': 'any', - 'http_post_url': 'http://test.webhook.url', - 'http_post_static_payload': {'name': 'somestaticname'}, - 'http_post_ca_certs': False, - 'alert': [] - } rules_loader = FileRulesLoader({}) rules_loader.load_modules(rule) alert = HTTPPostAlerter(rule) @@ -276,7 +258,7 @@ def test_http_alerter_post_ca_certs_false(): headers={'Content-Type': 'application/json', 'Accept': 'application/json;charset=utf-8'}, proxies=None, timeout=10, - verify=True + verify=excpet_verify ) assert expected_data == json.loads(mock_post_request.call_args_list[0][1]['data']) @@ -303,3 +285,51 @@ def test_http_alerter_post_ea_exception(): alert.alert([match]) except EAException: assert True + + +def test_http_getinfo(): + rule = { + 'name': 'Test HTTP Post Alerter Without Payload', + 'type': 'any', + 'http_post_url': 'http://test.webhook.url', + 'alert': [] + } + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = HTTPPostAlerter(rule) + + expected_data = { + 'type': 'http_post', + 'http_post_webhook_url': ['http://test.webhook.url'] + } + actual_data = alert.get_info() + assert expected_data == actual_data + + +@pytest.mark.parametrize('http_post_webhook_url, expected_data', [ + ('', True), + ('http://test.webhook.url', + { + 'type': 'http_post', + 'http_post_webhook_url': ['http://test.webhook.url'] + }), +]) +def test_http_key_error(http_post_webhook_url, expected_data): + try: + rule = { + 'name': 'Test HTTP Post Alerter Without Payload', + 'type': 'any', + 'alert': [] + } + + if http_post_webhook_url != '': + rule['http_post_webhook_url'] = http_post_webhook_url + + rules_loader = FileRulesLoader({}) + rules_loader.load_modules(rule) + alert = HTTPPostAlerter(rule) + + actual_data = alert.get_info() + assert expected_data == actual_data + except KeyError: + assert expected_data