Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-6701] Add logical types to schema #7865

Merged
merged 11 commits into from
Feb 19, 2019

Conversation

reuvenlax
Copy link
Contributor

Beam schemas define a limited number of fundamental types, and often users of schemas have slightly different needs for field types. An example is Beam SQL: SQL has many different date/time/timestamp types, while Beam has only the one DATETIME field type. Today SQL stuffs magic strings into the FieldType metadata to identify which type it is really using, which is quite ad hoc.

Logical types introduce a principled way of doing this. A new LogicalType class can be defined that uses one of the fundamental Beam field types as storage. The user can then add this logical type to their schema, and Beam will use the underlying field type and record the logical type in the field as well. Logical types have globally unique identifiers that are understood by the system, so this provides a much more principled way of storing custom types in schemas.

This PR adds LogicalType and converts Avro and Beam SQL over to the new framework. Follow-on PRs will add CoderLogicalType (so any existing Coder can be used as a Schema field) as well as adding logical-type support to POJOs.

@reuvenlax
Copy link
Contributor Author

R: @amaliujia

@ryan-williams
Copy link
Contributor

ryan-williams commented Feb 17, 2019

I think we are seeing the same Java_Examples_Dataflow failures: #1753 (on my #7823) and #1754 here.

> Task :beam-runners-google-cloud-dataflow-java-examples-streaming:preCommit

org.apache.beam.examples.WordCountIT > testE2EWordCount FAILED
    java.lang.RuntimeException at WordCountIT.java:69
> Task :beam-runners-google-cloud-dataflow-java-examples:preCommitLegacyWorker

org.apache.beam.examples.WindowedWordCountIT > testWindowedWordCountInStreamingStaticSharding FAILED
    java.lang.RuntimeException at WindowedWordCountIT.java:188

Build scan doesn't appear to have captured anything useful

Think it's broken independently of our PRs?

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Feb 17, 2019 via email

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Feb 17, 2019 via email

@ryan-williams
Copy link
Contributor

(cc @drieber, cf. above, any ideas?)

@ryan-williams
Copy link
Contributor

ryan-williams commented Feb 17, 2019

btw @reuvenlax where did you get this info?

It appears that an exception is being throw from StreamingDataflowWorker.java:1952.
The following line is failing with NullPointerException:
this.transformUserNameToStateFamily = ImmutableMap.copyOf(transformUserNameToStateFamily);

@drieber
Copy link
Contributor

drieber commented Feb 18, 2019

Sorry about this. My PR did not handle properly the case when the proto field is not set. I will send a fix within an hour or so.

Copy link
Contributor

@amaliujia amaliujia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation generally looks good. Can we have some tests on logical type construction and equality checking?

/** Returns a copy of the descriptor with metadata set. */
public FieldType withMetadata(@Nullable byte[] metadata) {
return toBuilder().setMetadata(metadata).build();
public FieldType withMetadata(String key, byte[] metadata) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As metadata is Map now, can we have a withMetadata(Map<String, byte[]>) as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is some changes not pushed? Not seeing withMetadata(Map) implementations, I can see comment is updated though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it got messed up. Fixed now.

* #toBaseType} and {@link #toInputType} should convert back and forth between the Java type for
* the LogicalType (InputT) and the Java type appropriate for the underlying base type (BaseT).
*
* <p>{@link #getIdentifier} must define a globally unique identifier for this LogicalType. A
Copy link
Contributor

@amaliujia amaliujia Feb 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that there is no verification on global uniqueness of identifiers. What's the effect if identifiers are not unique (seems to me that there is no any effect)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question: I originally had a registration for LogicalTypes (all had to be registered and registration checks uniqueness). I removed it from this PR because I was trying to keep the PR smaller. If you think it's better I can add it back in, or I could keep it for a future PR.

Since identifier is the identity of the logical type, Beam code is allowed to check the identifier to see what the type is; if multiple types had the same identifier, this logic will break. In fact SQL does this in several places, and if a different LogicalType had the same identifier as the ones registered in CalciteUtils, things would break in strange ways.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I didn't notice IDENTIFIERs are used in Map as key. That explains uniqueness requirement.

Leaving it to future PR definitely is ok.

}

@Override
public byte[] toInputType(byte[] base) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not quite understand the intention of toBaseType and toInputType of LogicalType, especially when I see this implantation allows a partial copy while toBaseType checks equality on input.length and byteArraySize.

Why toInputTypes allows base.length <= byteArraySize but not strictly check base.length == byteArraySize?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the main point is to actually convert types. For example, imagine if we got rid of DATETIME as a primitive schema type and replaced it with a LogicalType (which we may do). In this case the underlying Beam type would be INT64, but we want users to be able to use Joda DateType objects. In that case, the logical type would have the following signature:

class DateTimeType implements LogicalType<DateTime, Long>

But since the user is passing in DateTime objects, we need to know how to convert them back and forth to the Long object that Row expects for an INT64. So you would also override:

Long toBaseType(DateTime dateTime) {
return dateTime.toMillis();
}

DateTime toInputType(Long millis) {
return new DateTime(millis);
}

In this particular case we use toBaseType to verify the input, since the java types are the same. However we also allow passing in an array that is smaller than the fixed size, in which case we extend it (possibly we should do this in both codepaths though).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. Thanks for explanation. So this two functions are util functions for conversions.

@reuvenlax
Copy link
Contributor Author

Also added unit test coverage for schema equality in the case of logicaltype

@amaliujia
Copy link
Contributor

LGTM assuming tests pass

@reuvenlax reuvenlax merged commit b2fa119 into apache:master Feb 19, 2019
@kennknowles
Copy link
Member

Looks like this broke SQL postcommits

Expression field = Expressions.call(expression, getter, Expressions.constant(index));
if (fromType.getTypeName().isDateType()) {
if (fromType.getTypeName().isLogicalType()) {
field = Expressions.call(field, "getMillis");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't going to work on just any LogicalType only DateType. Something is broken here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be inside the following if clauses I think (or alternatively wrapped with CalciteUtils.isDateTimeType). Do we have no tests that test CHAR type?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants