Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into SPARK-20754-trunc
Browse files Browse the repository at this point in the history
# Conflicts:
#	sql/core/src/test/resources/sql-tests/inputs/datetime.sql
#	sql/core/src/test/resources/sql-tests/results/datetime.sql.out
  • Loading branch information
wangyum committed Oct 28, 2017
2 parents b59a2df + d28d573 commit 679ff98
Show file tree
Hide file tree
Showing 20 changed files with 202 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -564,8 +564,7 @@ valueExpression
;

primaryExpression
: name=(CURRENT_DATE | CURRENT_TIMESTAMP) #timeFunctionCall
| CASE whenClause+ (ELSE elseExpression=expression)? END #searchedCase
: CASE whenClause+ (ELSE elseExpression=expression)? END #searchedCase
| CASE value=expression whenClause+ (ELSE elseExpression=expression)? END #simpleCase
| CAST '(' expression AS dataType ')' #cast
| STRUCT '(' (argument+=namedExpression (',' argument+=namedExpression)*)? ')' #struct
Expand Down Expand Up @@ -747,7 +746,7 @@ nonReserved
| NULL | ORDER | OUTER | TABLE | TRUE | WITH | RLIKE
| AND | CASE | CAST | DISTINCT | DIV | ELSE | END | FUNCTION | INTERVAL | MACRO | OR | STRATIFY | THEN
| UNBOUNDED | WHEN
| DATABASE | SELECT | FROM | WHERE | HAVING | TO | TABLE | WITH | NOT | CURRENT_DATE | CURRENT_TIMESTAMP
| DATABASE | SELECT | FROM | WHERE | HAVING | TO | TABLE | WITH | NOT
| DIRECTORY
| BOTH | LEADING | TRAILING
;
Expand Down Expand Up @@ -983,8 +982,6 @@ OPTION: 'OPTION';
ANTI: 'ANTI';
LOCAL: 'LOCAL';
INPATH: 'INPATH';
CURRENT_DATE: 'CURRENT_DATE';
CURRENT_TIMESTAMP: 'CURRENT_TIMESTAMP';

STRING
: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,12 @@ class Analyzer(
private def resolve(e: Expression, q: LogicalPlan): Expression = e match {
case u @ UnresolvedAttribute(nameParts) =>
// Leave unchanged if resolution fails. Hopefully will be resolved next round.
val result = withPosition(u) { q.resolveChildren(nameParts, resolver).getOrElse(u) }
val result =
withPosition(u) {
q.resolveChildren(nameParts, resolver)
.orElse(resolveLiteralFunction(nameParts, u, q))
.getOrElse(u)
}
logDebug(s"Resolving $u to $result")
result
case UnresolvedExtractValue(child, fieldExpr) if child.resolved =>
Expand Down Expand Up @@ -925,6 +930,30 @@ class Analyzer(
exprs.exists(_.find(_.isInstanceOf[UnresolvedDeserializer]).isDefined)
}

/**
* Literal functions do not require the user to specify braces when calling them
* When an attributes is not resolvable, we try to resolve it as a literal function.
*/
private def resolveLiteralFunction(
nameParts: Seq[String],
attribute: UnresolvedAttribute,
plan: LogicalPlan): Option[Expression] = {
if (nameParts.length != 1) return None
val isNamedExpression = plan match {
case Aggregate(_, aggregateExpressions, _) => aggregateExpressions.contains(attribute)
case Project(projectList, _) => projectList.contains(attribute)
case Window(windowExpressions, _, _, _) => windowExpressions.contains(attribute)
case _ => false
}
val wrapper: Expression => Expression =
if (isNamedExpression) f => Alias(f, toPrettySQL(f))() else identity
// support CURRENT_DATE and CURRENT_TIMESTAMP
val literalFunctions = Seq(CurrentDate(), CurrentTimestamp())
val name = nameParts.head
val func = literalFunctions.find(e => resolver(e.prettyName, name))
func.map(wrapper)
}

protected[sql] def resolveExpression(
expr: Expression,
plan: LogicalPlan,
Expand All @@ -937,7 +966,11 @@ class Analyzer(
expr transformUp {
case GetColumnByOrdinal(ordinal, _) => plan.output(ordinal)
case u @ UnresolvedAttribute(nameParts) =>
withPosition(u) { plan.resolve(nameParts, resolver).getOrElse(u) }
withPosition(u) {
plan.resolve(nameParts, resolver)
.orElse(resolveLiteralFunction(nameParts, u, plan))
.getOrElse(u)
}
case UnresolvedExtractValue(child, fieldName) if child.resolved =>
ExtractValue(child, fieldName, resolver)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ case class HiveTableRelation(

def isPartitioned: Boolean = partitionCols.nonEmpty

override lazy val canonicalized: HiveTableRelation = copy(
override def doCanonicalize(): HiveTableRelation = copy(
tableMeta = tableMeta.copy(
storage = CatalogStorageFormat.empty,
createTime = -1
Expand All @@ -448,7 +448,8 @@ case class HiveTableRelation(
},
partitionCols = partitionCols.zipWithIndex.map {
case (attr, index) => attr.withExprId(ExprId(index + dataCols.length))
})
}
)

override def computeStats(): Statistics = {
tableMeta.stats.map(_.toPlanStats(output)).getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1234,19 +1234,6 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
}

/**
* Create a current timestamp/date expression. These are different from regular function because
* they do not require the user to specify braces when calling them.
*/
override def visitTimeFunctionCall(ctx: TimeFunctionCallContext): Expression = withOrigin(ctx) {
ctx.name.getType match {
case SqlBaseParser.CURRENT_DATE =>
CurrentDate()
case SqlBaseParser.CURRENT_TIMESTAMP =>
CurrentTimestamp()
}
}

/**
* Create a function database (optional) and name pair.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,15 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT

override protected def innerChildren: Seq[QueryPlan[_]] = subqueries

/**
* A private mutable variable to indicate whether this plan is the result of canonicalization.
* This is used solely for making sure we wouldn't execute a canonicalized plan.
* See [[canonicalized]] on how this is set.
*/
@transient private var _isCanonicalizedPlan: Boolean = false

protected def isCanonicalizedPlan: Boolean = _isCanonicalizedPlan

/**
* Returns a plan where a best effort attempt has been made to transform `this` in a way
* that preserves the result but removes cosmetic variations (case sensitivity, ordering for
Expand All @@ -188,10 +197,24 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
* Plans where `this.canonicalized == other.canonicalized` will always evaluate to the same
* result.
*
* Some nodes should overwrite this to provide proper canonicalize logic, but they should remove
* expressions cosmetic variations themselves.
* Plan nodes that require special canonicalization should override [[doCanonicalize()]].
* They should remove expressions cosmetic variations themselves.
*/
@transient final lazy val canonicalized: PlanType = {
var plan = doCanonicalize()
// If the plan has not been changed due to canonicalization, make a copy of it so we don't
// mutate the original plan's _isCanonicalizedPlan flag.
if (plan eq this) {
plan = plan.makeCopy(plan.mapProductIterator(x => x.asInstanceOf[AnyRef]))
}
plan._isCanonicalizedPlan = true
plan
}

/**
* Defines how the canonicalization should work for the current plan.
*/
lazy val canonicalized: PlanType = {
protected def doCanonicalize(): PlanType = {
val canonicalizedChildren = children.map(_.canonicalized)
var id = -1
mapExpressions {
Expand All @@ -213,7 +236,6 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
}.withNewChildren(canonicalizedChildren)
}


/**
* Returns true when the given query plan will return the same results as this query plan.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ case class SubqueryAlias(
child: LogicalPlan)
extends UnaryNode {

override lazy val canonicalized: LogicalPlan = child.canonicalized
override def doCanonicalize(): LogicalPlan = child.canonicalized

override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ case class ResolvedHint(child: LogicalPlan, hints: HintInfo = HintInfo())

override def output: Seq[Attribute] = child.output

override lazy val canonicalized: LogicalPlan = child.canonicalized
override def doCanonicalize(): LogicalPlan = child.canonicalized
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,11 +592,6 @@ class ExpressionParserSuite extends PlanTest {
intercept("1 - f('o', o(bar)) hello * world", "mismatched input '*'")
}

test("current date/timestamp braceless expressions") {
assertEqual("current_date", CurrentDate())
assertEqual("current_timestamp", CurrentTimestamp())
}

test("SPARK-17364, fully qualified column name which starts with number") {
assertEqual("123_", UnresolvedAttribute("123_"))
assertEqual("1a.123_", UnresolvedAttribute("1a.123_"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ case class RowDataSourceScanExec(
}

// Don't care about `rdd` and `tableIdentifier` when canonicalizing.
override lazy val canonicalized: SparkPlan =
override def doCanonicalize(): SparkPlan =
copy(
fullOutput.map(QueryPlan.normalizeExprId(_, fullOutput)),
rdd = null,
Expand Down Expand Up @@ -522,7 +522,7 @@ case class FileSourceScanExec(
}
}

override lazy val canonicalized: FileSourceScanExec = {
override def doCanonicalize(): FileSourceScanExec = {
FileSourceScanExec(
relation,
output.map(QueryPlan.normalizeExprId(_, output)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
* Concrete implementations of SparkPlan should override `doExecute`.
*/
final def execute(): RDD[InternalRow] = executeQuery {
if (isCanonicalizedPlan) {
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
}
doExecute()
}

Expand All @@ -121,6 +124,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
* Concrete implementations of SparkPlan should override `doExecuteBroadcast`.
*/
final def executeBroadcast[T](): broadcast.Broadcast[T] = executeQuery {
if (isCanonicalizedPlan) {
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
}
doExecuteBroadcast()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
* Create a [[ClearCacheCommand]] logical plan.
*/
override def visitClearCache(ctx: ClearCacheContext): LogicalPlan = withOrigin(ctx) {
ClearCacheCommand
ClearCacheCommand()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

override lazy val canonicalized: SparkPlan = {
override def doCanonicalize(): SparkPlan = {
RangeExec(range.canonicalized.asInstanceOf[org.apache.spark.sql.catalyst.plans.logical.Range])
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ case class UncacheTableCommand(
/**
* Clear all cached data from the in-memory cache.
*/
case object ClearCacheCommand extends RunnableCommand {
case class ClearCacheCommand() extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.catalog.clearCache()
Seq.empty[Row]
}

/** [[org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy()]] does not support 0-arg ctor. */
override def makeCopy(newArgs: Array[AnyRef]): ClearCacheCommand = ClearCacheCommand()
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ case class LogicalRelation(
extends LeafNode with MultiInstanceRelation {

// Only care about relation when canonicalizing.
override lazy val canonicalized: LogicalPlan = copy(
override def doCanonicalize(): LogicalPlan = copy(
output = output.map(QueryPlan.normalizeExprId(_, output)),
catalogTable = None)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ case class BroadcastExchangeExec(

override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)

override lazy val canonicalized: SparkPlan = {
override def doCanonicalize(): SparkPlan = {
BroadcastExchangeExec(mode.canonicalized, child.canonicalized)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan
extends LeafExecNode {

// Ignore this wrapper for canonicalizing.
override lazy val canonicalized: SparkPlan = child.canonicalized
override def doCanonicalize(): SparkPlan = child.canonicalized

def doExecute(): RDD[InternalRow] = {
child.execute()
Expand Down
17 changes: 17 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/datetime.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,23 @@ select to_timestamp(null), to_timestamp('2016-12-31 00:12:00'), to_timestamp('20

select dayofweek('2007-02-03'), dayofweek('2009-07-30'), dayofweek('2017-05-27'), dayofweek(null), dayofweek('1582-10-15 13:10:15');

-- [SPARK-22333]: timeFunctionCall has conflicts with columnReference
create temporary view ttf1 as select * from values
(1, 2),
(2, 3)
as ttf1(current_date, current_timestamp);

select current_date, current_timestamp from ttf1;

create temporary view ttf2 as select * from values
(1, 2),
(2, 3)
as ttf2(a, b);

select current_date = current_date(), current_timestamp = current_timestamp(), a, b from ttf2;

select a, b from ttf2 order by a, current_date;

-- trunc date
select trunc('2015-07-22', 'yyyy'), trunc('2015-07-22', 'YYYY'),
trunc('2015-07-22', 'year'), trunc('2015-07-22', 'YEAR'),
Expand Down
Loading

0 comments on commit 679ff98

Please sign in to comment.