-
Notifications
You must be signed in to change notification settings - Fork 295
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1367 from wufeiqun/master
add workwechat alerter
- Loading branch information
Showing
7 changed files
with
193 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import json | ||
import warnings | ||
|
||
import requests | ||
from elastalert.alerts import Alerter, DateTimeEncoder | ||
from elastalert.util import EAException, elastalert_logger | ||
from requests import RequestException | ||
|
||
|
||
class WorkWechatAlerter(Alerter): | ||
""" Creates a WorkWechat message for each alert """ | ||
required_options = frozenset(['work_wechat_bot_id']) | ||
|
||
def __init__(self, rule): | ||
super(WorkWechatAlerter, self).__init__(rule) | ||
self.work_wechat_bot_id = self.rule.get('work_wechat_bot_id', None) | ||
self.work_wechat_webhook_url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={self.work_wechat_bot_id}' | ||
self.work_wechat_msg_type = 'text' | ||
|
||
def alert(self, matches): | ||
title = self.create_title(matches) | ||
body = self.create_alert_body(matches) | ||
|
||
headers = { | ||
'Content-Type': 'application/json', | ||
'Accept': 'application/json;charset=utf-8' | ||
} | ||
|
||
payload = { | ||
'msgtype': self.work_wechat_msg_type, | ||
"text": { | ||
"content": body | ||
}, | ||
} | ||
|
||
try: | ||
response = requests.post( | ||
self.work_wechat_webhook_url, | ||
data=json.dumps(payload, cls=DateTimeEncoder), | ||
headers=headers) | ||
warnings.resetwarnings() | ||
response.raise_for_status() | ||
except RequestException as e: | ||
raise EAException("Error posting to workwechat: %s" % e) | ||
|
||
elastalert_logger.info("Trigger sent to workwechat") | ||
|
||
def get_info(self): | ||
return { | ||
"type": "workwechat", | ||
"work_wechat_webhook_url": self.work_wechat_webhook_url | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
import json | ||
import logging | ||
from unittest import mock | ||
|
||
import pytest | ||
from requests import RequestException | ||
|
||
from elastalert.alerters.workwechat import WorkWechatAlerter | ||
from elastalert.loaders import FileRulesLoader | ||
from elastalert.util import EAException | ||
|
||
|
||
def test_work_wechat_text(caplog): | ||
caplog.set_level(logging.INFO) | ||
rule = { | ||
'name': 'Test WorkWechat Rule', | ||
'type': 'any', | ||
'work_wechat_bot_id': 'xxxxxxx', | ||
'alert': [], | ||
} | ||
rules_loader = FileRulesLoader({}) | ||
rules_loader.load_modules(rule) | ||
alert = WorkWechatAlerter(rule) | ||
match = { | ||
'@timestamp': '2024-01-30T00:00:00', | ||
'somefield': 'foobar' | ||
} | ||
with mock.patch('requests.post') as mock_post_request: | ||
alert.alert([match]) | ||
|
||
expected_data = { | ||
'msgtype': 'text', | ||
'text': { | ||
'content': 'Test WorkWechat Rule\n\n@timestamp: 2024-01-30T00:00:00\nsomefield: foobar\n' | ||
} | ||
} | ||
|
||
mock_post_request.assert_called_once_with( | ||
'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxx', | ||
data=mock.ANY, | ||
headers={ | ||
'Content-Type': 'application/json', | ||
'Accept': 'application/json;charset=utf-8' | ||
} | ||
) | ||
|
||
actual_data = json.loads(mock_post_request.call_args_list[0][1]['data']) | ||
assert expected_data == actual_data | ||
assert ('elastalert', logging.INFO, 'Trigger sent to workwechat') == caplog.record_tuples[0] | ||
|
||
|
||
def test_work_wechat_ea_exception(): | ||
with pytest.raises(EAException) as ea: | ||
rule = { | ||
'name': 'Test WorkWechat Rule', | ||
'type': 'any', | ||
'work_wechat_bot_id': 'xxxxxxx', | ||
'alert': [], | ||
} | ||
rules_loader = FileRulesLoader({}) | ||
rules_loader.load_modules(rule) | ||
alert = WorkWechatAlerter(rule) | ||
match = { | ||
'@timestamp': '2024-01-30T00:00:00', | ||
'somefield': 'foobar' | ||
} | ||
mock_run = mock.MagicMock(side_effect=RequestException) | ||
with mock.patch('requests.post', mock_run), pytest.raises(RequestException): | ||
alert.alert([match]) | ||
assert 'Error posting to workwechat: ' in str(ea) | ||
|
||
|
||
def test_work_wechat_getinfo(): | ||
rule = { | ||
'name': 'Test WorkWechat Rule', | ||
'type': 'any', | ||
'work_wechat_bot_id': 'xxxxxxx', | ||
'alert': [], | ||
} | ||
rules_loader = FileRulesLoader({}) | ||
rules_loader.load_modules(rule) | ||
alert = WorkWechatAlerter(rule) | ||
|
||
expected_data = { | ||
'type': 'workwechat', | ||
'work_wechat_webhook_url': 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxx' | ||
} | ||
actual_data = alert.get_info() | ||
assert expected_data == actual_data | ||
|
||
|
||
@pytest.mark.parametrize('work_wechat_bot_id, expected_data', [ | ||
('', 'Missing required option(s): work_wechat_bot_id'), | ||
('xxxxxxx', | ||
{ | ||
'type': 'workwechat', | ||
'work_wechat_webhook_url': 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxx' | ||
}), | ||
]) | ||
def test_work_wechat_required_error(work_wechat_bot_id, expected_data): | ||
try: | ||
rule = { | ||
'name': 'Test WorkWechat Rule', | ||
'type': 'any', | ||
'alert': [], | ||
} | ||
|
||
if work_wechat_bot_id: | ||
rule['work_wechat_bot_id'] = work_wechat_bot_id | ||
|
||
rules_loader = FileRulesLoader({}) | ||
rules_loader.load_modules(rule) | ||
alert = WorkWechatAlerter(rule) | ||
|
||
actual_data = alert.get_info() | ||
assert expected_data == actual_data | ||
except Exception as ea: | ||
assert expected_data in str(ea) |