Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26673][SQL] File source V2 writes: create framework and migrate ORC #23601

Closed
wants to merge 6 commits into from

Conversation

gengliangwang
Copy link
Member

@gengliangwang gengliangwang commented Jan 21, 2019

What changes were proposed in this pull request?

Create a framework for write path of File Source V2.
Also, migrate write path of ORC to V2.

Supported:

  • Write to file as Dataframe

Not Supported:

  • Partitioning, which is still under development in the data source V2 project.
  • Bucketing, which is still under development in the data source V2 project.
  • Catalog.

How was this patch tested?

Unit test

@SparkQA
Copy link

SparkQA commented Jan 21, 2019

Test build #101471 has finished for PR 23601 at commit 91689ac.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FileSourceWriter(
  • case class FileDataWriterFactory (
  • abstract class FileWriteBuilder(options: DataSourceOptions)
  • class OrcWriteBuilder(options: DataSourceOptions) extends FileWriteBuilder(options)

@gengliangwang
Copy link
Member Author

Due to #21381, the write path is much easier to implement.

@gengliangwang gengliangwang changed the title [WIP][SPARK-26673] File source V2 write: create framework and migrate ORC to it [SPARK-26673][SQL] File source V2 write: create framework and migrate ORC to it Jan 21, 2019
@gengliangwang gengliangwang changed the title [SPARK-26673][SQL] File source V2 write: create framework and migrate ORC to it [SPARK-26673][SQL] File source V2 writes: create framework and migrate ORC Jan 21, 2019
@SparkQA
Copy link

SparkQA commented Jan 21, 2019

Test build #101484 has finished for PR 23601 at commit 54893e0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 21, 2019

Test build #101486 has finished for PR 23601 at commit d3cd59d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 21, 2019

Test build #101488 has finished for PR 23601 at commit ebf4466.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Hi, @gengliangwang . Please check UT failure in your local environment first.

@gengliangwang
Copy link
Member Author

Hi @dongjoon-hyun ,
I ran the orc test cases before pushing the code.
After I push code, I find some comments need to be revise, so I have to push several times. That is why the test is triggered multiple times.
Sorry about that. I will try to avoid such behavior.

@gengliangwang gengliangwang changed the title [SPARK-26673][SQL] File source V2 writes: create framework and migrate ORC [WIP][SPARK-26673][SQL] File source V2 writes: create framework and migrate ORC Jan 22, 2019
@dongjoon-hyun
Copy link
Member

I got it~ And, thanks for the fix, @gengliangwang .

@SparkQA
Copy link

SparkQA commented Jan 22, 2019

Test build #101547 has finished for PR 23601 at commit d6b7a95.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Oh, SparkR seems to complain for some reasons.

1. Failure: Call DataFrameWriter.save() API in Java without path and check argument types (@test_sparkSQL.R#3552) 
error$message does not match "Error in orc : analysis error - path file:.*already exists".
Actual value: "Error in orc : java.lang.RuntimeException: data already exists.\n\tat 

@SparkQA
Copy link

SparkQA commented Jan 23, 2019

Test build #101575 has finished for PR 23601 at commit 2ca90a7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 23, 2019

Test build #101580 has finished for PR 23601 at commit 2ca90a7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* Returns whether this format supports the given [[DataType]] in write path.
* By default all data types are supported.
*/
def supportDataType(dataType: DataType): Boolean = true
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to find a better solution for this. Mark this PR as WIP for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can implement the supportDataType API in another PR. This PR is ready for review.

@gengliangwang gengliangwang changed the title [WIP][SPARK-26673][SQL] File source V2 writes: create framework and migrate ORC [SPARK-26673][SQL] File source V2 writes: create framework and migrate ORC Jan 27, 2019
@dongjoon-hyun
Copy link
Member

Ur, #23639 seems to make conflicts. Could you resolve the conflicts?

@SparkQA
Copy link

SparkQA commented Jan 27, 2019

Test build #101732 has finished for PR 23601 at commit 5fda97e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FileBatchWrite(
  • abstract class FileWriteBuilder(options: DataSourceOptions)
  • case class FileWriterFactory (
  • class OrcWriteBuilder(options: DataSourceOptions) extends FileWriteBuilder(options)

@SparkQA
Copy link

SparkQA commented Jan 28, 2019

Test build #101761 has finished for PR 23601 at commit 9538a1b.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 28, 2019

Test build #101762 has finished for PR 23601 at commit 5358ad4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FileBatchWrite(
  • abstract class FileWriteBuilder(options: DataSourceOptions)
  • case class FileWriterFactory (
  • class OrcWriteBuilder(options: DataSourceOptions) extends FileWriteBuilder(options)

committer: FileCommitProtocol)
extends BatchWrite {
override def commit(messages: Array[WriterCommitMessage]): Unit = {
committer.commitJob(job, messages.map(_.asInstanceOf[WriteTaskResult].commitMsg))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we call FileFormatWriter.processStats here?

@SparkQA
Copy link

SparkQA commented Jan 29, 2019

Test build #101819 has finished for PR 23601 at commit 2bdd73a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

f.fallBackFileFormat
case _ => lookupCls
}
// SPARK-26673: In Data Source V2 project, partitioning is still under development.
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jan 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we remove this (SPARK-26673) since this is the current PR's JIRA?

}
// SPARK-26673: In Data Source V2 project, partitioning is still under development.
// Here we fallback to V1 if the write path if output partitioning is required.
// TODO: use V2 implementations when partitioning feature is supported.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clearly mention what JIRA ID is for this TODO?

@@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
* E.g, with temporary view `t` using [[FileDataSourceV2]], inserting into view `t` fails
* since there is no corresponding physical plan.
* SPARK-23817: This is a temporary hack for making current data source V2 work. It should be
* removed when write path of file data source v2 is finished.
* removed when Catalog of file data source v2 is finished.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catalog of file data source v2 is finished? Does this mean catalog support of file data source v2?

@SparkQA
Copy link

SparkQA commented Jan 30, 2019

Test build #101869 has finished for PR 23601 at commit 31bc1b7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 30, 2019

Test build #101883 has finished for PR 23601 at commit 31bc1b7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case _ => lookupCls
}
// In Data Source V2 project, partitioning is still under development.
// Here we fallback to V1 if the write path if output partitioning is required.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we fallback to V1 if partitioning columns are specified

this
}

override def buildForBatch(): BatchWrite = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is too long, could be better if we can separate it into multiple methods

@@ -56,18 +56,25 @@ case class WriteToDataSourceV2Exec(batchWrite: BatchWrite, query: SparkPlan)
val writerFactory = batchWrite.createBatchWriterFactory()
val useCommitCoordinator = batchWrite.useCommitCoordinator
val rdd = query.execute()
val messages = new Array[WriterCommitMessage](rdd.partitions.length)
// SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single
// partition rdd to make sure we at least set up one write task to write the metadata.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok for now, but we should improve it later:

  1. use a config to do it, it seems only file source need it
  2. or do it in FileBatchWrite.commit. If commit messages are empty, write a metadata file.

@cloud-fan
Copy link
Contributor

LGTM except a few minor comments

@SparkQA
Copy link

SparkQA commented Jan 31, 2019

Test build #101937 has finished for PR 23601 at commit 8a6a9b6.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 31, 2019

Test build #101939 has finished for PR 23601 at commit 7bd1c09.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in df4c53e Jan 31, 2019
HyukjinKwon pushed a commit that referenced this pull request Feb 16, 2019
…ast object in FileWriterFactory

## What changes were proposed in this pull request?

This is a followup PR to fix two issues in #23601:
1.  the class `FileWriterFactory` contains `conf: SerializableConfiguration` as a member, which is duplicated with `WriteJobDescription. serializableHadoopConf `. By removing it we can reduce the broadcast task binary size by around 70KB
2. The test suite `OrcV1QuerySuite`/`OrcV1QuerySuite`/`OrcV1PartitionDiscoverySuite` didn't change the configuration `SQLConf.USE_V1_SOURCE_WRITER_LIST` to `"orc"`. We should set the conf.

## How was this patch tested?

Unit test

Closes #23800 from gengliangwang/reduceWriteTaskSize.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…e ORC

## What changes were proposed in this pull request?

Create a framework for write path of File Source V2.
Also, migrate write path of ORC to V2.

Supported:
* Write to file as Dataframe

Not Supported:
* Partitioning, which is still under development in the data source V2 project.
* Bucketing, which is still under development in the data source V2 project.
* Catalog.

## How was this patch tested?

Unit test

Closes apache#23601 from gengliangwang/orc_write.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…ast object in FileWriterFactory

## What changes were proposed in this pull request?

This is a followup PR to fix two issues in apache#23601:
1.  the class `FileWriterFactory` contains `conf: SerializableConfiguration` as a member, which is duplicated with `WriteJobDescription. serializableHadoopConf `. By removing it we can reduce the broadcast task binary size by around 70KB
2. The test suite `OrcV1QuerySuite`/`OrcV1QuerySuite`/`OrcV1PartitionDiscoverySuite` didn't change the configuration `SQLConf.USE_V1_SOURCE_WRITER_LIST` to `"orc"`. We should set the conf.

## How was this patch tested?

Unit test

Closes apache#23800 from gengliangwang/reduceWriteTaskSize.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
null

case SaveMode.Overwrite =>
committer.deleteWithJob(fs, path, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened if the path does not exist? It is possible that the underlying committer's deleteWithJob might not handle this case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        if (fs.exists(path)) {
          committer.deleteWithJob(fs, path, recursive = true)
        }

Copy link
Member Author

@gengliangwang gengliangwang Feb 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile I check the source code. Actually, all the implementations (that I can see in IDE) handle the case that the file path does not exist. But in InsertIntoHadoopFsRelationCommand the deleteWithJob is used as following:

if (fs.exists(path) && !committer.deleteWithJob(fs, path, true)) {
        throw new IOException(s"Unable to clear partition " +
          s"directory $path prior to writing to it")
}

Should we follow it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea let's follow it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, create #23889 for this.

mccheah pushed a commit to palantir/spark that referenced this pull request May 15, 2019
…t path before delete it

## What changes were proposed in this pull request?
This is a followup PR to resolve comment: apache#23601 (review)

When Spark writes DataFrame with "overwrite" mode, it deletes the output path before actual writes. To safely handle the case that the output path doesn't exist,  it is suggested to follow the V1 code by checking the existence.

## How was this patch tested?

Apply apache#23836 and run unit tests

Closes apache#23889 from gengliangwang/checkFileBeforeOverwrite.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants