From c27d2f82cb508a733121f0bc456af340156bb131 Mon Sep 17 00:00:00 2001 From: Will Kahn-Greene Date: Wed, 30 Oct 2024 22:17:06 -0400 Subject: [PATCH] bug-1911367: Remove processor heartbeat and process metrics This removes processor heartbeat (and the scaffolding for that heartbeat) which generated process metrics for the processor container. We don't need those anymore--the problem those were added to help us understand is long gone now. --- socorro/lib/task_manager.py | 25 +------- socorro/lib/threaded_task_manager.py | 12 +--- socorro/processor/processor_app.py | 61 ------------------- socorro/statsd_metrics.yaml | 24 -------- socorro/tests/processor/test_processor_app.py | 15 ----- 5 files changed, 4 insertions(+), 133 deletions(-) diff --git a/socorro/lib/task_manager.py b/socorro/lib/task_manager.py index 756f32006d..58c8b40a02 100644 --- a/socorro/lib/task_manager.py +++ b/socorro/lib/task_manager.py @@ -9,8 +9,6 @@ LOGGER = logging.getLogger(__name__) -HEARTBEAT_INTERVAL = 60 - def default_task_func(a_param): """Default task function. @@ -21,16 +19,6 @@ def default_task_func(a_param): """ -def default_heartbeat(): - """Runs once a second from the main thread. - - Note: If this raises an exception, it could kill the process or put it in a - weird state. - - """ - LOGGER.info("THUMP") - - def default_iterator(): """Default iterator for tasks. @@ -76,7 +64,6 @@ def __init__( idle_delay=7, quit_on_empty_queue=False, job_source_iterator=default_iterator, - heartbeat_func=default_heartbeat, task_func=default_task_func, ): """ @@ -88,14 +75,12 @@ def __init__( instantiated with a config object can be iterated. The iterator must yield a tuple consisting of a function's tuple of args and, optionally, a mapping of kwargs. Ex: (('a', 17), {'x': 23}) - :arg heartbeat_func: a function to run every second :arg task_func: a function that will accept the args and kwargs yielded by the job_source_iterator """ self.idle_delay = idle_delay self.quit_on_empty_queue = quit_on_empty_queue self.job_source_iterator = job_source_iterator - self.heartbeat_func = heartbeat_func self.task_func = task_func self._pid = os.getpid() @@ -109,7 +94,7 @@ def _get_iterator(self): job_source_iterator can be one of a few things: * a class that can be instantiated and iterated over - * a function that returns an interator + * a function that returns an iterator * an actual iterator/generator * an iterable collection @@ -124,7 +109,7 @@ def _get_iterator(self): def _responsive_sleep(self, seconds, wait_log_interval=0, wait_reason=""): """Responsive sleep that checks for quit flag - When there is litte work to do, the queuing thread sleeps a lot. It can't sleep + When there is little work to do, the queuing thread sleeps a lot. It can't sleep for too long without checking for the quit flag and/or logging about why it is sleeping. @@ -132,7 +117,7 @@ def _responsive_sleep(self, seconds, wait_log_interval=0, wait_reason=""): :arg wait_log_interval: while sleeping, it is helpful if the thread periodically announces itself so that we know that it is still alive. This number is the time in seconds between log entries. - :arg wait_reason: the is for the explaination of why the thread is + :arg wait_reason: the is for the explanation of why the thread is sleeping. This is likely to be a message like: 'there is no work to do'. This was also partially motivated by old versions' of Python inability to @@ -146,14 +131,10 @@ def _responsive_sleep(self, seconds, wait_log_interval=0, wait_reason=""): def blocking_start(self): """This function starts the task manager running to do tasks.""" - next_heartbeat = time.time() + HEARTBEAT_INTERVAL self.logger.debug("threadless start") try: # May never exhaust for job_params in self._get_iterator(): - if time.time() > next_heartbeat: - self.heartbeat_func() - next_heartbeat = time.time() + HEARTBEAT_INTERVAL self.logger.debug("received %r", job_params) if job_params is None: if self.quit_on_empty_queue: diff --git a/socorro/lib/threaded_task_manager.py b/socorro/lib/threaded_task_manager.py index 2b4ea90202..092a877982 100644 --- a/socorro/lib/threaded_task_manager.py +++ b/socorro/lib/threaded_task_manager.py @@ -17,7 +17,6 @@ import time from socorro.lib.task_manager import ( - default_heartbeat, default_iterator, default_task_func, TaskManager, @@ -26,8 +25,6 @@ STOP_TOKEN = (None, None) -HEARTBEAT_INTERVAL = 60 - class ThreadedTaskManager(TaskManager): """Threaded task manager.""" @@ -39,7 +36,6 @@ def __init__( number_of_threads=4, maximum_queue_size=8, job_source_iterator=default_iterator, - heartbeat_func=default_heartbeat, task_func=default_task_func, ): """ @@ -54,7 +50,6 @@ def __init__( instantiated with a config object can be iterated. The iterator must yield a tuple consisting of a function's tuple of args and, optionally, a mapping of kwargs. Ex: (('a', 17), {'x': 23}) - :arg heartbeat_func: a function to run every second :arg task_func: a function that will accept the args and kwargs yielded by the job_source_iterator """ @@ -71,7 +66,6 @@ def __init__( idle_delay=idle_delay, quit_on_empty_queue=quit_on_empty_queue, job_source_iterator=job_source_iterator, - heartbeat_func=heartbeat_func, task_func=task_func, ) self.thread_list = [] # the thread object storage @@ -107,12 +101,8 @@ def wait_for_completion(self): if self.queueing_thread is None: return - next_heartbeat = time.time() + HEARTBEAT_INTERVAL self.logger.debug("waiting to join queueing_thread") while True: - if time.time() > next_heartbeat: - self.heartbeat_func() - next_heartbeat = time.time() + HEARTBEAT_INTERVAL try: self.queueing_thread.join(1.0) if not self.queueing_thread.is_alive(): @@ -149,7 +139,7 @@ def wait_for_empty_queue(self, wait_log_interval=0, wait_reason=""): :arg wait_log_interval: While sleeping, it is helpful if the thread periodically announces itself so that we know that it is still alive. This number is the time in seconds between log entries. - :arg wait_reason: The is for the explaination of why the thread is sleeping. + :arg wait_reason: The is for the explanation of why the thread is sleeping. This is likely to be a message like: 'there is no work to do'. """ diff --git a/socorro/processor/processor_app.py b/socorro/processor/processor_app.py index 8444e03ac4..b22ccf445d 100755 --- a/socorro/processor/processor_app.py +++ b/socorro/processor/processor_app.py @@ -27,7 +27,6 @@ from fillmore.libsentry import set_up_sentry from fillmore.scrubber import Scrubber, SCRUB_RULES_DEFAULT -import psutil import sentry_sdk from sentry_sdk.integrations.atexit import AtexitIntegration from sentry_sdk.integrations.dedupe import DedupeIntegration @@ -270,7 +269,6 @@ def _set_up_task_manager(self): manager_settings.update( { "job_source_iterator": self.source_iterator, - "heartbeat_func": self.heartbeat, "task_func": self.transform, } ) @@ -278,65 +276,6 @@ def _set_up_task_manager(self): class_path=manager_class, kwargs=manager_settings ) - def heartbeat(self): - """Runs once a second from the main thread. - - Note: If this raises an exception, it could kill the process or put it in a - weird state. - - """ - try: - processes_by_type = {} - processes_by_status = {} - open_files = 0 - for proc in psutil.process_iter(["cmdline", "status", "open_files"]): - try: - # NOTE(willkg): This is all intertwined with exactly how we run the - # processor in a Docker container. If we ever make changes to that, this - # will change, too. However, even if we never update this, seeing - # "zombie" and "orphaned" as process statuses or seeing lots of - # processes as a type will be really fishy and suggestive that evil is a - # foot. - cmdline = proc.cmdline() or ["unknown"] - - if cmdline[0] in ["/bin/sh", "/bin/bash"]: - proc_type = "shell" - elif cmdline[0] in ["python", "/usr/local/bin/python"]: - proc_type = "python" - elif "stackwalk" in cmdline[0]: - proc_type = "stackwalker" - else: - proc_type = "other" - - open_files_count = len(proc.open_files()) - proc_status = proc.status() - - except psutil.Error: - # For any psutil error, we want to track that we saw a process, but - # the details don't matter - proc_type = "unknown" - proc_status = "unknown" - open_files_count = 0 - - processes_by_type[proc_type] = processes_by_type.get(proc_type, 0) + 1 - processes_by_status[proc_status] = ( - processes_by_status.get(proc_status, 0) + 1 - ) - open_files += open_files_count - - METRICS.gauge("processor.open_files", open_files) - for proc_type, val in processes_by_type.items(): - METRICS.gauge( - "processor.processes_by_type", val, tags=[f"proctype:{proc_type}"] - ) - for status, val in processes_by_status.items(): - METRICS.gauge( - "processor.processes_by_status", val, tags=[f"procstatus:{status}"] - ) - - except Exception as exc: - sentry_sdk.capture_exception(exc) - def close(self): """Clean up the processor on shutdown.""" with suppress(AttributeError): diff --git a/socorro/statsd_metrics.yaml b/socorro/statsd_metrics.yaml index f7418c162c..2da71ea327 100644 --- a/socorro/statsd_metrics.yaml +++ b/socorro/statsd_metrics.yaml @@ -144,30 +144,6 @@ socorro.processor.minidumpstackwalk.run: * ``outcome``: either ``success`` or ``fail`` * ``exitcode``: the exit code of the minidump stackwalk process -socorro.processor.open_files: - type: "gauge" - description: | - Gauge of currently open files for all processes running in the container. - -socorro.processor.processes_by_type: - type: "gauge" - description: | - Gauge of processes by type. - - Tags: - - * ``proctype``: one of ``shell``, ``python``, ``stackwalker``, or ``other`` - -socorro.processor.processes_by_status: - type: "gauge" - description: | - Gauge of processes by process status. - - Tags: - - * ``procstatus``: one of ``running``, ``sleeping``, or other process - statuses. - socorro.processor.process_crash: type: "timing" description: | diff --git a/socorro/tests/processor/test_processor_app.py b/socorro/tests/processor/test_processor_app.py index 53e18296c1..d5f3720023 100644 --- a/socorro/tests/processor/test_processor_app.py +++ b/socorro/tests/processor/test_processor_app.py @@ -66,21 +66,6 @@ def test_source_iterator(self, processor_settings): assert next(queue) is None assert next(queue) == ((3,), {}) - def test_heartbeat(self, sentry_helper): - """Basic test to make sure it runs, captures metrics, and doesn't error out""" - with sentry_helper.reuse() as sentry_client: - with MetricsMock() as metricsmock: - app = ProcessorApp() - app.heartbeat() - - # Assert it emitted some metrics - metricsmock.assert_gauge("socorro.processor.open_files") - metricsmock.assert_gauge("socorro.processor.processes_by_type") - metricsmock.assert_gauge("socorro.processor.processes_by_status") - - # Assert it didn't throw an exception - assert len(sentry_client.envelopes) == 0 - def test_transform_success(self, processor_settings): app = ProcessorApp() app._set_up_source_and_destination()