Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug-1911367: Remove processor heartbeat and process metrics #6776

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 3 additions & 22 deletions socorro/lib/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

LOGGER = logging.getLogger(__name__)

HEARTBEAT_INTERVAL = 60


def default_task_func(a_param):
"""Default task function.
Expand All @@ -21,16 +19,6 @@ def default_task_func(a_param):
"""


def default_heartbeat():
"""Runs once a second from the main thread.

Note: If this raises an exception, it could kill the process or put it in a
weird state.

"""
LOGGER.info("THUMP")


def default_iterator():
"""Default iterator for tasks.

Expand Down Expand Up @@ -76,7 +64,6 @@ def __init__(
idle_delay=7,
quit_on_empty_queue=False,
job_source_iterator=default_iterator,
heartbeat_func=default_heartbeat,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Socorro still has this TaskManager and ThreadedTaskManager which is the last remnants for how socorro service processes were structured a long long time ago.

In this PR, I removed the heartbeat function from the processor app as well as the scaffolding for heartbeats for any app that uses the task managers figuring we don't need that anymore. (We can always re-add it if we do need it.)

task_func=default_task_func,
):
"""
Expand All @@ -88,14 +75,12 @@ def __init__(
instantiated with a config object can be iterated. The iterator must
yield a tuple consisting of a function's tuple of args and, optionally,
a mapping of kwargs. Ex: (('a', 17), {'x': 23})
:arg heartbeat_func: a function to run every second
:arg task_func: a function that will accept the args and kwargs yielded
by the job_source_iterator
"""
self.idle_delay = idle_delay
self.quit_on_empty_queue = quit_on_empty_queue
self.job_source_iterator = job_source_iterator
self.heartbeat_func = heartbeat_func
self.task_func = task_func

self._pid = os.getpid()
Expand All @@ -109,7 +94,7 @@ def _get_iterator(self):
job_source_iterator can be one of a few things:

* a class that can be instantiated and iterated over
* a function that returns an interator
* a function that returns an iterator
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set up pre-commit and it's now fixing typos.

* an actual iterator/generator
* an iterable collection

Expand All @@ -124,15 +109,15 @@ def _get_iterator(self):
def _responsive_sleep(self, seconds, wait_log_interval=0, wait_reason=""):
"""Responsive sleep that checks for quit flag

When there is litte work to do, the queuing thread sleeps a lot. It can't sleep
When there is little work to do, the queuing thread sleeps a lot. It can't sleep
for too long without checking for the quit flag and/or logging about why it is
sleeping.

:arg seconds: the number of seconds to sleep
:arg wait_log_interval: while sleeping, it is helpful if the thread
periodically announces itself so that we know that it is still alive.
This number is the time in seconds between log entries.
:arg wait_reason: the is for the explaination of why the thread is
:arg wait_reason: the is for the explanation of why the thread is
sleeping. This is likely to be a message like: 'there is no work to do'.

This was also partially motivated by old versions' of Python inability to
Expand All @@ -146,14 +131,10 @@ def _responsive_sleep(self, seconds, wait_log_interval=0, wait_reason=""):

def blocking_start(self):
"""This function starts the task manager running to do tasks."""
next_heartbeat = time.time() + HEARTBEAT_INTERVAL
self.logger.debug("threadless start")
try:
# May never exhaust
for job_params in self._get_iterator():
if time.time() > next_heartbeat:
self.heartbeat_func()
next_heartbeat = time.time() + HEARTBEAT_INTERVAL
self.logger.debug("received %r", job_params)
if job_params is None:
if self.quit_on_empty_queue:
Expand Down
12 changes: 1 addition & 11 deletions socorro/lib/threaded_task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import time

from socorro.lib.task_manager import (
default_heartbeat,
default_iterator,
default_task_func,
TaskManager,
Expand All @@ -26,8 +25,6 @@

STOP_TOKEN = (None, None)

HEARTBEAT_INTERVAL = 60


class ThreadedTaskManager(TaskManager):
"""Threaded task manager."""
Expand All @@ -39,7 +36,6 @@ def __init__(
number_of_threads=4,
maximum_queue_size=8,
job_source_iterator=default_iterator,
heartbeat_func=default_heartbeat,
task_func=default_task_func,
):
"""
Expand All @@ -54,7 +50,6 @@ def __init__(
instantiated with a config object can be iterated. The iterator must
yield a tuple consisting of a function's tuple of args and, optionally,
a mapping of kwargs. Ex: (('a', 17), {'x': 23})
:arg heartbeat_func: a function to run every second
:arg task_func: a function that will accept the args and kwargs yielded
by the job_source_iterator
"""
Expand All @@ -71,7 +66,6 @@ def __init__(
idle_delay=idle_delay,
quit_on_empty_queue=quit_on_empty_queue,
job_source_iterator=job_source_iterator,
heartbeat_func=heartbeat_func,
task_func=task_func,
)
self.thread_list = [] # the thread object storage
Expand Down Expand Up @@ -107,12 +101,8 @@ def wait_for_completion(self):
if self.queueing_thread is None:
return

next_heartbeat = time.time() + HEARTBEAT_INTERVAL
self.logger.debug("waiting to join queueing_thread")
while True:
if time.time() > next_heartbeat:
self.heartbeat_func()
next_heartbeat = time.time() + HEARTBEAT_INTERVAL
try:
self.queueing_thread.join(1.0)
if not self.queueing_thread.is_alive():
Expand Down Expand Up @@ -149,7 +139,7 @@ def wait_for_empty_queue(self, wait_log_interval=0, wait_reason=""):
:arg wait_log_interval: While sleeping, it is helpful if the thread periodically
announces itself so that we know that it is still alive. This number is the
time in seconds between log entries.
:arg wait_reason: The is for the explaination of why the thread is sleeping.
:arg wait_reason: The is for the explanation of why the thread is sleeping.
This is likely to be a message like: 'there is no work to do'.

"""
Expand Down
61 changes: 0 additions & 61 deletions socorro/processor/processor_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

from fillmore.libsentry import set_up_sentry
from fillmore.scrubber import Scrubber, SCRUB_RULES_DEFAULT
import psutil
import sentry_sdk
from sentry_sdk.integrations.atexit import AtexitIntegration
from sentry_sdk.integrations.dedupe import DedupeIntegration
Expand Down Expand Up @@ -270,73 +269,13 @@ def _set_up_task_manager(self):
manager_settings.update(
{
"job_source_iterator": self.source_iterator,
"heartbeat_func": self.heartbeat,
"task_func": self.transform,
}
)
self.task_manager = build_instance(
class_path=manager_class, kwargs=manager_settings
)

def heartbeat(self):
"""Runs once a second from the main thread.

Note: If this raises an exception, it could kill the process or put it in a
weird state.

"""
try:
processes_by_type = {}
processes_by_status = {}
open_files = 0
for proc in psutil.process_iter(["cmdline", "status", "open_files"]):
try:
# NOTE(willkg): This is all intertwined with exactly how we run the
# processor in a Docker container. If we ever make changes to that, this
# will change, too. However, even if we never update this, seeing
# "zombie" and "orphaned" as process statuses or seeing lots of
# processes as a type will be really fishy and suggestive that evil is a
# foot.
cmdline = proc.cmdline() or ["unknown"]

if cmdline[0] in ["/bin/sh", "/bin/bash"]:
proc_type = "shell"
elif cmdline[0] in ["python", "/usr/local/bin/python"]:
proc_type = "python"
elif "stackwalk" in cmdline[0]:
proc_type = "stackwalker"
else:
proc_type = "other"

open_files_count = len(proc.open_files())
proc_status = proc.status()

except psutil.Error:
# For any psutil error, we want to track that we saw a process, but
# the details don't matter
proc_type = "unknown"
proc_status = "unknown"
open_files_count = 0

processes_by_type[proc_type] = processes_by_type.get(proc_type, 0) + 1
processes_by_status[proc_status] = (
processes_by_status.get(proc_status, 0) + 1
)
open_files += open_files_count

METRICS.gauge("processor.open_files", open_files)
for proc_type, val in processes_by_type.items():
METRICS.gauge(
"processor.processes_by_type", val, tags=[f"proctype:{proc_type}"]
)
for status, val in processes_by_status.items():
METRICS.gauge(
"processor.processes_by_status", val, tags=[f"procstatus:{status}"]
)

except Exception as exc:
sentry_sdk.capture_exception(exc)
Copy link
Contributor Author

@willkg willkg Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buh-bye!

One nice thing about removing this code is that when you have the processor running in a local dev environment, it's constantly producing output to the console even when it's just sitting around which was kind of annoying and now it won't be anymore.


def close(self):
"""Clean up the processor on shutdown."""
with suppress(AttributeError):
Expand Down
24 changes: 0 additions & 24 deletions socorro/statsd_metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,30 +144,6 @@ socorro.processor.minidumpstackwalk.run:
* ``outcome``: either ``success`` or ``fail``
* ``exitcode``: the exit code of the minidump stackwalk process

socorro.processor.open_files:
type: "gauge"
description: |
Gauge of currently open files for all processes running in the container.

socorro.processor.processes_by_type:
type: "gauge"
description: |
Gauge of processes by type.

Tags:

* ``proctype``: one of ``shell``, ``python``, ``stackwalker``, or ``other``

socorro.processor.processes_by_status:
type: "gauge"
description: |
Gauge of processes by process status.

Tags:

* ``procstatus``: one of ``running``, ``sleeping``, or other process
statuses.

socorro.processor.process_crash:
type: "timing"
description: |
Expand Down
15 changes: 0 additions & 15 deletions socorro/tests/processor/test_processor_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,6 @@ def test_source_iterator(self, processor_settings):
assert next(queue) is None
assert next(queue) == ((3,), {})

def test_heartbeat(self, sentry_helper):
"""Basic test to make sure it runs, captures metrics, and doesn't error out"""
with sentry_helper.reuse() as sentry_client:
with MetricsMock() as metricsmock:
app = ProcessorApp()
app.heartbeat()

# Assert it emitted some metrics
metricsmock.assert_gauge("socorro.processor.open_files")
metricsmock.assert_gauge("socorro.processor.processes_by_type")
metricsmock.assert_gauge("socorro.processor.processes_by_status")

# Assert it didn't throw an exception
assert len(sentry_client.envelopes) == 0

def test_transform_success(self, processor_settings):
app = ProcessorApp()
app._set_up_source_and_destination()
Expand Down