Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8813][SQL]Combine splits by size #9097

Closed
wants to merge 7 commits into from

Conversation

zhichao-li
Copy link
Contributor

The idea is simple and it try to solve this problem by combining splits by size which has been generated by the underlying inputformat, so it would support all of the inputformat in theory.
The combining size can be specified by spark.sql.mapper.splitCombineSize, the default value is: -1 meaning turn off the combining logic.
i.e partition -> splits-> [combineSplit, combineSplit,...]-> RDD

@zhichao-li zhichao-li changed the title [WIP]Combine splits by size [SPARK-8813][SQL][WIP]Combine splits by size Oct 13, 2015
@SparkQA
Copy link

SparkQA commented Oct 13, 2015

Test build #43641 has finished for PR 9097 at commit f9392c3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class CombineSplit implements InputSplit
    • public class CombineSplitInputFormat<K, V> implements InputFormat<K, V>
    • public class CombineSplitRecordReader<K, V> implements RecordReader<K, V>
    • class HadoopCombineRDD[K, V](

@zhichao-li
Copy link
Contributor Author

cc @chenghao-intel

@zhichao-li
Copy link
Contributor Author

retest this please

out.writeUTF(location);
}
out.writeInt(splits.length);
out.writeUTF(splits[0].getClass().getCanonicalName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment says, we only process combination within a single table partition, so all of the class name of the splits should be exactly the identical.

Nit: Writing the split class name in the very beginning? Instead of after all of the location info.

@chenghao-intel
Copy link
Contributor

It looks good in general, and can you also attach the benchmark result?

@zhichao-li
Copy link
Contributor Author

@chenghao-intel Just tested with data which have 15w small files and 1000 partitions.

  1. SQL (select count(_) from test), only improve a little bit, I guess tasks scheduling is not the bottle neck here, so reducing tasks number would not have too much effect.
  2. SQL (select count(_) from test group by a ), the performance would increase by 3 times. reducing the tasks would largely improve the shuffle performance.

@SparkQA
Copy link

SparkQA commented Nov 5, 2015

Test build #45090 has finished for PR 9097 at commit 5793af1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public class CombineSplit implements InputSplit\n * public class CombineSplitInputFormat<K, V> implements InputFormat<K, V>\n * public class CombineSplitRecordReader<K, V> implements RecordReader<K, V>\n * class HadoopCombineRDD[K, V](\n

@zhichao-li
Copy link
Contributor Author

retest this please.

@chenghao-intel
Copy link
Contributor

cc/ @scwf @Sephiroth-Lin

@zhichao-li has posted the benchmark result that we've done, but it's based on the fake data, I know you guys have requirement on this improvement, too, can you please test it with some real world cases?

@SparkQA
Copy link

SparkQA commented Nov 5, 2015

Test build #45104 has finished for PR 9097 at commit 5793af1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public class CombineSplit implements InputSplit\n * public class CombineSplitInputFormat<K, V> implements InputFormat<K, V>\n * public class CombineSplitRecordReader<K, V> implements RecordReader<K, V>\n * class HadoopCombineRDD[K, V](\n

@watermen
Copy link
Contributor

watermen commented Nov 9, 2015

@zhichao-li Can this patch support all of formats(Text/ORC/Parquet)?

@zhichao-li
Copy link
Contributor Author

@watermen , Yes. It should support all formats in theory, since it combine on InputSplit level which is the result of inputformat.getSplits. In other words, split is transparent to inputformat. I've tested it with Sequence, ORC and LZO. but this patch sometimes may not suitable for Parquet since it would not always go through TableReader

@zhichao-li
Copy link
Contributor Author

CombineHiveInputFormat or CombineFileInputFormat would have the restriction that it would always suppose the combined inputformat is a subclass of FileInputformat, but would not the same case if we can combine on InputSplit.

+public class CombineSplit implements InputSplit {
+  private InputSplit[] splits;
+  private long totalLen;
+  private String[] locations;

VS

public class CombineFileSplit extends InputSplit implements Writable {

 private Path[] paths;
 private long[] startoffset;
 private long[] lengths;
 private String[] locations;
 private long totLength;

@zhichao-li zhichao-li changed the title [SPARK-8813][SQL][WIP]Combine splits by size [SPARK-8813][SQL]Combine splits by size Nov 9, 2015
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.mapred.InputSplit;

public class CombineSplit implements InputSplit {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment here to point out which version of Hive/Hadoop this implementation is based on.

@zhichao-li
Copy link
Contributor Author

retest this please.

@SparkQA
Copy link

SparkQA commented Feb 24, 2016

Test build #51839 has finished for PR 9097 at commit 701700b.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 24, 2016

Test build #51845 has finished for PR 9097 at commit 085ce5f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhichao-li
Copy link
Contributor Author

retest this please.

seems like it's not related to this pr: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.JoinedRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow

@zhichao-li
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 25, 2016

Test build #51929 has finished for PR 9097 at commit 085ce5f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Jun 15, 2016

I believe this has been fixed in Spark SQL in 2.0.0. Going to close this. Thanks!

@asfgit asfgit closed this in 1a33f2e Jun 15, 2016
@KevinZwx
Copy link
Contributor

This issue was marked as fixed in spark 2.0.0, but "spark.sql.mapper.splitCombineSize" doesn't show up in the list of the SQL configuration when I run command "spark.sql("SET -v").show(numRows = 200, truncate = false)" in spark-sql session. Do I make something wrong?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants