Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Can't specify schema for write_parquet #48630

Closed
bveeramani opened this issue Nov 7, 2024 · 0 comments · Fixed by #48631
Closed

[Data] Can't specify schema for write_parquet #48630

bveeramani opened this issue Nov 7, 2024 · 0 comments · Fixed by #48631
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P1 Issue that should be fixed within a few weeks

Comments

@bveeramani
Copy link
Member

bveeramani commented Nov 7, 2024

What happened + What you expected to happen

See repro.

Versions / Dependencies

a1bb4a4

Reproduction script

    ds = ray.data.range(1)
    schema = pa.schema({"id": pa.float32()})

    ds.write_parquet(tmp_path, schema=schema)

    assert pq.read_table(tmp_path).schema == schema
2024-11-07 10:23:46,280 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-07_10-23-44_776331_71856/logs/ray-data
2024-11-07 10:23:46,280 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> TaskPoolMapOperator[Write]
Running 0: 0.00 row [00:00, ? row/s]2024-11-07 10:23:46,949     ERROR streaming_executor_state.py:479 -- An exception was raised from a task of operator "Write". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.
⚠️  Dataset execution failed: : 0.00 row [00:00, ? row/s] 
- ReadRange->SplitBlocks(2): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 8.0B object store: : 1.00 row [00:00, 1.51 row/s]
- Write: Tasks: 1 [backpressured]; Queued blocks: 1; Resources: 1.0 CPU, 256.0MB object store: : 0.00 row [00:00, ? row/s]    
2024-11-07 10:23:46,953 ERROR exceptions.py:73 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose
2024-11-07 10:23:46,954 ERROR exceptions.py:81 -- Full stack trace:
Traceback (most recent call last):
  File "/Users/balaji/ray/python/ray/data/exceptions.py", line 49, in handle_trace
    return fn(*args, **kwargs)
  File "/Users/balaji/ray/python/ray/data/_internal/plan.py", line 498, in execute
    blocks = execute_to_legacy_block_list(
  File "/Users/balaji/ray/python/ray/data/_internal/execution/legacy_compat.py", line 123, in execute_to_legacy_block_list
    block_list = _bundles_to_block_list(bundles)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/legacy_compat.py", line 169, in _bundles_to_block_list
    for ref_bundle in bundles:
  File "/Users/balaji/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor.py", line 153, in get_next
    item = self._outer._output_node.get_output_blocking(
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 306, in get_output_blocking
    raise self._exception
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor.py", line 232, in run
    continue_sched = self._scheduling_loop_step(self._topology)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor.py", line 287, in _scheduling_loop_step
    num_errored_blocks = process_completed_tasks(
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 480, in process_completed_tasks
    raise e from None
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 447, in process_completed_tasks
    bytes_read = task.on_data_ready(
  File "/Users/balaji/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 105, in on_data_ready
    raise ex from None
  File "/Users/balaji/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 101, in on_data_ready
    ray.get(block_ref)
  File "/Users/balaji/ray/python/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/Users/balaji/ray/python/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/Users/balaji/ray/python/ray/_private/worker.py", line 2755, in get
    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
  File "/Users/balaji/ray/python/ray/_private/worker.py", line 906, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(TypeError): ray::Write() (pid=71881, ip=127.0.0.1)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/Users/balaji/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 253, in __call__
    yield from self._block_fn(input, ctx)
  File "/Users/balaji/ray/python/ray/data/_internal/planner/plan_write_op.py", line 45, in fn
    block_accessors = [BlockAccessor.for_block(block) for block in blocks]
  File "/Users/balaji/ray/python/ray/data/_internal/planner/plan_write_op.py", line 45, in <listcomp>
    block_accessors = [BlockAccessor.for_block(block) for block in blocks]
  File "/Users/balaji/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 253, in __call__
    yield from self._block_fn(input, ctx)
  File "/Users/balaji/ray/python/ray/data/_internal/planner/plan_write_op.py", line 28, in fn
    datasink_or_legacy_datasource.write(it1, ctx)
  File "/Users/balaji/ray/python/ray/data/_internal/datasource/parquet_datasink.py", line 85, in write
    call_with_retry(
  File "/Users/balaji/ray/python/ray/data/_internal/util.py", line 985, in call_with_retry
    raise e from None
  File "/Users/balaji/ray/python/ray/data/_internal/util.py", line 972, in call_with_retry
    return f()
  File "/Users/balaji/ray/python/ray/data/_internal/datasource/parquet_datasink.py", line 79, in write_blocks_to_path
    with pq.ParquetWriter(file, schema, **write_kwargs) as writer:
TypeError: __init__() got multiple values for argument 'schema'

Issue Severity

None

@bveeramani bveeramani added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) data Ray Data-related issues P1 Issue that should be fixed within a few weeks and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Nov 7, 2024
JP-sDEV pushed a commit to JP-sDEV/ray that referenced this issue Nov 14, 2024
mohitjain2504 pushed a commit to mohitjain2504/ray that referenced this issue Nov 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant