Skip to content

Commit

Permalink
SPARK-5068: fix bug when partition path doesn't exists apache#2
Browse files Browse the repository at this point in the history
  • Loading branch information
lazyman500 committed Mar 17, 2015
1 parent 41f60ce commit 04c443c
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,39 @@ class HadoopTableReader(
partitionToDeserializer: Map[HivePartition,
Class[_ <: Deserializer]],
filterOpt: Option[PathFilter]): RDD[Row] = {
val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) =>
// SPARK-5068:get FileStatus and do the filtering locally when the path is not exists

var existPathSet =collection.mutable.Set[String]()
var pathPatternSet = collection.mutable.Set[String]()

val hivePartitionRDDs = partitionToDeserializer.filter {
case (partition, partDeserializer) =>

def updateExistPathSetByPathPattern(pathPatternStr:String ){
val pathPattern = new Path(pathPatternStr)
val fs = pathPattern.getFileSystem(sc.hiveconf)
val matchs = fs.globStatus(pathPattern);
matchs.map( fileStatus =>(existPathSet+= fileStatus.getPath.toString))
}
// convert /demo/data/year/month/day to /demo/data/**/**/**/
def getPathPatternByPath(parNum:Int,tpath:Path):String = {
var path = tpath
for (i <- (1 to parNum)) { path = path.getParent }
val tails = (1 to parNum).map(_ => "*").mkString("/","/","/")
path.toString + tails
}

val partPath = HiveShim.getDataLocationPath(partition)
val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size();
var pathPatternStr = getPathPatternByPath(partNum,partPath)
if(!pathPatternSet.contains(pathPatternStr)){
pathPatternSet+=pathPatternStr
updateExistPathSetByPathPattern(pathPatternStr)
}
existPathSet.contains(partPath.toString)

}
.map { case (partition, partDeserializer) =>
val partDesc = Utilities.getPartitionDesc(partition)
val partPath = HiveShim.getDataLocationPath(partition)
val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import java.io.File

import com.google.common.io.Files
import org.apache.spark.sql.{QueryTest, _}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.util.Utils
/* Implicits */
import org.apache.spark.sql.hive.test.TestHive._



class QueryPartitionSuite extends QueryTest {
import org.apache.spark.sql.hive.test.TestHive.implicits._

test("SPARK-5068: query data when path doesn't exists"){
val testData = TestHive.sparkContext.parallelize(
(1 to 10).map(i => TestData(i, i.toString))).toDF()
testData.registerTempTable("testData")

val tmpDir = Files.createTempDir()
//create the table for test
sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' ")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') SELECT key,value FROM testData")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') SELECT key,value FROM testData")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') SELECT key,value FROM testData")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') SELECT key,value FROM testData")

//test for the exist path
checkAnswer(sql("select key,value from table_with_partition"),
testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect
++ testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect)

//delect the path of one partition
val folders = tmpDir.listFiles.filter(_.isDirectory)
Utils.deleteRecursively(folders(0))

//test for affter delete the path
checkAnswer(sql("select key,value from table_with_partition"),
testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect
++ testData.toSchemaRDD.collect)

sql("DROP TABLE table_with_partition")
sql("DROP TABLE createAndInsertTest")
}
}

0 comments on commit 04c443c

Please sign in to comment.