Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subprocess functions can't be picked when decorated for observability #590

Closed
callumforrester opened this issue Aug 8, 2024 · 1 comment · Fixed by #584
Closed

Subprocess functions can't be picked when decorated for observability #590

callumforrester opened this issue Aug 8, 2024 · 1 comment · Fixed by #584
Labels
bug Something isn't working

Comments

@callumforrester
Copy link
Contributor

In @keithralphs' attempt to add tracing in #586, errors occurred because the functions in interface.py are not loaded by the subprocess, they are pickled and sent over. When decorated for tracing they become unpicklable and cause an error. @keithralphs can fill in the details.

Possible Solution

We can pass a function name to the subprocess and make it import and load the functions internally. If I'm honest, that's how I thought multiprocessing worked in the first place...

This is also a step in the right direction for #504

Acceptance Criteria

@callumforrester callumforrester added the enhancement New feature or request label Aug 8, 2024
@callumforrester callumforrester added bug Something isn't working and removed enhancement New feature or request labels Aug 8, 2024
@keithralphs
Copy link
Contributor

As Callum say the process of pickling the method and sending to the subprocess resulted it it not being possible to correctly un-pickle the observability decorator applied to it, which was custom written to wrap the propagation of observability context from process to process. The current form of the decorator is:

def use_propagated_context(
    func: Callable[P, T],
) -> Callable[Concatenate[dict[str, Any], P], T]:
    """Retrieves the propagated context information from the carrier param which is concatenated
    onto the target function and injects that into the local observablity context"""

    @functools.wraps(func)
    def wrapper(carrier: dict[str, Any] | None, *args: P.args, **kwargs: P.kwargs) -> T:
        if carrier:
            ctx = get_global_textmap().extract(carrier)
            attach(ctx)
        return func(*args, **kwargs)

    return wrapper

and, ignoring the ParamSpec components which werer added later, it appeared that the wrapper function caused the issue, pesumably as it makes use of objects one either side of the process boundary, not available until runtime, which could therefore not be seralized in advance?? This was fixed using the method described of 'boxing' the call inside a _rpc function such that this is what get's pickled. It merely fowards the target function name and parameter list to the subprocess which then itself executes the identified function using the supplied parameters in the correct process where the target observability context is available, removing the issue: It's implementation looks like this:

def _rpc(module_name: str, function_name: str, args, **kwargs) -> T:
    mod = import_module(module_name)
    function = mod.__dict__.get(function_name)
    _validate_function(function_name, function)
    return function(*args, **kwargs)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants