Skip to content


refactor ExecuteWriteTask
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed May 21, 2018
1 parent e480ecc commit cbd4ce2
Show file tree
Hide file tree
Showing 2 changed files with 328 additions and 335 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.spark.sql.execution.datasources

import scala.collection.mutable

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskAttemptContext

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage}
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.SerializableConfiguration

* Abstract class for writing out data in a single Spark task.
* Exceptions thrown by the implementation of this trait will automatically trigger task aborts.
abstract class FileFormatDataWriter(
description: WriteJobDescription,
taskAttemptContext: TaskAttemptContext,
committer: FileCommitProtocol
) extends DataWriter[InternalRow] {
* Max number of files a single task writes out due to file size. In most cases the number of
* files written should be very small. This is just a safe guard to protect some really bad
* settings, e.g. maxRecordsPerFile = 1.
val MAX_FILE_COUNTER = 1000 * 1000
protected val updatedPartitions = mutable.Set[String]()
protected var currentWriter: OutputWriter = _
protected var fileCounter: Int = _
protected var recordsInFile: Long = _
/** Trackers for computing various statistics on the data as it's being written out. */
val statsTrackers: Seq[WriteTaskStatsTracker] =

def releaseResources(): Unit = {
if (currentWriter != null) {
try {
} finally {
currentWriter = null

* Returns the summary of relative information which
* includes the list of partition strings written out. The list of partitions is sent back
* to the driver and used to update the catalog. Other information will be sent back to the
* driver too and used to e.g. update the metrics in UI.
override def commit(): WriteTaskResult = {
val summary = ExecutedWriteSummary(
updatedPartitions = Set.empty,
stats =
WriteTaskResult(committer.commitTask(taskAttemptContext), summary)

override def abort(): Unit = {
try {
} finally {

/** FileFormatWriteTask for empty partitions */
class EmptyDirectoryDataWriter(
description: WriteJobDescription,
taskAttemptContext: TaskAttemptContext,
committer: FileCommitProtocol
) extends FileFormatDataWriter(description, taskAttemptContext, committer) {
override def write(record: InternalRow): Unit = {}

/** Writes data to a single directory (used for non-dynamic-partition writes). */
class SingleDirectoryDataWriter(
description: WriteJobDescription,
taskAttemptContext: TaskAttemptContext,
committer: FileCommitProtocol)
extends FileFormatDataWriter(description, taskAttemptContext, committer) {
// Initialize currentWriter and statsTrackers

private def newOutputWriter(): Unit = {
val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext)
val currentPath = committer.newTaskTempFile(
f"-c$fileCounter%03d" + ext)

currentWriter = description.outputWriterFactory.newInstance(
path = currentPath,
dataSchema = description.dataColumns.toStructType,
context = taskAttemptContext)

override def write(record: InternalRow): Unit = {
if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) {
fileCounter += 1
assert(fileCounter < MAX_FILE_COUNTER,
s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER")

recordsInFile = 0

recordsInFile += 1

* Writes data to using dynamic partition writes, meaning this single function can write to
* multiple directories (partitions) or files (bucketing).
class DynamicPartitionDataWriter(
description: WriteJobDescription,
taskAttemptContext: TaskAttemptContext,
committer: FileCommitProtocol)
extends FileFormatDataWriter(description, taskAttemptContext, committer) {

/** Flag saying whether or not the data to be written out is partitioned. */
val isPartitioned = description.partitionColumns.nonEmpty

/** Flag saying whether or not the data to be written out is bucketed. */
val isBucketed = description.bucketIdExpression.isDefined

assert(isPartitioned || isBucketed,
s"""DynamicPartitionWriteTask should be used for writing out data that's either
|partitioned or bucketed. In this case neither is true.
|WriteJobDescription: ${description}

var currentPartionValues: Option[UnsafeRow] = None
var currentBucketId: Option[Int] = None

/** Extracts the partition values out of an input row. */
private lazy val getPartitionValues: InternalRow => UnsafeRow = {
val proj = UnsafeProjection.create(description.partitionColumns, description.allColumns)
row => proj(row)

/** Expression that given partition columns builds a path string like: col1=val/col2=val/... */
private lazy val partitionPathExpression: Expression = Concat(
description.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
val partitionName = ScalaUDF(
ExternalCatalogUtils.getPartitionPathString _,
Seq(Literal(, Cast(c, StringType, Option(description.timeZoneId))))
if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR), partitionName)

/** Evaluates the `partitionPathExpression` above on a row of `partitionValues` and returns
* the partition string. */
private lazy val getPartitionPath: InternalRow => String = {
val proj = UnsafeProjection.create(Seq(partitionPathExpression), description.partitionColumns)
row => proj(row).getString(0)

/** Given an input row, returns the corresponding `bucketId` */
private lazy val getBucketId: InternalRow => Int = {
val proj =
UnsafeProjection.create(description.bucketIdExpression.toSeq, description.allColumns)
row => proj(row).getInt(0)

/** Returns the data columns to be written given an input row */
private val getOutputRow =
UnsafeProjection.create(description.dataColumns, description.allColumns)

* Opens a new OutputWriter given a partition key and/or a bucket id.
* If bucket id is specified, we will append it to the end of the file name, but before the
* file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
* @param partitionValues the partition which all tuples being written by this `OutputWriter`
* belong to
* @param bucketId the bucket which all tuples being written by this `OutputWriter` belong to
private def newOutputWriter(partitionValues: Option[InternalRow], bucketId: Option[Int]): Unit = {
val partDir =

val bucketIdStr ="")

// This must be in a form that matches our bucketing format. See BucketingUtils.
val ext = f"$bucketIdStr.c$fileCounter%03d" +

val customPath = partDir.flatMap { dir =>
val currentPath = if (customPath.isDefined) {
committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext)
} else {
committer.newTaskTempFile(taskAttemptContext, partDir, ext)

currentWriter = description.outputWriterFactory.newInstance(
path = currentPath,
dataSchema = description.dataColumns.toStructType,
context = taskAttemptContext)


override def write(record: InternalRow): Unit = {
val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None
val nextBucketId = if (isBucketed) Some(getBucketId(record)) else None

if (currentPartionValues != nextPartitionValues || currentBucketId != nextBucketId) {
// See a new partition or bucket - write to a new partition dir (or a new bucket file).
if (isPartitioned && currentPartionValues != nextPartitionValues) {
currentPartionValues = Some(nextPartitionValues.get.copy())
if (isBucketed) {
currentBucketId = nextBucketId

recordsInFile = 0
fileCounter = 0

newOutputWriter(currentPartionValues, currentBucketId)
} else if (description.maxRecordsPerFile > 0 &&
recordsInFile >= description.maxRecordsPerFile) {
// Exceeded the threshold in terms of the number of records per file.
// Create a new file by increasing the file counter.
recordsInFile = 0
fileCounter += 1
assert(fileCounter < MAX_FILE_COUNTER,
s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER")

newOutputWriter(currentPartionValues, currentBucketId)
val outputRow = getOutputRow(record)
recordsInFile += 1

/** A shared job description for all the write tasks. */
class WriteJobDescription(
val uuid: String, // prevent collision between different (appending) write jobs
val serializableHadoopConf: SerializableConfiguration,
val outputWriterFactory: OutputWriterFactory,
val allColumns: Seq[Attribute],
val dataColumns: Seq[Attribute],
val partitionColumns: Seq[Attribute],
val bucketIdExpression: Option[Expression],
val path: String,
val customPartitionLocations: Map[TablePartitionSpec, String],
val maxRecordsPerFile: Long,
val timeZoneId: String,
val statsTrackers: Seq[WriteJobStatsTracker])
extends Serializable {

assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ dataColumns),
|All columns: ${allColumns.mkString(", ")}
|Partition columns: ${partitionColumns.mkString(", ")}
|Data columns: ${dataColumns.mkString(", ")}

/** The result of a successful write task. */
case class WriteTaskResult(commitMsg: TaskCommitMessage, summary: ExecutedWriteSummary)
extends WriterCommitMessage

* Wrapper class for the metrics of writing data out.
* @param updatedPartitions the partitions updated during writing data out. Only valid
* for dynamic partition.
* @param stats one `WriteTaskStats` object for every `WriteJobStatsTracker` that the job had.
case class ExecutedWriteSummary(
updatedPartitions: Set[String],
stats: Seq[WriteTaskStats])

0 comments on commit cbd4ce2

Please sign in to comment.