Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43298][PYTHON][ML] predict_batch_udf with scalar input fails with batch size of one #40967

Closed
wants to merge 1 commit into from

Conversation

leewyang
Copy link
Contributor

What changes were proposed in this pull request?

This is a followup to #39817 to handle another error condition when the input batch is a single scalar value (where the previous fix focused on a single scalar value output).

Why are the changes needed?

Using predict_batch_udf fails when the input batch size is one.

import numpy as np
from pyspark.ml.functions import predict_batch_udf
from pyspark.sql.types import DoubleType

df = spark.createDataFrame([[1.0],[2.0]], schema=["a"])

def make_predict_fn():
    def predict(inputs):
        return inputs

    return predict

identity = predict_batch_udf(make_predict_fn, return_type=DoubleType(), batch_size=1)
preds = df.withColumn("preds", identity("a")).show()

fails with:

  File "/.../spark/python/pyspark/worker.py", line 869, in main
    process()
  File "/.../spark/python/pyspark/worker.py", line 861, in process
    serializer.dump_stream(out_iter, outfile)
  File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 354, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
  File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 86, in dump_stream
    for batch in iterator:
  File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 347, in init_stream_yield_batches
    for series in iterator:
  File "/.../spark/python/pyspark/worker.py", line 555, in func
    for result_batch, result_type in result_iter:
  File "/.../spark/python/pyspark/ml/functions.py", line 818, in predict
    yield _validate_and_transform_prediction_result(
  File "/.../spark/python/pyspark/ml/functions.py", line 339, in _validate_and_transform_prediction_result
    if len(preds_array) != num_input_rows:
TypeError: len() of unsized object

After the fix:

+---+-----+
|  a|preds|
+---+-----+
|1.0|  1.0|
|2.0|  2.0|
+---+-----+

Does this PR introduce any user-facing change?

This fixes a bug in the feature that was released in Spark 3.4.0.

How was this patch tested?

Unit test was added.

@leewyang
Copy link
Contributor Author

@leewyang leewyang changed the title predict_batch_udf with scalar input fails with batch size of one [SPARK-43298][PYTHON][ML] predict_batch_udf with scalar input fails with batch size of one Apr 26, 2023
@HyukjinKwon
Copy link
Member

@leewyang
Copy link
Contributor Author

Hmm, looks like I might be stuck with this issue, any ideas? I've configured my "actions permissions" to match this comment, but I'm still getting this error:

Error: buildx failed with: ERROR: failed to solve: failed to push ghcr.io/leewyang/apache-spark-ci-image:master-4815979720: failed commit on ref "manifest-sha256:80eca005ea656d063e07f9059619043cd701e0c0d17029523fc167e4915405b4": unexpected status: 403 Forbidden

@leewyang
Copy link
Contributor Author

Nm, got past this issue... waiting on the rest of the build results (but already failed the Kubernetes Integration Test).

@HyukjinKwon
Copy link
Member

Merged to master.

@HyukjinKwon
Copy link
Member

oops, I missed that the linter failed. Reverting, and reopening the PR.

@HyukjinKwon HyukjinKwon reopened this Apr 27, 2023
@zhengruifeng
Copy link
Contributor

merged to master again

@leewyang leewyang deleted the SPARK-43298 branch April 28, 2023 15:43
LuciferYang pushed a commit to LuciferYang/spark that referenced this pull request May 10, 2023
…ith batch size of one

### What changes were proposed in this pull request?

This is a followup to apache#39817 to handle another error condition when the input batch is a single scalar value (where the previous fix focused on a single scalar value output).

### Why are the changes needed?
Using `predict_batch_udf` fails when the input batch size is one.
```
import numpy as np
from pyspark.ml.functions import predict_batch_udf
from pyspark.sql.types import DoubleType

df = spark.createDataFrame([[1.0],[2.0]], schema=["a"])

def make_predict_fn():
    def predict(inputs):
        return inputs

    return predict

identity = predict_batch_udf(make_predict_fn, return_type=DoubleType(), batch_size=1)
preds = df.withColumn("preds", identity("a")).show()
```
fails with:
```
  File "/.../spark/python/pyspark/worker.py", line 869, in main
    process()
  File "/.../spark/python/pyspark/worker.py", line 861, in process
    serializer.dump_stream(out_iter, outfile)
  File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 354, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
  File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 86, in dump_stream
    for batch in iterator:
  File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 347, in init_stream_yield_batches
    for series in iterator:
  File "/.../spark/python/pyspark/worker.py", line 555, in func
    for result_batch, result_type in result_iter:
  File "/.../spark/python/pyspark/ml/functions.py", line 818, in predict
    yield _validate_and_transform_prediction_result(
  File "/.../spark/python/pyspark/ml/functions.py", line 339, in _validate_and_transform_prediction_result
    if len(preds_array) != num_input_rows:
TypeError: len() of unsized object
```

After the fix:
```
+---+-----+
|  a|preds|
+---+-----+
|1.0|  1.0|
|2.0|  2.0|
+---+-----+
```

### Does this PR introduce _any_ user-facing change?
This fixes a bug in the feature that was released in Spark 3.4.0.

### How was this patch tested?
Unit test was added.

Closes apache#40967 from leewyang/SPARK-43298.

Authored-by: Lee Yang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
LuciferYang pushed a commit to LuciferYang/spark that referenced this pull request May 10, 2023
…ith batch size of one

### What changes were proposed in this pull request?

This is a followup to apache#39817 to handle another error condition when the input batch is a single scalar value (where the previous fix focused on a single scalar value output).

### Why are the changes needed?
Using `predict_batch_udf` fails when the input batch size is one.
```
import numpy as np
from pyspark.ml.functions import predict_batch_udf
from pyspark.sql.types import DoubleType

df = spark.createDataFrame([[1.0],[2.0]], schema=["a"])

def make_predict_fn():
    def predict(inputs):
        return inputs

    return predict

identity = predict_batch_udf(make_predict_fn, return_type=DoubleType(), batch_size=1)
preds = df.withColumn("preds", identity("a")).show()
```
fails with:
```
  File "/.../spark/python/pyspark/worker.py", line 869, in main
    process()
  File "/.../spark/python/pyspark/worker.py", line 861, in process
    serializer.dump_stream(out_iter, outfile)
  File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 354, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
  File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 86, in dump_stream
    for batch in iterator:
  File "/.../spark/python/pyspark/sql/pandas/serializers.py", line 347, in init_stream_yield_batches
    for series in iterator:
  File "/.../spark/python/pyspark/worker.py", line 555, in func
    for result_batch, result_type in result_iter:
  File "/.../spark/python/pyspark/ml/functions.py", line 818, in predict
    yield _validate_and_transform_prediction_result(
  File "/.../spark/python/pyspark/ml/functions.py", line 339, in _validate_and_transform_prediction_result
    if len(preds_array) != num_input_rows:
TypeError: len() of unsized object
```

After the fix:
```
+---+-----+
|  a|preds|
+---+-----+
|1.0|  1.0|
|2.0|  2.0|
+---+-----+
```

### Does this PR introduce _any_ user-facing change?
This fixes a bug in the feature that was released in Spark 3.4.0.

### How was this patch tested?
Unit test was added.

Closes apache#40967 from leewyang/SPARK-43298.

Authored-by: Lee Yang <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants