Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed Jan 21, 2019
1 parent 91689ac commit 54893e0
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

val session = df.sparkSession
val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
if (classOf[TableProvider].isAssignableFrom(cls)) {
// SPARK-26673: In Data Source V2 project, partitioning is still under development.
// Here we fallback to V1 if the write path if output partitioning is required.
// TODO: use V2 implementations when partitioning feature is supported.
if (classOf[TableProvider].isAssignableFrom(cls) && partitioningColumns.isEmpty) {
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
provider, session.sessionState.conf)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2

import org.apache.hadoop.mapreduce.Job

import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.util.SerializableConfiguration

class FileBatchWriter(
job: Job,
description: WriteJobDescription,
committer: FileCommitProtocol)
extends BatchWrite {
override def commit(messages: Array[WriterCommitMessage]): Unit = {
committer.commitJob(job, messages.map(_.asInstanceOf[WriteTaskResult].commitMsg))
}

override def useCommitCoordinator(): Boolean = false

override def abort(messages: Array[WriterCommitMessage]): Unit = {
committer.abortJob(job)
}

override def createBatchWriterFactory(): DataWriterFactory = {
val conf = new SerializableConfiguration(job.getConfiguration)
FileWriterFactory(description, committer, conf)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ abstract class FileWriteBuilder(options: DataSourceOptions)

job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[InternalRow])
FileOutputFormat.setOutputPath(job, new Path(pathName))
FileOutputFormat.setOutputPath(job, path)

val caseInsensitiveOptions = CaseInsensitiveMap(optionsAsScala)
// Note: prepareWrite has side effect. It sets "job".
Expand Down Expand Up @@ -113,12 +113,11 @@ abstract class FileWriteBuilder(options: DataSourceOptions)
case SaveMode.Overwrite =>
fs.delete(path, true)
committer.setupJob(job)
new FileSourceWriter(job, description, committer)
new FileBatchWriter(job, description, committer)

case _ =>
committer.setupJob(job)
new FileSourceWriter(job, description, committer)

new FileBatchWriter(job, description, committer)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,16 @@ package org.apache.spark.sql.execution.datasources.v2

import java.util.Date

import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataWriter, SingleDirectoryDataWriter, WriteJobDescription, WriteTaskResult}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataWriter, SingleDirectoryDataWriter, WriteJobDescription}
import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory}
import org.apache.spark.util.SerializableConfiguration

class FileSourceWriter(
job: Job,
description: WriteJobDescription,
committer: FileCommitProtocol)
extends BatchWrite {
override def commit(messages: Array[WriterCommitMessage]): Unit = {
committer.commitJob(job, messages.map(_.asInstanceOf[WriteTaskResult].commitMsg))
}

override def useCommitCoordinator(): Boolean = false

override def abort(messages: Array[WriterCommitMessage]): Unit = {
committer.abortJob(job)
}

override def createBatchWriterFactory(): DataWriterFactory = {
val conf = new SerializableConfiguration(job.getConfiguration)
FileDataWriterFactory(description, committer, conf)
}
}

case class FileDataWriterFactory (
case class FileWriterFactory (
description: WriteJobDescription,
committer: FileCommitProtocol,
conf: SerializableConfiguration) extends DataWriterFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class OrcWriteBuilder(options: DataSourceOptions) extends FileWriteBuilder(optio
conf.asInstanceOf[JobConf]
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])

println("v2 write...")
new OutputWriterFactory {
override def newInstance(
path: String,
Expand Down

0 comments on commit 54893e0

Please sign in to comment.