Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Spark Column Stats #10659

Merged
merged 9 commits into from
Jul 31, 2024
Merged

Support Spark Column Stats #10659

merged 9 commits into from
Jul 31, 2024

Conversation

huaxingao
Copy link
Contributor

Co-authored-by: Huaxin Gao
Co-authored-by: Karuppayya Rajendran

This PR adds the column stats support, so Iceberg can report column stats to Spark engine for CBO.

Co-authored-by: Karuppayya Rajendran <[email protected]>
@github-actions github-actions bot added the spark label Jul 8, 2024
@huaxingao
Copy link
Contributor Author

cc @szehon-ho @karuppayya

@jeesou
Copy link
Contributor

jeesou commented Jul 10, 2024

Hi @huaxingao , @karuppayya, just to clear out a few aspects,
This PR would be a continuation of #10288.

To the above mentioned PR we would need to create a Procedure to trigger the Analyze action.

But would this PR changes mean that we would be able to see other columnar statistics like min, max, etc along with NDV in the .stat file?

Could you please help understand how "Iceberg can report column stats to Spark engine for CBO".

Because even with this PR changes along with #10288, the Statistics section looks like :
"blob-metadata" : [ {
"type" : "apache-datasketches-theta-v1",
"snapshot-id" : 6563878496533098178,
"sequence-number" : 1,
"fields" : [ 3 ],
"properties" : {
"ndv" : "3"
}
}]

Other statistics are not coming right now, which is expected as per the code.
Please help understand that how will we generate the other statistics and how will Spark access the statistics.

@karuppayya
Copy link
Contributor

karuppayya commented Jul 10, 2024

@jeesou I will be creating a PR for the procedure for Analyze action, when #10288 is merged .
Currently the spec supports only NDV. For more stats we will need to make spec chnages and other corresponding chnages.

This changes helps reporting stats to Spark using the DSv2 APIs, which is subsequently used in Join estimation, Dynamic Partition Pruning etc

Copy link
Contributor

@singhpk234 singhpk234 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao @karuppayya want to know you thoughts on :
1/ sparks stats estimator for non CBO is super bad for v2 sources some additional info which sources like iceberg can provide for ex: rowCount gets simply ignored by them as. Do you have any thing in your mind in context of that I attempted this via :

2/ Do you have perf numbers on how does this complements CBO performance considering CBO is disabled by default in OSS.

3/ Do we want to give users an options to extract stuff from sketches to extract extra stats rather than relying on precomputed ndv just ?


Map<NamedReference, ColumnStatistics> map = Maps.newHashMap();

if (readConf.enableColumnStats()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Q] NDV can't be used for non CBO cases, do we need to couple this flag by checking cbo is enabled addtionally as well spark.sql.cbo.enabled other wise SizeInBytesOnlyStatsPlanVisitor will be used ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the check for CBO. Thanks!

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a first round of comments

import org.apache.spark.sql.connector.read.colstats.ColumnStatistics;
import org.apache.spark.sql.connector.read.colstats.Histogram;

class ColStats implements ColumnStatistics {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this is a bit confusing, ColStats is just abbreviate the Spark class name. How about SparkColumnStatistics? (Looks like in the package there are a lot of class called SparkXXX in this situation)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to SparkColumnStatistics. Thanks

return new Stats(0L, 0L, Maps.newHashMap());
}

Map<NamedReference, ColumnStatistics> map = Maps.newHashMap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a more descriptive name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also can we avoid the allocation if enableColumnStats and CBO aren't enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to colStatsMap and also only allocate if enableColumnStats and CBO are enabled.


