Skip to content

Commit

Permalink
VINITUS-248: add support for Cloudera Atlas Connector for Spark 3 (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
tanvn authored and GitHub Enterprise committed May 10, 2022
1 parent d0646ca commit 4158bbc
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}

case AppendData(r: DataSourceV2Relation, query, _, _, Some(write)) =>
AppendDataExec(planLater(query), refreshCache(r), write) :: Nil
AppendDataExec(planLater(query), refreshCache(r), write, r.table) :: Nil

case OverwriteByExpression(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), _, query,
_, _, Some(write)) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
Expand All @@ -232,10 +232,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}

case OverwriteByExpression(r: DataSourceV2Relation, _, query, _, _, Some(write)) =>
OverwriteByExpressionExec(planLater(query), refreshCache(r), write) :: Nil
OverwriteByExpressionExec(planLater(query), refreshCache(r), write, r.table) :: Nil

case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _, _, Some(write)) =>
OverwritePartitionsDynamicExec(planLater(query), refreshCache(r), write) :: Nil
OverwritePartitionsDynamicExec(planLater(query), refreshCache(r), write, r.table) :: Nil

case DeleteFromTable(relation, condition) =>
relation match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ case class AtomicReplaceTableAsSelectExec(
case class AppendDataExec(
query: SparkPlan,
refreshCache: () => Unit,
write: Write) extends V2ExistingTableWriteExec {
write: Write,
table: Table) extends V2ExistingTableWriteExec {
override protected def withNewChildInternal(newChild: SparkPlan): AppendDataExec =
copy(query = newChild)
}
Expand All @@ -254,7 +255,8 @@ case class AppendDataExec(
case class OverwriteByExpressionExec(
query: SparkPlan,
refreshCache: () => Unit,
write: Write) extends V2ExistingTableWriteExec {
write: Write,
table: Table) extends V2ExistingTableWriteExec {
override protected def withNewChildInternal(newChild: SparkPlan): OverwriteByExpressionExec =
copy(query = newChild)
}
Expand All @@ -271,7 +273,8 @@ case class OverwriteByExpressionExec(
case class OverwritePartitionsDynamicExec(
query: SparkPlan,
refreshCache: () => Unit,
write: Write) extends V2ExistingTableWriteExec {
write: Write,
table: Table) extends V2ExistingTableWriteExec {
override protected def withNewChildInternal(newChild: SparkPlan): OverwritePartitionsDynamicExec =
copy(query = newChild)
}
Expand Down

0 comments on commit 4158bbc

Please sign in to comment.