Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Fix logic for items in batch
Browse files Browse the repository at this point in the history
  • Loading branch information
pkosiec committed Sep 30, 2022
1 parent 7ac1ac8 commit acaef88
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 21 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ If the filename ends with `.url` suffix, the content will be processed as a URL
| `REQ_RETRY_CONNECT` | How many connection-related errors to retry on for any http request (`*.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `10` | integer |
| `REQ_RETRY_READ` | How many times to retry on read errors for any http request (`.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `5` | integer |
| `REQ_RETRY_BACKOFF_FACTOR` | A backoff factor to apply between attempts after the second try for any http request (`.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `1.1` | float |
| `REQ_IGNORE_INITIAL_EVENT` | Set to `true` to ignore requests for first events or initial list for configmaps and secrets events. Applicable only when `IGNORE_ALREADY_PROCESSED` is enabled. | false | `false` | boolean |
| `REQ_IGNORE_INITIAL_EVENT` | Set to `true` to ignore requests for first events or initial list for each configmap and secret. For listing configmaps/secrets, request will be skipped only if all items in the batch weren't discovered yet. Applicable only when `IGNORE_ALREADY_PROCESSED` is enabled. | false | `false` | boolean |
| `REQ_TIMEOUT` | How many seconds to wait for the server to send data before giving up for `.url` triggered requests or requests to `REQ_URI` (does not apply to k8s api requests) | false | `10` | float |
| `REQ_USERNAME` | Username to use for basic authentication for requests to `REQ_URL` and for `*.url` triggered requests | false | - | string |
| `REQ_PASSWORD` | Password to use for basic authentication for requests to `REQ_URL` and for `*.url` triggered requests | false | - | string |
Expand Down
44 changes: 25 additions & 19 deletions src/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def list_resources(label, label_value, target_folder, request_url, request_metho
ret = getattr(v1, _list_namespace[namespace][resource])(**additional_args)

files_changed = False
ignore_request = False
should_do_request = False

# For all the found resources
for item in ret.items:
Expand All @@ -96,13 +96,16 @@ def list_resources(label, label_value, target_folder, request_url, request_metho
# Ignore already processed resource
# Avoid numerous logs about useless resource processing each time the LIST loop reconnects
if ignore_already_processed:
if _resources_version_map.get(metadata.namespace + metadata.name) == metadata.resource_version:
logger.debug(f"Ignoring {resource} {metadata.namespace}/{metadata.name}")
resource_version_map_key = metadata.namespace + metadata.name
if _resources_version_map.get(resource_version_map_key) == metadata.resource_version:
logger.debug(f"Ignoring already processed resource version for {resource} {metadata.namespace}/{metadata.name}")
continue

logger.debug(f"Initial list for {resource} {metadata.namespace}/{metadata.name}")
ignore_request = True
_resources_version_map[metadata.namespace + metadata.name] = metadata.resource_version
if resource_version_map_key in _resources_version_map:
logger.debug(f"Item is already in the resource version map: {resource} {metadata.namespace}/{metadata.name}")
should_do_request = True

_resources_version_map[resource_version_map_key] = metadata.resource_version

logger.debug(f"Working on {resource}: {metadata.namespace}/{metadata.name}")

Expand All @@ -117,11 +120,12 @@ def list_resources(label, label_value, target_folder, request_url, request_metho
if script and files_changed:
execute(script)

if request_ignore_initial_event and ignore_request:
logger.debug(f"Ignoring sending request for initial list {resource} {metadata.namespace}/{metadata.name}")
if request_ignore_initial_event and not should_do_request:
logger.debug(f"Ignoring sending request for initial list for all items")
return

if request_url and files_changed:
logger.debug("Doing request as files changed")
request(request_url, request_method, enable_5xx, request_payload)


Expand Down Expand Up @@ -226,31 +230,32 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req
if namespace != "ALL":
additional_args['namespace'] = namespace

ignore_request = False

stream = watch.Watch().stream(getattr(v1, _list_namespace[namespace][resource]), **additional_args)

# Process events
for event in stream:
should_do_request = False
item = event['object']
metadata = item.metadata
event_type = event['type']

# Ignore already processed resource
# Avoid numerous logs about useless resource processing each time the WATCH loop reconnects
if ignore_already_processed:
if _resources_version_map.get(metadata.namespace + metadata.name) == metadata.resource_version:
resource_version_map_key = metadata.namespace + metadata.name
if _resources_version_map.get(resource_version_map_key) == metadata.resource_version:
if event_type == "ADDED" or event_type == "MODIFIED":
logger.debug(f"Ignoring {event_type} {resource} {metadata.namespace}/{metadata.name}")
logger.debug(f"Ignoring already processed resource version for {event_type} {resource} {metadata.namespace}/{metadata.name}")
continue
elif event_type == "DELETED":
_resources_version_map.pop(metadata.namespace + metadata.name)
_resources_version_map.pop(resource_version_map_key)

if resource_version_map_key in _resources_version_map:
logger.debug(f"Item is already in the resource version map: {resource} {metadata.namespace}/{metadata.name}")
should_do_request = True

if event_type == "ADDED" or event_type == "MODIFIED":
if request_ignore_initial_event and _resources_version_map.get(metadata.namespace + metadata.name) is None:
logger.debug(f"Initial event for {event_type} {resource} {metadata.namespace}/{metadata.name}")
ignore_request = True
_resources_version_map[metadata.namespace + metadata.name] = metadata.resource_version
_resources_version_map[resource_version_map_key] = metadata.resource_version

logger.debug(f"Working on {event_type} {resource} {metadata.namespace}/{metadata.name}")

Expand All @@ -269,11 +274,12 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req
if script and files_changed:
execute(script)

if request_ignore_initial_event and ignore_request:
logger.debug(f"Ignoring sending request for initial {event_type} {resource} {metadata.namespace}/{metadata.name}")
if request_ignore_initial_event and not should_do_request:
logger.debug(f"Ignoring sending request for initial event for all items")
return

if request_url and files_changed:
logger.debug("Doing request as files changed")
request(request_url, request_method, enable_5xx, request_payload)


Expand Down
2 changes: 1 addition & 1 deletion src/sidecar.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def main():
request_ignore_initial_event = os.getenv(REQ_IGNORE_INITIAL_EVENT) and ignore_already_processed

if request_ignore_initial_event:
logger.debug("Initial list or first event for a given resource will skip requests to a given URL.")
logger.debug("Requests for initial list or first event for every resource will be skipped.")

with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f:
namespace = os.getenv("NAMESPACE", f.read())
Expand Down

0 comments on commit acaef88

Please sign in to comment.