Skip to content

Commit

Permalink
[SPARK-23547][SQL] Cleanup the .pipeout file when the Hive Session cl…
Browse files Browse the repository at this point in the history
…osed

## What changes were proposed in this pull request?

![2018-03-07_121010](https://user-images.githubusercontent.com/24823338/37073232-922e10d2-2200-11e8-8172-6e03aa984b39.png)

when the hive session closed, we should also cleanup the .pipeout file.

## How was this patch tested?

Added test cases.

Author: zuotingbing <[email protected]>

Closes #20702 from zuotingbing/SPARK-23547.
  • Loading branch information
zuotingbing authored and cloud-fan committed Mar 13, 2018
1 parent 9ddd1e2 commit 918fb9b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,8 @@ public void close() throws HiveSQLException {
opHandleSet.clear();
// Cleanup session log directory.
cleanupSessionLogDir();
// Cleanup pipeout file.
cleanupPipeoutFile();
HiveHistory hiveHist = sessionState.getHiveHistory();
if (null != hiveHist) {
hiveHist.closeStream();
Expand All @@ -665,6 +667,22 @@ public void close() throws HiveSQLException {
}
}

private void cleanupPipeoutFile() {
String lScratchDir = hiveConf.getVar(ConfVars.LOCALSCRATCHDIR);
String sessionID = hiveConf.getVar(ConfVars.HIVESESSIONID);

File[] fileAry = new File(lScratchDir).listFiles(
(dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout"));

for (File file : fileAry) {
try {
FileUtils.forceDelete(file);
} catch (Exception e) {
LOG.error("Failed to cleanup pipeout file: " + file, e);
}
}
}

private void cleanupSessionLogDir() {
if (isOperationLogEnabled) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.sql.hive.thriftserver

import java.io.File
import java.io.{File, FilenameFilter}
import java.net.URL
import java.nio.charset.StandardCharsets
import java.sql.{Date, DriverManager, SQLException, Statement}
import java.util.UUID

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -613,6 +614,28 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
bufferSrc.close()
}
}

test("SPARK-23547 Cleanup the .pipeout file when the Hive Session closed") {
def pipeoutFileList(sessionID: UUID): Array[File] = {
lScratchDir.listFiles(new FilenameFilter {
override def accept(dir: File, name: String): Boolean = {
name.startsWith(sessionID.toString) && name.endsWith(".pipeout")
}
})
}

withCLIServiceClient { client =>
val user = System.getProperty("user.name")
val sessionHandle = client.openSession(user, "")
val sessionID = sessionHandle.getSessionId

assert(pipeoutFileList(sessionID).length == 1)

client.closeSession(sessionHandle)

assert(pipeoutFileList(sessionID).length == 0)
}
}
}

class SingleSessionSuite extends HiveThriftJdbcTest {
Expand Down Expand Up @@ -807,6 +830,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
private val pidDir: File = Utils.createTempDir(namePrefix = "thriftserver-pid")
protected var logPath: File = _
protected var operationLogPath: File = _
protected var lScratchDir: File = _
private var logTailingProcess: Process = _
private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]

Expand Down Expand Up @@ -844,6 +868,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
| --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=$mode
| --hiveconf ${ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}=$operationLogPath
| --hiveconf ${ConfVars.LOCALSCRATCHDIR}=$lScratchDir
| --hiveconf $portConf=$port
| --driver-class-path $driverClassPath
| --driver-java-options -Dlog4j.debug
Expand Down Expand Up @@ -873,6 +898,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
metastorePath.delete()
operationLogPath = Utils.createTempDir()
operationLogPath.delete()
lScratchDir = Utils.createTempDir()
lScratchDir.delete()
logPath = null
logTailingProcess = null

Expand Down Expand Up @@ -956,6 +983,9 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
operationLogPath.delete()
operationLogPath = null

lScratchDir.delete()
lScratchDir = null

Option(logPath).foreach(_.delete())
logPath = null

Expand Down

0 comments on commit 918fb9b

Please sign in to comment.