Skip to content

Commit

Permalink
Changes to ParquetRelation and its metadata
Browse files Browse the repository at this point in the history
The Hadoop configuration is now passed to ParquetRelation (fixes SPARK-2112) and the path is no longer stored in metadata (fixes SPARK-2195)
  • Loading branch information
AndreSchumacher committed Jun 19, 2014
1 parent 7eceb67 commit 95c1367
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
def parquetFile(path: String): SchemaRDD =
new SchemaRDD(this, parquet.ParquetRelation(path))
new SchemaRDD(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration)))

/**
* Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ class JavaSQLContext(val sqlContext: SQLContext) {
* Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
*/
def parquetFile(path: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, ParquetRelation(path))
new JavaSchemaRDD(
sqlContext,
ParquetRelation(path, Some(sqlContext.sparkContext.hadoopConfiguration)))

/**
* Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,20 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
*
* @param path The path to the Parquet file.
*/
private[sql] case class ParquetRelation(val path: String)
extends LeafNode with MultiInstanceRelation {
private[sql] case class ParquetRelation(
val path: String,
@transient val conf: Option[Configuration] = None) extends LeafNode with MultiInstanceRelation {
self: Product =>

/** Schema derived from ParquetFile */
def parquetSchema: MessageType =
ParquetTypesConverter
.readMetaData(new Path(path))
.readMetaData(new Path(path), conf)
.getFileMetaData
.getSchema

/** Attributes */
override val output = ParquetTypesConverter.readSchemaFromFile(new Path(path))
override val output = ParquetTypesConverter.readSchemaFromFile(new Path(path), conf)

override def newInstance = ParquetRelation(path).asInstanceOf[this.type]

Expand Down Expand Up @@ -130,7 +131,7 @@ private[sql] object ParquetRelation {
}
ParquetRelation.enableLogForwarding()
ParquetTypesConverter.writeMetaData(attributes, path, conf)
new ParquetRelation(path.toString) {
new ParquetRelation(path.toString, Some(conf)) {
override val output = attributes
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ private[parquet] object ParquetTypesConverter extends Logging {
}
}
val extraMetadata = new java.util.HashMap[String, String]()
extraMetadata.put("path", path.toString)
extraMetadata.put(
RowReadSupport.SPARK_METADATA_KEY,
ParquetTypesConverter.convertToString(attributes))
Expand All @@ -347,16 +346,15 @@ private[parquet] object ParquetTypesConverter extends Logging {
* in the parent directory. If so, this is used. Else we read the actual footer at the given
* location.
* @param origPath The path at which we expect one (or more) Parquet files.
* @param configuration The Hadoop configuration to use.
* @return The `ParquetMetadata` containing among other things the schema.
*/
def readMetaData(origPath: Path): ParquetMetadata = {
def readMetaData(origPath: Path, configuration: Option[Configuration]): ParquetMetadata = {
if (origPath == null) {
throw new IllegalArgumentException("Unable to read Parquet metadata: path is null")
}
val job = new Job()
// TODO: since this is called from ParquetRelation (LogicalPlan) we don't have access
// to SparkContext's hadoopConfig; in principle the default FileSystem may be different(?!)
val conf = ContextUtil.getConfiguration(job)
val conf = configuration.getOrElse(ContextUtil.getConfiguration(job))
val fs: FileSystem = origPath.getFileSystem(conf)
if (fs == null) {
throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath")
Expand Down Expand Up @@ -390,18 +388,19 @@ private[parquet] object ParquetTypesConverter extends Logging {
* may lead to an upcast of types (e.g., {byte, short} to int).
*
* @param origPath The path at which we expect one (or more) Parquet files.
* @param conf The Hadoop configuration to use.
* @return A list of attributes that make up the schema.
*/
def readSchemaFromFile(origPath: Path): Seq[Attribute] = {
def readSchemaFromFile(origPath: Path, conf: Option[Configuration]): Seq[Attribute] = {
val keyValueMetadata: java.util.Map[String, String] =
readMetaData(origPath)
readMetaData(origPath, conf)
.getFileMetaData
.getKeyValueMetaData
if (keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY) != null) {
convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
} else {
val attributes = convertToAttributes(
readMetaData(origPath).getFileMetaData.getSchema)
readMetaData(origPath, conf).getFileMetaData.getSchema)
log.warn(s"Falling back to schema conversion from Parquet types; result: $attributes")
attributes
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
path,
TestSQLContext.sparkContext.hadoopConfiguration)
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
val metaData = ParquetTypesConverter.readMetaData(path)
val metaData = ParquetTypesConverter.readMetaData(path, Some(ContextUtil.getConfiguration(job)))
assert(metaData != null)
ParquetTestData
.testData
Expand Down

0 comments on commit 95c1367

Please sign in to comment.