Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Joining on ROWKEY #2735

Merged
merged 9 commits into from
May 1, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,16 @@ private static org.apache.avro.Schema unionWithNull(final org.apache.avro.Schema

public static String getFieldNameWithNoAlias(final Field field) {
final String name = field.name();
final int idx = name.indexOf(FIELD_NAME_DELIMITER);
return getFieldNameWithNoAlias(name);
}

public static String getFieldNameWithNoAlias(final String fieldName) {
final int idx = fieldName.indexOf(FIELD_NAME_DELIMITER);
if (idx < 0) {
return name;
return fieldName;
}

return name.substring(idx + 1);
return fieldName.substring(idx + 1);
}

public static boolean areEqualSchemas(final Schema schema1, final Schema schema2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,18 +521,46 @@ public void shouldThrowOnUnknownSchemaType() {
SchemaUtil.getSchemaTypeAsSqlType(Schema.Type.BYTES);
}

@Test
public void shouldStripAliasFromField() {
// Given:
final Field field = new Field("alias.some-field-name", 1, Schema.OPTIONAL_STRING_SCHEMA);

// When:
final String result = SchemaUtil.getFieldNameWithNoAlias(field);

// Then:
assertThat(result, is("some-field-name"));
}

@Test
public void shouldReturnFieldWithoutAliasAsIs() {
// Given:
final Field field = new Field("some-field-name", 1, Schema.OPTIONAL_STRING_SCHEMA);

// When:
final String result = SchemaUtil.getFieldNameWithNoAlias(field);

// Then:
assertThat(result, is("some-field-name"));
}

@Test
public void shouldStripAliasFromFieldName() {
final Schema schemaWithAlias = SchemaUtil.buildSchemaWithAlias(schema, "alias");
assertThat("Invalid field name",
SchemaUtil.getFieldNameWithNoAlias(schemaWithAlias.fields().get(0)),
equalTo(schema.fields().get(0).name()));
// When:
final String result = SchemaUtil.getFieldNameWithNoAlias("some-alias.some-field-name");

// Then:
assertThat(result, is("some-field-name"));
}

@Test
public void shouldReturnFieldNameWithoutAliasAsIs() {
assertThat("Invalid field name", SchemaUtil.getFieldNameWithNoAlias(schema.fields().get(0)),
equalTo(schema.fields().get(0).name()));
// When:
final String result = SchemaUtil.getFieldNameWithNoAlias("some-field-name");

// Then:
assertThat(result, is("some-field-name"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.ksql.analyzer.Analysis.Into;
import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.metastore.model.KsqlTopic;
import io.confluent.ksql.metastore.model.StructuredDataSource;
import io.confluent.ksql.parser.DefaultTraversalVisitor;
Expand Down Expand Up @@ -402,20 +403,17 @@ protected Node visitJoin(final Join node, final Void context) {
final ComparisonExpression comparisonExpression = (ComparisonExpression) joinOn
.getExpression();

final Pair<String, String> leftSide = fetchKeyFieldName(
final KeyField leftKeyField = buildKeyField(
comparisonExpression,
leftAlias,
leftDataSource.getSchema()
);
final Pair<String, String> rightSide = fetchKeyFieldName(
final KeyField rightKeyField = buildKeyField(
comparisonExpression,
rightAlias,
rightDataSource.getSchema()
);

final String leftKeyFieldName = leftSide.getRight();
final String rightKeyFieldName = rightSide.getRight();

if (comparisonExpression.getType() != ComparisonExpression.Type.EQUAL) {
throw new KsqlException("Only equality join criteria is supported.");
}
Expand All @@ -441,8 +439,8 @@ protected Node visitJoin(final Join node, final Void context) {
joinType,
leftSourceKafkaTopicNode,
rightSourceKafkaTopicNode,
leftKeyFieldName,
rightKeyFieldName,
leftKeyField,
rightKeyField,
leftAlias,
rightAlias,
node.getWithinExpression().orElse(null),
Expand Down Expand Up @@ -475,24 +473,24 @@ private JoinNode.JoinType getJoinType(final Join node) {
/**
* From the join criteria expression fetch the field corresponding to the given source alias.
*/
private Pair<String, String> fetchKeyFieldName(
private KeyField buildKeyField(
final ComparisonExpression comparisonExpression,
final String sourceAlias,
final Schema sourceSchema
) {
Pair<String, String> keyInfo = fetchFieldNameFromExpr(
KeyField keyField = buildKeyFieldFromExpr(
comparisonExpression.getLeft(),
sourceAlias,
sourceSchema
);
if (keyInfo == null) {
keyInfo = fetchFieldNameFromExpr(
if (keyField == null) {
keyField = buildKeyFieldFromExpr(
comparisonExpression.getRight(),
sourceAlias,
sourceSchema
);
}
if (keyInfo == null) {
if (keyField == null) {
throw new KsqlException(
String.format(
"%s : Invalid join criteria %s. Could not find a join criteria operand for %s. ",
Expand All @@ -501,39 +499,55 @@ private Pair<String, String> fetchKeyFieldName(
)
);
}
return keyInfo;
return keyField;
}

/**
* Given an expression and the source alias detects if the expression type is
* DereferenceExpression or QualifiedNameReference and if the variable prefix matches the source
* Alias.
*/
private Pair<String, String> fetchFieldNameFromExpr(
final Expression expression, final String sourceAlias,
private KeyField buildKeyFieldFromExpr(
final Expression expression,
final String sourceAlias,
final Schema sourceSchema
) {
if (expression instanceof DereferenceExpression) {
final DereferenceExpression dereferenceExpression =
(DereferenceExpression) expression;
final String sourceAliasVal = dereferenceExpression.getBase().toString();
if (sourceAliasVal.equalsIgnoreCase(sourceAlias)) {
final String fieldName = dereferenceExpression.getFieldName();
if (SchemaUtil.getFieldByName(sourceSchema, fieldName).isPresent()) {
return new Pair<>(sourceAliasVal, fieldName);
}
}
} else if (expression instanceof QualifiedNameReference) {
final QualifiedNameReference qualifiedNameReference =
(QualifiedNameReference) expression;
final String fieldName = qualifiedNameReference.getName().getSuffix();
if (SchemaUtil.getFieldByName(sourceSchema, fieldName).isPresent()) {
return new Pair<>(sourceAlias, fieldName);
final DereferenceExpression dereferenceExpr = (DereferenceExpression) expression;

final String sourceAliasVal = dereferenceExpr.getBase().toString();
if (!sourceAliasVal.equalsIgnoreCase(sourceAlias)) {
return null;
}

final String fieldName = dereferenceExpr.getFieldName();
return buildKeyFieldFromSource(fieldName, sourceAlias, sourceSchema);
}

if (expression instanceof QualifiedNameReference) {
final QualifiedNameReference qualifiedNameRef = (QualifiedNameReference) expression;

final String fieldName = qualifiedNameRef.getName().getSuffix();
return buildKeyFieldFromSource(fieldName, sourceAlias, sourceSchema);
}
return null;
}

private KeyField buildKeyFieldFromSource(
final String fieldName,
final String sourceAlias,
final Schema sourceSchema
) {
final Optional<Field> field = SchemaUtil.getFieldByName(sourceSchema, fieldName);
if (!field.isPresent()) {
return null;
}

final Field legacy = SchemaUtil.buildAliasedField(sourceAlias, field.get());
final String latest = SchemaUtil.buildAliasedFieldName(sourceAlias, fieldName);

return KeyField.of(latest, legacy);
}

@Override
protected Node visitAliasedRelation(final AliasedRelation node, final Void context) {
Expand Down
Loading