From 7ac1ac8b0af649bba47a60d1f1cd03937ff5fb53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Kosiec?= Date: Tue, 27 Sep 2022 15:41:01 +0200 Subject: [PATCH 1/2] Introduce ability to ignore requests on initial list and first event --- README.md | 1 + src/resources.py | 28 ++++++++++++++++++++++------ src/sidecar.py | 8 +++++++- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index fdf4dcd3..204e4313 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,7 @@ If the filename ends with `.url` suffix, the content will be processed as a URL | `REQ_RETRY_CONNECT` | How many connection-related errors to retry on for any http request (`*.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `10` | integer | | `REQ_RETRY_READ` | How many times to retry on read errors for any http request (`.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `5` | integer | | `REQ_RETRY_BACKOFF_FACTOR` | A backoff factor to apply between attempts after the second try for any http request (`.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `1.1` | float | +| `REQ_IGNORE_INITIAL_EVENT` | Set to `true` to ignore requests for first events or initial list for configmaps and secrets events. Applicable only when `IGNORE_ALREADY_PROCESSED` is enabled. | false | `false` | boolean | | `REQ_TIMEOUT` | How many seconds to wait for the server to send data before giving up for `.url` triggered requests or requests to `REQ_URI` (does not apply to k8s api requests) | false | `10` | float | | `REQ_USERNAME` | Username to use for basic authentication for requests to `REQ_URL` and for `*.url` triggered requests | false | - | string | | `REQ_PASSWORD` | Password to use for basic authentication for requests to `REQ_URL` and for `*.url` triggered requests | false | - | string | diff --git a/src/resources.py b/src/resources.py index 3da08acd..39735f9e 100755 --- a/src/resources.py +++ b/src/resources.py @@ -73,7 +73,7 @@ def _get_destination_folder(metadata, default_folder, folder_annotation): def list_resources(label, label_value, target_folder, request_url, request_method, request_payload, namespace, folder_annotation, resource, unique_filenames, script, enable_5xx, - ignore_already_processed): + ignore_already_processed, request_ignore_initial_event): v1 = client.CoreV1Api() # Filter resources based on label and value or just label label_selector = f"{label}={label_value}" if label_value else label @@ -87,6 +87,7 @@ def list_resources(label, label_value, target_folder, request_url, request_metho ret = getattr(v1, _list_namespace[namespace][resource])(**additional_args) files_changed = False + ignore_request = False # For all the found resources for item in ret.items: @@ -99,6 +100,8 @@ def list_resources(label, label_value, target_folder, request_url, request_metho logger.debug(f"Ignoring {resource} {metadata.namespace}/{metadata.name}") continue + logger.debug(f"Initial list for {resource} {metadata.namespace}/{metadata.name}") + ignore_request = True _resources_version_map[metadata.namespace + metadata.name] = metadata.resource_version logger.debug(f"Working on {resource}: {metadata.namespace}/{metadata.name}") @@ -114,6 +117,10 @@ def list_resources(label, label_value, target_folder, request_url, request_metho if script and files_changed: execute(script) + if request_ignore_initial_event and ignore_request: + logger.debug(f"Ignoring sending request for initial list {resource} {metadata.namespace}/{metadata.name}") + return + if request_url and files_changed: request(request_url, request_method, enable_5xx, request_payload) @@ -206,7 +213,7 @@ def _update_file(data_key, data_content, dest_folder, metadata, resource, def _watch_resource_iterator(label, label_value, target_folder, request_url, request_method, request_payload, namespace, folder_annotation, resource, unique_filenames, script, enable_5xx, - ignore_already_processed): + ignore_already_processed, request_ignore_initial_event): v1 = client.CoreV1Api() # Filter resources based on label and value or just label label_selector = f"{label}={label_value}" if label_value else label @@ -219,6 +226,8 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req if namespace != "ALL": additional_args['namespace'] = namespace + ignore_request = False + stream = watch.Watch().stream(getattr(v1, _list_namespace[namespace][resource]), **additional_args) # Process events @@ -238,6 +247,9 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req _resources_version_map.pop(metadata.namespace + metadata.name) if event_type == "ADDED" or event_type == "MODIFIED": + if request_ignore_initial_event and _resources_version_map.get(metadata.namespace + metadata.name) is None: + logger.debug(f"Initial event for {event_type} {resource} {metadata.namespace}/{metadata.name}") + ignore_request = True _resources_version_map[metadata.namespace + metadata.name] = metadata.resource_version logger.debug(f"Working on {event_type} {resource} {metadata.namespace}/{metadata.name}") @@ -257,6 +269,10 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req if script and files_changed: execute(script) + if request_ignore_initial_event and ignore_request: + logger.debug(f"Ignoring sending request for initial {event_type} {resource} {metadata.namespace}/{metadata.name}") + return + if request_url and files_changed: request(request_url, request_method, enable_5xx, request_payload) @@ -287,11 +303,11 @@ def _watch_resource_loop(mode, *args): def watch_for_changes(mode, label, label_value, target_folder, request_url, request_method, request_payload, current_namespace, folder_annotation, resources, unique_filenames, script, enable_5xx, - ignore_already_processed): + ignore_already_processed, request_ignore_initial_event): processes = _start_watcher_processes(current_namespace, folder_annotation, label, label_value, request_method, mode, request_payload, resources, target_folder, unique_filenames, script, request_url, enable_5xx, - ignore_already_processed) + ignore_already_processed, request_ignore_initial_event) while True: died = False @@ -311,14 +327,14 @@ def watch_for_changes(mode, label, label_value, target_folder, request_url, requ def _start_watcher_processes(namespace, folder_annotation, label, label_value, request_method, mode, request_payload, resources, target_folder, unique_filenames, script, request_url, - enable_5xx, ignore_already_processed): + enable_5xx, ignore_already_processed, request_ignore_initial_event): processes = [] for resource in resources: for ns in namespace.split(','): proc = Process(target=_watch_resource_loop, args=(mode, label, label_value, target_folder, request_url, request_method, request_payload, ns, folder_annotation, resource, unique_filenames, script, enable_5xx, - ignore_already_processed) + ignore_already_processed, request_ignore_initial_event) ) proc.daemon = True proc.start() diff --git a/src/sidecar.py b/src/sidecar.py index 4810fee3..67d9680b 100755 --- a/src/sidecar.py +++ b/src/sidecar.py @@ -23,6 +23,7 @@ REQ_PAYLOAD = "REQ_PAYLOAD" REQ_URL = "REQ_URL" REQ_METHOD = "REQ_METHOD" +REQ_IGNORE_INITIAL_EVENT = "REQ_IGNORE_INITIAL_EVENT" SCRIPT = "SCRIPT" ENABLE_5XX = "ENABLE_5XX" IGNORE_ALREADY_PROCESSED = "IGNORE_ALREADY_PROCESSED" @@ -103,6 +104,11 @@ def main(): if not ignore_already_processed: logger.debug("Ignore already processed resource version will not be enabled.") + request_ignore_initial_event = os.getenv(REQ_IGNORE_INITIAL_EVENT) and ignore_already_processed + + if request_ignore_initial_event: + logger.debug("Initial list or first event for a given resource will skip requests to a given URL.") + with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f: namespace = os.getenv("NAMESPACE", f.read()) @@ -116,7 +122,7 @@ def main(): else: watch_for_changes(method, label, label_value, target_folder, request_url, request_method, request_payload, namespace, folder_annotation, resources, unique_filenames, script, enable_5xx, - ignore_already_processed) + ignore_already_processed, request_ignore_initial_event) def _initialize_kubeclient_configuration(): From acaef8867927b9a6613e6aed34ca12b9abc7e5a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Kosiec?= Date: Fri, 30 Sep 2022 11:49:38 +0200 Subject: [PATCH 2/2] Fix logic for items in batch --- README.md | 2 +- src/resources.py | 44 +++++++++++++++++++++++++------------------- src/sidecar.py | 2 +- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 204e4313..975efecc 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ If the filename ends with `.url` suffix, the content will be processed as a URL | `REQ_RETRY_CONNECT` | How many connection-related errors to retry on for any http request (`*.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `10` | integer | | `REQ_RETRY_READ` | How many times to retry on read errors for any http request (`.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `5` | integer | | `REQ_RETRY_BACKOFF_FACTOR` | A backoff factor to apply between attempts after the second try for any http request (`.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `1.1` | float | -| `REQ_IGNORE_INITIAL_EVENT` | Set to `true` to ignore requests for first events or initial list for configmaps and secrets events. Applicable only when `IGNORE_ALREADY_PROCESSED` is enabled. | false | `false` | boolean | +| `REQ_IGNORE_INITIAL_EVENT` | Set to `true` to ignore requests for first events or initial list for each configmap and secret. For listing configmaps/secrets, request will be skipped only if all items in the batch weren't discovered yet. Applicable only when `IGNORE_ALREADY_PROCESSED` is enabled. | false | `false` | boolean | | `REQ_TIMEOUT` | How many seconds to wait for the server to send data before giving up for `.url` triggered requests or requests to `REQ_URI` (does not apply to k8s api requests) | false | `10` | float | | `REQ_USERNAME` | Username to use for basic authentication for requests to `REQ_URL` and for `*.url` triggered requests | false | - | string | | `REQ_PASSWORD` | Password to use for basic authentication for requests to `REQ_URL` and for `*.url` triggered requests | false | - | string | diff --git a/src/resources.py b/src/resources.py index 39735f9e..0a36d43d 100755 --- a/src/resources.py +++ b/src/resources.py @@ -87,7 +87,7 @@ def list_resources(label, label_value, target_folder, request_url, request_metho ret = getattr(v1, _list_namespace[namespace][resource])(**additional_args) files_changed = False - ignore_request = False + should_do_request = False # For all the found resources for item in ret.items: @@ -96,13 +96,16 @@ def list_resources(label, label_value, target_folder, request_url, request_metho # Ignore already processed resource # Avoid numerous logs about useless resource processing each time the LIST loop reconnects if ignore_already_processed: - if _resources_version_map.get(metadata.namespace + metadata.name) == metadata.resource_version: - logger.debug(f"Ignoring {resource} {metadata.namespace}/{metadata.name}") + resource_version_map_key = metadata.namespace + metadata.name + if _resources_version_map.get(resource_version_map_key) == metadata.resource_version: + logger.debug(f"Ignoring already processed resource version for {resource} {metadata.namespace}/{metadata.name}") continue - logger.debug(f"Initial list for {resource} {metadata.namespace}/{metadata.name}") - ignore_request = True - _resources_version_map[metadata.namespace + metadata.name] = metadata.resource_version + if resource_version_map_key in _resources_version_map: + logger.debug(f"Item is already in the resource version map: {resource} {metadata.namespace}/{metadata.name}") + should_do_request = True + + _resources_version_map[resource_version_map_key] = metadata.resource_version logger.debug(f"Working on {resource}: {metadata.namespace}/{metadata.name}") @@ -117,11 +120,12 @@ def list_resources(label, label_value, target_folder, request_url, request_metho if script and files_changed: execute(script) - if request_ignore_initial_event and ignore_request: - logger.debug(f"Ignoring sending request for initial list {resource} {metadata.namespace}/{metadata.name}") + if request_ignore_initial_event and not should_do_request: + logger.debug(f"Ignoring sending request for initial list for all items") return if request_url and files_changed: + logger.debug("Doing request as files changed") request(request_url, request_method, enable_5xx, request_payload) @@ -226,12 +230,11 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req if namespace != "ALL": additional_args['namespace'] = namespace - ignore_request = False - stream = watch.Watch().stream(getattr(v1, _list_namespace[namespace][resource]), **additional_args) # Process events for event in stream: + should_do_request = False item = event['object'] metadata = item.metadata event_type = event['type'] @@ -239,18 +242,20 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req # Ignore already processed resource # Avoid numerous logs about useless resource processing each time the WATCH loop reconnects if ignore_already_processed: - if _resources_version_map.get(metadata.namespace + metadata.name) == metadata.resource_version: + resource_version_map_key = metadata.namespace + metadata.name + if _resources_version_map.get(resource_version_map_key) == metadata.resource_version: if event_type == "ADDED" or event_type == "MODIFIED": - logger.debug(f"Ignoring {event_type} {resource} {metadata.namespace}/{metadata.name}") + logger.debug(f"Ignoring already processed resource version for {event_type} {resource} {metadata.namespace}/{metadata.name}") continue elif event_type == "DELETED": - _resources_version_map.pop(metadata.namespace + metadata.name) + _resources_version_map.pop(resource_version_map_key) + + if resource_version_map_key in _resources_version_map: + logger.debug(f"Item is already in the resource version map: {resource} {metadata.namespace}/{metadata.name}") + should_do_request = True if event_type == "ADDED" or event_type == "MODIFIED": - if request_ignore_initial_event and _resources_version_map.get(metadata.namespace + metadata.name) is None: - logger.debug(f"Initial event for {event_type} {resource} {metadata.namespace}/{metadata.name}") - ignore_request = True - _resources_version_map[metadata.namespace + metadata.name] = metadata.resource_version + _resources_version_map[resource_version_map_key] = metadata.resource_version logger.debug(f"Working on {event_type} {resource} {metadata.namespace}/{metadata.name}") @@ -269,11 +274,12 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req if script and files_changed: execute(script) - if request_ignore_initial_event and ignore_request: - logger.debug(f"Ignoring sending request for initial {event_type} {resource} {metadata.namespace}/{metadata.name}") + if request_ignore_initial_event and not should_do_request: + logger.debug(f"Ignoring sending request for initial event for all items") return if request_url and files_changed: + logger.debug("Doing request as files changed") request(request_url, request_method, enable_5xx, request_payload) diff --git a/src/sidecar.py b/src/sidecar.py index 67d9680b..23f081c6 100755 --- a/src/sidecar.py +++ b/src/sidecar.py @@ -107,7 +107,7 @@ def main(): request_ignore_initial_event = os.getenv(REQ_IGNORE_INITIAL_EVENT) and ignore_already_processed if request_ignore_initial_event: - logger.debug("Initial list or first event for a given resource will skip requests to a given URL.") + logger.debug("Requests for initial list or first event for every resource will be skipped.") with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f: namespace = os.getenv("NAMESPACE", f.read())