Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DECIMAL logical type in python SDK #23014

Merged
merged 5 commits into from
Oct 3, 2022

Conversation

Abacn
Copy link
Contributor

@Abacn Abacn commented Sep 2, 2022

The second part of #19817 (keep the issue open for as go support is still pending).

This change makes decimal type work for xlang transforms (such as jdbc) in python sdk. It also contains prerequisite changes for the minimum support of a portable logical type with argument in both Java and Python sdk.

Python SDK still cannot get the argument value from proto because SchemaTranslation.value_to_runner_api method does not exist yet (it is SchemaTranslation.fieldValueToProto in Java SDK). Nevertheless because the argument is just a wrapper, and the Decimal value can be recovered from the encoded data alone. This does not affect the decimal type.

Because the change is already large, value_to_runner_api and the complete support of argument values in python is let as followup.

  • migrate FixedPrecisionNumeric jdbclogicaltype to portable
    logical type

  • Support DECIMAL type in python sdk xlang jdbc transform

  • Support standard logical type with argument in java sdk

  • proto support logical type's argument type in java sdk.
    Support of logical type's argument value is still pending

  • Implement BigIntegerCoder and DecimalCoder in python sdk

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@codecov
Copy link

codecov bot commented Sep 2, 2022

Codecov Report

Merging #23014 (b75ba4b) into master (ebacef9) will decrease coverage by 0.10%.
The diff coverage is 82.40%.

@@            Coverage Diff             @@
##           master   #23014      +/-   ##
==========================================
- Coverage   73.58%   73.48%   -0.11%     
==========================================
  Files         716      718       +2     
  Lines       95311    95692     +381     
==========================================
+ Hits        70138    70316     +178     
- Misses      23877    24080     +203     
  Partials     1296     1296              
Flag Coverage Δ
python 83.20% <82.40%> (-0.21%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/python/apache_beam/io/gcp/bigquery.py 74.24% <ø> (ø)
sdks/python/apache_beam/coders/coders.py 87.36% <65.21%> (-0.65%) ⬇️
sdks/python/apache_beam/typehints/schemas.py 91.89% <76.08%> (-1.96%) ⬇️
sdks/python/apache_beam/coders/coder_impl.py 94.05% <100.00%> (+0.14%) ⬆️
...ks/python/apache_beam/coders/coders_test_common.py 98.58% <100.00%> (+0.03%) ⬆️
sdks/python/apache_beam/coders/row_coder.py 94.95% <100.00%> (+0.13%) ⬆️
...s/python/apache_beam/io/gcp/bigquery_file_loads.py 87.24% <100.00%> (-0.47%) ⬇️
sdks/python/apache_beam/portability/common_urns.py 100.00% <100.00%> (ø)
...s/interactive/dataproc/dataproc_cluster_manager.py 71.72% <0.00%> (-5.70%) ⬇️
sdks/python/apache_beam/internal/gcp/auth.py 73.33% <0.00%> (-5.34%) ⬇️
... and 27 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@Abacn
Copy link
Contributor Author

Abacn commented Sep 2, 2022

Run Java PreCommit

@Abacn
Copy link
Contributor Author

Abacn commented Sep 2, 2022

Run Python_PVR_Flink PreCommit

@Abacn
Copy link
Contributor Author

Abacn commented Sep 2, 2022

Run Python 3.8 PostCommit

@Abacn
Copy link
Contributor Author

Abacn commented Sep 3, 2022

@Abacn
Copy link
Contributor Author

Abacn commented Sep 3, 2022

Run Java PreCommit

@Abacn Abacn marked this pull request as ready for review September 3, 2022 02:36
@github-actions
Copy link
Contributor

github-actions bot commented Sep 3, 2022

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @y1chi for label python.
R: @apilloud for label java.
R: @chamikaramj for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@Abacn
Copy link
Contributor Author

Abacn commented Sep 8, 2022

R: @TheNeuralBit

@github-actions
Copy link
Contributor

github-actions bot commented Sep 8, 2022

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

* migrate FixedPrecisionNumeric jdbclogicaltype to portable
  logical type

* Support DECIMAL type in python sdk xlang jdbc transform

* Support standard logical type with argument in java sdk

* proto support logical type's argument type in java sdk.
  Support of logical type's argument value is still pending

* Implement BigIntegerCoder and DecimalCoder in python sdk
@Abacn
Copy link
Contributor Author

Abacn commented Sep 13, 2022

Run Python 3.8 PostCommit

@Abacn
Copy link
Contributor Author

Abacn commented Sep 13, 2022

@Abacn
Copy link
Contributor Author

Abacn commented Sep 13, 2022

Run Java PreCommit

@Abacn
Copy link
Contributor Author

Abacn commented Sep 13, 2022

Java PreCommit build stuck twice at Task :sdks:java:core:buildNeeded. Also occasionally happens on master (e.g. https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/5900/, https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/5896/console) should be irrelevant. Read to review.

@Abacn
Copy link
Contributor Author

Abacn commented Sep 14, 2022

Run Java PreCommit

Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Abacn, and sorry about the delay. I have a few suggestions

@@ -65,7 +67,7 @@
})
public class SchemaTranslation {

private static final String URN_BEAM_LOGICAL_DECIMAL = "beam:logical_type:decimal:v1";
private static final String URN_BEAM_LOGICAL_DECIMAL = FixedPrecisionNumeric.BASE_IDENTIFIER;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add this in schema.proto and reference that here? Then we can document the format in schema.proto as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added "beam:logical_type:decimal:v1" into schema.proto

(base == null) ? null : base.precision(),
scale,
(base == null) ? null : base.scale());
return base;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic in this class is a little bit complicated, what do you think about adding a unit test just for this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is just moving org.apache.beam.sdk.io.jdbc.LogicalTypes.FixedPrecisionNumeric class to a separate file under schemas/logicaltypes. Yes will do add unit tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry about that, thank you

