From 8af4579532fa6f99f29175f1ec239057790fc081 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 22 May 2018 09:44:48 +0800 Subject: [PATCH] fix test failure --- .../datasources/FileFormatDataWriter.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index ef380e8f4e8ec..10b2bfd9fd18f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -73,7 +73,6 @@ abstract class FileFormatDataWriter( */ def commit(): WriteTaskResult = { releaseResources() - committer.commitTask(taskAttemptContext) val summary = ExecutedWriteSummary( updatedPartitions = updatedPartitions.toSet, stats = statsTrackers.map(_.getFinalStats())) @@ -108,6 +107,9 @@ class SingleDirectoryDataWriter( newOutputWriter() private def newOutputWriter(): Unit = { + recordsInFile = 0 + releaseResources() + val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext) val currentPath = committer.newTaskTempFile( taskAttemptContext, @@ -128,8 +130,6 @@ class SingleDirectoryDataWriter( assert(fileCounter < MAX_FILE_COUNTER, s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER") - recordsInFile = 0 - releaseResources() newOutputWriter() } @@ -208,6 +208,9 @@ class DynamicPartitionDataWriter( * @param bucketId the bucket which all tuples being written by this `OutputWriter` belong to */ private def newOutputWriter(partitionValues: Option[InternalRow], bucketId: Option[Int]): Unit = { + recordsInFile = 0 + releaseResources() + val partDir = partitionValues.map(getPartitionPath(_)) partDir.foreach(updatedPartitions.add) @@ -249,21 +252,16 @@ class DynamicPartitionDataWriter( statsTrackers.foreach(_.newBucket(currentBucketId.get)) } - recordsInFile = 0 fileCounter = 0 - - releaseResources() newOutputWriter(currentPartionValues, currentBucketId) } else if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) { // Exceeded the threshold in terms of the number of records per file. // Create a new file by increasing the file counter. - recordsInFile = 0 fileCounter += 1 assert(fileCounter < MAX_FILE_COUNTER, s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER") - releaseResources() newOutputWriter(currentPartionValues, currentBucketId) } val outputRow = getOutputRow(record)