-
Notifications
You must be signed in to change notification settings - Fork 514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhance saveAsCustomOutput and provide access to POutput #4995
Comments
One more experiment with an even more generic API that allows encapsulating all steps of the custom IO as a single Scala friendly transform. With custom output like this, I could provide a composite transform that converts domain objects into underlying storage format (for example JSON bytes) and then save bytes in the database using the Beam IO connector.
For the reading part I could do the same. Provide the composite transform that reads bytes from the database using Beam IO connector, deserializes JSON, and returns domain objects. Everything as a single transform, easy to test at the job level. In the test I only need to prepare domain objects as input. The bytes representation from the Beam IO connector is fully encapsulated in the composite transform.
Alternatively I could put my composite transform into plain Beam PTransform and use existing |
Thanks for the feature request @mkuthan. Will look at this during our preparation of 0.14. |
@mkuthan just a status update on this issue. We've opted for another strategy that avoids passing a pipeline transforms function to the IO. We prefered to expose possible On 0.14, the testing framework will mock those as empty, but we plan to let users set custom values in the future. I hope this setup fits with your needs. Let us know otherwise. |
Function
saveAsCustomOutput
is an excellent extension point for implementing my own IO. It gives me also an abstraction for testing the whole pipeline usingJobTest
. I would like thanks all Scio authors for that functionality.But for IOs like BigQuery, POutput is not a terminal step, it delivers information about errors. The API for accessing such errors is very different for each writing to BigQuery method.
getFailedInserts
orgetFailedInsertsWithErr
for Streaming InsertsgetFailedStorageApiInserts
for Storage Write APIYou can also access successful rows for Streaming Inserts, Storage Write API and Batch Loads with another 3 methods.
I know that I could use
internal.apply(name, io)
method and extract errors like this:But I can't use
output(CustomIO[Out](id)) { results => ...}
fromJobTest
anymore. I could hack this limitation with TransformOverride but it is not so easy to write assertion then:I would love to see the following enhancement for
saveAsCustomOutput
. This is only a short showcase of the overall idea, but I'm glad to hear from you better API or implementation:With such extension I'm able to do anything I need with the writeResult, for example:
What do you think about such extension? Looks very generic and should handle all kinds of use cases when POutput delivers something valuable.
The text was updated successfully, but these errors were encountered: