Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [c++/python] Querying DataFrame with pyarrow array coords in multiprocessing returns invalid result #1456

Closed
atolopko-czi opened this issue Jun 6, 2023 · 4 comments · Fixed by #1573
Assignees

Comments

@atolopko-czi
Copy link
Member

atolopko-czi commented Jun 6, 2023

Describe the bug

If the DataFrame.read() method is called in multiple child processes, where the coords arg for each child process is a disjoint slice of a pyarrow.Array object, then the result of the read() in each child process is the same, returning the first process's slice of the array in all cases. This does not occur if the slice is first converted to a Python list.

To Reproduce

import multiprocessing as mp
import os
from concurrent.futures import ProcessPoolExecutor
from typing import Union, List

import pyarrow
import tiledbsoma
from tiledbsoma import Experiment

import cellxgene_census


def read_obs(ids: Union[List[int], pyarrow.Array], test_case_name: str):
    log_prefix = f"[{test_case_name} {os.getpid()}]"

    with cellxgene_census.open_soma() as census:
        exp: Experiment = census['census_data']['homo_sapiens']
        obs_ids_out = exp.obs.read(column_names=['soma_joinid'],
                                   coords=[ids]).concat().to_pandas()['soma_joinid'].to_list()

        obs_ids_in = ids.tolist() if isinstance(ids, pyarrow.Array) else ids
        is_valid = set(obs_ids_out) == set(obs_ids_in)
        print(f"{log_prefix} {'VALID' if is_valid else 'INVALID'} result for test case '{test_case_name}'")
        print(f"{log_prefix} {obs_ids_in=}")
        print(f"{log_prefix} {obs_ids_out=}")


def run_mp(id_slices: List[Union[List[int], pyarrow.Array]], test_case_name: str):
    with ProcessPoolExecutor() as executor:
        for id_slice in id_slices:
            executor.submit(read_obs, id_slice, test_case_name)


if __name__ == '__main__':
    tiledbsoma.show_package_versions()

    num_processes = 3
    num_ids_per_process = 10
    all_ids = range(num_processes * num_ids_per_process)

    mp.set_start_method("spawn")

    all_ids_pyarrow = pyarrow.array(all_ids)
    slices = [all_ids_pyarrow[i * num_ids_per_process:i * num_ids_per_process + num_ids_per_process]
              for i in range(num_processes)]
    run_mp(slices, "pyarrow")

    all_ids_list = list(all_ids)
    slices = [all_ids_list[i * num_ids_per_process:i * num_ids_per_process + num_ids_per_process]
              for i in range(num_processes)]
    run_mp(slices, "list")

Output:

tiledbsoma.__version__        1.2.4
TileDB-Py tiledb.version()    (0, 21, 4)
TileDB core version           2.15.3
libtiledbsoma version()       libtiledb=2.15.2
python version                3.10.11.final.0
OS version                    Darwin 22.5.0
The "stable" release is currently 2023-05-15. Specify 'census_version="2023-05-15"' in future calls to open_soma() to ensure data consistency.
The "stable" release is currently 2023-05-15. Specify 'census_version="2023-05-15"' in future calls to open_soma() to ensure data consistency.
The "stable" release is currently 2023-05-15. Specify 'census_version="2023-05-15"' in future calls to open_soma() to ensure data consistency.
[pyarrow 69004] INVALID result for test case 'pyarrow'
[pyarrow 69004] obs_ids_in=[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[pyarrow 69004] obs_ids_out=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[pyarrow 69006] INVALID result for test case 'pyarrow'
[pyarrow 69006] obs_ids_in=[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[pyarrow 69006] obs_ids_out=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[pyarrow 69005] VALID result for test case 'pyarrow'
[pyarrow 69005] obs_ids_in=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[pyarrow 69005] obs_ids_out=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
The "stable" release is currently 2023-05-15. Specify 'census_version="2023-05-15"' in future calls to open_soma() to ensure data consistency.
The "stable" release is currently 2023-05-15. Specify 'census_version="2023-05-15"' in future calls to open_soma() to ensure data consistency.
The "stable" release is currently 2023-05-15. Specify 'census_version="2023-05-15"' in future calls to open_soma() to ensure data consistency.
[list 69022] VALID result for test case 'list'
[list 69022] obs_ids_in=[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[list 69022] obs_ids_out=[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[list 69023] VALID result for test case 'list'
[list 69023] obs_ids_in=[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[list 69023] obs_ids_out=[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[list 69021] VALID result for test case 'list'
[list 69021] obs_ids_in=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[list 69021] obs_ids_out=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Process finished with exit code 0

Versions (please complete the following information):

tiledbsoma.__version__        1.2.4
TileDB-Py tiledb.version()    (0, 21, 4)
TileDB core version           2.15.3
libtiledbsoma version()       libtiledb=2.15.2
python version                3.10.11.final.0
OS version                    Darwin 22.5.0

Additional context

Issue was originally encountered in CELLxGENE Census API PyTorch feature: chanzuckerberg/cellxgene-census#516. See PR discussion).

@johnkerl
Copy link
Member

johnkerl commented Jun 6, 2023

@atolopko-czi assigning to you for now pending the initial repro/etc writeup (we can re-assign from there)

@johnkerl johnkerl changed the title [Python] Querying DataFrame with pyarrow array coords in multiprocessing returns invalid result [python] Querying DataFrame with pyarrow array coords in multiprocessing returns invalid result Jun 8, 2023
@atolopko-czi
Copy link
Member Author

Still occurs even if we use a single child process in a serial manner with:

def run_mp(id_slices: List[Union[List[int], pyarrow.Array]], test_case_name: str):
    with ProcessPoolExecutor(max_workers=1) as executor:
        for id_slice in id_slices:
            executor.submit(read_obs, id_slice, test_case_name).result()

@thetorpedodog
Copy link
Contributor

wondering if there’s something in a C++ layer that is getting an arrow array but is forgetting to look at the offset within the array

@johnkerl johnkerl assigned nguyenv and unassigned atolopko-czi Jun 21, 2023
@johnkerl johnkerl changed the title [python] Querying DataFrame with pyarrow array coords in multiprocessing returns invalid result [c++/python] Querying DataFrame with pyarrow array coords in multiprocessing returns invalid result Jun 21, 2023
@atolopko-czi atolopko-czi changed the title [c++/python] Querying DataFrame with pyarrow array coords in multiprocessing returns invalid result [Bug] [c++/python] Querying DataFrame with pyarrow array coords in multiprocessing returns invalid result Jul 5, 2023
@johnkerl
Copy link
Member

johnkerl commented Jul 5, 2023

Hi @nguyenv -- re sequencing -- for this coming sprint #866 is of course priority 1 -- @atolopko-czi and I believe that this issue would be priority 2, and #1256 #1257 et al. ("Create ...") thereafter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants