Skip to content

Commit

Permalink
[SPARK-13983][SQL] Fix HiveThriftServer2 can not get "--hiveconf" and…
Browse files Browse the repository at this point in the history
… ''--hivevar" variables since 2.0

## What changes were proposed in this pull request?

`--hiveconf` and `--hivevar` variables no longer work since Spark 2.0. The `spark-sql` client has fixed by [SPARK-15730](https://issues.apache.org/jira/browse/SPARK-15730) and [SPARK-18086](https://issues.apache.org/jira/browse/SPARK-18086). but `beeline`/[`Spark SQL HiveThriftServer2`](https://github.com/apache/spark/blob/v2.1.1/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala) is still broken. This pull request fix it.

This pull request works for both `JDBC client` and `beeline`.

## How was this patch tested?

unit tests for  `JDBC client`
manual tests for `beeline`:
```
git checkout origin/pr/17886

dev/make-distribution.sh --mvn mvn  --tgz -Phive -Phive-thriftserver -Phadoop-2.6 -DskipTests

tar -zxf spark-2.3.0-SNAPSHOT-bin-2.6.5.tgz && cd spark-2.3.0-SNAPSHOT-bin-2.6.5

sbin/start-thriftserver.sh
```
```
cat <<EOF > test.sql
select '\${a}', '\${b}';
EOF

beeline -u jdbc:hive2://localhost:10000 --hiveconf a=avalue --hivevar b=bvalue -f test.sql

```

Author: Yuming Wang <[email protected]>

Closes #17886 from wangyum/SPARK-13983-dev.
  • Loading branch information
wangyum authored and gatorsmile committed Feb 1, 2018
1 parent ec63e2d commit f051f83
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.hadoop.hive.ql.history.HiveHistory;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.processors.SetProcessor;
import org.apache.hadoop.hive.ql.parse.VariableSubstitution;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.common.util.HiveVersionInfo;
Expand All @@ -71,6 +71,12 @@
import org.apache.hive.service.cli.thrift.TProtocolVersion;
import org.apache.hive.service.server.ThreadWithGarbageCleanup;

import static org.apache.hadoop.hive.conf.SystemVariables.ENV_PREFIX;
import static org.apache.hadoop.hive.conf.SystemVariables.HIVECONF_PREFIX;
import static org.apache.hadoop.hive.conf.SystemVariables.HIVEVAR_PREFIX;
import static org.apache.hadoop.hive.conf.SystemVariables.METACONF_PREFIX;
import static org.apache.hadoop.hive.conf.SystemVariables.SYSTEM_PREFIX;

/**
* HiveSession
*
Expand Down Expand Up @@ -209,7 +215,7 @@ private void configureSession(Map<String, String> sessionConfMap) throws HiveSQL
String key = entry.getKey();
if (key.startsWith("set:")) {
try {
SetProcessor.setVariable(key.substring(4), entry.getValue());
setVariable(key.substring(4), entry.getValue());
} catch (Exception e) {
throw new HiveSQLException(e);
}
Expand All @@ -221,6 +227,70 @@ private void configureSession(Map<String, String> sessionConfMap) throws HiveSQL
}
}

// Copy from org.apache.hadoop.hive.ql.processors.SetProcessor, only change:
// setConf(varname, propName, varvalue, true) when varname.startsWith(HIVECONF_PREFIX)
public static int setVariable(String varname, String varvalue) throws Exception {
SessionState ss = SessionState.get();
if (varvalue.contains("\n")){
ss.err.println("Warning: Value had a \\n character in it.");
}
varname = varname.trim();
if (varname.startsWith(ENV_PREFIX)){
ss.err.println("env:* variables can not be set.");
return 1;
} else if (varname.startsWith(SYSTEM_PREFIX)){
String propName = varname.substring(SYSTEM_PREFIX.length());
System.getProperties().setProperty(propName,
new VariableSubstitution().substitute(ss.getConf(),varvalue));
} else if (varname.startsWith(HIVECONF_PREFIX)){
String propName = varname.substring(HIVECONF_PREFIX.length());
setConf(varname, propName, varvalue, true);
} else if (varname.startsWith(HIVEVAR_PREFIX)) {
String propName = varname.substring(HIVEVAR_PREFIX.length());
ss.getHiveVariables().put(propName,
new VariableSubstitution().substitute(ss.getConf(),varvalue));
} else if (varname.startsWith(METACONF_PREFIX)) {
String propName = varname.substring(METACONF_PREFIX.length());
Hive hive = Hive.get(ss.getConf());
hive.setMetaConf(propName, new VariableSubstitution().substitute(ss.getConf(), varvalue));
} else {
setConf(varname, varname, varvalue, true);
}
return 0;
}

// returns non-null string for validation fail
private static void setConf(String varname, String key, String varvalue, boolean register)
throws IllegalArgumentException {
HiveConf conf = SessionState.get().getConf();
String value = new VariableSubstitution().substitute(conf, varvalue);
if (conf.getBoolVar(HiveConf.ConfVars.HIVECONFVALIDATION)) {
HiveConf.ConfVars confVars = HiveConf.getConfVars(key);
if (confVars != null) {
if (!confVars.isType(value)) {
StringBuilder message = new StringBuilder();
message.append("'SET ").append(varname).append('=').append(varvalue);
message.append("' FAILED because ").append(key).append(" expects ");
message.append(confVars.typeString()).append(" type value.");
throw new IllegalArgumentException(message.toString());
}
String fail = confVars.validate(value);
if (fail != null) {
StringBuilder message = new StringBuilder();
message.append("'SET ").append(varname).append('=').append(varvalue);
message.append("' FAILED in validation : ").append(fail).append('.');
throw new IllegalArgumentException(message.toString());
}
} else if (key.startsWith("hive.")) {
throw new IllegalArgumentException("hive configuration " + key + " does not exists.");
}
}
conf.verifyAndSet(key, value);
if (register) {
SessionState.get().getOverriddenConfigurations().put(key, value);
}
}

@Override
public void setOperationLogSessionDir(File operationLogRootDir) {
if (!operationLogRootDir.exists()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation}
import org.apache.spark.sql.internal.SQLConf

/**
* Executes queries using Spark SQL, and maintains a list of handles to active queries.
Expand All @@ -50,6 +51,9 @@ private[thriftserver] class SparkSQLOperationManager()
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
s" initialized or had already closed.")
val conf = sqlContext.sessionState.conf
val hiveSessionState = parentSession.getSessionState
setConfMap(conf, hiveSessionState.getOverriddenConfigurations)
setConfMap(conf, hiveSessionState.getHiveVariables)
val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
runInBackground)(sqlContext, sessionToActivePool)
Expand All @@ -58,4 +62,12 @@ private[thriftserver] class SparkSQLOperationManager()
s"runInBackground=$runInBackground")
operation
}

def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = {
val iterator = confMap.entrySet().iterator()
while (iterator.hasNext) {
val kv = iterator.next()
conf.setConfString(kv.getKey, kv.getValue)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,22 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
}

test("Support beeline --hiveconf and --hivevar") {
withJdbcStatement() { statement =>
executeTest(hiveConfList)
executeTest(hiveVarList)
def executeTest(hiveList: String): Unit = {
hiveList.split(";").foreach{ m =>
val kv = m.split("=")
// select "${a}"; ---> avalue
val resultSet = statement.executeQuery("select \"${" + kv(0) + "}\"")
resultSet.next()
assert(resultSet.getString(1) === kv(1))
}
}
}
}

test("JDBC query execution") {
withJdbcStatement("test") { statement =>
val queries = Seq(
Expand Down Expand Up @@ -740,10 +756,11 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test {
s"""jdbc:hive2://localhost:$serverPort/
|default?
|hive.server2.transport.mode=http;
|hive.server2.thrift.http.path=cliservice
|hive.server2.thrift.http.path=cliservice;
|${hiveConfList}#${hiveVarList}
""".stripMargin.split("\n").mkString.trim
} else {
s"jdbc:hive2://localhost:$serverPort/"
s"jdbc:hive2://localhost:$serverPort/?${hiveConfList}#${hiveVarList}"
}

def withMultipleConnectionJdbcStatement(tableNames: String*)(fs: (Statement => Unit)*) {
Expand Down Expand Up @@ -779,6 +796,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
private var listeningPort: Int = _
protected def serverPort: Int = listeningPort

protected val hiveConfList = "a=avalue;b=bvalue"
protected val hiveVarList = "c=cvalue;d=dvalue"
protected def user = System.getProperty("user.name")

protected var warehousePath: File = _
Expand Down

0 comments on commit f051f83

Please sign in to comment.