for (BlobMetadata blobMetadata : metadataList) {
long ndv = Long.parseLong(blobMetadata.properties().get("ndv"));
ColumnStatistics colStats = new ColStats(ndv, null, null, 0L, 0L, 0L, null);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we fill the other values from the manifest file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In aggregate push down, I have some code to get min/max/nulls from the manifest file. I will extract the common code and reuse them. Will have a separate PR for this after this PR is merged. I have put a // TODO: Fill min, max and null from the manifest file

List<BlobMetadata> metadataList = (files.get(0)).blobMetadata();

for (BlobMetadata blobMetadata : metadataList) {
long ndv = Long.parseLong(blobMetadata.properties().get("ndv"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be more defensive if there is no value, or numberformatexception?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also check org.apache.iceberg.BlobMetadata#type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the code to check null and empty value. It's probably too strict to throw Exception if the value is null or empty String. I log a DEBUG message instead.

Is there an agreed type for org.apache.iceberg.BlobMetadata#type? If yes, I will add a check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.iceberg.puffin.StandardBlobTypes has the supported types, but currently its just one

@@ -97,6 +117,36 @@ public static Object[][] parameters() {
};
}

@BeforeAll
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it needed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is required, for throwing in new properties you should probably use

  protected void withSQLConf(Map<String, String> conf, Action action) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed and added withSQLConf

@@ -90,4 +90,8 @@ private SparkSQLProperties() {}
public static final String EXECUTOR_CACHE_LOCALITY_ENABLED =
"spark.sql.iceberg.executor-cache.locality.enabled";
public static final boolean EXECUTOR_CACHE_LOCALITY_ENABLED_DEFAULT = false;

// Controls whether column statistics are enabled when estimating statistics
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment seems not meaningful at first read. Can we be more precise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to // Controls whether to calculate column statistics and report them to Spark

List<BlobMetadata> metadataList = (files.get(0)).blobMetadata();

for (BlobMetadata blobMetadata : metadataList) {
long ndv = Long.parseLong(blobMetadata.properties().get("ndv"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also check org.apache.iceberg.BlobMetadata#type?

@jeesou
Copy link
Contributor

jeesou commented Jul 15, 2024

@jeesou I will be creating a PR for the procedure for Analyze action, when #10288 is merged . Currently the spec supports only NDV. For more stats we will need to make spec chnages and other corresponding chnages.

This changes helps reporting stats to Spark using the DSv2 APIs, which is subsequently used in Join estimation, Dynamic Partition Pruning etc

Hi @karuppayya , could you please let us know where can we find this spec which will be changed to accommodate the other statistics.

And could you Share some knowledge on how to hit the DSv2 apis, while using pyspark, because the documentations don't have much clarity.

List<BlobMetadata> metadataList = (files.get(0)).blobMetadata();

for (BlobMetadata blobMetadata : metadataList) {
int id = blobMetadata.fields().get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check if there is more than one field here(for example, ndv stats is collected for say field1 and field2) and not propagate the stats if so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find detailed documentation for BlobMetadata in the spec. My understanding is that for the blob type apache-datasketches-theta-v1, ndv is required. I assume that ndv is set in the properties as shown in the following example. If my assumption is correct, it seems to me that we can only have one field in the fields, establishing a one-to-one relationship with the ndv in the properties.

new GenericBlobMetadata(
    APACHE_DATASKETCHES_THETA_V1,
    snapshotId,
    1,
    ImmutableList.of(1),
    ImmutableMap.of("ndv", "4"))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, the apache-datasketches-theta-v1 should be calculated on one field.
And yes, there should be the ndv property set. The property may seem somewhat redundant within the Puffin file, but allow faster access to the information at SELECT-time. More importantly, the properties are propagated to the table metadata and so a query planner accesses the NDV information without opening the Puffin file at all.

}

// TODO: Fill min, max and null from the manifest file
ColumnStatistics colStats = new SparkColumnStatistics(ndv, null, null, 0L, 0L, 0L, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When ndv is not set, we are sending in 0. Is that intended?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I didn't see notes in the Spark docs about what 0 would mean in this context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked Spark code, should be None if ndv is not available. I changed the code accordingly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it is still 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to null.

@@ -189,9 +192,8 @@ protected Statistics estimateStatistics(Snapshot snapshot) {

boolean cboEnabled =
Boolean.parseBoolean(spark.conf().get(SQLConf.CBO_ENABLED().key(), "false"));
Map<NamedReference, ColumnStatistics> colStatsMap = null;
Map<NamedReference, ColumnStatistics> colStatsMap = Maps.newHashMap();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Spark side, it has a columnStats().isEmpty, so I changed back to Maps.newHashMap() to avoid the NPE.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant here was to have it be Collection EmptyMap unless cbo is enabled, probably a premature optimization. So feel free to ignore

Ie:

Map colStatsMap;
  if (readConf && cbo)
     colStatsMap = Maps.newHashMap()
  else 
    colStatsMap = Collections.emptyMap()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Changed!

@jeesou
Copy link
Contributor

jeesou commented Jul 26, 2024

Hi @huaxingao , @karuppayya
cc : @RussellSpitzer
We were running some tests on Spark with the latest codes.
We took the changes of the previous PR #10288, along with this PR changes, and generated the .stats file and we can see the ndv values in the metadata.

So to verify the performance enhancement we ran the TPCH queries. On running the query at 1000 G scale we are facing some issues on certain queries (query umbers - 5,7,8,9,10) where while performing the broadcast join, some error occurred.

I am sharing the log for query number 8 you can check.
I am sharing logical plan as well.

Screenshot 2024-07-26 at 1 01 19 PM
Screenshot 2024-07-26 at 1 02 03 PM

Sharing the error log for query 5 as well
Screenshot 2024-07-26 at 1 05 58 PM

Sharing the config we used for reference -

"spark.executor.cores": "6",
"spark.executor.memory": "24G",
"spark.driver.cores": "6",
"spark.driver.memory": "24G",
"spark.driver.maxResultSize":"0",
"spark.sql.iceberg.enable-column-stats": "true",
"spark.sql.cbo.enabled": "true",
"spark.sql.cbo.joinReorder.enabled": "true",

we have tried by upscaling the executor and driver cores and memoryto 12/48 scale. but received the same issue.

Kindly help us understand if we are missing anything out, or is this an issue.

@saitharun15
Copy link
Contributor

saitharun15 commented Jul 26, 2024

Hi @huaxingao , @karuppayya cc : @RussellSpitzer We were running some tests on Spark with the latest codes. We took the changes of the previous PR #10288, along with this PR changes, and generated the .stats file and we can see the ndv values in the metadata.

So to verify the performance enhancement we ran the TPCH queries. On running the query at 1000 G scale we are facing some issues on certain queries (query umbers - 5,7,8,9,10) where while performing the broadcast join, some error occurred.

I am sharing the log for query number 8 you can check. I am sharing logical plan as well.

Screenshot 2024-07-26 at 1 01 19 PM Screenshot 2024-07-26 at 1 02 03 PM

Sharing the error log for query 5 as well Screenshot 2024-07-26 at 1 05 58 PM

Sharing the config we used for reference -

"spark.executor.cores": "6", "spark.executor.memory": "24G", "spark.driver.cores": "6", "spark.driver.memory": "24G", "spark.driver.maxResultSize":"0", "spark.sql.iceberg.enable-column-stats": "true", "spark.sql.cbo.enabled": "true", "spark.sql.cbo.joinReorder.enabled": "true",

we have tried by upscaling the executor and driver cores and memoryto 12/48 scale. but received the same issue.

Kindly help us understand if we are missing anything out, or is this an issue.

We saw performance enhancement in couple of queries (1,2,19) where spark was able to perform broadcast join on larger tables where as previously without ndv stats, the plans were not that better. And also considering the 5th query when ran with hive with statistics and cbo enabled, it was not failing because spark used sort merge join instead of broadcast for larger tables, but incase of iceberg with ndv stats spark is using only broadcast join and failing due to errors:

" org.apache.spark.SparkException: Cannot broadcast the table that is larger than 8.0 GiB: 9.0 GiB. org.apache.spark.sql.errors.QueryExecutionErrors$.cannotBroadcastTableOverMaxTableBytesError and

: org.apache.spark.SparkUnsupportedOperationException: Can not build a HashedRelation that is larger than 8G.
org.apache.spark.sql.errors.QueryExecutionErrors$.cannotBuildHashedRelationLargerThan8GError"

new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty());
SparkScan scan = (SparkScan) scanBuilder.build();

Map<String, String> sqlConf1 =
Copy link
Member

@RussellSpitzer RussellSpitzer Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sqlConf1 => reportStatsDisabled
sqlConf2 => reportStatsEnabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed. Thanks

@@ -734,6 +801,21 @@ private Expression[] expressions(Expression... expressions) {
return expressions;
}

private void checkStatistics(SparkScan scan, long expectedRowCount, boolean expectedColumnStats) {
Copy link
Member

@RussellSpitzer RussellSpitzer Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be over parameterized, we only use this function 2 times and we use them for relatively different things. So I'm not sure it makes things clearer. It's also missing a bit to be completely parameterized.

So if it was completely parameterized we would want probably two functions like

checkStatisticsNotReported(scan)
checkReportedStatistics(scan, rowCount, map<String, Int> distinctValueCounts)

As it is we hard code in the column name "id" and the distinct value count "4l" but parameterize the expectedRowCount. So we couldn't really re-use this function for any reason.

Now since we only actually use this function for 1 example above it may be ok to not even parameterize it at all. So I think there are 2 ways to go here.

Find more uses for the function and fully parameterize
Deparameterize and inline.

I think we have a few more use cases we should probably test out just for fun so maybe route 1 is better?

For example

Test where 1 column has NDV and the other does not.
Test where stats reporting is enabled, but the table does not have stats
Test with different distinct values for columns in the table

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! Changed to route 1

@huaxingao
Copy link
Contributor Author

@jeesou @saitharun15
Thanks for testing this out! The column stats are not yet accurate because I still need to retrieve the numOfNulls, min, and max from the manifest files. I will submit a follow-up PR to address this. Please wait for the follow-up PR and retest to see if the problem is resolved.

LOG.debug("DataSketch blob is not available for column {}", colName);
}

// TODO: Fill min, max and null from the manifest file
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the TODO and make an Issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created issue

expectedOneNDV.put("id", 4L);
withSQLConf(reportColStatsEnabled, () -> checkColStatisticsReported(scan, 4L, expectedOneNDV));

statisticsFile =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd split this one out into another test, minor nit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Map<String, String> reportColStatsEnabled =
ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");

// Test table does not have col stats
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put this one in another test too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few remaining test nits, but I think we are good here

.type()
.equals(org.apache.iceberg.puffin.StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1)) {
String ndvStr = blobMetadata.properties().get(NDV_KEY);
if (ndvStr != null && !ndvStr.isEmpty()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually use the commons "!Strings.isNullOrEmpty" for these

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks!


boolean cboEnabled =
Boolean.parseBoolean(spark.conf().get(SQLConf.CBO_ENABLED().key(), "false"));
Map<NamedReference, ColumnStatistics> colStatsMap = null;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking back to this change, may be cleaner to just set this to empty map and the reassign to Maps.newHashMap on 195 as is.

colStatsMap = Collections.emptyMap
if (report & cbo) {
colStatsMap = Maps.newHashMap
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks!

@RussellSpitzer RussellSpitzer merged commit 506fee4 into apache:main Jul 31, 2024
42 checks passed
@RussellSpitzer
Copy link
Member

Thanks @huaxingao and @karuppayya ! This is a great addition to the Spark capabilities

@huaxingao
Copy link
Contributor Author

Thanks a lot @RussellSpitzer! Also thanks to @szehon-ho @karuppayya @findepi @singhpk234 for helping with this PR!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants