From 15edc8075749df3b549652e643911a23a3e58e22 Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Thu, 15 Jun 2023 10:30:21 -0700 Subject: [PATCH 01/24] Add tox.yml and update tox.ini based on eda-partner-testing --- .github/workflows/tox.ini | 10 ++++++---- .github/workflows/tox.yml | 14 ++++++++++++++ 2 files changed, 20 insertions(+), 4 deletions(-) create mode 100644 .github/workflows/tox.yml diff --git a/.github/workflows/tox.ini b/.github/workflows/tox.ini index 44587b26..42fee1c4 100644 --- a/.github/workflows/tox.ini +++ b/.github/workflows/tox.ini @@ -1,4 +1,7 @@ -# this template can be adjusted as needed for your CI environment. the paths in the `commands` of each tox env linter can be changed if the tox file sits in a different location than expected in this template +# Recommended usage of this file is detailed in https://github.com/ansible/eda-partner-testing/blob/main/README.md. +# The linter paths can be changed, but may result in false passes. +# {posargs} in this case would be the path to collection root relative from the .github/workflows dir (`../..`) + [tox] envlist = ruff, darglint, pylint-event-source, pylint-event-filter requires = @@ -15,9 +18,8 @@ commands = ruff check --select ALL --ignore INP001 -q {posargs}/extensions/eda/p deps = darglint commands = darglint -s numpy -z full {posargs}/extensions/eda/plugins -# depending on what kind of plugins you have, remove the line you don't need (i.e event_sources or event_filters, remove the pylint call for the other one) -# depending on how your collection and repo is structured, you may need to change the path to each type of plugin -# if pylint warns about missing __init__.py files in directories, there's no need to include them if you ensure that the paths in the below pylint `commands` point directly to the *.py files under the event_source/ and event_filter/ dirs, as shown in the template path here + +# If you dont have any event_source or event_filter plugins, remove the corresponding testenv [testenv:pylint-event-source] deps = pylint commands = pylint {posargs}/extensions/eda/plugins/event_source/*.py --output-format=parseable -sn --disable R0801 diff --git a/.github/workflows/tox.yml b/.github/workflows/tox.yml new file mode 100644 index 00000000..b9395ab8 --- /dev/null +++ b/.github/workflows/tox.yml @@ -0,0 +1,14 @@ +--- +on: [push, pull_request] +name: Tox +jobs: + tox: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Install deps + run: python -m pip install tox + - name: Move to tox conf file and run tox + run: | + cd .github/workflows + python -m tox -- ../.. From 4c65c7185373b53acf6683f11a8b5ac5857141cf Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Thu, 15 Jun 2023 11:31:24 -0700 Subject: [PATCH 02/24] Fix all auto-fixable ruff issues --- .../event_filter/dashes_to_underscores.py | 6 +- .../event_filter/insert_hosts_to_meta.py | 19 ++++--- .../eda/plugins/event_filter/json_filter.py | 15 ++--- extensions/eda/plugins/event_filter/noop.py | 4 +- .../plugins/event_filter/normalize_keys.py | 12 ++-- .../eda/plugins/event_source/alertmanager.py | 22 +++---- .../plugins/event_source/aws_cloudtrail.py | 12 ++-- .../eda/plugins/event_source/aws_sqs_queue.py | 15 +++-- .../plugins/event_source/azure_service_bus.py | 22 +++---- extensions/eda/plugins/event_source/file.py | 17 +++--- .../eda/plugins/event_source/file_watch.py | 57 ++++++++++--------- .../eda/plugins/event_source/generic.py | 37 ++++++------ .../eda/plugins/event_source/journald.py | 14 +++-- extensions/eda/plugins/event_source/kafka.py | 15 ++--- extensions/eda/plugins/event_source/range.py | 14 ++--- extensions/eda/plugins/event_source/tick.py | 14 ++--- .../eda/plugins/event_source/url_check.py | 41 +++++++------ .../eda/plugins/event_source/webhook.py | 14 +++-- 18 files changed, 179 insertions(+), 171 deletions(-) diff --git a/extensions/eda/plugins/event_filter/dashes_to_underscores.py b/extensions/eda/plugins/event_filter/dashes_to_underscores.py index a9b47100..b3010de2 100644 --- a/extensions/eda/plugins/event_filter/dashes_to_underscores.py +++ b/extensions/eda/plugins/event_filter/dashes_to_underscores.py @@ -1,10 +1,12 @@ -""" -dashes_to_underscores.py: +"""dashes_to_underscores.py. + An event filter that changes dashes in keys to underscores. For instance, the key X-Y becomes the new key X_Y. Arguments: +--------- * overwrite: Overwrite the values if there is a collision with a new key. + """ import multiprocessing as mp diff --git a/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py b/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py index 9ba2ee84..f650d1b9 100644 --- a/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py +++ b/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py @@ -1,11 +1,11 @@ -""" -insert_hosts_to_meta.py +"""insert_hosts_to_meta.py. An ansible-rulebook event filter that extract hosts from the event data and insert them to the meta dict. Ansible-rulebook will limit an ansible action running on hosts in the meta dict. Arguments: +--------- host_path: The json path inside the event data to find hosts. Do nothing if the key is not present or does exist in event path_separator: The separator to interpret host_path. Default to "." @@ -16,6 +16,7 @@ parameter is not present. Example: +------- - ansible.eda.insert_hosts_to_meta host_path: app.target path_separator: . @@ -23,17 +24,17 @@ """ -from typing import Any, Dict +from typing import Any import dpath def main( - event: Dict[str, Any], + event: dict[str, Any], host_path: str = None, host_separator: str = None, path_separator: str = ".", -) -> Dict[str, Any]: +) -> dict[str, Any]: if not host_path: return event @@ -45,12 +46,14 @@ def main( if isinstance(hosts, str): hosts = hosts.split(host_separator) if host_separator else [hosts] - elif isinstance(hosts, list) or isinstance(hosts, tuple): + elif isinstance(hosts, list | tuple): for h in hosts: if not isinstance(h, str): - raise TypeError(f"{h} is not a valid hostname") + msg = f"{h} is not a valid hostname" + raise TypeError(msg) else: - raise TypeError(f"{hosts} is not a valid hostname") + msg = f"{hosts} is not a valid hostname" + raise TypeError(msg) if "meta" not in event: event["meta"] = {} diff --git a/extensions/eda/plugins/event_filter/json_filter.py b/extensions/eda/plugins/event_filter/json_filter.py index 0b044209..ba922e8e 100644 --- a/extensions/eda/plugins/event_filter/json_filter.py +++ b/extensions/eda/plugins/event_filter/json_filter.py @@ -1,5 +1,4 @@ -""" -json_filter.py: An event filter that filters keys out of events. +"""json_filter.py: An event filter that filters keys out of events. Includes override excludes. @@ -7,26 +6,22 @@ engine. Arguments: +--------- * exclude_keys = a list of strings or patterns to remove * include_keys = a list of strings or patterns to keep even if it matches exclude_keys patterns. + """ import fnmatch def matches_include_keys(include_keys, s): - for pattern in include_keys: - if fnmatch.fnmatch(s, pattern): - return True - return False + return any(fnmatch.fnmatch(s, pattern) for pattern in include_keys) def matches_exclude_keys(exclude_keys, s): - for pattern in exclude_keys: - if fnmatch.fnmatch(s, pattern): - return True - return False + return any(fnmatch.fnmatch(s, pattern) for pattern in exclude_keys) def main(event, exclude_keys=None, include_keys=None): diff --git a/extensions/eda/plugins/event_filter/noop.py b/extensions/eda/plugins/event_filter/noop.py index f1288294..91331aca 100644 --- a/extensions/eda/plugins/event_filter/noop.py +++ b/extensions/eda/plugins/event_filter/noop.py @@ -1,6 +1,4 @@ -""" -noop.py: An event filter that does nothing to the input. -""" +"""noop.py: An event filter that does nothing to the input.""" def main(event): diff --git a/extensions/eda/plugins/event_filter/normalize_keys.py b/extensions/eda/plugins/event_filter/normalize_keys.py index da6c5fad..d0d1f0e7 100644 --- a/extensions/eda/plugins/event_filter/normalize_keys.py +++ b/extensions/eda/plugins/event_filter/normalize_keys.py @@ -1,18 +1,18 @@ -""" -normalize_keys.py: +"""normalize_keys.py: An event filter that changes keys that contain non alpha numeric or underscore to undersocres. For instance, the key server-name becomes the new key server_name If there are consecutive non alpa numeric or under score, they would be coalesced into a single underscore For instance the key server.com/&abc becomes server_com_abc - instead of server_com__abc + instead of server_com__abc. If there is a existing key with the normalized name, it will get overwritten by default. If you don't want to over write it you can pass in overwrite: False The default value of overwrite is True. Arguments: +--------- * overwrite: Overwrite the values if there is a collision with a new key. Usage in a rulebook, a filter is usually attached to a source in the rulebook: @@ -50,17 +50,17 @@ def main(event, overwrite=True): def _normalize_embedded_keys(obj, overwrite, logger): if isinstance(obj, dict): - new_dict = dict() + new_dict = {} original_keys = list(obj.keys()) for key in original_keys: new_key = normalize_regex.sub("_", key) if new_key == key or new_key not in original_keys: new_dict[new_key] = _normalize_embedded_keys( - obj[key], overwrite, logger + obj[key], overwrite, logger, ) elif new_key in original_keys and overwrite: new_dict[new_key] = _normalize_embedded_keys( - obj[key], overwrite, logger + obj[key], overwrite, logger, ) logger.warning("Replacing existing key %s", new_key) return new_dict diff --git a/extensions/eda/plugins/event_source/alertmanager.py b/extensions/eda/plugins/event_source/alertmanager.py index 652aa802..ca3a99aa 100644 --- a/extensions/eda/plugins/event_source/alertmanager.py +++ b/extensions/eda/plugins/event_source/alertmanager.py @@ -1,10 +1,10 @@ -""" -alertmanager.py +"""alertmanager.py. An ansible-rulebook event source module for receiving events via a webhook from alertmanager or alike system. Arguments: +--------- host: The webserver hostname to listen to. Set to 0.0.0.0 to listen on all interfaces. Defaults to 127.0.0.1 port: The TCP port to listen to. Defaults to 5000 @@ -22,7 +22,7 @@ data and each parsed alert item to the queue. Example: - +------- - ansible.eda.alertmanager: host: 0.0.0.0 port: 8000 @@ -33,7 +33,7 @@ """ import asyncio -from typing import Any, Dict +from typing import Any from aiohttp import web from dpath import util @@ -91,12 +91,12 @@ async def webhook(request: web.Request): pass await request.app["queue"].put( - dict( - alert=alert, - meta=dict( - endpoint=endpoint, headers=dict(request.headers), hosts=hosts - ), - ) + { + "alert": alert, + "meta": { + "endpoint": endpoint, "headers": dict(request.headers), "hosts": hosts, + }, + }, ) return web.Response(status=202, text="Received") @@ -109,7 +109,7 @@ def clean_host(host): return host -async def main(queue: asyncio.Queue, args: Dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]): app = web.Application() app["queue"] = queue app["data_host_path"] = str(args.get("data_host_path", "labels.instance")) diff --git a/extensions/eda/plugins/event_source/aws_cloudtrail.py b/extensions/eda/plugins/event_source/aws_cloudtrail.py index 4ffe969c..198fdc29 100644 --- a/extensions/eda/plugins/event_source/aws_cloudtrail.py +++ b/extensions/eda/plugins/event_source/aws_cloudtrail.py @@ -1,9 +1,9 @@ -""" -aws_cloudtrail.py +"""aws_cloudtrail.py. An ansible-rulebook event source module for getting events from an AWS CloudTrail Arguments: +--------- access_key: Optional AWS access key ID secret_key: Optional AWS secret access key session_token: Optional STS session token for use with temporary credentials @@ -19,7 +19,7 @@ event_category: The optional event category to return. (e.g. 'insight') Example: - +------- - ansible.eda.aws_cloudtrail: region: us-east-1 lookup_attributes: @@ -34,7 +34,7 @@ import asyncio import json from datetime import datetime -from typing import Any, Dict +from typing import Any from aiobotocore.session import get_session @@ -74,7 +74,7 @@ async def get_cloudtrail_events(client, params): } -async def main(queue: asyncio.Queue, args: Dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]): delay = int(args.get("delay_seconds", 10)) session = get_session() @@ -103,7 +103,7 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): await asyncio.sleep(delay) -def connection_args(args: Dict[str, Any]) -> Dict[str, Any]: +def connection_args(args: dict[str, Any]) -> dict[str, Any]: selected_args = {} # Best Practice: get credentials from ~/.aws/credentials or the environment diff --git a/extensions/eda/plugins/event_source/aws_sqs_queue.py b/extensions/eda/plugins/event_source/aws_sqs_queue.py index 50fbf6ba..43bb2093 100644 --- a/extensions/eda/plugins/event_source/aws_sqs_queue.py +++ b/extensions/eda/plugins/event_source/aws_sqs_queue.py @@ -1,9 +1,9 @@ -""" -aws_sqs_queue.py +"""aws_sqs_queue.py. An ansible-rulebook event source plugin for receiving events via an AWS SQS queue. Arguments: +--------- access_key: Optional AWS access key ID secret_key: Optional AWS secret access key session_token: Optional STS session token for use with temporary credentials @@ -13,26 +13,29 @@ delay_seconds: The SQS long polling duration. Set to 0 to disable. Defaults to 2. Example: +------- - ansible.eda.aws_sqs: region: us-east-1 name: eda delay_seconds: 10 + """ import asyncio import json import logging -from typing import Any, Dict +from typing import Any import botocore.exceptions from aiobotocore.session import get_session -async def main(queue: asyncio.Queue, args: Dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]): logger = logging.getLogger() if "name" not in args: - raise ValueError("Missing queue name") + msg = "Missing queue name" + raise ValueError(msg) queue_name = str(args.get("name")) wait_seconds = int(args.get("delay_seconds", 2)) @@ -79,7 +82,7 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): logger.debug("No messages in queue") -def connection_args(args: Dict[str, Any]) -> Dict[str, Any]: +def connection_args(args: dict[str, Any]) -> dict[str, Any]: selected_args = {} # Best Practice: get credentials from ~/.aws/credentials or the environment diff --git a/extensions/eda/plugins/event_source/azure_service_bus.py b/extensions/eda/plugins/event_source/azure_service_bus.py index 0802c910..ec5dd2fd 100644 --- a/extensions/eda/plugins/event_source/azure_service_bus.py +++ b/extensions/eda/plugins/event_source/azure_service_bus.py @@ -1,15 +1,15 @@ -""" -azure_service_bus.py +"""azure_service_bus.py. An ansible-rulebook event source module for receiving events from an Azure service bus Arguments: +--------- conn_str: The connection string to connect to the Azure service bus queue_name: The name of the queue to pull messages from logging_enable: Whether to turn on logging. Default to True Example: - +------- - ansible.eda.azure_service_bus: conn_str: "{{connection_str}}" queue_name: "{{queue_name}}" @@ -18,17 +18,18 @@ import asyncio import concurrent.futures +import contextlib import json -from typing import Any, Dict +from typing import Any from azure.servicebus import ServiceBusClient def receive_events( - loop: asyncio.events.AbstractEventLoop, queue: asyncio.Queue, args: Dict[str, Any] + loop: asyncio.events.AbstractEventLoop, queue: asyncio.Queue, args: dict[str, Any], ): servicebus_client = ServiceBusClient.from_connection_string( - conn_str=args["conn_str"], logging_enable=bool(args.get("logging_enable", True)) + conn_str=args["conn_str"], logging_enable=bool(args.get("logging_enable", True)), ) with servicebus_client: @@ -37,17 +38,16 @@ def receive_events( for msg in receiver: meta = {"message_id": msg.message_id} body = str(msg) - try: + with contextlib.suppress(json.JSONDecodeError): body = json.loads(body) - except json.JSONDecodeError: - pass + loop.call_soon_threadsafe( - queue.put_nowait, {"body": body, "meta": meta} + queue.put_nowait, {"body": body, "meta": meta}, ) receiver.complete_message(msg) -async def main(queue: asyncio.Queue, args: Dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]): loop = asyncio.get_running_loop() with concurrent.futures.ThreadPoolExecutor(max_workers=1) as task_pool: diff --git a/extensions/eda/plugins/event_source/file.py b/extensions/eda/plugins/event_source/file.py index fc7a827f..5d48cfc3 100644 --- a/extensions/eda/plugins/event_source/file.py +++ b/extensions/eda/plugins/event_source/file.py @@ -1,14 +1,14 @@ -""" -file.py +"""file.py. An ansible-rulebook event source plugin for loading facts from YAML files initially and when the file changes. Arguments: +--------- files - a list of YAML files Example: - +------- - ansible.eda.file: files: - fact.yml @@ -31,13 +31,14 @@ def send_facts(queue, filename): queue.put(data) else: if not isinstance(data, list): + msg = f"Unsupported facts type, expects a list of dicts found {type(data)}" raise Exception( - "Unsupported facts type, expects a list of dicts found" - f" {type(data)}" + msg, ) - if not all(True if isinstance(item, dict) else False for item in data): + if not all(bool(isinstance(item, dict)) for item in data): + msg = f"Unsupported facts type, expects a list of dicts found {data}" raise Exception( - f"Unsupported facts type, expects a list of dicts found {data}" + msg, ) for item in data: queue.put(item) @@ -53,7 +54,7 @@ def main(queue, args): send_facts(queue, filename) class Handler(RegexMatchingEventHandler): - def __init__(self, **kwargs): + def __init__(self, **kwargs) -> None: RegexMatchingEventHandler.__init__(self, **kwargs) def on_created(self, event): diff --git a/extensions/eda/plugins/event_source/file_watch.py b/extensions/eda/plugins/event_source/file_watch.py index 77dce5f5..60650159 100644 --- a/extensions/eda/plugins/event_source/file_watch.py +++ b/extensions/eda/plugins/event_source/file_watch.py @@ -1,20 +1,21 @@ -""" -file_watch.py +"""file_watch.py. An ansible-rulebook event source plugin for watching file system changes. Arguments: +--------- path: The directory to watch for changes. ignore_regexes: A list of regular expressions to ignore changes recursive: Recursively watch the path if true Example: - +------- - name: file_watch file_watch: path: "{{src_path}}" recursive: true ignore_regexes: ['.*\\.pytest.*', '.*__pycache__.*', '.*/.git.*'] + """ import asyncio @@ -28,51 +29,51 @@ def watch(loop, queue, args): root_path = args["path"] class Handler(RegexMatchingEventHandler): - def __init__(self, **kwargs): + def __init__(self, **kwargs) -> None: RegexMatchingEventHandler.__init__(self, **kwargs) def on_created(self, event): loop.call_soon_threadsafe( queue.put_nowait, - dict( - change="created", - src_path=event.src_path, - type=event.__class__.__name__, - root_path=root_path, - ), + { + "change": "created", + "src_path": event.src_path, + "type": event.__class__.__name__, + "root_path": root_path, + }, ) def on_deleted(self, event): loop.call_soon_threadsafe( queue.put_nowait, - dict( - change="deleted", - src_path=event.src_path, - type=event.__class__.__name__, - root_path=root_path, - ), + { + "change": "deleted", + "src_path": event.src_path, + "type": event.__class__.__name__, + "root_path": root_path, + }, ) def on_modified(self, event): loop.call_soon_threadsafe( queue.put_nowait, - dict( - change="modified", - src_path=event.src_path, - type=event.__class__.__name__, - root_path=root_path, - ), + { + "change": "modified", + "src_path": event.src_path, + "type": event.__class__.__name__, + "root_path": root_path, + }, ) def on_moved(self, event): loop.call_soon_threadsafe( queue.put_nowait, - dict( - change="moved", - src_path=event.src_path, - type=event.__class__.__name__, - root_path=root_path, - ), + { + "change": "moved", + "src_path": event.src_path, + "type": event.__class__.__name__, + "root_path": root_path, + }, ) observer = Observer() diff --git a/extensions/eda/plugins/event_source/generic.py b/extensions/eda/plugins/event_source/generic.py index f8c2db0d..b4fd6ace 100644 --- a/extensions/eda/plugins/event_source/generic.py +++ b/extensions/eda/plugins/event_source/generic.py @@ -16,7 +16,7 @@ import random import time from datetime import datetime -from typing import Any, Dict +from typing import Any """ A generic source plugin that allows you to insert custom data @@ -48,7 +48,7 @@ """ -async def main(queue: asyncio.Queue, args: Dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]): payload = args.get("payload") randomize = args.get("randomize", False) display = args.get("display", False) @@ -65,7 +65,8 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): loop_count = int(args.get("loop_count", 1)) # -1 infinite repeat_count = int(args.get("repeat_count", 1)) if time_format not in ["local", "iso8601", "epoch"]: - raise ValueError("time_format must be one of local, iso8601, epoch") + msg = "time_format must be one of local, iso8601, epoch" + raise ValueError(msg) if not isinstance(payload, list): payload = [payload] @@ -116,19 +117,19 @@ async def put(self, event): asyncio.run( main( MockQueue(), - dict( - randomize=True, - startup_delay=1, - create_index="my_index", - loop_count=2, - repeat_count=2, - repeat_delay=1, - event_delay=2, - loop_delay=3, - shutdown_after=11, - timestamp=True, - display=True, - payload=[dict(i=1), dict(f=3.14159), dict(b=False)], - ), - ) + { + "randomize": True, + "startup_delay": 1, + "create_index": "my_index", + "loop_count": 2, + "repeat_count": 2, + "repeat_delay": 1, + "event_delay": 2, + "loop_delay": 3, + "shutdown_after": 11, + "timestamp": True, + "display": True, + "payload": [{"i": 1}, {"f": 3.14159}, {"b": False}], + }, + ), ) diff --git a/extensions/eda/plugins/event_source/journald.py b/extensions/eda/plugins/event_source/journald.py index 5070bdd9..22079ca0 100644 --- a/extensions/eda/plugins/event_source/journald.py +++ b/extensions/eda/plugins/event_source/journald.py @@ -1,11 +1,12 @@ -""" -journald.py +"""journald.py An ansible-events event source plugin that tails systemd journald logs. Arguments: +--------- match - return messages that matches this field, see https://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html # noqa Examples: +-------- - name: Return severity 6 messages ansible.eda.journald: match: "PRIORITY=6" @@ -17,15 +18,16 @@ - name: Return all messages ansible.eda.journald: match: "ALL" + """ import asyncio -from typing import Any, Dict +from typing import Any from systemd import journal -async def main(queue: asyncio.Queue, args: Dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]): delay = args.get("delay", 0) match = args.get("match", []) @@ -36,14 +38,14 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): journal_stream.seek_tail() while True: - if not match == "ALL": + if match != "ALL": journal_stream.add_match(match) for entry in journal_stream: stream_dict = {} for field in entry: stream_dict[field.lower()] = f"{entry[field]}" - await queue.put(dict(journald=stream_dict)) + await queue.put({"journald": stream_dict}) await asyncio.sleep(delay) stream_dict.clear() diff --git a/extensions/eda/plugins/event_source/kafka.py b/extensions/eda/plugins/event_source/kafka.py index a896bc82..500147c2 100644 --- a/extensions/eda/plugins/event_source/kafka.py +++ b/extensions/eda/plugins/event_source/kafka.py @@ -1,9 +1,9 @@ -""" -kafka.py +"""kafka.py. An ansible-rulebook event source plugin for receiving events via a kafka topic. Arguments: +--------- host: The host where the kafka topic is hosted port: The port where the kafka server is listening cafile: The optional certificate authority file path containing certificates @@ -27,13 +27,13 @@ import asyncio import json import logging -from typing import Any, Dict +from typing import Any from aiokafka import AIOKafkaConsumer from aiokafka.helpers import create_ssl_context -async def main(queue: asyncio.Queue, args: Dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]): logger = logging.getLogger() topic = args.get("topic") @@ -49,7 +49,8 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): encoding = args.get("encoding", "utf-8") if offset not in ("latest", "earliest"): - raise Exception(f"Invalid offset option: {offset}") + msg = f"Invalid offset option: {offset}" + raise Exception(msg) if cafile: context = create_ssl_context( @@ -62,7 +63,7 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): kafka_consumer = AIOKafkaConsumer( topic, - bootstrap_servers="{0}:{1}".format(host, port), + bootstrap_servers=f"{host}:{port}", group_id=group_id, enable_auto_commit=True, max_poll_records=1, @@ -102,5 +103,5 @@ async def put(self, event): main( MockQueue(), {"topic": "eda", "host": "localhost", "port": "9092", "group_id": "test"}, - ) + ), ) diff --git a/extensions/eda/plugins/event_source/range.py b/extensions/eda/plugins/event_source/range.py index 080ddb4f..71933272 100644 --- a/extensions/eda/plugins/event_source/range.py +++ b/extensions/eda/plugins/event_source/range.py @@ -1,28 +1,28 @@ -""" -range.py +"""range.py. An ansible-rulebook event source plugin for generating events with an increasing index i. Arguments: +--------- limit: The upper limit of the range of the index. Example: - +------- - ansible.eda.range: limit: 5 """ import asyncio -from typing import Any, Dict +from typing import Any -async def main(queue: asyncio.Queue, args: Dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]): delay = args.get("delay", 0) for i in range(int(args["limit"])): - await queue.put(dict(i=i)) + await queue.put({"i": i}) await asyncio.sleep(delay) @@ -32,4 +32,4 @@ class MockQueue: async def put(self, event): print(event) - asyncio.run(main(MockQueue(), dict(limit=5))) + asyncio.run(main(MockQueue(), {"limit": 5})) diff --git a/extensions/eda/plugins/event_source/tick.py b/extensions/eda/plugins/event_source/tick.py index 4910b11e..130d8179 100644 --- a/extensions/eda/plugins/event_source/tick.py +++ b/extensions/eda/plugins/event_source/tick.py @@ -1,28 +1,28 @@ -""" -tick.py +"""tick.py. An ansible-rulebook event source plugin for generating events with an increasing index i that never ends. Arguments: +--------- delay: time between ticks Example: - +------- - ansible.eda.tick: delay: 5 """ import asyncio import itertools -from typing import Any, Dict +from typing import Any -async def main(queue: asyncio.Queue, args: Dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]): delay = args.get("delay", 1) for i in itertools.count(start=1): - await queue.put(dict(i=i)) + await queue.put({"i": i}) await asyncio.sleep(delay) @@ -32,4 +32,4 @@ class MockQueue: async def put(self, event): print(event) - asyncio.run(main(MockQueue(), dict(delay=1))) + asyncio.run(main(MockQueue(), {"delay": 1})) diff --git a/extensions/eda/plugins/event_source/url_check.py b/extensions/eda/plugins/event_source/url_check.py index f6393398..2d5b9301 100644 --- a/extensions/eda/plugins/event_source/url_check.py +++ b/extensions/eda/plugins/event_source/url_check.py @@ -1,32 +1,31 @@ -""" - -url_check.py +"""url_check.py. An ansible-rulebook event source plugin that polls a set of URLs and sends events with their status. Arguments: - +--------- urls - a list of urls to poll delay - the number of seconds to wait between polling verify_ssl - verify SSL certificate Example: - +------- - name: check web server ansible.eda.url_check: urls: - http://44.201.5.56:8000/docs delay: 10 + """ import asyncio -from typing import Any, Dict +from typing import Any import aiohttp -async def main(queue: asyncio.Queue, args: Dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]): urls = args.get("urls", []) delay = int(args.get("delay", 1)) verify_ssl = args.get("verify_ssl", True) @@ -40,25 +39,25 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): for url in urls: async with session.get(url, verify_ssl=verify_ssl) as resp: await queue.put( - dict( - url_check=dict( - url=url, - status="up" if resp.status == 200 else "down", - status_code=resp.status, - ) - ) + { + "url_check": { + "url": url, + "status": "up" if resp.status == 200 else "down", + "status_code": resp.status, + }, + }, ) except aiohttp.ClientError as e: client_error = str(e) await queue.put( - dict( - url_check=dict( - url=url, - status="down", - error_msg=client_error, - ) - ) + { + "url_check": { + "url": url, + "status": "down", + "error_msg": client_error, + }, + }, ) await asyncio.sleep(delay) diff --git a/extensions/eda/plugins/event_source/webhook.py b/extensions/eda/plugins/event_source/webhook.py index 800c698a..6f69685b 100644 --- a/extensions/eda/plugins/event_source/webhook.py +++ b/extensions/eda/plugins/event_source/webhook.py @@ -1,9 +1,9 @@ -""" -webhook.py +"""webhook.py. An ansible-rulebook event source module for receiving events via a webhook. Arguments: +--------- host: The hostname to listen to. Set to 0.0.0.0 to listen on all interfaces. Defaults to 127.0.0.1 port: The TCP port to listen to. Defaults to 5000 @@ -17,7 +17,8 @@ import asyncio import logging import ssl -from typing import Any, Callable, Dict +from collections.abc import Callable +from typing import Any from aiohttp import web @@ -55,9 +56,10 @@ async def bearer_auth(request: web.Request, handler: Callable): return await handler(request) -async def main(queue: asyncio.Queue, args: Dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]): if "port" not in args: - raise ValueError("Missing required argument: port") + msg = "Missing required argument: port" + raise ValueError(msg) if "token" in args: app = web.Application(middlewares=[bearer_auth]) app["token"] = args["token"] @@ -112,5 +114,5 @@ async def put(self, event): "certfile": "cert.pem", "keyfile": "key.pem", }, - ) + ), ) From 50af8ffd2b81d89c8af5afaa42b3305722eda1cc Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:35:46 -0700 Subject: [PATCH 03/24] Change tox to only run ruff for now --- .github/workflows/tox.ini | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/tox.ini b/.github/workflows/tox.ini index 42fee1c4..1c6ad867 100644 --- a/.github/workflows/tox.ini +++ b/.github/workflows/tox.ini @@ -3,7 +3,8 @@ # {posargs} in this case would be the path to collection root relative from the .github/workflows dir (`../..`) [tox] -envlist = ruff, darglint, pylint-event-source, pylint-event-filter +# envlist = ruff, darglint, pylint-event-source, pylint-event-filter +envlist = ruff requires = ruff darglint From 4f43f788074f0d4b9fc4db7b775c8eedc1faefc1 Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:36:12 -0700 Subject: [PATCH 04/24] Fix some ruff issues in dashes_to_underscores.py Remaining issues in this file: 15:23: FBT001 Boolean positional arg in function definition 15:41: FBT002 Boolean default value in function definition --- .../eda/plugins/event_filter/dashes_to_underscores.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/extensions/eda/plugins/event_filter/dashes_to_underscores.py b/extensions/eda/plugins/event_filter/dashes_to_underscores.py index b3010de2..07919c30 100644 --- a/extensions/eda/plugins/event_filter/dashes_to_underscores.py +++ b/extensions/eda/plugins/event_filter/dashes_to_underscores.py @@ -12,7 +12,8 @@ import multiprocessing as mp -def main(event, overwrite=True): +def main(event: dict, overwrite: bool = True) -> dict: + """Change dashes in keys to underscores.""" logger = mp.get_logger() logger.info("dashes_to_underscores") q = [] @@ -26,10 +27,7 @@ def main(event, overwrite=True): if "-" in key: new_key = key.replace("-", "_") del o[key] - if new_key in o and overwrite: - o[new_key] = value - logger.info("Replacing %s with %s", key, new_key) - elif new_key not in o: + if (new_key in o and overwrite) or (new_key not in o): o[new_key] = value logger.info("Replacing %s with %s", key, new_key) From 5586984a21c04cc3116d9f80c4ac5727d0b6b598 Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:38:13 -0700 Subject: [PATCH 05/24] Fix ruff issues in insert_hosts_to_meta.py --- extensions/eda/plugins/event_filter/insert_hosts_to_meta.py | 1 + 1 file changed, 1 insertion(+) diff --git a/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py b/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py index f650d1b9..9e30f5b8 100644 --- a/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py +++ b/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py @@ -35,6 +35,7 @@ def main( host_separator: str = None, path_separator: str = ".", ) -> dict[str, Any]: + """Extract hosts from event data and insert into meta dict.""" if not host_path: return event From a25b4b0f3b8460cdc1c2e86ae3cce7caf460253a Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:39:42 -0700 Subject: [PATCH 06/24] Fix ruff issues in json_filter.py --- .../eda/plugins/event_filter/json_filter.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/extensions/eda/plugins/event_filter/json_filter.py b/extensions/eda/plugins/event_filter/json_filter.py index ba922e8e..59a84d6e 100644 --- a/extensions/eda/plugins/event_filter/json_filter.py +++ b/extensions/eda/plugins/event_filter/json_filter.py @@ -16,15 +16,16 @@ import fnmatch -def matches_include_keys(include_keys, s): +def _matches_include_keys(include_keys: list, s: str) -> bool: return any(fnmatch.fnmatch(s, pattern) for pattern in include_keys) -def matches_exclude_keys(exclude_keys, s): +def _matches_exclude_keys(exclude_keys: list, s: str) -> bool: return any(fnmatch.fnmatch(s, pattern) for pattern in exclude_keys) -def main(event, exclude_keys=None, include_keys=None): +def main(event: dict, exclude_keys: list = None, include_keys: list = None) -> dict: + """Filter keys out of events.""" if exclude_keys is None: exclude_keys = [] @@ -37,13 +38,9 @@ def main(event, exclude_keys=None, include_keys=None): o = q.pop() if isinstance(o, dict): for i in list(o.keys()): - if i in include_keys: + if (i in include_keys) or _matches_include_keys(include_keys, i): q.append(o[i]) - elif matches_include_keys(include_keys, i): - q.append(o[i]) - elif i in exclude_keys: - del o[i] - elif matches_exclude_keys(exclude_keys, i): + elif (i in exclude_keys) or _matches_exclude_keys(exclude_keys, i): del o[i] else: q.append(o[i]) From cfd38604ad0fec2ad77392377e15f524c245d4ec Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:40:00 -0700 Subject: [PATCH 07/24] Fix ruff issues in noop.py --- extensions/eda/plugins/event_filter/noop.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions/eda/plugins/event_filter/noop.py b/extensions/eda/plugins/event_filter/noop.py index 91331aca..57c32df9 100644 --- a/extensions/eda/plugins/event_filter/noop.py +++ b/extensions/eda/plugins/event_filter/noop.py @@ -1,5 +1,6 @@ """noop.py: An event filter that does nothing to the input.""" -def main(event): +def main(event: dict) -> dict: + """Return the input.""" return event From f061721fd7cb0c5ccd2a1ff4410c0c3db324e201 Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:40:22 -0700 Subject: [PATCH 08/24] Fix some ruff issues in normalize_keys.py Remaining issues: 46:23: FBT001 Boolean positional arg in function definition 46:41: FBT002 Boolean default value in function definition 53:41: FBT001 Boolean positional arg in function definition 53:58: ANN001 Missing type annotation for function argument `logger` 69:5: RET505 Unnecessary `elif` after `return` statement --- extensions/eda/plugins/event_filter/normalize_keys.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/extensions/eda/plugins/event_filter/normalize_keys.py b/extensions/eda/plugins/event_filter/normalize_keys.py index d0d1f0e7..42b764a5 100644 --- a/extensions/eda/plugins/event_filter/normalize_keys.py +++ b/extensions/eda/plugins/event_filter/normalize_keys.py @@ -1,4 +1,5 @@ -"""normalize_keys.py: +"""normalize_keys.py. + An event filter that changes keys that contain non alpha numeric or underscore to undersocres. For instance, the key server-name becomes the new key server_name @@ -42,13 +43,14 @@ normalize_regex = re.compile("[^0-9a-zA-Z_]+") -def main(event, overwrite=True): +def main(event: dict, overwrite: bool = True) -> dict: + """Change keys that contain non-alphanumeric characters to underscores.""" logger = mp.get_logger() logger.info("normalize_keys") return _normalize_embedded_keys(event, overwrite, logger) -def _normalize_embedded_keys(obj, overwrite, logger): +def _normalize_embedded_keys(obj: dict, overwrite: bool, logger) -> dict: if isinstance(obj, dict): new_dict = {} original_keys = list(obj.keys()) From 5a93c3f65906a893c4de43ca613a26b9595eeed9 Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:41:24 -0700 Subject: [PATCH 09/24] Fix some ruff issues in alertmanager.py Remaining issues: 45:18: ARG001 Unused function argument: `request` 99:89: E501 Line too long (91 > 88 characters) 133:9: T201 `print` found 144:23: ANN101 Missing type annotation for `self` in method --- .../eda/plugins/event_source/alertmanager.py | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/extensions/eda/plugins/event_source/alertmanager.py b/extensions/eda/plugins/event_source/alertmanager.py index ca3a99aa..c6374442 100644 --- a/extensions/eda/plugins/event_source/alertmanager.py +++ b/extensions/eda/plugins/event_source/alertmanager.py @@ -42,12 +42,14 @@ @routes.get("/") -async def status(request: web.Request): +async def status(request: web.Request) -> web.Response: + """Return status of a web request.""" return web.Response(status=200, text="up") @routes.post("/{endpoint}") -async def webhook(request: web.Request): +async def webhook(request: web.Request) -> web.Response: + """Read events from webhook.""" payload = await request.json() endpoint = request.match_info["endpoint"] @@ -102,14 +104,15 @@ async def webhook(request: web.Request): return web.Response(status=202, text="Received") -def clean_host(host): +def clean_host(host: str) -> str: + """Remove port from host string if it exists.""" if ":" in host: return host.split(":")[0] - else: - return host + return host -async def main(queue: asyncio.Queue, args: dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: + """Receive events via alertmanager webhook.""" app = web.Application() app["queue"] = queue app["data_host_path"] = str(args.get("data_host_path", "labels.instance")) @@ -133,9 +136,13 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]): if __name__ == "__main__": + """MockQueue if running directly.""" class MockQueue: - async def put(self, event): - print(event) + """A fake queue.""" + + async def put(self, event: dict) -> None: + """Print the event.""" + print(event) # noqa: T201 asyncio.run(main(MockQueue(), {})) From 97625e15f3f8b565a825596e5a46e611fdba5e2a Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:41:58 -0700 Subject: [PATCH 10/24] Fix some ruff issues in aws_cloudtrail.py Remaining issues: 48:5: ANN201 Missing return type annotation for public function `get_events` 48:5: D103 Missing docstring in public function 48:16: ANN001 Missing type annotation for function argument `events` 48:24: ANN001 Missing type annotation for function argument `last_event_ids` 65:11: ANN201 Missing return type annotation for public function `get_cloudtrail_events` 65:11: D103 Missing docstring in public function 65:33: ANN001 Missing type annotation for function argument `client` 65:41: ANN001 Missing type annotation for function argument `params` 87:27: DTZ003 The use of `datetime.datetime.utcnow()` is not allowed, use `datetime.datetime.now(tz=)` instead 133:23: ANN101 Missing type annotation for `self` in method --- .../eda/plugins/event_source/aws_cloudtrail.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/extensions/eda/plugins/event_source/aws_cloudtrail.py b/extensions/eda/plugins/event_source/aws_cloudtrail.py index 198fdc29..7e6aebda 100644 --- a/extensions/eda/plugins/event_source/aws_cloudtrail.py +++ b/extensions/eda/plugins/event_source/aws_cloudtrail.py @@ -39,7 +39,7 @@ from aiobotocore.session import get_session -def _cloudtrail_event_to_dict(event): +def _cloudtrail_event_to_dict(event: dict) -> dict: event["EventTime"] = event["EventTime"].isoformat() event["CloudTrailEvent"] = json.loads(event["CloudTrailEvent"]) return event @@ -74,7 +74,8 @@ async def get_cloudtrail_events(client, params): } -async def main(queue: asyncio.Queue, args: dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: + """Receive events via AWS CloudTrail.""" delay = int(args.get("delay_seconds", 10)) session = get_session() @@ -104,6 +105,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]): def connection_args(args: dict[str, Any]) -> dict[str, Any]: + """Provide connection arguments to AWS CloudTrail.""" selected_args = {} # Best Practice: get credentials from ~/.aws/credentials or the environment @@ -123,9 +125,13 @@ def connection_args(args: dict[str, Any]) -> dict[str, Any]: if __name__ == "__main__": + """MockQueue if running directly.""" class MockQueue: - async def put(self, event): - print(event) + """A fake queue.""" + + async def put(self, event: dict) -> None: + """Print the event.""" + print(event) # noqa: T201 asyncio.run(main(MockQueue(), {})) From 498d8756c4335eec10178273cc74d78f84630d15 Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:43:02 -0700 Subject: [PATCH 11/24] Fix some ruff issues in aws_sqs_queue.py Remaining issues: 52:17: B904 Within an `except` clause, raise exceptions with `raise ... from err` or `raise ... from None` to distinguish them from errors in exception handling 52:17: TRY200 Use `raise from` to specify exception cause 111:23: ANN101 Missing type annotation for `self` in method --- .../eda/plugins/event_source/aws_sqs_queue.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/extensions/eda/plugins/event_source/aws_sqs_queue.py b/extensions/eda/plugins/event_source/aws_sqs_queue.py index 43bb2093..94a1d5f4 100644 --- a/extensions/eda/plugins/event_source/aws_sqs_queue.py +++ b/extensions/eda/plugins/event_source/aws_sqs_queue.py @@ -30,7 +30,8 @@ from aiobotocore.session import get_session -async def main(queue: asyncio.Queue, args: dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: + """Receive events via an AWS SQS queue.""" logger = logging.getLogger() if "name" not in args: @@ -49,8 +50,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]): == "AWS.SimpleQueueService.NonExistentQueue" ): raise ValueError("Queue %s does not exist" % queue_name) - else: - raise + raise queue_url = response["QueueUrl"] @@ -83,6 +83,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]): def connection_args(args: dict[str, Any]) -> dict[str, Any]: + """Provide connection arguments to AWS SQS queue.""" selected_args = {} # Best Practice: get credentials from ~/.aws/credentials or the environment @@ -102,9 +103,13 @@ def connection_args(args: dict[str, Any]) -> dict[str, Any]: if __name__ == "__main__": + """MockQueue if running directly.""" class MockQueue: - async def put(self, event): - print(event) + """A fake queue.""" + + async def put(self, event: dict) -> None: + """Print the event.""" + print(event) # noqa: T201 asyncio.run(main(MockQueue(), {"region": "us-east-1", "name": "eda"})) From c3425681ead852987cc9d4645e3a8c1812e9ca0b Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:44:02 -0700 Subject: [PATCH 12/24] Fix some ruff issues in azure_service_bus.py Remaining issues: 66:30: ANN101 Missing type annotation for `self` in method --- .../plugins/event_source/azure_service_bus.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/extensions/eda/plugins/event_source/azure_service_bus.py b/extensions/eda/plugins/event_source/azure_service_bus.py index ec5dd2fd..36d77d20 100644 --- a/extensions/eda/plugins/event_source/azure_service_bus.py +++ b/extensions/eda/plugins/event_source/azure_service_bus.py @@ -27,9 +27,11 @@ def receive_events( loop: asyncio.events.AbstractEventLoop, queue: asyncio.Queue, args: dict[str, Any], -): +) -> None: + """Receive events from service bus.""" servicebus_client = ServiceBusClient.from_connection_string( - conn_str=args["conn_str"], logging_enable=bool(args.get("logging_enable", True)), + conn_str=args["conn_str"], + logging_enable=bool(args.get("logging_enable", True)), ) with servicebus_client: @@ -47,7 +49,8 @@ def receive_events( receiver.complete_message(msg) -async def main(queue: asyncio.Queue, args: dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: + """Receive events from service bus in a loop.""" loop = asyncio.get_running_loop() with concurrent.futures.ThreadPoolExecutor(max_workers=1) as task_pool: @@ -55,10 +58,14 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]): if __name__ == "__main__": + """MockQueue if running directly.""" class MockQueue: - def put_nowait(self, event): - print(event) + """A fake queue.""" + + async def put_nowait(self, event: dict) -> None: + """Print the event.""" + print(event) # noqa: T201 args = { "conn_str": "Endpoint=sb://foo.servicebus.windows.net/", From 3a2ed23ec05035863cef16e505bdb5405bac427e Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:44:37 -0700 Subject: [PATCH 13/24] Fix some ruff issues in file.py Remaining issues: 25:16: ANN001 Missing type annotation for function argument `queue` 27:10: PTH123 `open()` should be replaced by `Path.open()` 35:89: E501 Line too long (91 > 88 characters) 36:17: TRY004 Prefer `TypeError` exception for invalid type 36:23: TRY002 Create your own exception 41:23: TRY002 Create your own exception 48:5: C901 `main` is too complex (11 > 10) 48:10: ANN001 Missing type annotation for function argument `queue` 50:14: PTH100 `os.path.abspath()` should be replaced by `Path.resolve()` 59:22: ANN101 Missing type annotation for `self` in method 59:30: ANN003 Missing type annotation for `**kwargs` 62:24: ANN101 Missing type annotation for `self` in method 66:24: ANN101 Missing type annotation for `self` in method 69:25: ANN101 Missing type annotation for `self` in method 73:22: ANN101 Missing type annotation for `self` in method 80:21: PTH120 `os.path.dirname()` should be replaced by `Path.parent` 97:23: ANN101 Missing type annotation for `self` in method --- extensions/eda/plugins/event_source/file.py | 22 +++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/extensions/eda/plugins/event_source/file.py b/extensions/eda/plugins/event_source/file.py index 5d48cfc3..8a3519f0 100644 --- a/extensions/eda/plugins/event_source/file.py +++ b/extensions/eda/plugins/event_source/file.py @@ -22,7 +22,8 @@ from watchdog.observers import Observer -def send_facts(queue, filename): +def send_facts(queue, filename: str) -> None: + """Send facts to the queue.""" with open(filename) as f: data = yaml.safe_load(f.read()) if data is None: @@ -44,7 +45,8 @@ def send_facts(queue, filename): queue.put(item) -def main(queue, args): +def main(queue, args: dict) -> None: + """Load facts from YAML files initially and when the file changes.""" files = [os.path.abspath(f) for f in args.get("files", [])] if not files: @@ -57,18 +59,18 @@ class Handler(RegexMatchingEventHandler): def __init__(self, **kwargs) -> None: RegexMatchingEventHandler.__init__(self, **kwargs) - def on_created(self, event): + def on_created(self, event: dict) -> None: if event.src_path in files: send_facts(queue, event.src_path) - def on_deleted(self, event): + def on_deleted(self, event: dict) -> None: pass - def on_modified(self, event): + def on_modified(self, event: dict) -> None: if event.src_path in files: send_facts(queue, event.src_path) - def on_moved(self, event): + def on_moved(self, event: dict) -> None: pass observer = Observer() @@ -87,9 +89,13 @@ def on_moved(self, event): if __name__ == "__main__": + """MockQueue if running directly.""" class MockQueue: - def put(self, event): - print(event) + """A fake queue.""" + + async def put(self, event: dict) -> None: + """Print the event.""" + print(event) # noqa: T201 main(MockQueue(), {"files": ["facts.yml"]}) From 4ac56fa25a7155716f9f126b2da9a12386833766 Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:47:15 -0700 Subject: [PATCH 14/24] Fix some ruff issues in file_watch.py Remaining issues: 28:89: E501 Line too long (92 > 88 characters) 33:22: ANN101 Missing type annotation for `self` in method 33:30: ANN003 Missing type annotation for `**kwargs` 36:24: ANN101 Missing type annotation for `self` in method 47:24: ANN101 Missing type annotation for `self` in method 58:25: ANN101 Missing type annotation for `self` in method 69:22: ANN101 Missing type annotation for `self` in method 106:30: ANN101 Missing type annotation for `self` in method 110:44: S108 Probable insecure usage of temporary file or directory: "/tmp" --- .../eda/plugins/event_source/file_watch.py | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/extensions/eda/plugins/event_source/file_watch.py b/extensions/eda/plugins/event_source/file_watch.py index 60650159..4e8b379a 100644 --- a/extensions/eda/plugins/event_source/file_watch.py +++ b/extensions/eda/plugins/event_source/file_watch.py @@ -1,4 +1,4 @@ -"""file_watch.py. +r"""file_watch.py. An ansible-rulebook event source plugin for watching file system changes. @@ -25,14 +25,15 @@ from watchdog.observers import Observer -def watch(loop, queue, args): +def watch(loop: asyncio.events.AbstractEventLoop, queue: asyncio.Queue, args: dict) -> None: + """Watch for changes and put events on the queue.""" root_path = args["path"] class Handler(RegexMatchingEventHandler): def __init__(self, **kwargs) -> None: RegexMatchingEventHandler.__init__(self, **kwargs) - def on_created(self, event): + def on_created(self, event: dict) -> None: loop.call_soon_threadsafe( queue.put_nowait, { @@ -43,7 +44,7 @@ def on_created(self, event): }, ) - def on_deleted(self, event): + def on_deleted(self, event: dict) -> None: loop.call_soon_threadsafe( queue.put_nowait, { @@ -54,7 +55,7 @@ def on_deleted(self, event): }, ) - def on_modified(self, event): + def on_modified(self, event: dict) -> None: loop.call_soon_threadsafe( queue.put_nowait, { @@ -65,7 +66,7 @@ def on_modified(self, event): }, ) - def on_moved(self, event): + def on_moved(self, event: dict) -> None: loop.call_soon_threadsafe( queue.put_nowait, { @@ -88,7 +89,8 @@ def on_moved(self, event): observer.join() -async def main(queue, args): +async def main(queue: asyncio.Queue, args: dict) -> None: + """Watch for changes to a file and put events on the queue.""" loop = asyncio.get_event_loop() with concurrent.futures.ThreadPoolExecutor(max_workers=1) as task_pool: @@ -96,9 +98,13 @@ async def main(queue, args): if __name__ == "__main__": + """MockQueue if running directly.""" class MockQueue: - def put_nowait(self, event): - print(event) + """A fake queue.""" + + async def put_nowait(self, event: dict) -> None: + """Print the event.""" + print(event) # noqa: T201 asyncio.run(main(MockQueue(), {"path": "/tmp", "recursive": True})) From 44328436b69e0f0b37d957feaec767981ca9b9b6 Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:48:03 -0700 Subject: [PATCH 15/24] Fix some ruff issues in generic.py Remaining issues: 51:11: C901 `main` is too complex (15 > 10) 51:11: PLR0912 Too many branches (14 > 12) 94:49: DTZ005 The use of `datetime.datetime.now()` without `tz` argument is not allowed 98:45: DTZ005 The use of `datetime.datetime.now()` without `tz` argument is not allowed 103:21: T201 `print` found 118:23: ANN101 Missing type annotation for `self` in method --- .../eda/plugins/event_source/generic.py | 69 ++++++++++--------- 1 file changed, 37 insertions(+), 32 deletions(-) diff --git a/extensions/eda/plugins/event_source/generic.py b/extensions/eda/plugins/event_source/generic.py index b4fd6ace..7e6c31bc 100644 --- a/extensions/eda/plugins/event_source/generic.py +++ b/extensions/eda/plugins/event_source/generic.py @@ -1,3 +1,32 @@ +"""A generic source plugin that allows you to insert custom data. + +The event data to insert into the queue is specified in the required +parameter payload and is an array of events. + +Optional Parameters: +randomize True|False Randomize the events in the payload, default False +display True|False Display the event data in stdout, default False +add_timestamp True|False Add an event timestamp, default False +time_format local|iso8601|epoch The time format of event timestamp, + default local +create_index str The index to create for each event starts at 0 +startup_delay float Number of seconds to wait before injecting events + into the queue. Default 0 +event_delay float Number of seconds to wait before injecting the next + event from the payload. Default 0 +repeat_delay float Number of seconds to wait before injecting a repeated + event from the payload. Default 0 +loop_delay float Number of seconds to wait before inserting the + next set of events. Default 0 +shutdown_after float Number of seconds to wait before shutting down the + plugin. Default 0 +loop_count int Number of times the set of events in the playload + should be repeated. Default 0 +repeat_count int Number of times each individual event in the playload + should be repeated. Default 1 + +""" + # Copyright 2022 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,37 +47,9 @@ from datetime import datetime from typing import Any -""" A generic source plugin that allows you to insert custom data - - The event data to insert into the queue is specified in the required - parameter payload and is an array of events. - - Optional Parameters: - randomize True|False Randomize the events in the payload, default False - display True|False Display the event data in stdout, default False - add_timestamp True|False Add an event timestamp, default False - time_format local|iso8601|epoch The time format of event timestamp, - default local - create_index str The index to create for each event starts at 0 - startup_delay float Number of seconds to wait before injecting events - into the queue. Default 0 - event_delay float Number of seconds to wait before injecting the next - event from the payload. Default 0 - repeat_delay float Number of seconds to wait before injecting a repeated - event from the payload. Default 0 - loop_delay float Number of seconds to wait before inserting the - next set of events. Default 0 - shutdown_after float Number of seconds to wait before shutting down the - plugin. Default 0 - loop_count int Number of times the set of events in the playload - should be repeated. Default 0 - repeat_count int Number of times each individual event in the playload - should be repeated. Default 1 -""" - - -async def main(queue: asyncio.Queue, args: dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: + """Insert event data into the queue.""" payload = args.get("payload") randomize = args.get("randomize", False) display = args.get("display", False) @@ -109,10 +110,14 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]): if __name__ == "__main__": + """MockQueue if running directly.""" class MockQueue: - async def put(self, event): - print(event) + """A fake queue.""" + + async def put(self, event: dict) -> None: + """Print the event.""" + print(event) # noqa: T201 asyncio.run( main( From 8f10635a049b4a73448400b72d4ad05ebbd11d07 Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:48:44 -0700 Subject: [PATCH 16/24] Fix some ruff issues in journald.py Remaining issues: 63:23: ANN101 Missing type annotation for `self` in method --- extensions/eda/plugins/event_source/journald.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/extensions/eda/plugins/event_source/journald.py b/extensions/eda/plugins/event_source/journald.py index 22079ca0..02ace505 100644 --- a/extensions/eda/plugins/event_source/journald.py +++ b/extensions/eda/plugins/event_source/journald.py @@ -1,9 +1,11 @@ -"""journald.py +"""journald.py. + An ansible-events event source plugin that tails systemd journald logs. Arguments: --------- - match - return messages that matches this field, see https://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html # noqa + match - return messages that matches this field, + see https://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html Examples: -------- @@ -27,7 +29,8 @@ from systemd import journal -async def main(queue: asyncio.Queue, args: dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: + """Tail systemd journald logs.""" delay = args.get("delay", 0) match = args.get("match", []) @@ -52,9 +55,13 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]): if __name__ == "__main__": + """MockQueue if running directly.""" class MockQueue: - async def put(self, event): - print(event) + """A fake queue.""" + + async def put(self, event: dict) -> None: + """Print the event.""" + print(event) # noqa: T201 asyncio.run(main(MockQueue(), {"match": "ALL"})) From 821a059e6f58630d9efcd0631952005c823e4eae Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:49:05 -0700 Subject: [PATCH 17/24] Fix some ruff issues in kafka.py Remaining issues: 54:15: TRY002 Create your own exception 87:17: TRY400 Use `logging.exception` instead of `logging.error` 103:23: ANN101 Missing type annotation for `self` in method --- extensions/eda/plugins/event_source/kafka.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/extensions/eda/plugins/event_source/kafka.py b/extensions/eda/plugins/event_source/kafka.py index 500147c2..9128a48e 100644 --- a/extensions/eda/plugins/event_source/kafka.py +++ b/extensions/eda/plugins/event_source/kafka.py @@ -33,7 +33,8 @@ from aiokafka.helpers import create_ssl_context -async def main(queue: asyncio.Queue, args: dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: + """Receive events via a kafka topic.""" logger = logging.getLogger() topic = args.get("topic") @@ -94,10 +95,14 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]): if __name__ == "__main__": + """MockQueue if running directly.""" class MockQueue: - async def put(self, event): - print(event) + """A fake queue.""" + + async def put(self, event: dict) -> None: + """Print the event.""" + print(event) # noqa: T201 asyncio.run( main( From 8dbc2d4fa7cc137159737c982ae9b92a2aab8efe Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:49:39 -0700 Subject: [PATCH 18/24] Fix some ruff issues in range.py Remaining issues: 36:23: ANN101 Missing type annotation for `self` in method --- extensions/eda/plugins/event_source/range.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/extensions/eda/plugins/event_source/range.py b/extensions/eda/plugins/event_source/range.py index 71933272..53b2b17b 100644 --- a/extensions/eda/plugins/event_source/range.py +++ b/extensions/eda/plugins/event_source/range.py @@ -18,7 +18,8 @@ from typing import Any -async def main(queue: asyncio.Queue, args: dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: + """Generate events with an increasing index i with a limit.""" delay = args.get("delay", 0) for i in range(int(args["limit"])): @@ -27,9 +28,13 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]): if __name__ == "__main__": + """MockQueue if running directly.""" class MockQueue: - async def put(self, event): - print(event) + """A fake queue.""" + + async def put(self, event: dict) -> None: + """Print the event.""" + print(event) # noqa: T201 asyncio.run(main(MockQueue(), {"limit": 5})) From fc3bb7b6d90210aa18a6c7556923dc5df27997e3 Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:50:34 -0700 Subject: [PATCH 19/24] Fix some ruff issues in tick.py Remaining issues: 36:23: ANN101 Missing type annotation for `self` in method --- extensions/eda/plugins/event_source/tick.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/extensions/eda/plugins/event_source/tick.py b/extensions/eda/plugins/event_source/tick.py index 130d8179..b4e99ef1 100644 --- a/extensions/eda/plugins/event_source/tick.py +++ b/extensions/eda/plugins/event_source/tick.py @@ -18,7 +18,8 @@ from typing import Any -async def main(queue: asyncio.Queue, args: dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: + """Generate events with an increasing index i and a time between ticks.""" delay = args.get("delay", 1) for i in itertools.count(start=1): @@ -27,9 +28,13 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]): if __name__ == "__main__": + """MockQueue if running directly.""" class MockQueue: - async def put(self, event): - print(event) + """A fake queue.""" + + async def put(self, event: dict) -> None: + """Print the event.""" + print(event) # noqa: T201 asyncio.run(main(MockQueue(), {"delay": 1})) From eeb61973f8b575c35567dfca922eadf5a50cadd4 Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:50:56 -0700 Subject: [PATCH 20/24] Fix some ruff issues in url_check.py Remaining issues: 74:23: ANN101 Missing type annotation for `self` in method --- extensions/eda/plugins/event_source/url_check.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/extensions/eda/plugins/event_source/url_check.py b/extensions/eda/plugins/event_source/url_check.py index 2d5b9301..f8142071 100644 --- a/extensions/eda/plugins/event_source/url_check.py +++ b/extensions/eda/plugins/event_source/url_check.py @@ -24,8 +24,10 @@ import aiohttp +OK = 200 -async def main(queue: asyncio.Queue, args: dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: + """Poll a set of URLs and send events with status.""" urls = args.get("urls", []) delay = int(args.get("delay", 1)) verify_ssl = args.get("verify_ssl", True) @@ -42,7 +44,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]): { "url_check": { "url": url, - "status": "up" if resp.status == 200 else "down", + "status": "up" if resp.status == OK else "down", "status_code": resp.status, }, }, @@ -64,9 +66,13 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]): if __name__ == "__main__": + """MockQueue if running directly.""" class MockQueue: - async def put(self, event): - print(event) + """A fake queue.""" + + async def put(self, event: dict) -> None: + """Print the event.""" + print(event) # noqa: T201 asyncio.run(main(MockQueue(), {"urls": ["http://redhat.com"]})) From 9c17bd95c49ed8a51fee986d90d68840ba6adcea Mon Sep 17 00:00:00 2001 From: Erik Clarizio Date: Tue, 20 Jun 2023 09:51:14 -0700 Subject: [PATCH 21/24] Fix some ruff issues in webhook.py Remaining issues: 45:11: ANN201 Missing return type annotation for public function `bearer_auth` 50:13: TRY301 Abstract `raise` to an inner function 51:9: RET506 Unnecessary `elif` after `raise` statement 52:13: TRY301 Abstract `raise` to an inner function 54:9: B904 Within an `except` clause, raise exceptions with `raise ... from err` or `raise ... from None` to distinguish them from errors in exception handling 54:9: TRY200 Use `raise from` to specify exception cause 56:9: B904 Within an `except` clause, raise exceptions with `raise ... from err` or `raise ... from None` to distinguish them from errors in exception handling 56:9: TRY200 Use `raise from` to specify exception cause 84:13: TRY400 Use `logging.exception` instead of `logging.error` 91:29: S104 Possible binding to all interfaces 111:23: ANN101 Missing type annotation for `self` in method --- extensions/eda/plugins/event_source/webhook.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/extensions/eda/plugins/event_source/webhook.py b/extensions/eda/plugins/event_source/webhook.py index 6f69685b..ac5a005c 100644 --- a/extensions/eda/plugins/event_source/webhook.py +++ b/extensions/eda/plugins/event_source/webhook.py @@ -27,7 +27,8 @@ @routes.post(r"/{endpoint:.*}") -async def webhook(request: web.Request): +async def webhook(request: web.Request) -> web.Response: + """Return response to webhook request.""" payload = await request.json() endpoint = request.match_info["endpoint"] headers = dict(request.headers) @@ -42,6 +43,7 @@ async def webhook(request: web.Request): @web.middleware async def bearer_auth(request: web.Request, handler: Callable): + """Verify authorization is Bearer type.""" try: scheme, token = request.headers["Authorization"].strip().split(" ") if scheme != "Bearer": @@ -56,7 +58,8 @@ async def bearer_auth(request: web.Request, handler: Callable): return await handler(request) -async def main(queue: asyncio.Queue, args: dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: + """Receive events via webhook.""" if "port" not in args: msg = "Missing required argument: port" raise ValueError(msg) @@ -100,10 +103,14 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]): if __name__ == "__main__": + """MockQueue if running directly.""" class MockQueue: - async def put(self, event): - print(event) + """A fake queue.""" + + async def put(self, event: dict) -> None: + """Print the event.""" + print(event) # noqa: T201 asyncio.run( main( From 55d18b3993dedff07ed53dc23ed4bf80202aa86c Mon Sep 17 00:00:00 2001 From: Bill Wei Date: Thu, 22 Jun 2023 14:06:45 -0400 Subject: [PATCH 22/24] Fix remaining ruff issues --- .../event_filter/dashes_to_underscores.py | 2 +- .../event_filter/insert_hosts_to_meta.py | 6 ++- .../eda/plugins/event_filter/json_filter.py | 8 +++- .../plugins/event_filter/normalize_keys.py | 19 ++++++--- .../eda/plugins/event_source/alertmanager.py | 13 +++--- .../plugins/event_source/aws_cloudtrail.py | 15 +++---- .../eda/plugins/event_source/aws_sqs_queue.py | 6 +-- .../plugins/event_source/azure_service_bus.py | 11 +++-- extensions/eda/plugins/event_source/file.py | 40 ++++++++++--------- .../eda/plugins/event_source/file_watch.py | 23 ++++++----- .../eda/plugins/event_source/generic.py | 36 +++++++++++------ .../eda/plugins/event_source/journald.py | 4 +- extensions/eda/plugins/event_source/kafka.py | 10 ++--- extensions/eda/plugins/event_source/range.py | 4 +- extensions/eda/plugins/event_source/tick.py | 4 +- .../eda/plugins/event_source/url_check.py | 5 ++- .../eda/plugins/event_source/webhook.py | 29 ++++++++------ 17 files changed, 141 insertions(+), 94 deletions(-) diff --git a/extensions/eda/plugins/event_filter/dashes_to_underscores.py b/extensions/eda/plugins/event_filter/dashes_to_underscores.py index 07919c30..d36d7577 100644 --- a/extensions/eda/plugins/event_filter/dashes_to_underscores.py +++ b/extensions/eda/plugins/event_filter/dashes_to_underscores.py @@ -12,7 +12,7 @@ import multiprocessing as mp -def main(event: dict, overwrite: bool = True) -> dict: +def main(event: dict, overwrite: bool = True) -> dict: # noqa: FBT001, FBT002 """Change dashes in keys to underscores.""" logger = mp.get_logger() logger.info("dashes_to_underscores") diff --git a/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py b/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py index 9e30f5b8..9fc9575e 100644 --- a/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py +++ b/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py @@ -24,6 +24,8 @@ """ +from __future__ import annotations + from typing import Any import dpath @@ -31,8 +33,8 @@ def main( event: dict[str, Any], - host_path: str = None, - host_separator: str = None, + host_path: str | None = None, + host_separator: str | None = None, path_separator: str = ".", ) -> dict[str, Any]: """Extract hosts from event data and insert into meta dict.""" diff --git a/extensions/eda/plugins/event_filter/json_filter.py b/extensions/eda/plugins/event_filter/json_filter.py index 59a84d6e..bf86ce8b 100644 --- a/extensions/eda/plugins/event_filter/json_filter.py +++ b/extensions/eda/plugins/event_filter/json_filter.py @@ -13,6 +13,8 @@ """ +from __future__ import annotations + import fnmatch @@ -24,7 +26,11 @@ def _matches_exclude_keys(exclude_keys: list, s: str) -> bool: return any(fnmatch.fnmatch(s, pattern) for pattern in exclude_keys) -def main(event: dict, exclude_keys: list = None, include_keys: list = None) -> dict: +def main( + event: dict, + exclude_keys: list | None = None, + include_keys: list | None = None, +) -> dict: """Filter keys out of events.""" if exclude_keys is None: exclude_keys = [] diff --git a/extensions/eda/plugins/event_filter/normalize_keys.py b/extensions/eda/plugins/event_filter/normalize_keys.py index 42b764a5..2c6409b5 100644 --- a/extensions/eda/plugins/event_filter/normalize_keys.py +++ b/extensions/eda/plugins/event_filter/normalize_keys.py @@ -37,20 +37,25 @@ """ +import logging import multiprocessing as mp import re normalize_regex = re.compile("[^0-9a-zA-Z_]+") -def main(event: dict, overwrite: bool = True) -> dict: +def main(event: dict, overwrite: bool = True) -> dict: # noqa: FBT001, FBT002 """Change keys that contain non-alphanumeric characters to underscores.""" logger = mp.get_logger() logger.info("normalize_keys") return _normalize_embedded_keys(event, overwrite, logger) -def _normalize_embedded_keys(obj: dict, overwrite: bool, logger) -> dict: +def _normalize_embedded_keys( + obj: dict, + overwrite: bool, # noqa: FBT001 + logger: logging.Logger, +) -> dict: if isinstance(obj, dict): new_dict = {} original_keys = list(obj.keys()) @@ -58,14 +63,18 @@ def _normalize_embedded_keys(obj: dict, overwrite: bool, logger) -> dict: new_key = normalize_regex.sub("_", key) if new_key == key or new_key not in original_keys: new_dict[new_key] = _normalize_embedded_keys( - obj[key], overwrite, logger, + obj[key], + overwrite, + logger, ) elif new_key in original_keys and overwrite: new_dict[new_key] = _normalize_embedded_keys( - obj[key], overwrite, logger, + obj[key], + overwrite, + logger, ) logger.warning("Replacing existing key %s", new_key) return new_dict - elif isinstance(obj, list): + if isinstance(obj, list): return [_normalize_embedded_keys(item, overwrite, logger) for item in obj] return obj diff --git a/extensions/eda/plugins/event_source/alertmanager.py b/extensions/eda/plugins/event_source/alertmanager.py index c6374442..7384b347 100644 --- a/extensions/eda/plugins/event_source/alertmanager.py +++ b/extensions/eda/plugins/event_source/alertmanager.py @@ -33,6 +33,7 @@ """ import asyncio +import logging from typing import Any from aiohttp import web @@ -42,7 +43,7 @@ @routes.get("/") -async def status(request: web.Request) -> web.Response: +async def status(_request: web.Request) -> web.Response: """Return status of a web request.""" return web.Response(status=200, text="up") @@ -96,7 +97,9 @@ async def webhook(request: web.Request) -> web.Response: { "alert": alert, "meta": { - "endpoint": endpoint, "headers": dict(request.headers), "hosts": hosts, + "endpoint": endpoint, + "headers": dict(request.headers), + "hosts": hosts, }, }, ) @@ -130,7 +133,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: try: await asyncio.Future() except asyncio.CancelledError: - print("Plugin Task Cancelled") + logging.getLogger().info("Plugin Task Cancelled") finally: await runner.cleanup() @@ -141,8 +144,8 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" - print(event) # noqa: T201 + print(event) # noqa: T201 asyncio.run(main(MockQueue(), {})) diff --git a/extensions/eda/plugins/event_source/aws_cloudtrail.py b/extensions/eda/plugins/event_source/aws_cloudtrail.py index 7e6aebda..810ae4a2 100644 --- a/extensions/eda/plugins/event_source/aws_cloudtrail.py +++ b/extensions/eda/plugins/event_source/aws_cloudtrail.py @@ -36,6 +36,7 @@ from datetime import datetime from typing import Any +from aiobotocore.client import BaseClient from aiobotocore.session import get_session @@ -45,7 +46,7 @@ def _cloudtrail_event_to_dict(event: dict) -> dict: return event -def get_events(events, last_event_ids): +def _get_events(events: list[dict], last_event_ids: list) -> list: event_time = None event_ids = [] result = [] @@ -62,7 +63,7 @@ def get_events(events, last_event_ids): return result, event_time, event_ids -async def get_cloudtrail_events(client, params): +async def _get_cloudtrail_events(client: BaseClient, params: dict) -> list[dict]: paginator = client.get_paginator("lookup_events") results = await paginator.paginate(**params).build_full_result() return results.get("Events", []) @@ -84,17 +85,17 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: if args.get(k) is not None: params[v] = args.get(k) - params["StartTime"] = datetime.utcnow() + params["StartTime"] = datetime.utcnow() # noqa: DTZ003 async with session.create_client("cloudtrail", **connection_args(args)) as client: event_time = None event_ids = [] while True: - events = await get_cloudtrail_events(client, params) + events = await _get_cloudtrail_events(client, params) if event_time is not None: params["StartTime"] = event_time - events, c_event_time, c_event_ids = get_events(events, event_ids) + events, c_event_time, c_event_ids = _get_events(events, event_ids) for event in events: await queue.put(_cloudtrail_event_to_dict(event)) @@ -130,8 +131,8 @@ def connection_args(args: dict[str, Any]) -> dict[str, Any]: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" - print(event) # noqa: T201 + print(event) # noqa: T201 asyncio.run(main(MockQueue(), {})) diff --git a/extensions/eda/plugins/event_source/aws_sqs_queue.py b/extensions/eda/plugins/event_source/aws_sqs_queue.py index 94a1d5f4..30f50421 100644 --- a/extensions/eda/plugins/event_source/aws_sqs_queue.py +++ b/extensions/eda/plugins/event_source/aws_sqs_queue.py @@ -49,7 +49,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: err.response["Error"]["Code"] == "AWS.SimpleQueueService.NonExistentQueue" ): - raise ValueError("Queue %s does not exist" % queue_name) + raise ValueError("Queue %s does not exist" % queue_name) from None raise queue_url = response["QueueUrl"] @@ -108,8 +108,8 @@ def connection_args(args: dict[str, Any]) -> dict[str, Any]: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" - print(event) # noqa: T201 + print(event) # noqa: T201 asyncio.run(main(MockQueue(), {"region": "us-east-1", "name": "eda"})) diff --git a/extensions/eda/plugins/event_source/azure_service_bus.py b/extensions/eda/plugins/event_source/azure_service_bus.py index 36d77d20..f4ab6498 100644 --- a/extensions/eda/plugins/event_source/azure_service_bus.py +++ b/extensions/eda/plugins/event_source/azure_service_bus.py @@ -26,7 +26,9 @@ def receive_events( - loop: asyncio.events.AbstractEventLoop, queue: asyncio.Queue, args: dict[str, Any], + loop: asyncio.events.AbstractEventLoop, + queue: asyncio.Queue, + args: dict[str, Any], ) -> None: """Receive events from service bus.""" servicebus_client = ServiceBusClient.from_connection_string( @@ -44,7 +46,8 @@ def receive_events( body = json.loads(body) loop.call_soon_threadsafe( - queue.put_nowait, {"body": body, "meta": meta}, + queue.put_nowait, + {"body": body, "meta": meta}, ) receiver.complete_message(msg) @@ -63,9 +66,9 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: class MockQueue: """A fake queue.""" - async def put_nowait(self, event: dict) -> None: + async def put_nowait(self: "MockQueue", event: dict) -> None: """Print the event.""" - print(event) # noqa: T201 + print(event) # noqa: T201 args = { "conn_str": "Endpoint=sb://foo.servicebus.windows.net/", diff --git a/extensions/eda/plugins/event_source/file.py b/extensions/eda/plugins/event_source/file.py index 8a3519f0..314a14b9 100644 --- a/extensions/eda/plugins/event_source/file.py +++ b/extensions/eda/plugins/event_source/file.py @@ -15,16 +15,16 @@ """ -import os +import pathlib import yaml from watchdog.events import RegexMatchingEventHandler from watchdog.observers import Observer -def send_facts(queue, filename: str) -> None: +def send_facts(queue, filename: str) -> None: # noqa: ANN001 """Send facts to the queue.""" - with open(filename) as f: + with pathlib.Path(filename).open() as f: data = yaml.safe_load(f.read()) if data is None: return @@ -32,52 +32,54 @@ def send_facts(queue, filename: str) -> None: queue.put(data) else: if not isinstance(data, list): - msg = f"Unsupported facts type, expects a list of dicts found {type(data)}" - raise Exception( - msg, + msg = ( + "Unsupported facts type, expects a list of dicts found " + f"{type(data)}" ) + raise TypeError(msg) if not all(bool(isinstance(item, dict)) for item in data): msg = f"Unsupported facts type, expects a list of dicts found {data}" - raise Exception( - msg, - ) + raise TypeError(msg) for item in data: queue.put(item) -def main(queue, args: dict) -> None: +def main(queue, args: dict) -> None: # noqa: ANN001 """Load facts from YAML files initially and when the file changes.""" - files = [os.path.abspath(f) for f in args.get("files", [])] + files = [pathlib.Path(f).resolve().as_posix() for f in args.get("files", [])] if not files: return for filename in files: send_facts(queue, filename) + _observe_files(queue, files) + +def _observe_files(queue, files: list[str]) -> None: # noqa: ANN001 class Handler(RegexMatchingEventHandler): - def __init__(self, **kwargs) -> None: + def __init__(self: "Handler", **kwargs) -> None: # noqa: ANN003 RegexMatchingEventHandler.__init__(self, **kwargs) - def on_created(self, event: dict) -> None: + def on_created(self: "Handler", event: dict) -> None: if event.src_path in files: send_facts(queue, event.src_path) - def on_deleted(self, event: dict) -> None: + def on_deleted(self: "Handler", event: dict) -> None: pass - def on_modified(self, event: dict) -> None: + def on_modified(self: "Handler", event: dict) -> None: if event.src_path in files: send_facts(queue, event.src_path) - def on_moved(self, event: dict) -> None: + def on_moved(self: "Handler", event: dict) -> None: pass observer = Observer() handler = Handler() for filename in files: - directory = os.path.dirname(filename) + directory = pathlib.Path(filename).parent observer.schedule(handler, directory, recursive=False) observer.start() @@ -94,8 +96,8 @@ def on_moved(self, event: dict) -> None: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" - print(event) # noqa: T201 + print(event) # noqa: T201 main(MockQueue(), {"files": ["facts.yml"]}) diff --git a/extensions/eda/plugins/event_source/file_watch.py b/extensions/eda/plugins/event_source/file_watch.py index 4e8b379a..6e23c997 100644 --- a/extensions/eda/plugins/event_source/file_watch.py +++ b/extensions/eda/plugins/event_source/file_watch.py @@ -20,20 +20,25 @@ import asyncio import concurrent.futures +from typing import Optional from watchdog.events import RegexMatchingEventHandler from watchdog.observers import Observer -def watch(loop: asyncio.events.AbstractEventLoop, queue: asyncio.Queue, args: dict) -> None: +def watch( + loop: asyncio.events.AbstractEventLoop, + queue: asyncio.Queue, + args: dict, +) -> None: """Watch for changes and put events on the queue.""" root_path = args["path"] class Handler(RegexMatchingEventHandler): - def __init__(self, **kwargs) -> None: + def __init__(self: "Handler", **kwargs: Optional(list[str])) -> None: RegexMatchingEventHandler.__init__(self, **kwargs) - def on_created(self, event: dict) -> None: + def on_created(self: "Handler", event: dict) -> None: loop.call_soon_threadsafe( queue.put_nowait, { @@ -44,7 +49,7 @@ def on_created(self, event: dict) -> None: }, ) - def on_deleted(self, event: dict) -> None: + def on_deleted(self: "Handler", event: dict) -> None: loop.call_soon_threadsafe( queue.put_nowait, { @@ -55,7 +60,7 @@ def on_deleted(self, event: dict) -> None: }, ) - def on_modified(self, event: dict) -> None: + def on_modified(self: "Handler", event: dict) -> None: loop.call_soon_threadsafe( queue.put_nowait, { @@ -66,7 +71,7 @@ def on_modified(self, event: dict) -> None: }, ) - def on_moved(self, event: dict) -> None: + def on_moved(self: "Handler", event: dict) -> None: loop.call_soon_threadsafe( queue.put_nowait, { @@ -103,8 +108,8 @@ async def main(queue: asyncio.Queue, args: dict) -> None: class MockQueue: """A fake queue.""" - async def put_nowait(self, event: dict) -> None: + async def put_nowait(self: "MockQueue", event: dict) -> None: """Print the event.""" - print(event) # noqa: T201 + print(event) # noqa: T201 - asyncio.run(main(MockQueue(), {"path": "/tmp", "recursive": True})) + asyncio.run(main(MockQueue(), {"path": "/tmp", "recursive": True})) # noqa: S108 diff --git a/extensions/eda/plugins/event_source/generic.py b/extensions/eda/plugins/event_source/generic.py index 7e6c31bc..fb3ca10e 100644 --- a/extensions/eda/plugins/event_source/generic.py +++ b/extensions/eda/plugins/event_source/generic.py @@ -86,21 +86,12 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: if not event: continue for _ignore in range(repeat_count): - data = {} - if create_index: - data[create_index] = index - if add_timestamp: - if time_format == "local": - data["timestamp"] = str(datetime.now()) - elif time_format == "epoch": - data["timestamp"] = int(time.time()) - elif time_format == "iso8601": - data["timestamp"] = datetime.now().isoformat() + data = _create_data(create_index, index, add_timestamp, time_format) index += 1 data.update(event) if display: - print(data) + print(data) # noqa: T201 await queue.put(data) await asyncio.sleep(repeat_delay) @@ -109,15 +100,34 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: await asyncio.sleep(shutdown_after) +def _create_data( + create_index: str, + index: int, + add_timestamp: str, + time_format: str, +) -> dict: + data = {} + if create_index: + data[create_index] = index + if add_timestamp: + if time_format == "local": + data["timestamp"] = str(datetime.now()) # noqa: DTZ005 + elif time_format == "epoch": + data["timestamp"] = int(time.time()) + elif time_format == "iso8601": + data["timestamp"] = datetime.now(tz=None).isoformat() # noqa: DTZ005 + return data + + if __name__ == "__main__": """MockQueue if running directly.""" class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" - print(event) # noqa: T201 + print(event) # noqa: T201 asyncio.run( main( diff --git a/extensions/eda/plugins/event_source/journald.py b/extensions/eda/plugins/event_source/journald.py index 02ace505..21c4cef6 100644 --- a/extensions/eda/plugins/event_source/journald.py +++ b/extensions/eda/plugins/event_source/journald.py @@ -60,8 +60,8 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" - print(event) # noqa: T201 + print(event) # noqa: T201 asyncio.run(main(MockQueue(), {"match": "ALL"})) diff --git a/extensions/eda/plugins/event_source/kafka.py b/extensions/eda/plugins/event_source/kafka.py index 9128a48e..7d1106a6 100644 --- a/extensions/eda/plugins/event_source/kafka.py +++ b/extensions/eda/plugins/event_source/kafka.py @@ -51,7 +51,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: if offset not in ("latest", "earliest"): msg = f"Invalid offset option: {offset}" - raise Exception(msg) + raise ValueError(msg) if cafile: context = create_ssl_context( @@ -83,8 +83,8 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: data = json.loads(value) except json.decoder.JSONDecodeError: data = value - except UnicodeError as e: - logger.error(e) + except UnicodeError: + logger.exception("Unicode Error") if data: await queue.put({"body": data}) @@ -100,9 +100,9 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" - print(event) # noqa: T201 + print(event) # noqa: T201 asyncio.run( main( diff --git a/extensions/eda/plugins/event_source/range.py b/extensions/eda/plugins/event_source/range.py index 53b2b17b..9ac01c33 100644 --- a/extensions/eda/plugins/event_source/range.py +++ b/extensions/eda/plugins/event_source/range.py @@ -33,8 +33,8 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" - print(event) # noqa: T201 + print(event) # noqa: T201 asyncio.run(main(MockQueue(), {"limit": 5})) diff --git a/extensions/eda/plugins/event_source/tick.py b/extensions/eda/plugins/event_source/tick.py index b4e99ef1..48632dc7 100644 --- a/extensions/eda/plugins/event_source/tick.py +++ b/extensions/eda/plugins/event_source/tick.py @@ -33,8 +33,8 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" - print(event) # noqa: T201 + print(event) # noqa: T201 asyncio.run(main(MockQueue(), {"delay": 1})) diff --git a/extensions/eda/plugins/event_source/url_check.py b/extensions/eda/plugins/event_source/url_check.py index f8142071..8c6c7b89 100644 --- a/extensions/eda/plugins/event_source/url_check.py +++ b/extensions/eda/plugins/event_source/url_check.py @@ -26,6 +26,7 @@ OK = 200 + async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: """Poll a set of URLs and send events with status.""" urls = args.get("urls", []) @@ -71,8 +72,8 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" - print(event) # noqa: T201 + print(event) # noqa: T201 asyncio.run(main(MockQueue(), {"urls": ["http://redhat.com"]})) diff --git a/extensions/eda/plugins/event_source/webhook.py b/extensions/eda/plugins/event_source/webhook.py index ac5a005c..c32c918d 100644 --- a/extensions/eda/plugins/event_source/webhook.py +++ b/extensions/eda/plugins/event_source/webhook.py @@ -41,19 +41,24 @@ async def webhook(request: web.Request) -> web.Response: return web.Response(text=endpoint) +def _parse_token(request: web.Request) -> (str, str): + scheme, token = request.headers["Authorization"].strip().split(" ") + if scheme != "Bearer": + raise web.HTTPUnauthorized(text="Only Bearer type is accepted") + if token != request.app["token"]: + raise web.HTTPUnauthorized(text="Invalid authorization token") + return scheme, token + + @web.middleware -async def bearer_auth(request: web.Request, handler: Callable): +async def bearer_auth(request: web.Request, handler: Callable) -> web.StreamResponse: """Verify authorization is Bearer type.""" try: - scheme, token = request.headers["Authorization"].strip().split(" ") - if scheme != "Bearer": - raise web.HTTPUnauthorized(text="Only Bearer type is accepted") - elif token != request.app["token"]: - raise web.HTTPUnauthorized(text="Invalid authorization token") + scheme, token = _parse_token(request) except KeyError: - raise web.HTTPUnauthorized(reason="Missing authorization token") + raise web.HTTPUnauthorized(reason="Missing authorization token") from None except ValueError: - raise web.HTTPUnauthorized(text="Invalid authorization token") + raise web.HTTPUnauthorized(text="Invalid authorization token") from None return await handler(request) @@ -81,14 +86,14 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: try: context.load_cert_chain(certfile, keyfile, password) except Exception: - logger.error("Failed to load certificates. Check they are valid") + logger.exception("Failed to load certificates. Check they are valid") raise runner = web.AppRunner(app) await runner.setup() site = web.TCPSite( runner, - args.get("host") or "0.0.0.0", + args.get("host") or "127.0.0.1", args.get("port"), ssl_context=context, ) @@ -108,9 +113,9 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" - print(event) # noqa: T201 + print(event) # noqa: T201 asyncio.run( main( From 85a6b4adf7ff33f86e1f9d22d4d9b47ec26dcfc6 Mon Sep 17 00:00:00 2001 From: Bill Wei Date: Mon, 26 Jun 2023 13:34:30 -0400 Subject: [PATCH 23/24] Drop python3.8 for tests --- .github/workflows/integration-tests.yml | 2 +- .github/workflows/tests.yml | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index b42e3072..47a90576 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -22,7 +22,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.9" - name: Install package dependencies run: | diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ed0866ff..7a8df0ac 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -15,7 +15,6 @@ jobs: python-version: - "3.10" - "3.9" - - "3.8" defaults: run: From 1e64fa418cae5452343f26028db20059baf39b77 Mon Sep 17 00:00:00 2001 From: Bill Wei Date: Mon, 26 Jun 2023 16:12:50 -0400 Subject: [PATCH 24/24] Ignore UP038 because python3.9 does not support it --- extensions/eda/plugins/event_filter/insert_hosts_to_meta.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py b/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py index 9fc9575e..65aadfa3 100644 --- a/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py +++ b/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py @@ -49,7 +49,7 @@ def main( if isinstance(hosts, str): hosts = hosts.split(host_separator) if host_separator else [hosts] - elif isinstance(hosts, list | tuple): + elif isinstance(hosts, (list, tuple)): # noqa: UP038 for h in hosts: if not isinstance(h, str): msg = f"{h} is not a valid hostname"