public static final String IDENTIFIER = "beam:logical_type:fixed_decimal:v1";

// TODO(https://github.com/apache/beam/issues/19817) implement beam:logical_type:decimal:v1 as
// CoderLogicalType (once CoderLogicalType is implemented).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is CoderLogicalType? I don't a reference to it in that issue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah the link in TODO is not accurate. This is a suggestion I got from @reuvenlax. The reference was here: #7865 (comment). Should we create an issue for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entered #23374

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

logical_type.representation_type())))
else:
# TODO(bhulette,yathu): Complete support for logical types that require
# arguments.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please file an issue for this and link it here. Also note that the option_to_runner_api and option_from_runner_api has logic for working with FieldValues, we should re-use that when we take this on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entered #23373

"CREATE TABLE {}(f_int INTEGER, f_timestamp TIMESTAMP)".format(
table_name))
"CREATE TABLE {}(f_int INTEGER, f_timestamp TIMESTAMP, f_decimal DECIMAL(10,2))" # pylint: disable=line-too-long
.format(table_name))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a test(s) for this in standard_coders.yaml?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in standard_coders.yaml

@Abacn
Copy link
Contributor Author

Abacn commented Sep 29, 2022

Run Python 3.8 PostCommit

@Abacn
Copy link
Contributor Author

Abacn commented Sep 29, 2022

Run Python 3.8 PostCommit

@Abacn
Copy link
Contributor Author

Abacn commented Sep 29, 2022

Run Python 3.8 PostCommit

@Abacn
Copy link
Contributor Author

Abacn commented Sep 29, 2022

PostCommit test passed. Ready for review

@TheNeuralBit
Copy link
Member

Run Java PreCommit

(base == null) ? null : base.precision(),
scale,
(base == null) ? null : base.scale());
return base;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry about that, thank you

public static final String IDENTIFIER = "beam:logical_type:fixed_decimal:v1";

// TODO(https://github.com/apache/beam/issues/19817) implement beam:logical_type:decimal:v1 as
// CoderLogicalType (once CoderLogicalType is implemented).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

row = Row.withSchema(schema).addValues(decimal).build();
assertEquals(decimal, row.getLogicalTypeValue(0, BigDecimal.class));

decimal = BigDecimal.valueOf(random.nextLong() + 100_000_000_000L, scale);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'm a little nervous that the random component can make this flaky. Can we just select some constants istead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I made a typo. These two cases were intended be (random.nextInt() + 100_000_000_000L) which guarantees to generate a positive number with at least 11 digits and beyond the precision of DECIMAL(precision=10). Yes we can just use 100_000_000_001L here


// check argument invalid case
decimal = BigDecimal.valueOf(random.nextLong() + 100_000_000_000L, scale);
assertThrows(IllegalArgumentException.class, Row.withSchema(schema).addValues(decimal)::build);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I like how concise this is :)

IllegalArgumentException.class,
() ->
FixedPrecisionNumeric.of(
Row.withSchema(invalidArgumentSchema).addValues(precision, scale).build()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think unit testing best practice would be to put each of these test cases in a different test method. On the other hand this pattern is consistent with the rest of the file, so I'm fine if you want to leave it as-is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree in general this is best practice, I observed the file LogicalTypesTest.java has each test testing one logical type and keep the new test following this. If tests become more loaded we can split test source files.


@classmethod
def _from_typing(cls, typ):
print(typ)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a debug statement that was left in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops thanks for the catch, will double check


/** Fixed precision numeric types used to represent jdbc NUMERIC and DECIMAL types. */
public class FixedPrecisionNumeric extends PassThroughLogicalType<BigDecimal> {
public static final String IDENTIFIER = "beam:logical_type:fixed_decimal:v1";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is being used cross-language can we define the URN in schema.proto and document it there as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered to put it in schema.proto as beam:logical_type:decimal:v1 but the problem is that I do not know how to generate a test case of it in standard_coders.yaml yet because the logical types with argument is currently not fully supported. I would like to leave it now and make it into schema.proto until we are confident that this implementation is standardized(i.e. stable). Open to opinions for sure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that's fine with me.

Abacn and others added 2 commits September 29, 2022 19:43
@Abacn
Copy link
Contributor Author

Abacn commented Sep 30, 2022

go precommit fails due to

 FAIL	github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx	600.108s

first time see go precommit flakes

java precommit failed for another different test

org.apache.beam.runners.samza.runtime.AsyncDoFnRunnerTest.testSimplePipeline

should be irrelevant either

@Abacn
Copy link
Contributor Author

Abacn commented Sep 30, 2022

Run Go PreCommit

@Abacn
Copy link
Contributor Author

Abacn commented Sep 30, 2022

Run Java PreCommit

@Abacn
Copy link
Contributor Author

Abacn commented Sep 30, 2022

New flakes seen on Java PreCommit and entered #23449

@Abacn
Copy link
Contributor Author

Abacn commented Sep 30, 2022

Run Java PreCommit

@Abacn
Copy link
Contributor Author

Abacn commented Sep 30, 2022

Run Python 3.8 PostCommit

1 similar comment
@TheNeuralBit
Copy link
Member

Run Python 3.8 PostCommit

@TheNeuralBit
Copy link
Member

Run Java PreCommit

@TheNeuralBit
Copy link
Member

Run Python 3.8 PostCommit

@Abacn
Copy link
Contributor Author

Abacn commented Sep 30, 2022

Thanks for taking care of the tests @TheNeuralBit! Test reliability has been terrible. Last two Java PreCommit fails in different flakes (no test fails for both)

#5592:
org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
org.apache.beam.sdk.io.pulsar.PulsarIOTest.testReadFromSimpleTopic
#5593:
org.apache.beam.runners.samza.runtime.AsyncDoFnRunnerTest.testSimplePipeline
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2Test.testCloseVisibleToAwaitCompletionCallerAndProducer

Python 3.8 PostCommit:

#621: bigquery integration tests with BigQuery API call HttpError.

there was a success run (rare) of postcommit on the third commit: https://ci-beam.apache.org/job/beam_PostCommit_Python38_PR/618/ the last two commits of this PR are trivial and should not introduce surprise. We could wait for the running postcommit and give it one more chance :).

Update: looks like too much post commit run hit the BigQuery quota.

@Abacn
Copy link
Contributor Author

Abacn commented Oct 3, 2022

Run Python 3.8 PostCommit

@TheNeuralBit
Copy link
Member

Ok, given the previous successful run I'm going to go ahead and merge. Thanks @Abacn!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants