Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: catch all exceptions in forked process #921

Merged
merged 1 commit into from
Jul 24, 2024

Conversation

jjaakola-aiven
Copy link
Contributor

@jjaakola-aiven jjaakola-aiven commented Jul 24, 2024

If base exception is raised the communication queue will block in the calling process and will cause timeouts. This is prevalent in unit test for protobuf serialization in Github Actions, caused by the protobuf compiler work directory existence. The fix is to use pathlib.Path with exists=True and parents=True for removing the FileExistsError and also changing the error handling logic to pass back also BaseExceptions through the multiprocess queue.

About this change - What it does

References: #xxxxx

Why this way

Copy link

github-actions bot commented Jul 24, 2024

Coverage report

Click to see where and how coverage changed

FileStatementsMissingCoverageCoverage
(new stmts)
Lines missing
  karapace
  utils.py
  karapace/kafka
  common.py
  karapace/protobuf
  field_element.py
  io.py
  proto_file_element.py
  proto_normalizations.py
  schema.py
Project Total  

This report was generated by python-coverage-comment-action

@jjaakola-aiven jjaakola-aiven force-pushed the jjaakola-aiven-catch-all-exceptions-in-forked-process branch 3 times, most recently from b00cc77 to 56c4f9e Compare July 24, 2024 12:05
@jjaakola-aiven jjaakola-aiven marked this pull request as ready for review July 24, 2024 12:33
@jjaakola-aiven jjaakola-aiven requested review from a team as code owners July 24, 2024 12:33
Copy link
Contributor

@eliax1996 eliax1996 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything fine except for the queue size, not sure if the change for setting the max_size to 1 can decrease the performances

Comment on lines +137 to +138
# todo: This will leave residues on sys.path in case of exceptions. If really must
# mutate sys.path, we should at least wrap in try-finally.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cannot we do as final action a tree delete marking the protobuf folder as the father folder to delete? In that way (unless the path of the dependencies contain ../../name we can delete everything eventually)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you didn't introduce the comment but can be solved in that way

Copy link
Contributor Author

@jjaakola-aiven jjaakola-aiven Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the protobuf files are ok to keep, if the one exists it will be reused and protoc is not called.
The mangling of sys.path and the lines below should be wrapped in try-except-else-finally block to remove the added path if something fails. But not removed if it already was there.

@@ -233,34 +237,37 @@ def __init__(
def read(self, bio: BytesIO) -> dict:
if self._reader_schema is None:
self._reader_schema = self._writer_schema
return reader_mp(self.config, self._writer_schema, self._reader_schema, bio)
return read_in_forked_process_multiprocess_process(self.config, self._writer_schema, self._reader_schema, bio)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good rename!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could remove the extra _process_ though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -168,25 +168,25 @@ def read_data(
return class_instance


_ReaderQueue: TypeAlias = "Queue[dict[object, object] | Exception]"
_ReaderQueue: TypeAlias = "Queue[dict[object, object] | BaseException]"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

@@ -233,34 +237,37 @@ def __init__(
def read(self, bio: BytesIO) -> dict:
if self._reader_schema is None:
self._reader_schema = self._writer_schema
return reader_mp(self.config, self._writer_schema, self._reader_schema, bio)
return read_in_forked_process_multiprocess_process(self.config, self._writer_schema, self._reader_schema, bio)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good rename!

@@ -274,14 +281,18 @@ def writer_mp(
# To avoid problem with enum values for basic SerDe support we
# will isolate work with call protobuf libraries in child process.
if __name__ == "karapace.protobuf.io":
queue: _WriterQueue = Queue()
p = Process(target=writer_process, args=(queue, config, writer_schema, message_name, datum))
writer_queue: _WriterQueue = Queue(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why only 1 message in the queue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to proceed with just 1 at a time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is nothing else put to queue than one message at a time which the caller waits for. I decided just to make the size explicit here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't notice wasn't reused and we have 1 process for each message

@jjaakola-aiven jjaakola-aiven force-pushed the jjaakola-aiven-catch-all-exceptions-in-forked-process branch from 56c4f9e to c945648 Compare July 24, 2024 13:18
If base exception is raised the communication queue will block
in the calling process and will cause timeouts. This is prevalent
in unit test for protobuf serialization in Github Actions, caused
by the protobuf compiler work directory existence.
The fix is to use pathlib.Path with exists=True and parents=True
for removing the FileExistsError and also changing the error handling
logic to pass back also BaseExceptions through the multiprocess queue.
@eliax1996 eliax1996 force-pushed the jjaakola-aiven-catch-all-exceptions-in-forked-process branch from c945648 to 8f0a350 Compare July 24, 2024 13:38
@keejon keejon enabled auto-merge July 24, 2024 13:45
@keejon keejon merged commit 8afd513 into main Jul 24, 2024
9 checks passed
@keejon keejon deleted the jjaakola-aiven-catch-all-exceptions-in-forked-process branch July 24, 2024 14:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants