Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce RabbitMQ prefetch_count for mula and more AMQP exception handling #1332

Merged
merged 14 commits into from
Jul 11, 2023

Conversation

Donnype
Copy link
Contributor

@Donnype Donnype commented Jul 4, 2023

Changes

First check the Rabbitmq connection in Bytes before creating a new channel and set scheduler prefetch_count to 100 as a default. It is hard to estimate a sane default for this value as these issue occur sporadically, but it mostly depends on the throughput and amount of consumers. For the raw file events this will probably not be very high for most installs, but I find it hard to put a number to the scan profile mutation queue throughput (@jpbruinsslot ?).

@Darwinkel What was the consensus regarding putting default env vars in end-dist or env-default?

Issue link

Closes #1320

Proof

Here's a large list of dispatched tasks that turns into succeeded:
image

image


Code Checklist

  • All the commits in this PR are properly PGP-signed and verified;
  • This PR only contains functionality relevant to the issue; tickets have been created for newly discovered issues.
  • I have written unit tests for the changes or fixes I made.
  • For any non-trivial functionality, I have added integration and/or end-to-end tests.
  • I have performed a self-review of my code and refactored it to the best of my abilities.

Communication

  • I have informed others of any required .env changes files if required and changed the .env-dist accordingly.
  • I have made corresponding changes to the documentation, if necessary.

Checklist for code reviewers:

Copy-paste the checklist from the docs/source/templates folder into your comment.


Checklist for QA:

Copy-paste the checklist from the docs/source/templates folder into your comment.

@Donnype Donnype requested a review from a team as a code owner July 4, 2023 12:59
@Donnype Donnype changed the title Fix/bytes rabbimq channels are closed sometimes Reduce RabbitMQ prefetch_count for mula and also check connection on closed channel Jul 4, 2023
@Darwinkel
Copy link
Contributor

@Donnype there is no consensus yet. Discussion is ongoing here #1314 and here #1299.

@dekkers
Copy link
Contributor

dekkers commented Jul 4, 2023

My guess is that prefetch of 100 should be more than enough, because as far as I understand it, it would mean that the scheduler would be processing 100 messages at the same time before hitting that limit.

@Donnype
Copy link
Contributor Author

Donnype commented Jul 5, 2023

@dekkers My understanding is that it is the size of the message buffer on the client side. The longer the handling of messages takes and the more consumers we have, the lower it could/should be.

@underdarknl underdarknl mentioned this pull request Jul 5, 2023
@Donnype
Copy link
Contributor Author

Donnype commented Jul 7, 2023

I added the following to the ticket:

EDIT:

  • Looking at the discussion above again, I am not sure if it is the prefetch_count per se anymore.. I found this comment on another thread as well that shows roughly the issue we have: channels (connections?) closing after approximately 20 minutes. However, the current PR also takes better care of that.

Perhaps adding the prefetch count wouldn't hurt though since it is a tuning parameter to the system. We could also consider adding a rabbitmq config file as well to read from at one point.

@Darwinkel
Copy link
Contributor

Checklist for QA:

  • I have checked out this branch, and successfully ran a fresh make reset.
  • I confirmed that there are no unintended functional regressions in this branch:
    • I have managed to pass the onboarding flow
    • Objects and Findings are created properly
    • Tasks are created and completed properly
  • I confirmed that the PR's advertised feature or hotfix works as intended.

What works:

  • No more closed channel errors; KAT remains operational even after ~30 minutes

What doesn't work:

  • Sporadically, a Boefje fails with the following errors in Bytes:
[2023-07-10 07:59:41 +0000] [9] [INFO] [sql_meta_repository] Added boefje meta [id=0622831f962543a58298f6f3e187d405]
2023-07-10T07:59:41.892076762Z [2023-07-10 07:59:41 +0000] [9] [INFO] [h11_impl] 172.18.0.10:60030 - "POST /bytes/boefje_meta HTTP/1.1" 201
2023-07-10T07:59:41.939506870Z [2023-07-10 07:59:41 +0000] [9] [INFO] [sql_meta_repository] Added hash sha512:26cbf13f220f41aa835645d9c4a432c83776ac0d0939d218dba7a7364d28b657d81ca6b07e7cffc342fc2d5fa71de1b3d8e111df9c72218ecde5429c93cf37a7 and link 94bb73a9-0ce2-4cb4-898e-fcf117e438ad to data
2023-07-10T07:59:41.939709007Z [2023-07-10 07:59:41 +0000] [9] [INFO] [file_raw_repository] Writing raw data with id 3de0eb50-dea4-4cc3-8bc6-40d92bed6bf5 to disk
2023-07-10T07:59:41.939822906Z [2023-07-10 07:59:41 +0000] [9] [INFO] [sql_meta_repository] Added raw data [id=3de0eb50-dea4-4cc3-8bc6-40d92bed6bf5]
2023-07-10T07:59:41.943308734Z [2023-07-10 07:59:41 +0000] [9] [ERROR] [io_services_utils] _AsyncBaseTransport._produce() failed, aborting connection: error=ConnectionResetError(104, 'Connection reset by peer'); sock=<socket.socket fd=16, family=2, type=1, proto=6, laddr=('172.18.0.4', 52050)>; Caller's stack:
2023-07-10T07:59:41.943331077Z Traceback (most recent call last):
2023-07-10T07:59:41.943333105Z   File "/usr/local/lib/python3.11/site-packages/pika/adapters/utils/io_services_utils.py", line 1103, in _on_socket_writable
2023-07-10T07:59:41.943334603Z     self._produce()
2023-07-10T07:59:41.943335751Z   File "/usr/local/lib/python3.11/site-packages/pika/adapters/utils/io_services_utils.py", line 819, in _produce
2023-07-10T07:59:41.943337016Z     num_bytes_sent = self._sigint_safe_send(self._sock,
2023-07-10T07:59:41.943338212Z                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-07-10T07:59:41.943339515Z   File "/usr/local/lib/python3.11/site-packages/pika/adapters/utils/io_services_utils.py", line 79, in retry_sigint_wrap
2023-07-10T07:59:41.943340792Z     return func(*args, **kwargs)
2023-07-10T07:59:41.943341921Z            ^^^^^^^^^^^^^^^^^^^^^
2023-07-10T07:59:41.943343025Z   File "/usr/local/lib/python3.11/site-packages/pika/adapters/utils/io_services_utils.py", line 861, in _sigint_safe_send
2023-07-10T07:59:41.943344265Z     return sock.send(data)
2023-07-10T07:59:41.943345357Z            ^^^^^^^^^^^^^^^
2023-07-10T07:59:41.943346513Z ConnectionResetError: [Errno 104] Connection reset by peer
2023-07-10T07:59:41.943347688Z Traceback (most recent call last):
2023-07-10T07:59:41.943348813Z   File "/usr/local/lib/python3.11/site-packages/pika/adapters/utils/io_services_utils.py", line 1103, in _on_socket_writable
2023-07-10T07:59:41.943350075Z     self._produce()
2023-07-10T07:59:41.943351150Z   File "/usr/local/lib/python3.11/site-packages/pika/adapters/utils/io_services_utils.py", line 819, in _produce
2023-07-10T07:59:41.943352442Z     num_bytes_sent = self._sigint_safe_send(self._sock,
2023-07-10T07:59:41.943353560Z                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-07-10T07:59:41.943354711Z   File "/usr/local/lib/python3.11/site-packages/pika/adapters/utils/io_services_utils.py", line 79, in retry_sigint_wrap
2023-07-10T07:59:41.943355939Z     return func(*args, **kwargs)
2023-07-10T07:59:41.943357034Z            ^^^^^^^^^^^^^^^^^^^^^
2023-07-10T07:59:41.943358177Z   File "/usr/local/lib/python3.11/site-packages/pika/adapters/utils/io_services_utils.py", line 861, in _sigint_safe_send
2023-07-10T07:59:41.943359473Z     return sock.send(data)
2023-07-10T07:59:41.943360614Z            ^^^^^^^^^^^^^^^
2023-07-10T07:59:41.943361848Z ConnectionResetError: [Errno 104] Connection reset by peer
2023-07-10T07:59:41.943364323Z [2023-07-10 07:59:41 +0000] [9] [INFO] [io_services_utils] _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=ConnectionResetError(104, 'Connection reset by peer'); <socket.socket fd=16, family=2, type=1, proto=6, laddr=('172.18.0.4', 52050)>
2023-07-10T07:59:41.943368641Z [2023-07-10 07:59:41 +0000] [9] [INFO] [io_services_utils] Deactivating transport: state=1; <socket.socket fd=16, family=2, type=1, proto=6, laddr=('172.18.0.4', 52050)>
2023-07-10T07:59:41.943753631Z [2023-07-10 07:59:41 +0000] [9] [ERROR] [base_connection] connection_lost: StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",)
2023-07-10T07:59:41.943788376Z [2023-07-10 07:59:41 +0000] [9] [INFO] [connection] AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",); pending-error=None
2023-07-10T07:59:41.943940822Z [2023-07-10 07:59:41 +0000] [9] [INFO] [connection] Stack terminated due to StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",)
2023-07-10T07:59:41.944275799Z [2023-07-10 07:59:41 +0000] [9] [INFO] [io_services_utils] Closing transport socket and unlinking: state=2; <socket.socket fd=16, family=2, type=1, proto=6, laddr=('172.18.0.4', 52050)>
2023-07-10T07:59:41.944507090Z [2023-07-10 07:59:41 +0000] [9] [ERROR] [blocking_connection] Unexpected connection close detected: StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",)
2023-07-10T07:59:41.945162796Z [2023-07-10 07:59:41 +0000] [9] [ERROR] [router] Error saving raw data
2023-07-10T07:59:41.945201633Z Traceback (most recent call last):
2023-07-10T07:59:41.945211978Z   File "/app/bytes/bytes/api/router.py", line 174, in create_raw
2023-07-10T07:59:41.945215338Z     event_manager.publish(event)
2023-07-10T07:59:41.945217821Z   File "/app/bytes/bytes/rabbitmq.py", line 29, in publish
2023-07-10T07:59:41.945220107Z     self.channel.queue_declare(queue_name, durable=True)
2023-07-10T07:59:41.945222272Z   File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 2524, in queue_declare
2023-07-10T07:59:41.945224621Z     self._flush_output(declare_ok_result.is_ready)
2023-07-10T07:59:41.945226602Z   File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 1353, in _flush_output
2023-07-10T07:59:41.945228975Z     self._connection._flush_output(lambda: self.is_closed, *waiters)
2023-07-10T07:59:41.945231148Z   File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output
2023-07-10T07:59:41.945233610Z     raise self._closed_result.value.error
2023-07-10T07:59:41.945236040Z pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
2023-07-10T07:59:41.945838405Z [2023-07-10 07:59:41 +0000] [9] [ERROR] [sql_meta_repository] An error occurred during the session.
2023-07-10T07:59:41.945852068Z Traceback (most recent call last):
2023-07-10T07:59:41.945853894Z   File "/app/bytes/bytes/api/router.py", line 174, in create_raw
2023-07-10T07:59:41.945855413Z     event_manager.publish(event)
2023-07-10T07:59:41.945856656Z   File "/app/bytes/bytes/rabbitmq.py", line 29, in publish
2023-07-10T07:59:41.945864086Z     self.channel.queue_declare(queue_name, durable=True)
2023-07-10T07:59:41.945865788Z   File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 2524, in queue_declare
2023-07-10T07:59:41.945868789Z     self._flush_output(declare_ok_result.is_ready)
2023-07-10T07:59:41.945888393Z   File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 1353, in _flush_output
2023-07-10T07:59:41.946093203Z     self._connection._flush_output(lambda: self.is_closed, *waiters)
2023-07-10T07:59:41.946626244Z   File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output
2023-07-10T07:59:41.946938744Z     raise self._closed_result.value.error
2023-07-10T07:59:41.947245510Z pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
2023-07-10T07:59:41.947563376Z 
2023-07-10T07:59:41.947831116Z The above exception was the direct cause of the following exception:
2023-07-10T07:59:41.948136041Z 
2023-07-10T07:59:41.948432731Z Traceback (most recent call last):
2023-07-10T07:59:41.948641095Z   File "/app/bytes/bytes/database/sql_meta_repository.py", line 224, in create_meta_data_repository
2023-07-10T07:59:41.948868900Z     yield repository
2023-07-10T07:59:41.949161956Z fastapi.exceptions.HTTPException
2023-07-10T07:59:41.949387907Z [2023-07-10 07:59:41 +0000] [9] [WARNING] [sql_meta_repository] Rolled back session.

@Donnype
Copy link
Contributor Author

Donnype commented Jul 10, 2023

@Darwinkel Possible fix pushed!

@Darwinkel
Copy link
Contributor

Nice, I can't reproduce the issue anymore which is good. Could somebody else verify?

@Donnype Donnype changed the title Reduce RabbitMQ prefetch_count for mula and also check connection on closed channel Reduce RabbitMQ prefetch_count for mula and more exception handling Jul 10, 2023
@Donnype Donnype changed the title Reduce RabbitMQ prefetch_count for mula and more exception handling Reduce RabbitMQ prefetch_count for mula and more AMQP exception handling Jul 10, 2023
@Donnype Donnype changed the title Reduce RabbitMQ prefetch_count for mula and more AMQP exception handling Reduce RabbitMQ prefetch_count for mula and more AMQP exception handling Jul 10, 2023
@jpbruinsslot
Copy link
Contributor

Nice, I can't reproduce the issue anymore which is good. Could somebody else verify?

Will try as well

@Donnype Donnype self-assigned this Jul 11, 2023
@jpbruinsslot
Copy link
Contributor

Haven't seen any errors on my part and seems to be working as intended. I'll keep it running for a while.

@underdarknl underdarknl merged commit ee43031 into main Jul 11, 2023
@underdarknl underdarknl deleted the fix/bytes-rabbimq-channels-are-closed-sometimes branch July 11, 2023 13:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

Bytes RabbitMQ channels are sometimes closed
5 participants