diff --git a/extensions/eda/plugins/event_filter/dashes_to_underscores.py b/extensions/eda/plugins/event_filter/dashes_to_underscores.py index 07919c30..a76748a9 100644 --- a/extensions/eda/plugins/event_filter/dashes_to_underscores.py +++ b/extensions/eda/plugins/event_filter/dashes_to_underscores.py @@ -12,7 +12,7 @@ import multiprocessing as mp -def main(event: dict, overwrite: bool = True) -> dict: +def main(event: dict, overwrite: bool = True) -> dict: # noqa: FBT001, FBT002 """Change dashes in keys to underscores.""" logger = mp.get_logger() logger.info("dashes_to_underscores") diff --git a/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py b/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py index 9e30f5b8..a4eee94e 100644 --- a/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py +++ b/extensions/eda/plugins/event_filter/insert_hosts_to_meta.py @@ -31,8 +31,8 @@ def main( event: dict[str, Any], - host_path: str = None, - host_separator: str = None, + host_path: str | None = None, + host_separator: str | None = None, path_separator: str = ".", ) -> dict[str, Any]: """Extract hosts from event data and insert into meta dict.""" diff --git a/extensions/eda/plugins/event_filter/json_filter.py b/extensions/eda/plugins/event_filter/json_filter.py index 59a84d6e..71b7fd34 100644 --- a/extensions/eda/plugins/event_filter/json_filter.py +++ b/extensions/eda/plugins/event_filter/json_filter.py @@ -24,7 +24,11 @@ def _matches_exclude_keys(exclude_keys: list, s: str) -> bool: return any(fnmatch.fnmatch(s, pattern) for pattern in exclude_keys) -def main(event: dict, exclude_keys: list = None, include_keys: list = None) -> dict: +def main( + event: dict, + exclude_keys: list | None = None, + include_keys: list | None = None, +) -> dict: """Filter keys out of events.""" if exclude_keys is None: exclude_keys = [] diff --git a/extensions/eda/plugins/event_filter/normalize_keys.py b/extensions/eda/plugins/event_filter/normalize_keys.py index 42b764a5..0ac0af97 100644 --- a/extensions/eda/plugins/event_filter/normalize_keys.py +++ b/extensions/eda/plugins/event_filter/normalize_keys.py @@ -37,20 +37,25 @@ """ +import logging import multiprocessing as mp import re normalize_regex = re.compile("[^0-9a-zA-Z_]+") -def main(event: dict, overwrite: bool = True) -> dict: +def main(event: dict, overwrite: bool = True) -> dict: # noqa: FBT001, FBT002 """Change keys that contain non-alphanumeric characters to underscores.""" logger = mp.get_logger() logger.info("normalize_keys") return _normalize_embedded_keys(event, overwrite, logger) -def _normalize_embedded_keys(obj: dict, overwrite: bool, logger) -> dict: +def _normalize_embedded_keys( + obj: dict, + overwrite: bool, # noqa: FBT001 + logger: logging.Logger, +) -> dict: if isinstance(obj, dict): new_dict = {} original_keys = list(obj.keys()) @@ -66,6 +71,6 @@ def _normalize_embedded_keys(obj: dict, overwrite: bool, logger) -> dict: ) logger.warning("Replacing existing key %s", new_key) return new_dict - elif isinstance(obj, list): + if isinstance(obj, list): return [_normalize_embedded_keys(item, overwrite, logger) for item in obj] return obj diff --git a/extensions/eda/plugins/event_source/alertmanager.py b/extensions/eda/plugins/event_source/alertmanager.py index c6374442..189f91db 100644 --- a/extensions/eda/plugins/event_source/alertmanager.py +++ b/extensions/eda/plugins/event_source/alertmanager.py @@ -33,6 +33,7 @@ """ import asyncio +import logging from typing import Any from aiohttp import web @@ -42,7 +43,7 @@ @routes.get("/") -async def status(request: web.Request) -> web.Response: +async def status(_request: web.Request) -> web.Response: """Return status of a web request.""" return web.Response(status=200, text="up") @@ -96,7 +97,9 @@ async def webhook(request: web.Request) -> web.Response: { "alert": alert, "meta": { - "endpoint": endpoint, "headers": dict(request.headers), "hosts": hosts, + "endpoint": endpoint, + "headers": dict(request.headers), + "hosts": hosts, }, }, ) @@ -130,7 +133,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: try: await asyncio.Future() except asyncio.CancelledError: - print("Plugin Task Cancelled") + logging.getLogger().info("Plugin Task Cancelled") finally: await runner.cleanup() @@ -141,7 +144,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" print(event) # noqa: T201 diff --git a/extensions/eda/plugins/event_source/aws_cloudtrail.py b/extensions/eda/plugins/event_source/aws_cloudtrail.py index 7e6aebda..79e4a331 100644 --- a/extensions/eda/plugins/event_source/aws_cloudtrail.py +++ b/extensions/eda/plugins/event_source/aws_cloudtrail.py @@ -36,6 +36,7 @@ from datetime import datetime from typing import Any +from aiobotocore.client import BaseClient from aiobotocore.session import get_session @@ -45,7 +46,7 @@ def _cloudtrail_event_to_dict(event: dict) -> dict: return event -def get_events(events, last_event_ids): +def _get_events(events: list[dict], last_event_ids: list) -> list: event_time = None event_ids = [] result = [] @@ -62,7 +63,7 @@ def get_events(events, last_event_ids): return result, event_time, event_ids -async def get_cloudtrail_events(client, params): +async def _get_cloudtrail_events(client: BaseClient, params: dict) -> list[dict]: paginator = client.get_paginator("lookup_events") results = await paginator.paginate(**params).build_full_result() return results.get("Events", []) @@ -84,17 +85,17 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: if args.get(k) is not None: params[v] = args.get(k) - params["StartTime"] = datetime.utcnow() + params["StartTime"] = datetime.utcnow() # noqa: DTZ003 async with session.create_client("cloudtrail", **connection_args(args)) as client: event_time = None event_ids = [] while True: - events = await get_cloudtrail_events(client, params) + events = await _get_cloudtrail_events(client, params) if event_time is not None: params["StartTime"] = event_time - events, c_event_time, c_event_ids = get_events(events, event_ids) + events, c_event_time, c_event_ids = _get_events(events, event_ids) for event in events: await queue.put(_cloudtrail_event_to_dict(event)) @@ -130,7 +131,7 @@ def connection_args(args: dict[str, Any]) -> dict[str, Any]: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" print(event) # noqa: T201 diff --git a/extensions/eda/plugins/event_source/aws_sqs_queue.py b/extensions/eda/plugins/event_source/aws_sqs_queue.py index 94a1d5f4..630a8990 100644 --- a/extensions/eda/plugins/event_source/aws_sqs_queue.py +++ b/extensions/eda/plugins/event_source/aws_sqs_queue.py @@ -49,7 +49,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: err.response["Error"]["Code"] == "AWS.SimpleQueueService.NonExistentQueue" ): - raise ValueError("Queue %s does not exist" % queue_name) + raise ValueError("Queue %s does not exist" % queue_name) from None raise queue_url = response["QueueUrl"] @@ -108,7 +108,7 @@ def connection_args(args: dict[str, Any]) -> dict[str, Any]: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" print(event) # noqa: T201 diff --git a/extensions/eda/plugins/event_source/azure_service_bus.py b/extensions/eda/plugins/event_source/azure_service_bus.py index 36d77d20..6a55d763 100644 --- a/extensions/eda/plugins/event_source/azure_service_bus.py +++ b/extensions/eda/plugins/event_source/azure_service_bus.py @@ -63,7 +63,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: class MockQueue: """A fake queue.""" - async def put_nowait(self, event: dict) -> None: + async def put_nowait(self: "MockQueue", event: dict) -> None: """Print the event.""" print(event) # noqa: T201 diff --git a/extensions/eda/plugins/event_source/file.py b/extensions/eda/plugins/event_source/file.py index 8a3519f0..8a49816d 100644 --- a/extensions/eda/plugins/event_source/file.py +++ b/extensions/eda/plugins/event_source/file.py @@ -15,16 +15,16 @@ """ -import os +import pathlib import yaml from watchdog.events import RegexMatchingEventHandler from watchdog.observers import Observer -def send_facts(queue, filename: str) -> None: +def send_facts(queue, filename: str) -> None: # noqa: ANN001 """Send facts to the queue.""" - with open(filename) as f: + with pathlib.Path(filename).open() as f: data = yaml.safe_load(f.read()) if data is None: return @@ -32,52 +32,54 @@ def send_facts(queue, filename: str) -> None: queue.put(data) else: if not isinstance(data, list): - msg = f"Unsupported facts type, expects a list of dicts found {type(data)}" - raise Exception( - msg, + msg = ( + "Unsupported facts type, expects a list of dicts found " + f"{type(data)}" ) + raise TypeError(msg) if not all(bool(isinstance(item, dict)) for item in data): msg = f"Unsupported facts type, expects a list of dicts found {data}" - raise Exception( - msg, - ) + raise TypeError(msg) for item in data: queue.put(item) -def main(queue, args: dict) -> None: +def main(queue, args: dict) -> None: # noqa: ANN001 """Load facts from YAML files initially and when the file changes.""" - files = [os.path.abspath(f) for f in args.get("files", [])] + files = [pathlib.Path(f).resolve().as_posix() for f in args.get("files", [])] if not files: return for filename in files: send_facts(queue, filename) + _observe_files(queue, files) + +def _observe_files(queue, files: list[str]) -> None: # noqa: ANN001 class Handler(RegexMatchingEventHandler): - def __init__(self, **kwargs) -> None: + def __init__(self: "Handler", **kwargs) -> None: # noqa: ANN003 RegexMatchingEventHandler.__init__(self, **kwargs) - def on_created(self, event: dict) -> None: + def on_created(self: "Handler", event: dict) -> None: if event.src_path in files: send_facts(queue, event.src_path) - def on_deleted(self, event: dict) -> None: + def on_deleted(self: "Handler", event: dict) -> None: pass - def on_modified(self, event: dict) -> None: + def on_modified(self: "Handler", event: dict) -> None: if event.src_path in files: send_facts(queue, event.src_path) - def on_moved(self, event: dict) -> None: + def on_moved(self: "Handler", event: dict) -> None: pass observer = Observer() handler = Handler() for filename in files: - directory = os.path.dirname(filename) + directory = pathlib.Path(filename).parent observer.schedule(handler, directory, recursive=False) observer.start() @@ -94,7 +96,7 @@ def on_moved(self, event: dict) -> None: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" print(event) # noqa: T201 diff --git a/extensions/eda/plugins/event_source/file_watch.py b/extensions/eda/plugins/event_source/file_watch.py index 4e8b379a..ed760fc5 100644 --- a/extensions/eda/plugins/event_source/file_watch.py +++ b/extensions/eda/plugins/event_source/file_watch.py @@ -20,20 +20,24 @@ import asyncio import concurrent.futures +from typing import Optional from watchdog.events import RegexMatchingEventHandler from watchdog.observers import Observer -def watch(loop: asyncio.events.AbstractEventLoop, queue: asyncio.Queue, args: dict) -> None: +def watch( + loop: asyncio.events.AbstractEventLoop, + queue: asyncio.Queue, args: dict, +) -> None: """Watch for changes and put events on the queue.""" root_path = args["path"] class Handler(RegexMatchingEventHandler): - def __init__(self, **kwargs) -> None: + def __init__(self: "Handler", **kwargs: Optional(list[str])) -> None: RegexMatchingEventHandler.__init__(self, **kwargs) - def on_created(self, event: dict) -> None: + def on_created(self: "Handler", event: dict) -> None: loop.call_soon_threadsafe( queue.put_nowait, { @@ -44,7 +48,7 @@ def on_created(self, event: dict) -> None: }, ) - def on_deleted(self, event: dict) -> None: + def on_deleted(self: "Handler", event: dict) -> None: loop.call_soon_threadsafe( queue.put_nowait, { @@ -55,7 +59,7 @@ def on_deleted(self, event: dict) -> None: }, ) - def on_modified(self, event: dict) -> None: + def on_modified(self: "Handler", event: dict) -> None: loop.call_soon_threadsafe( queue.put_nowait, { @@ -66,7 +70,7 @@ def on_modified(self, event: dict) -> None: }, ) - def on_moved(self, event: dict) -> None: + def on_moved(self: "Handler", event: dict) -> None: loop.call_soon_threadsafe( queue.put_nowait, { @@ -103,8 +107,8 @@ async def main(queue: asyncio.Queue, args: dict) -> None: class MockQueue: """A fake queue.""" - async def put_nowait(self, event: dict) -> None: + async def put_nowait(self: "MockQueue", event: dict) -> None: """Print the event.""" print(event) # noqa: T201 - asyncio.run(main(MockQueue(), {"path": "/tmp", "recursive": True})) + asyncio.run(main(MockQueue(), {"path": "/tmp", "recursive": True})) # noqa: S108 diff --git a/extensions/eda/plugins/event_source/generic.py b/extensions/eda/plugins/event_source/generic.py index 7e6c31bc..ba83f65b 100644 --- a/extensions/eda/plugins/event_source/generic.py +++ b/extensions/eda/plugins/event_source/generic.py @@ -86,21 +86,12 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: if not event: continue for _ignore in range(repeat_count): - data = {} - if create_index: - data[create_index] = index - if add_timestamp: - if time_format == "local": - data["timestamp"] = str(datetime.now()) - elif time_format == "epoch": - data["timestamp"] = int(time.time()) - elif time_format == "iso8601": - data["timestamp"] = datetime.now().isoformat() + data = _create_data(create_index, index, add_timestamp, time_format) index += 1 data.update(event) if display: - print(data) + print(data) # noqa: T201 await queue.put(data) await asyncio.sleep(repeat_delay) @@ -109,13 +100,32 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: await asyncio.sleep(shutdown_after) +def _create_data( + create_index: str, + index: int, + add_timestamp: str, + time_format: str, +) -> dict: + data = {} + if create_index: + data[create_index] = index + if add_timestamp: + if time_format == "local": + data["timestamp"] = str(datetime.now()) # noqa: DTZ005 + elif time_format == "epoch": + data["timestamp"] = int(time.time()) + elif time_format == "iso8601": + data["timestamp"] = datetime.now(tz=None).isoformat() # noqa: DTZ005 + return data + + if __name__ == "__main__": """MockQueue if running directly.""" class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" print(event) # noqa: T201 diff --git a/extensions/eda/plugins/event_source/journald.py b/extensions/eda/plugins/event_source/journald.py index 02ace505..a74a423a 100644 --- a/extensions/eda/plugins/event_source/journald.py +++ b/extensions/eda/plugins/event_source/journald.py @@ -60,7 +60,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" print(event) # noqa: T201 diff --git a/extensions/eda/plugins/event_source/kafka.py b/extensions/eda/plugins/event_source/kafka.py index 9128a48e..72500ed5 100644 --- a/extensions/eda/plugins/event_source/kafka.py +++ b/extensions/eda/plugins/event_source/kafka.py @@ -51,7 +51,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: if offset not in ("latest", "earliest"): msg = f"Invalid offset option: {offset}" - raise Exception(msg) + raise ValueError(msg) if cafile: context = create_ssl_context( @@ -83,8 +83,8 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: data = json.loads(value) except json.decoder.JSONDecodeError: data = value - except UnicodeError as e: - logger.error(e) + except UnicodeError: + logger.exception("Unicode Error") if data: await queue.put({"body": data}) @@ -100,7 +100,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" print(event) # noqa: T201 diff --git a/extensions/eda/plugins/event_source/range.py b/extensions/eda/plugins/event_source/range.py index 53b2b17b..7bd51020 100644 --- a/extensions/eda/plugins/event_source/range.py +++ b/extensions/eda/plugins/event_source/range.py @@ -33,7 +33,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" print(event) # noqa: T201 diff --git a/extensions/eda/plugins/event_source/tick.py b/extensions/eda/plugins/event_source/tick.py index b4e99ef1..ca6ec4b3 100644 --- a/extensions/eda/plugins/event_source/tick.py +++ b/extensions/eda/plugins/event_source/tick.py @@ -33,7 +33,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" print(event) # noqa: T201 diff --git a/extensions/eda/plugins/event_source/url_check.py b/extensions/eda/plugins/event_source/url_check.py index f8142071..cca28abd 100644 --- a/extensions/eda/plugins/event_source/url_check.py +++ b/extensions/eda/plugins/event_source/url_check.py @@ -71,7 +71,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" print(event) # noqa: T201 diff --git a/extensions/eda/plugins/event_source/webhook.py b/extensions/eda/plugins/event_source/webhook.py index ac5a005c..996f4da9 100644 --- a/extensions/eda/plugins/event_source/webhook.py +++ b/extensions/eda/plugins/event_source/webhook.py @@ -41,19 +41,24 @@ async def webhook(request: web.Request) -> web.Response: return web.Response(text=endpoint) +def _parse_token(request: web.Request) -> (str, str): + scheme, token = request.headers["Authorization"].strip().split(" ") + if scheme != "Bearer": + raise web.HTTPUnauthorized(text="Only Bearer type is accepted") + if token != request.app["token"]: + raise web.HTTPUnauthorized(text="Invalid authorization token") + return scheme, token + + @web.middleware -async def bearer_auth(request: web.Request, handler: Callable): +async def bearer_auth(request: web.Request, handler: Callable) -> web.StreamResponse: """Verify authorization is Bearer type.""" try: - scheme, token = request.headers["Authorization"].strip().split(" ") - if scheme != "Bearer": - raise web.HTTPUnauthorized(text="Only Bearer type is accepted") - elif token != request.app["token"]: - raise web.HTTPUnauthorized(text="Invalid authorization token") + scheme, token = _parse_token(request) except KeyError: - raise web.HTTPUnauthorized(reason="Missing authorization token") + raise web.HTTPUnauthorized(reason="Missing authorization token") from None except ValueError: - raise web.HTTPUnauthorized(text="Invalid authorization token") + raise web.HTTPUnauthorized(text="Invalid authorization token") from None return await handler(request) @@ -81,14 +86,14 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: try: context.load_cert_chain(certfile, keyfile, password) except Exception: - logger.error("Failed to load certificates. Check they are valid") + logger.exception("Failed to load certificates. Check they are valid") raise runner = web.AppRunner(app) await runner.setup() site = web.TCPSite( runner, - args.get("host") or "0.0.0.0", + args.get("host") or "127.0.0.1", args.get("port"), ssl_context=context, ) @@ -108,7 +113,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: class MockQueue: """A fake queue.""" - async def put(self, event: dict) -> None: + async def put(self: "MockQueue", event: dict) -> None: """Print the event.""" print(event) # noqa: T201