Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] LambdaOp doesnt work with multi-GPU cluster, multiple output workers #1626

Open
bschifferer opened this issue Jul 20, 2022 · 3 comments
Labels
bug Something isn't working P1

Comments

@bschifferer
Copy link
Contributor

Describe the bug
I run a multi=GPU NVTabular workflow with LambdaOps. It executes the fit functionality, but when the pipeline does tramsform and to parquet, I get following error. I I run on single GPU, it does work.

Failed to transform operator <nvtabular.ops.lambdaop.LambdaOp object at 0x7fbb4a93f2e0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/nvtabular/workflow/workflow.py", line 539, in _transform_partition
    f"Dtype discrepancy detected for column {col_name}: "
  File "/usr/local/lib/python3.8/dist-packages/nvtabular/ops/lambdaop.py", line 103, in label
    source = getsourcelines(self.f)[0][0]
  File "/usr/lib/python3.8/inspect.py", line 979, in getsourcelines
    lines, lnum = findsource(object)
  File "/usr/lib/python3.8/inspect.py", line 798, in findsource
    raise OSError('could not get source code')
OSError: could not get source code
2022-07-20 10:17:49,872 - distributed.worker - WARNING - Compute Failed
Key:       ('write-processed-2c66d3a6a6f1b9c54db536c61f1e311e-partition2c66d3a6a6f1b9c54db536c61f1e311e', "('part_50.parquet', 'part_51.parquet', 'part_52.parquet', 'part_53.parquet', 'part_54.parquet', 'part_55.parquet', 'part_56.parquet', 'part_57.parquet', 'part_58.parquet', 'part_59.parquet')")
Function:  _write_subgraph
args:      (<merlin.io.dask.DaskSubgraph object at 0x7fbb4ab4d550>, ('part_50.parquet', 'part_51.parquet', 'part_52.parquet', 'part_53.parquet', 'part_54.parquet', 'part_55.parquet', 'part_56.parquet', 'part_57.parquet', 'part_58.parquet', 'part_59.parquet'), '/raid/moj_feed_data_v1_sample1_parquet_test/', None, <fsspec.implementations.local.LocalFileSystem object at 0x7fbb4aaee2b0>, [], [], [], 'parquet', 0, False, '')
kwargs:    {}
Exception: "OSError('could not get source code')"

Steps/Code to reproduce bug
I use the criteo dask cluster initalization

# Dask dashboard
dashboard_port = "8787"
dask_workdir = '/raid/dask/'

# Deploy a Single-Machine Multi-GPU Cluster
protocol = "tcp"  # "tcp" or "ucx"
if numba.cuda.is_available():
    NUM_GPUS = list(range(len(numba.cuda.gpus)))
else:
    NUM_GPUS = []
visible_devices = ",".join([str(n) for n in NUM_GPUS])  # Delect devices to place workers
device_limit_frac = 0.7  # Spill GPU-Worker memory to host at this limit.
device_pool_frac = 0.8
part_mem_frac = 0.15

# Use total device size to calculate args.device_limit_frac
device_size = device_mem_size(kind="total")
device_limit = int(device_limit_frac * device_size)
device_pool_size = int(device_pool_frac * device_size)
part_size = int(part_mem_frac * device_size)

# Check if any device memory is already occupied
for dev in visible_devices.split(","):
    fmem = pynvml_mem_size(kind="free", index=int(dev))
    used = (device_size - fmem) / 1e9
    if used > 1.0:
        warnings.warn(f"BEWARE - {used} GB is already occupied on device {int(dev)}!")

cluster = None  # (Optional) Specify existing scheduler port
if cluster is None:
    cluster = LocalCUDACluster(
        protocol=protocol,
        n_workers=len(visible_devices.split(",")),
        CUDA_VISIBLE_DEVICES=visible_devices,
        device_memory_limit=device_limit,
        local_directory=dask_workdir,
        dashboard_address=":" + dashboard_port,
        rmm_pool_size=(device_pool_size // 256) * 256
    )

# Create the distributed client
client = Client(cluster)
client

E.g.

col_cat_int8 = col_cat_int8 >> nvt.ops.Categorify() >> nvt.ops.LambdaOp(lambda x: x.astype('int8'))
workflow.transform(dataset).to_parquet(output_path, out_files_per_proc=10)
@bschifferer bschifferer added bug Something isn't working P1 labels Jul 20, 2022
@karlhigley
Copy link
Contributor

The only thing I know that serialized Python lambdas is cloudpickle, so I guess we could serialize the Workflow to cloudpickle and then deserialize it on the workers? Not too sure how to do that, but it's the only approach I can think of.

@benfred
Copy link
Member

benfred commented Jul 23, 2022

@bschifferer, as a short-term workaround - can you try this instead to explicitly set the dtype in the LambdaOp?

col_cat_int8 = col_cat_int8 >> nvt.ops.Categorify() >> nvt.ops.LambdaOp(lambda x: x.astype('int8'), dtype="int8")
workflow.transform(dataset).to_parquet(output_path, out_files_per_proc=10)

This should at least cause your workflow to execute on multiple workers.

I think the root cause of the issue here is that the 'capture_dtypes' functionality doesn't work on a distributed cluster environment - since the dtypes are only captured here

if capture_dtypes:
node.output_schema.column_schemas[col_name] = output_df_schema
on the worker process, and the dtypes for each node in the graph aren't communicated back to the actual workflow.

The LambdaOp issue is a red-herring - we're already using cloudpickle under the hood (either to save the workflow itself, or distribute work with dask https://distributed.dask.org/en/stable/serialization.html#defaults . The issue was in raising the exception, called the LambdaOp.label functionality which failed =(. I've fixed the LambdaOp.label call here #1634 and added a basic test using LambdaOp's with dask there too

@bschifferer
Copy link
Contributor Author

Yes that worked around works for me - thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P1
Projects
None yet
Development

No branches or pull requests

3 participants