Skip to content

Commit

Permalink
fix test failure
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed May 22, 2018
1 parent ef45539 commit 8af4579
Showing 1 changed file with 6 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ abstract class FileFormatDataWriter(
*/
def commit(): WriteTaskResult = {
releaseResources()
committer.commitTask(taskAttemptContext)
val summary = ExecutedWriteSummary(
updatedPartitions = updatedPartitions.toSet,
stats = statsTrackers.map(_.getFinalStats()))
Expand Down Expand Up @@ -108,6 +107,9 @@ class SingleDirectoryDataWriter(
newOutputWriter()

private def newOutputWriter(): Unit = {
recordsInFile = 0
releaseResources()

val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext)
val currentPath = committer.newTaskTempFile(
taskAttemptContext,
Expand All @@ -128,8 +130,6 @@ class SingleDirectoryDataWriter(
assert(fileCounter < MAX_FILE_COUNTER,
s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER")

recordsInFile = 0
releaseResources()
newOutputWriter()
}

Expand Down Expand Up @@ -208,6 +208,9 @@ class DynamicPartitionDataWriter(
* @param bucketId the bucket which all tuples being written by this `OutputWriter` belong to
*/
private def newOutputWriter(partitionValues: Option[InternalRow], bucketId: Option[Int]): Unit = {
recordsInFile = 0
releaseResources()

val partDir = partitionValues.map(getPartitionPath(_))
partDir.foreach(updatedPartitions.add)

Expand Down Expand Up @@ -249,21 +252,16 @@ class DynamicPartitionDataWriter(
statsTrackers.foreach(_.newBucket(currentBucketId.get))
}

recordsInFile = 0
fileCounter = 0

releaseResources()
newOutputWriter(currentPartionValues, currentBucketId)
} else if (description.maxRecordsPerFile > 0 &&
recordsInFile >= description.maxRecordsPerFile) {
// Exceeded the threshold in terms of the number of records per file.
// Create a new file by increasing the file counter.
recordsInFile = 0
fileCounter += 1
assert(fileCounter < MAX_FILE_COUNTER,
s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER")

releaseResources()
newOutputWriter(currentPartionValues, currentBucketId)
}
val outputRow = getOutputRow(record)
Expand Down

0 comments on commit 8af4579

Please sign in to comment.