Skip to content

Commit

Permalink
[Coral-Spark] [Backward Incompatible] Refactor CoralSpark API (#448)
Browse files Browse the repository at this point in the history
  • Loading branch information
aastha25 authored Sep 6, 2023
1 parent c3df771 commit 64ec9b5
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 96 deletions.
95 changes: 0 additions & 95 deletions coral-spark/src/main/java/com/linkedin/coral/spark/CoralSpark.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ public class CoralSpark {
private final SqlNode sqlNode;
private final String sparkSql;

@Deprecated
private CoralSpark(List<String> baseTables, List<SparkUDFInfo> sparkUDFInfoList, String sparkSql) {
this.baseTables = baseTables;
this.sparkUDFInfoList = sparkUDFInfoList;
this.sparkSql = sparkSql;
this.hiveMetastoreClient = null;
this.sqlNode = null;
}

private CoralSpark(List<String> baseTables, List<SparkUDFInfo> sparkUDFInfoList, String sparkSql,
HiveMetastoreClient hmsClient, SqlNode sqlNode) {
this.baseTables = baseTables;
Expand All @@ -65,30 +56,6 @@ private CoralSpark(List<String> baseTables, List<SparkUDFInfo> sparkUDFInfoList,
this.sqlNode = sqlNode;
}

/**
* Users use this function as the main API for getting CoralSpark instance.
*
* Internally IR RelNode is converted to Spark RelNode, and Spark RelNode to Spark SQL.
*
* It returns an instance of CoralSpark which contains
* 1) Spark SQL
* 2) Base tables
* 3) Spark UDF information objects, ie. List of {@link SparkUDFInfo}
*
* @param irRelNode A IR RelNode for which CoralSpark will be constructed.
*
* @return [[CoralSpark]]
*/
@Deprecated
public static CoralSpark create(RelNode irRelNode) {
SparkRelInfo sparkRelInfo = IRRelToSparkRelTransformer.transform(irRelNode);
Set<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfos();
RelNode sparkRelNode = sparkRelInfo.getSparkRelNode();
String sparkSQL = constructSparkSQL(sparkRelNode, sparkUDFInfos);
List<String> baseTables = constructBaseTables(sparkRelNode);
return new CoralSpark(baseTables, ImmutableList.copyOf(sparkUDFInfos), sparkSQL);
}

/**
* Users use this function as the main API for getting CoralSpark instance.
*
Expand All @@ -115,26 +82,6 @@ public static CoralSpark create(RelNode irRelNode, HiveMetastoreClient hmsClient
return new CoralSpark(baseTables, ImmutableList.copyOf(sparkUDFInfos), sparkSQL, hmsClient, sparkSqlNode);
}

/**
* Users use this function as the main API for getting CoralSpark instance.
* This should be used when user need to align the Coral-spark translated SQL
* with Coral-schema output schema
*
* @param irRelNode An IR RelNode for which CoralSpark will be constructed.
* @param schema Coral schema that is represented by an Avro schema
* @return [[CoralSpark]]
*/
@Deprecated
public static CoralSpark create(RelNode irRelNode, Schema schema) {
List<String> aliases = schema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
SparkRelInfo sparkRelInfo = IRRelToSparkRelTransformer.transform(irRelNode);
Set<SparkUDFInfo> sparkUDFInfos = sparkRelInfo.getSparkUDFInfos();
RelNode sparkRelNode = sparkRelInfo.getSparkRelNode();
String sparkSQL = constructSparkSQLWithExplicitAlias(sparkRelNode, aliases, sparkUDFInfos);
List<String> baseTables = constructBaseTables(sparkRelNode);
return new CoralSpark(baseTables, ImmutableList.copyOf(sparkUDFInfos), sparkSQL);
}

/**
* Users use this function as the main API for getting CoralSpark instance.
* This should be used when user need to align the Coral-spark translated SQL
Expand Down Expand Up @@ -178,48 +125,6 @@ public static String constructSparkSQL(SqlNode sparkSqlNode) {
return sparkSqlNode.toSqlString(SparkSqlDialect.INSTANCE).getSql();
}

/**
* This function returns a completely expanded SQL statement in Spark SQL Dialect.
*
* A SQL statement is 'completely expanded' if it doesn't depend
* on (or selects from) Hive views, but instead, just on base tables.
* This function internally calls [[CoralRelToSqlNodeConverter]] module to
* convert CoralSpark to SparkSQL.
*
* Converts Spark RelNode to Spark SQL
*
* @param sparkRelNode A Spark compatible RelNode
*
* @param sparkUDFInfos A set of Spark UDF information objects
* @return SQL String in Spark SQL dialect which is 'completely expanded'
*/
private static String constructSparkSQL(RelNode sparkRelNode, Set<SparkUDFInfo> sparkUDFInfos) {
CoralRelToSqlNodeConverter rel2sql = new CoralRelToSqlNodeConverter();
SqlNode coralSqlNode = rel2sql.convert(sparkRelNode);
SqlNode sparkSqlNode = coralSqlNode.accept(new CoralSqlNodeToSparkSqlNodeConverter())
.accept(new CoralToSparkSqlCallConverter(sparkUDFInfos));
SqlNode rewrittenSparkSqlNode = sparkSqlNode.accept(new SparkSqlRewriter());
return rewrittenSparkSqlNode.toSqlString(SparkSqlDialect.INSTANCE).getSql();
}

private static String constructSparkSQLWithExplicitAlias(RelNode sparkRelNode, List<String> aliases,
Set<SparkUDFInfo> sparkUDFInfos) {
CoralRelToSqlNodeConverter rel2sql = new CoralRelToSqlNodeConverter();
// Create temporary objects r and rewritten to make debugging easier
SqlNode coralSqlNode = rel2sql.convert(sparkRelNode);
SqlNode sparkSqlNode = coralSqlNode.accept(new CoralSqlNodeToSparkSqlNodeConverter())
.accept(new CoralToSparkSqlCallConverter(sparkUDFInfos));

SqlNode rewritten = sparkSqlNode.accept(new SparkSqlRewriter());
// Use a second pass visit to add explicit alias names,
// only do this when it's not a select star case,
// since for select star we don't need to add any explicit aliases
if (rewritten.getKind() == SqlKind.SELECT && ((SqlSelect) rewritten).getSelectList() != null) {
rewritten = rewritten.accept(new AddExplicitAlias(aliases));
}
return rewritten.toSqlString(SparkSqlDialect.INSTANCE).getSql();
}

/**
* This function returns the list of base table names, in the format
* "database_name.table_name".
Expand Down
2 changes: 1 addition & 1 deletion version.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Version of the produced binaries.
# The version is inferred by shipkit-auto-version Gradle plugin (https://github.com/shipkit/shipkit-auto-version)
version=2.1.*
version=2.2.*

0 comments on commit 64ec9b5

Please sign in to comment.