From 54893e01bea37e5f86f19675fa179637d8176fbe Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 21 Jan 2019 22:43:06 +0800 Subject: [PATCH] fix --- .../apache/spark/sql/DataFrameWriter.scala | 5 +- .../datasources/v2/FileBatchWriter.scala | 46 +++++++++++++++++++ .../datasources/v2/FileWriteBuilder.scala | 7 ++- ...ceWriter.scala => FileWriterFactory.scala} | 29 ++---------- .../datasources/v2/orc/OrcWriteBuilder.scala | 1 - 5 files changed, 57 insertions(+), 31 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWriter.scala rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/{FileSourceWriter.scala => FileWriterFactory.scala} (72%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index d9404cd929925..c6b3c9b47d5ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -244,7 +244,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val session = df.sparkSession val cls = DataSource.lookupDataSource(source, session.sessionState.conf) - if (classOf[TableProvider].isAssignableFrom(cls)) { + // SPARK-26673: In Data Source V2 project, partitioning is still under development. + // Here we fallback to V1 if the write path if output partitioning is required. + // TODO: use V2 implementations when partitioning feature is supported. + if (classOf[TableProvider].isAssignableFrom(cls) && partitioningColumns.isEmpty) { val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] val sessionOptions = DataSourceV2Utils.extractSessionConfigs( provider, session.sessionState.conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWriter.scala new file mode 100644 index 0000000000000..e2dd94348dc24 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWriter.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.hadoop.mapreduce.Job + +import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.util.SerializableConfiguration + +class FileBatchWriter( + job: Job, + description: WriteJobDescription, + committer: FileCommitProtocol) + extends BatchWrite { + override def commit(messages: Array[WriterCommitMessage]): Unit = { + committer.commitJob(job, messages.map(_.asInstanceOf[WriteTaskResult].commitMsg)) + } + + override def useCommitCoordinator(): Boolean = false + + override def abort(messages: Array[WriterCommitMessage]): Unit = { + committer.abortJob(job) + } + + override def createBatchWriterFactory(): DataWriterFactory = { + val conf = new SerializableConfiguration(job.getConfiguration) + FileWriterFactory(description, committer, conf) + } +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala index 7406803de5559..332c68773fef2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala @@ -75,7 +75,7 @@ abstract class FileWriteBuilder(options: DataSourceOptions) job.setOutputKeyClass(classOf[Void]) job.setOutputValueClass(classOf[InternalRow]) - FileOutputFormat.setOutputPath(job, new Path(pathName)) + FileOutputFormat.setOutputPath(job, path) val caseInsensitiveOptions = CaseInsensitiveMap(optionsAsScala) // Note: prepareWrite has side effect. It sets "job". @@ -113,12 +113,11 @@ abstract class FileWriteBuilder(options: DataSourceOptions) case SaveMode.Overwrite => fs.delete(path, true) committer.setupJob(job) - new FileSourceWriter(job, description, committer) + new FileBatchWriter(job, description, committer) case _ => committer.setupJob(job) - new FileSourceWriter(job, description, committer) - + new FileBatchWriter(job, description, committer) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileSourceWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala similarity index 72% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileSourceWriter.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala index 2086b8105af25..55d16910c26f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileSourceWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala @@ -18,37 +18,16 @@ package org.apache.spark.sql.execution.datasources.v2 import java.util.Date -import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskID, TaskType} +import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskID, TaskType} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataWriter, SingleDirectoryDataWriter, WriteJobDescription, WriteTaskResult} -import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataWriter, SingleDirectoryDataWriter, WriteJobDescription} +import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory} import org.apache.spark.util.SerializableConfiguration -class FileSourceWriter( - job: Job, - description: WriteJobDescription, - committer: FileCommitProtocol) - extends BatchWrite { - override def commit(messages: Array[WriterCommitMessage]): Unit = { - committer.commitJob(job, messages.map(_.asInstanceOf[WriteTaskResult].commitMsg)) - } - - override def useCommitCoordinator(): Boolean = false - - override def abort(messages: Array[WriterCommitMessage]): Unit = { - committer.abortJob(job) - } - - override def createBatchWriterFactory(): DataWriterFactory = { - val conf = new SerializableConfiguration(job.getConfiguration) - FileDataWriterFactory(description, committer, conf) - } -} - -case class FileDataWriterFactory ( +case class FileWriterFactory ( description: WriteJobDescription, committer: FileCommitProtocol, conf: SerializableConfiguration) extends DataWriterFactory { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala index ba0304f267f48..986a1c5f2b3c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala @@ -46,7 +46,6 @@ class OrcWriteBuilder(options: DataSourceOptions) extends FileWriteBuilder(optio conf.asInstanceOf[JobConf] .setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]]) - println("v2 write...") new OutputWriterFactory { override def newInstance( path: String,