diff --git a/Alerting/Sample Watches/load_watch.sh b/Alerting/Sample Watches/load_watch.sh index 9c205ab8..d6740ac4 100755 --- a/Alerting/Sample Watches/load_watch.sh +++ b/Alerting/Sample Watches/load_watch.sh @@ -1,3 +1,5 @@ +#!/bin/bash + if [ -z "$1" ] ; then echo "USAGE: load_watch.sh : " echo "eg: ./load_watch.sh port_scan elastic changeme my_remote_cluster.mydomain:9200 https" @@ -19,11 +21,11 @@ fi port=9200 endpoint=localhost if [ "$4" ] ; then - if ":" in $4; then - endpoint=${4%":"*} # extractthe host value from the provided endpoint + if ":" in "$4"; then + endpoint=${4%":"*} # extract the host value from the provided endpoint port=${4#*":"} # extract the port value if provided in endpoint:port format if [ "$port" == "" ]; then - # if port is blank, due to endpoint provided as localhost: or no port providedthen use default port + # if port is blank, due to endpoint provided as localhost: or no port provided then use default port port=9200 fi else @@ -46,13 +48,13 @@ fi echo "Loading $1 scripts" shopt -s nullglob -for script in $1/scripts/*.json +for script in "$1/scripts"/*.json do filename=$(basename "$script") scriptname="${filename%.*}" - echo $scriptname - es_response=$(curl -H "Content-Type: application/json" -s -X POST $protocol$endpoint:$port/_scripts/$scriptname -u $username:$password -d @$script) - if [ 0 -eq $? ] && [ $es_response = '{"acknowledged":true}' ]; then + echo "$scriptname" + es_response=$(curl -H "Content-Type: application/json" -s -X POST "$protocol$endpoint:$port/_scripts/$scriptname" -u "$username:$password" -d "@$script") + if [ 0 -eq $? ] && [ "$es_response" = '{"acknowledged":true}' ]; then echo "Loading $scriptname script...OK" else echo "Loading $scriptname script...FAILED" @@ -62,9 +64,9 @@ done echo "Removing existing $1 watch " -curl -H "Content-Type: application/json" -s -X DELETE $protocol$endpoint:$port/_xpack/watcher/watch/$1 -u $username:$password +curl -H "Content-Type: application/json" -s -X DELETE "$protocol$endpoint:$port/_xpack/watcher/watch/$1" -u "$username:$password" echo "Loading $1 watch " -es_response=$(curl -H "Content-Type: application/json" --w "%{http_code}" -s -o /dev/null -X PUT $protocol$endpoint:$port/_xpack/watcher/watch/$1 -u $username:$password -d @$1/watch.json) +es_response=$(curl -H "Content-Type: application/json" --w "%{http_code}" -s -o /dev/null -X PUT "$protocol$endpoint:$port/_xpack/watcher/watch/$1" -u "$username:$password" -d "@$1/watch.json") if [ 0 -eq $? ] && [ $es_response = "201" ]; then echo "Loading $1 watch...OK" exit 0 diff --git a/Alerting/Sample Watches/run_all_tests.sh b/Alerting/Sample Watches/run_all_tests.sh index 20faad44..3d300fcf 100755 --- a/Alerting/Sample Watches/run_all_tests.sh +++ b/Alerting/Sample Watches/run_all_tests.sh @@ -1,2 +1,4 @@ #!/usr/bin/env bash -./run_test.sh '**' $1 $2 $3 $4 $5 +set -o nounset -o pipefail -o errexit + +./run_test.sh '**' "${1:-}" "${2:-}" "${3:-}" "${4:-}" "${5:-}" diff --git a/Alerting/Sample Watches/run_test.py b/Alerting/Sample Watches/run_test.py old mode 100644 new mode 100755 index 0460c71b..67b0c589 --- a/Alerting/Sample Watches/run_test.py +++ b/Alerting/Sample Watches/run_test.py @@ -1,81 +1,198 @@ -import sys +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# SPDX-FileCopyrightText: 2017 Dale McDiarmid +# SPDX-FileCopyrightText: 2017-2020 Robin Schneider +# SPDX-FileCopyrightText: 2020 Dan Roscigno +# SPDX-License-Identifier: Apache-2.0 -__author__ = 'dalem@elastic.co' +from __future__ import (print_function, unicode_literals, + absolute_import, division) import datetime +import json +import logging +import subprocess +import sys + +import yaml + from elasticsearch7 import Elasticsearch from elasticsearch7.client.ingest import IngestClient -import argparse -import json -parser = argparse.ArgumentParser(description='Index Connection Log data into ES with the last event at the current time') -parser.add_argument('--user',help='user') -parser.add_argument('--password',help='password') -parser.add_argument('--endpoint',help='endpoint') -parser.add_argument('--port',help='port') -parser.add_argument('--protocol',help='protocol') -parser.add_argument('--test_file',help='test file') - -parser.set_defaults(endpoint='localhost',port="9200",protocol="http",test_file='data.json',user='elastic',password='changeme') -args = parser.parse_args() -es = Elasticsearch([args.protocol+"://"+args.endpoint+":"+args.port],http_auth=(args.user, args.password)) - -def find_item(list, key): - for item in list: - if key in item: - return item - return None - -with open(args.test_file,'r') as test_file: - test=json.loads(test_file.read()) - try: - es.indices.delete(test['index']) - except: - print("Unable to delete current dataset") - pass - with open(test['mapping_file'],'r') as mapping_file: - es.indices.create(index=test["index"],body=json.loads(mapping_file.read())) - params={} - if "ingest_pipeline_file" in test: - with open(test['ingest_pipeline_file'],'r') as ingest_pipeline_file: - pipeline=json.loads(ingest_pipeline_file.read()) + +def set_value_as_default_for_leaf(nested_dict, path_exp, value): + if len(path_exp) == 1: + nested_dict.setdefault(path_exp[0], value) + elif path_exp[0] in nested_dict: + set_value_as_default_for_leaf(nested_dict[path_exp[0]], path_exp[1:], value) + + +def load_file(serialized_file): + with open(serialized_file, 'r') as serialized_file_fh: + if serialized_file.endswith('.json'): + decoded_object = json.loads(serialized_file_fh.read()) + elif serialized_file.endswith('.yml') or serialized_file.endswith('.yaml'): + decoded_object = yaml.safe_load(serialized_file_fh) + return decoded_object + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser(description='Index Connection Log data into ES with the last event at the current time') + parser.add_argument('-v', '--verbose', help='verbose output', action='store_true') + parser.add_argument('--endpoint', help='endpoint') + parser.add_argument('--port', help='port') + parser.add_argument('--protocol', help='protocol') + parser.add_argument('--cacert', help='CA certificate to trust for HTTPS') + parser.add_argument('--user', help='user') + parser.add_argument('--password', help='password') + parser.add_argument('--test_file', help='test file') + parser.add_argument( + '--minify-scripts', + help='Minify script source code as workaround for' + + ' "Scripts may be no longer than 16384 characters." in ES < v6.6.', + action='store_true') + # Ref: https://github.com/elastic/elasticsearch/pull/35184 + parser.add_argument('--keep-index', help='Keep the index where test documents have been loaded to after the test', action='store_true') + parser.add_argument('--metadata-git-commit', help='Include the git commit hash in the metadata field of the watcher', action='store_true') + parser.add_argument('--modify-watch-by-eval', help='Python code to modify the watch before loading it into Elastic') + parser.add_argument( + '--no-test-index', + help='Don’t put the test data into an index.', + action='store_false', + dest='test_index') + parser.add_argument( + '--no-execute-watch', + help='Do not force watch execution. This can be useful when you use this script to deploy the watch.', + action='store_false', + dest='execute_watch') + + parser.set_defaults(endpoint='localhost', port="9200", protocol="http", test_file='data.json', user='elastic', password='changeme') + args = parser.parse_args() + + if args.verbose: + logging.basicConfig(level=logging.DEBUG) + + es = Elasticsearch([args.protocol+"://"+args.endpoint+":"+args.port], http_auth=(args.user, args.password), ca_certs=args.cacert) + + test = load_file(args.test_file) + + if args.test_index: + # Load Mapping + try: + es.indices.delete(test['index']) + except Exception as err: + print("Unable to delete current dataset") + pass + index_template = load_file(test['mapping_file']) + for unneeded_keys in ['order', 'version', 'index_patterns']: + index_template.pop(unneeded_keys, None) + es.indices.create(index=test["index"], body=index_template) + + # Load pipeline if its declared + params = {} + if "ingest_pipeline_file" in test: + pipeline = load_file(test['ingest_pipeline_file']) p = IngestClient(es) - p.put_pipeline(id=test["watch_name"],body=pipeline) - params["pipeline"]=test["watch_name"] - current_data=last_time=datetime.datetime.utcnow() - i=0 - time_field = test["time_field"] if "time_field" in test else "@timestamp" - for event in test['events']: - event_time=current_data+datetime.timedelta(seconds=int(event['offset'] if 'offset' in event else 0)) - event[time_field]=event_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ') if not time_field in event else event[time_field] - es.index(index=test['index'],body=event,id=event['id'] if "id" in event else i,params=params) - i+=1 - es.indices.refresh(index=test["index"]) + p.put_pipeline(id=test["watch_name"], body=pipeline) + params["pipeline"] = test["watch_name"] + + # Index data + current_data = last_time = datetime.datetime.utcnow() + i = 0 + time_fields = test.get('time_fields', test.get('time_field', '@timestamp')) + time_fields = set([time_fields] if isinstance(time_fields, str) else time_fields) + for event in test['events']: + # All offsets are in seconds. + event_time = current_data+datetime.timedelta(seconds=int(event.get('offset', 0))) + for time_field in time_fields: + time_field = time_field.split('.') + set_value_as_default_for_leaf(event, time_field, event_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')) + es.index(index=test['index'], body=event, id=event.get('id', i), params=params) + i += 1 + es.indices.refresh(index=test["index"]) + + # Load Scripts if 'scripts' in test: for script in test['scripts']: - with open(script['path'], 'r') as script_file: - es.put_script(id=script["name"],body=json.loads(script_file.read())) + script_content = load_file(script['path']) + if args.minify_scripts: + # https://stackoverflow.com/questions/30795954/how-to-uglify-or-minify-c-code + p = subprocess.Popen(['gcc', '-fpreprocessed', '-dD', '-E', '-P', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) + script_content['script']['source'] = p.communicate(input=script_content['script']['source'].encode('utf-8'))[0].decode('utf-8') + es.put_script(id=script["name"], body=script_content) - with open(test['watch_file'],'r') as watch_file: - watch=json.loads(watch_file.read()) - es.watcher.put_watch(id=test["watch_name"],body=watch) - response=es.watcher.execute_watch(id=test["watch_name"]) + # Load Watch and Execute + watch = load_file(test['watch_file']) + if args.modify_watch_by_eval: + eval(compile(args.modify_watch_by_eval, '', 'exec')) + + if args.metadata_git_commit: + watch.setdefault('metadata', {}) + watch['metadata']['git_commit_hash'] = subprocess.check_output(['git', 'rev-parse', '--short', 'HEAD']).strip() + watch['metadata']['git_uncommitted_changes'] = True if len(subprocess.check_output(['git', 'status', '--porcelain']).strip()) > 0 else False + + es.watcher.put_watch(id=test["watch_name"], body=watch) + + if args.execute_watch: + response = es.watcher.execute_watch(id=test["watch_name"]) + + # Cleanup after the test to not pollute the environment for other tests. + if not args.keep_index: + try: + es.indices.delete(test['index']) + except Exception as err: + print("Unable to delete current dataset") + pass + + # Confirm Matches match = test['match'] if 'match' in test else True - print("Expected: Watch Condition: %s"%match) - if not 'condition' in response['watch_record']['result']: - print("Condition not evaluated due to watch error") + print("Expected: Watch Condition: {}".format(match)) + if 'condition' not in response['watch_record']['result']: + print("Condition not evaluated due to watch error: {}".format( + json.dumps(response['watch_record'], sort_keys=True, indent=2) + )) print("TEST FAIL") sys.exit(1) - met=response['watch_record']['result']['condition']['met'] - print("Received: Watch Condition: %s"%met) + met = response['watch_record']['result']['condition']['met'] + print("Received: Watch Condition: {}".format(met)) if match: if met and response['watch_record']['result']['condition']['status'] == "success": - print("Expected: %s"%test['expected_response']) - logging=find_item(response['watch_record']['result']['actions'],'logging')['logging'] + print("Expected: {}".format(test.get('expected_response'))) + if len(response['watch_record']['result']['actions']) == 0: + if response['watch_record']['result']['transform']['status'] == 'failure': + print("No actions where taken because transform failed: {}".format( + json.dumps(response['watch_record']['result'], sort_keys=True, indent=2) + )) + else: + print("No actions where taken: {}".format( + json.dumps(response['watch_record']['result'], sort_keys=True, indent=2) + )) + print("TEST FAIL") + sys.exit(1) + + logging_action = next((action for action in response['watch_record']['result']['actions'] if action["type"] == "logging"), None) + if logging_action is None: + print("No logging actions was taken. This test framework uses the logging action for comparison so you might need enable this action.") + print("TEST FAIL") + sys.exit(1) + if logging_action.get('transform', {}).get('status', 'success') != 'success': + print("Logging transform script failed: {}".format( + json.dumps(logging_action.get('transform', {}), sort_keys=True, indent=2), + )) + print("TEST FAIL") + sys.exit(1) + if 'logging' not in logging_action: + print("Logging action is not present: {}".format(logging_action)) + print("TEST FAIL") + sys.exit(1) + logging = logging_action['logging'] if logging: - print("Received: %s"%logging['logged_text']) - if logging['logged_text'] == test['expected_response']: + print("Received: {}".format(logging['logged_text'])) + if logging['logged_text'] == test.get('expected_response'): print("TEST PASS") sys.exit(0) else: @@ -83,5 +200,5 @@ def find_item(list, key): print("TEST FAIL") sys.exit(1) else: - print("TEST %s"%("PASS" if not response['watch_record']['result']['condition']['met'] else "FAIL")) + print("TEST {}".format("FAIL" if response['watch_record']['result']['condition']['met'] else "PASS")) sys.exit(met) diff --git a/Alerting/Sample Watches/run_test.sh b/Alerting/Sample Watches/run_test.sh index 120a2fcf..d3ea9e38 100755 --- a/Alerting/Sample Watches/run_test.sh +++ b/Alerting/Sample Watches/run_test.sh @@ -1,25 +1,26 @@ if [ -z "$1" ]; then -echo "Specify watch name e.g. run_test.sh " + echo "Specify watch name e.g. run_test.sh " + exit 1 fi username=elastic if [ "$2" ] ; then - username=$2 + username="$2" fi password=changeme if [ "$3" ] ; then - password=$3 + password="$3" fi port=9200 endpoint=localhost if [ "$4" ] ; then - if ":" in $4; then - endpoint=${4%":"*} # extractthe host value from the provided endpoint + if ":" in "$4"; then + endpoint=${4%":"*} # extract the host value from the provided endpoint port=${4#*":"} # extract the port value if provided in endpoint:port format if [ "$port" == "" ]; then - # if port is blank, due to endpoint provided as localhost: or no port providedthen use default port + # if port is blank, due to endpoint provided as localhost: or no port provided then use default port port=9200 fi else @@ -36,26 +37,22 @@ num_tests=0 pass=0 fails=0 echo "--------------------------------------------------" -for test in `ls $1/tests/*.json`; do -echo "Running test $test" -python3 run_test.py --user $username --password $password --endpoint $endpoint --port $port --protocol $protocol --test_file $test +# shellcheck disable=SC2231 +for test in $1/tests/*.json; do + echo "Running test $test" -if [ $? -eq 0 ]; then -let pass=pass+1 -else -let fails=fails+1 -fi -let num_tests=num_tests+1 -echo "--------------------------------------------------" -done; + if python3 run_test.py --user "$username" --password "$password" --endpoint "$endpoint" --port "$port" --protocol "$protocol" --test_file "$test"; then + pass=$(( pass+1 )) + else + fails=$(( fails+1 )) + fi + num_tests=$(( num_tests+1 )) + echo "--------------------------------------------------" +done echo "$num_tests tests run: $pass passed. $fails failed." if [ $fails -eq 0 ]; then -exit 0 + exit 0 else -exit 1 + exit 1 fi - - - -