Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regression in Python multiprocessing support #2744

Closed
Tom-Newton opened this issue Aug 7, 2024 · 9 comments · Fixed by #2765
Closed

Regression in Python multiprocessing support #2744

Tom-Newton opened this issue Aug 7, 2024 · 9 comments · Fixed by #2765
Labels
binding/python Issues for the Python package bug Something isn't working

Comments

@Tom-Newton
Copy link
Contributor

Tom-Newton commented Aug 7, 2024

Environment

Delta-rs version: 0.18.2
Binding: Python
Environment:

  • Cloud provider: Impacts local filesystem
  • OS: Ubuntu 20.04
  • Other: Python 3.10

Bug

What happened: Just upgraded deltalake for the first time in a while and we are now getting a deadlock when initialising a deltalake.DeltaTable object when using Python multiprocessing. Sometimes it just hangs and sometimes the OS kills it.

thread 'tokio-runtime-worker' panicked at library/std/src/sys/pal/unix/thread.rs:274:13:
failed to join thread: Resource deadlock avoided (os error 35)

full_log.txt

What you expected to happen: DeltaTable should initialise successfully regardless of multiprocessing.

How to reproduce it:

from multiprocessing import Pool, Process
from deltalake import DeltaTable


def load():
    DeltaTable("crates/test/tests/data/simple_table_with_checkpoint")


def main():
    print("start normal")
    load()
    print("Start multiprossed")
    p = Process(target=load)
    p.start()
    p.join()
    print("DONE")


if __name__ == "__main__":
    main()

Note, to reproduce we must use deltalake.DeltaTable once then use it again in a child python process using multiprocessing.

More details: I have done a git bisect and identified that #2424 is the cause.

@Tom-Newton Tom-Newton added the bug Something isn't working label Aug 7, 2024
@rtyler rtyler added the binding/python Issues for the Python package label Aug 8, 2024
@ion-elgreco
Copy link
Collaborator

@wjones127 do you have any idea's why this could happen? Oncelock should be thread safe

@wjones127
Copy link
Collaborator

Are you using fork or spawn? IIRC different platforms have different defaults. tokio doesn't support fork.

@Tom-Newton
Copy link
Contributor Author

Good question. We are using fork since that is the default on linux. multiprocessing.set_start_method("spawn") does indeed avoid the problem. I'll try to find out if this will be a viable solution for us.

@Tom-Newton
Copy link
Contributor Author

Tom-Newton commented Aug 9, 2024

I would definitely feel more comfortable if the default behaviour (fork) worked with deltalake. I don't have much control over how people use this.

tokio doesn't support fork

I think this can be worked around and previously was worked around by creating new runtimes in new processes. What do you think of this monstrosity? It would allow plenty of forking to work.... 😅

const NUMBER_OF_RUNTIME_SLOTS: usize = 10;
const PID_LOCK: OnceLock<u32> = OnceLock::new();
const RUNTIME_LOCK: OnceLock<Runtime> = OnceLock::new();

#[inline]
pub fn rt() -> &'static Runtime {
    static RUNTIMES: [OnceLock<Runtime>; NUMBER_OF_RUNTIME_SLOTS] =
        [RUNTIME_LOCK; NUMBER_OF_RUNTIME_SLOTS];
    static PIDS: [OnceLock<u32>; NUMBER_OF_RUNTIME_SLOTS] = [PID_LOCK; NUMBER_OF_RUNTIME_SLOTS];

    let current_pid = process::id();
    let (found_pid, idx) = PIDS
        .iter()
        .enumerate()
        .find_map(|(i, lock)| match lock.get() {
            Some(pid) if pid == &current_pid => Some((true, i)), // Found the current PID
            Some(_pid) => None,                                  // Found a different PID, keep searching
            None => Some((false, i)),                            // Found an empty slot
        })
        .expect("No available slot for tokio runtime. The process was forked too many times");

    match found_pid {
        true => RUNTIMES[idx].get().expect("Failed to get tokio runtime"),
        false => {
            PIDS[idx]
                .set(current_pid)
                .expect("Failed to record PID for new tokio runtime.");
            RUNTIMES[idx].get_or_init(|| Runtime::new().expect("Failed to create a tokio runtime."))
        }
    }
}

@wjones127
Copy link
Collaborator

wjones127 commented Aug 9, 2024

If that fixes it, I think that would be very cool. I think for simplicity you could probably just make it a HashMap<u32, Runtime>, although TBH I'm not sure I know how fork() works enough in detail to know if that's the best way. One concern I have is whether it will garbage collect properly. Will forking recursively cause a huge number of threads to be spawned? (I guess having a fixed buffer is part of what addressed that?)

@Tom-Newton
Copy link
Contributor Author

Tom-Newton commented Aug 10, 2024

I don't really understand why, but my attempt above still deadlocks after a couple of layers of recursive forking. For my current test I think it should be effectively the same as creating a new runtime on every call, but it seems there is something
more subtle going on.

From reading tokio-rs/tokio#4301 it sounds like there is probably no solution that is completely safe with fork and probably there are some subtleties that causes my attempt above to break. I'm starting to think the best solution might be to just fail fast if there is a fork. My main concern currently is difficult to debug deadlocks, so if users get an error saying to use spawn or forkserver instead of forkthat probably solves the problem for me.

@Tom-Newton
Copy link
Contributor Author

I implemented #2765 as my proposed solution.

@wjones127
Copy link
Collaborator

Thanks for trying @Tom-Newton. 👍

@Tom-Newton
Copy link
Contributor Author

Thanks for the help and the reviews everyone 🙂.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants