Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-LLAP-70]Implement against Apache Spark 2.2 #118

Closed
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@

name := "spark-llap"
version := "1.1.1-2.1"
version := "1.1.1-2.2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to change the versions in assembly together.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you change assembly versions?

organization := "com.hortonworks.spark"
scalaVersion := "2.11.8"
val scalatestVersion = "2.2.6"

sparkVersion := sys.props.getOrElse("spark.version", "2.1.0.2.6.0.3-8")
sparkVersion := sys.props.getOrElse("spark.version", "2.2.0-SNAPSHOT")

val hadoopVersion = sys.props.getOrElse("hadoop.version", "2.7.3")
val hiveVersion = sys.props.getOrElse("hive.version", "2.1.0.2.6.0.3-8")
Expand Down
529 changes: 529 additions & 0 deletions patch/0001-SPARK-LLAP-RANGER-Integration_2.2.0.patch

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hive.llap

import com.hortonworks.spark.sql.hive.llap.LlapRelation
import com.hortonworks.spark.sql.hive.llap.{DefaultJDBCWrapper, LlapRelation}

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.BaseRelation
Expand All @@ -27,13 +27,16 @@ class DefaultSource extends RelationProvider {

override def createRelation(sqlContext: SQLContext, parameters: Map[String, String])
: BaseRelation = {
val sessionState = sqlContext.sparkSession.sessionState.asInstanceOf[LlapSessionState]
val connectionUrl =
sqlContext.sparkSession.sessionState.getConnectionUrl(sqlContext.sparkSession)
val user = sqlContext.sparkSession.sessionState.getUserString()
val connection = DefaultJDBCWrapper.getConnector(None, connectionUrl, user)
val params = parameters +
("user.name" -> sessionState.getUserString()) +
("user.name" -> user) +
("user.password" -> "password")
LlapRelation(
sqlContext,
params,
sessionState.connection)
connection)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.sql.hive.llap

import com.hortonworks.spark.sql.hive.llap.DefaultJDBCWrapper
import java.sql.Connection
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import com.hortonworks.spark.sql.hive.llap.DefaultJDBCWrapper
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -65,6 +65,18 @@ private[spark] class LlapExternalCatalog(
found
}

/**
* @return the connection to the HIVE LLAP
*/
private def createConnection(): Connection = {
val sparkSession = SparkSession.getActiveSession.get.sqlContext.sparkSession
val sessionState = SparkSession.getActiveSession.get.sessionState
val connectionUrl = sessionState.getConnectionUrl(sparkSession)
val user = sessionState.getUserString()
val connection = DefaultJDBCWrapper.getConnector(None, connectionUrl, user)
connection
}

/**
* Run some code involving `client` in a [[synchronized]] block and wrap certain
* exceptions thrown in the process in [[AnalysisException]].
Expand All @@ -87,25 +99,23 @@ private[spark] class LlapExternalCatalog(
}
}

override def createDatabase(
override def doCreateDatabase(
dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean): Unit = withClient {
val sessionState = SparkSession.getActiveSession.get.sessionState.asInstanceOf[LlapSessionState]
tryWithResource(sessionState.connection) { conn =>
tryWithResource(createConnection()) { conn =>
tryWithResource(conn.createStatement()) { stmt =>
val ifNotExistsString = if (ignoreIfExists) "IF NOT EXISTS" else ""
stmt.executeUpdate(s"CREATE DATABASE $ifNotExistsString `${dbDefinition.name}`")
}
}
}

override def dropDatabase(
override def doDropDatabase(
db: String,
ignoreIfNotExists: Boolean,
cascade: Boolean): Unit = withClient {
requireDbExists(db)
val sessionState = SparkSession.getActiveSession.get.sessionState.asInstanceOf[LlapSessionState]
tryWithResource(sessionState.connection) { conn =>
tryWithResource(createConnection()) { conn =>
tryWithResource(conn.createStatement()) { stmt =>
val ifExistsString = if (ignoreIfNotExists) "IF EXISTS" else ""
val cascadeString = if (cascade) "CASCADE" else ""
Expand All @@ -117,9 +127,8 @@ private[spark] class LlapExternalCatalog(
override def databaseExists(db: String): Boolean = {
val sparkSession = SparkSession.getActiveSession
if (sparkSession.isDefined) {
val sessionState = sparkSession.get.sessionState.asInstanceOf[LlapSessionState]
var isExist = false
tryWithResource(sessionState.connection) { conn =>
tryWithResource(createConnection()) { conn =>
tryWithResource(conn.createStatement()) { stmt =>
tryWithResource(stmt.executeQuery(s"SHOW DATABASES LIKE '$db'")) { rs =>
isExist = rs.next()
Expand All @@ -144,9 +153,8 @@ private[spark] class LlapExternalCatalog(
}

override def listDatabases(pattern: String): Seq[String] = withClient {
val sessionState = SparkSession.getActiveSession.get.sessionState.asInstanceOf[LlapSessionState]
val databases = new ArrayBuffer[String]()
tryWithResource(sessionState.connection) { conn =>
tryWithResource(createConnection()) { conn =>
tryWithResource(conn.createStatement()) { stmt =>
tryWithResource(stmt.executeQuery(s"SHOW DATABASES LIKE '$pattern'")) { rs =>
while (rs.next()) {
Expand All @@ -158,7 +166,7 @@ private[spark] class LlapExternalCatalog(
databases
}

override def createTable(
override def doCreateTable(
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = {
logInfo(tableDefinition.toString)
Expand All @@ -180,7 +188,7 @@ private[spark] class LlapExternalCatalog(
}
}

override def dropTable(
override def doDropTable(
db: String,
table: String,
ignoreIfNotExists: Boolean,
Expand All @@ -190,9 +198,7 @@ private[spark] class LlapExternalCatalog(
super.dropTable(db, table, ignoreIfNotExists, purge)
} else {
requireDbExists(db)
val sessionState =
SparkSession.getActiveSession.get.sessionState.asInstanceOf[LlapSessionState]
tryWithResource(sessionState.connection) { conn =>
tryWithResource(createConnection()) { conn =>
tryWithResource(conn.createStatement()) { stmt =>
val ifExistsString = if (ignoreIfNotExists) "IF EXISTS" else ""
val purgeString = if (purge) "PURGE" else ""
Expand All @@ -209,9 +215,7 @@ private[spark] class LlapExternalCatalog(
} catch {
case NonFatal(_) =>
// Try to create a dummy table. This table cannot be used for ALTER TABLE.
val sessionState =
SparkSession.getActiveSession.get.sessionState.asInstanceOf[LlapSessionState]
tryWithResource(sessionState.connection) { conn =>
tryWithResource(createConnection()) { conn =>
tryWithResource(conn.getMetaData.getColumns(null, db, table, null)) { rs =>
val schema = new StructType()
while (rs.next()) {
Expand Down Expand Up @@ -243,8 +247,7 @@ private[spark] class LlapExternalCatalog(
}

override def tableExists(db: String, table: String): Boolean = withClient {
val sessionState = SparkSession.getActiveSession.get.sessionState.asInstanceOf[LlapSessionState]
tryWithResource(sessionState.connection) { conn =>
tryWithResource(createConnection()) { conn =>
tryWithResource(conn.getMetaData.getTables(null, db, table, null)) { rs =>
rs.next()
}
Expand All @@ -254,9 +257,8 @@ private[spark] class LlapExternalCatalog(
override def listTables(db: String): Seq[String] = listTables(db, "*")

override def listTables(db: String, pattern: String): Seq[String] = withClient {
val sessionState = SparkSession.getActiveSession.get.sessionState.asInstanceOf[LlapSessionState]
var tableList: List[String] = Nil
tryWithResource(sessionState.connection) { conn =>
tryWithResource(createConnection()) { conn =>
tryWithResource(conn.getMetaData.getTables(null, db, pattern, null)) { rs =>
while (rs.next()) {
tableList = rs.getString(3) :: tableList
Expand All @@ -266,7 +268,7 @@ private[spark] class LlapExternalCatalog(
tableList.reverse
}

override def renameTable(db: String, oldName: String, newName: String): Unit = {
override def doRenameTable(db: String, oldName: String, newName: String): Unit = {
requireDbExists(db)
executeUpdate(s"ALTER TABLE $db.$oldName RENAME TO $db.$newName")
}
Expand Down Expand Up @@ -319,8 +321,7 @@ private[spark] class LlapExternalCatalog(
}

private def executeUpdate(sql: String): Unit = {
val sessionState = SparkSession.getActiveSession.get.sessionState.asInstanceOf[LlapSessionState]
tryWithResource(sessionState.connection) { conn =>
tryWithResource(createConnection()) { conn =>
tryWithResource(conn.createStatement()) { stmt =>
stmt.executeUpdate(sql)
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,73 +17,56 @@

package org.apache.spark.sql.hive.llap

import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

import com.hortonworks.spark.sql.hive.llap.DefaultJDBCWrapper
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.hive.{HiveGenericUDF, HiveGenericUDTF, HiveSessionCatalog, HiveSimpleUDF, HiveUDAFFunction}
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.hive._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DecimalType, DoubleType}


private[sql] class LlapSessionCatalog(
externalCatalog: LlapExternalCatalog,
globalTempViewManager: GlobalTempViewManager,
metastoreCatalog: HiveMetastoreCatalog,
sparkSession: SparkSession,
functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
conf: SQLConf,
parser: ParserInterface,
hadoopConf: Configuration)
extends HiveSessionCatalog(
externalCatalog,
globalTempViewManager,
sparkSession,
functionResourceLoader,
functionRegistry,
conf,
hadoopConf) with Logging {

private val metastoreCatalog = new LlapMetastoreCatalog(sparkSession)
externalCatalog,
globalTempViewManager,
metastoreCatalog,
functionRegistry,
conf,
hadoopConf,
parser,
functionResourceLoader) with Logging {

override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = {
val table = formatTableName(name.table)
if (name.database.isDefined || !tempTables.contains(table)) {
val database = name.database.map(formatDatabaseName)
val newName = name.copy(database = database, table = table)
metastoreCatalog.lookupRelation(newName, alias)
} else {
val relation = tempTables(table)
val tableWithQualifiers = SubqueryAlias(table, relation, None)
// If an alias was specified by the lookup, wrap the plan in a subquery so that
// attributes are properly qualified with this alias.
alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers)
}
}

/**
* Retrieve the metadata of an existing permanent table/view. If no database is specified,
* assume the table/view is in the current database. If the specified table/view is not found
* in the database then a [[NoSuchTableException]] is thrown.
* in the database then a
* [[org.apache.spark.sql.catalyst.analysis.NoSuchTableException]] is thrown.
*/
override def getTableMetadata(name: TableIdentifier): CatalogTable = {
if (Thread.currentThread().getStackTrace()(2).toString().contains("DescribeTableCommand")) {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
val sessionState = sparkSession.sessionState.asInstanceOf[LlapSessionState]
val stmt = sessionState.connection.createStatement()
val sparkSession = SparkSession.getActiveSession.get.sqlContext.sparkSession
val sessionState = SparkSession.getActiveSession.get.sessionState
val connectionUrl = sessionState.getConnectionUrl(sparkSession)
val user = sessionState.getUserString()
val connection = DefaultJDBCWrapper.getConnector(None, connectionUrl, user)
val stmt = connection.createStatement()
stmt.executeUpdate(s"DESC `$db`.`$table`")
}

Expand Down
Loading