Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Support property filter pushdown by utilizing payload file formats #221

Merged
merged 7 commits into from
Sep 25, 2023

Conversation

Ziy1-Tan
Copy link
Contributor

@Ziy1-Tan Ziy1-Tan commented Aug 13, 2023

This PR is about C++ SDK for OSPP 2023
Issue number: #98.
You can find more detail about this feature here

Proposed changes

Now we support filter pushdown for spark

Types of changes

What types of changes does your code introduce to GraphAr?
Put an x in the boxes that apply

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation Update (if none of the other choices apply)

Checklist

Put an x in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code.

  • I have read the CONTRIBUTING doc
  • I have signed the CLA
  • Lint and unit tests pass locally with my changes
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

@Ziy1-Tan Ziy1-Tan changed the title Feat: filter pushdown for spark [Spark] Support property filter pushdown by utilizing payload file formats Aug 13, 2023
@Ziy1-Tan Ziy1-Tan marked this pull request as draft August 13, 2023 16:45
@Ziy1-Tan Ziy1-Tan force-pushed the pushdown-spark branch 2 times, most recently from ab0cd84 to ba8dbdb Compare August 22, 2023 17:31
@Ziy1-Tan Ziy1-Tan marked this pull request as ready for review August 22, 2023 17:31
@Ziy1-Tan
Copy link
Contributor Author

cc @lixueclaire @acezen. Currently, reading a single property group is easy to push down:

    val property_group = vertex_info.getPropertyGroup("gender")

    // test reading a single property chunk
    val single_chunk_df = reader.readVertexPropertyChunk(property_group, 0)
    assert(single_chunk_df.columns.length == 3)
    assert(single_chunk_df.count() == 100)
    val cond = "gender = 'female'"
    var df_pd = single_chunk_df.select("firstName", "gender").filter(cond)
    df_pd.explain()
    df_pd.show()
== Physical Plan ==
*(1) Filter (isnotnull(gender#2) AND (gender#2 = female))
+- *(1) ColumnarToRow
   +- BatchScan[firstName#0, gender#2] GarScan DataFilters: [isnotnull(gender#2), (gender#2 = female)], Format: gar, Location: InMemoryFileIndex(1 paths)[file:/home/simple/code/cpp/GraphAr/spark/src/test/resources/gar-test/l..., PartitionFilters: [], PushedFilters: [IsNotNull(gender), EqualTo(gender,female)], ReadSchema: struct<firstName:string,gender:string>, PushedFilters: [IsNotNull(gender), EqualTo(gender,female)] RuntimeFilters: []
2

+------------+------+
|   firstName|gender|
+------------+------+
|         Eli|female|
|      Joseph|female|
|        Jose|female|
|         Jun|female|
|       A. C.|female|
|       Karim|female|
|        Poul|female|
|       Chipo|female|
|       Dovid|female|
|       Ashin|female|
|         Cam|female|
|        Kurt|female|
|Daouda Malam|female|
|       David|female|
|      Batong|female|
|       Zheng|female|
|     Gabriel|female|
|       Boris|female|
|        Jose|female|
|    Fernando|female|
+------------+------+

But it is difficult to push down for reading multiple property groups:

    val vertex_df_with_index = reader.readAllVertexPropertyGroups()
    assert(vertex_df_with_index.columns.length == 5)
    assert(vertex_df_with_index.count() == 903)
    df_pd = vertex_df_with_index.filter(cond).select("firstName", "gender")
    df_pd.explain()
    df_pd.show()
== Physical Plan ==
*(1) Project [firstName#196, gender#198]
+- *(1) Filter (isnotnull(gender#198) AND (gender#198 = female))
   +- *(1) Scan ExistingRDD[_graphArVertexIndex#194L,id#195L,firstName#196,lastName#197,gender#198]
2

+------------+------+
|   firstName|gender|
+------------+------+
|         Eli|female|
|      Joseph|female|
|        Jose|female|
|         Jun|female|
|       A. C.|female|
|       Karim|female|
|        Poul|female|
|       Chipo|female|
|       Dovid|female|
|       Ashin|female|
|         Cam|female|
|        Kurt|female|
|Daouda Malam|female|
|       David|female|
|      Batong|female|
|       Zheng|female|
|     Gabriel|female|
|       Boris|female|
|        Jose|female|
|    Fernando|female|
+------------+------+

Because different property groups are actually stored in different parquet.
What do you think about extending this APIs for reading multiple property groups?

@lixueclaire
Copy link
Contributor

cc @lixueclaire @acezen. Currently, reading a single property group is easy to push down:

    val property_group = vertex_info.getPropertyGroup("gender")

    // test reading a single property chunk
    val single_chunk_df = reader.readVertexPropertyChunk(property_group, 0)
    assert(single_chunk_df.columns.length == 3)
    assert(single_chunk_df.count() == 100)
    val cond = "gender = 'female'"
    var df_pd = single_chunk_df.select("firstName", "gender").filter(cond)
    df_pd.explain()
    df_pd.show()
== Physical Plan ==
*(1) Filter (isnotnull(gender#2) AND (gender#2 = female))
+- *(1) ColumnarToRow
   +- BatchScan[firstName#0, gender#2] GarScan DataFilters: [isnotnull(gender#2), (gender#2 = female)], Format: gar, Location: InMemoryFileIndex(1 paths)[file:/home/simple/code/cpp/GraphAr/spark/src/test/resources/gar-test/l..., PartitionFilters: [], PushedFilters: [IsNotNull(gender), EqualTo(gender,female)], ReadSchema: struct<firstName:string,gender:string>, PushedFilters: [IsNotNull(gender), EqualTo(gender,female)] RuntimeFilters: []
2

+------------+------+
|   firstName|gender|
+------------+------+
|         Eli|female|
|      Joseph|female|
|        Jose|female|
|         Jun|female|
|       A. C.|female|
|       Karim|female|
|        Poul|female|
|       Chipo|female|
|       Dovid|female|
|       Ashin|female|
|         Cam|female|
|        Kurt|female|
|Daouda Malam|female|
|       David|female|
|      Batong|female|
|       Zheng|female|
|     Gabriel|female|
|       Boris|female|
|        Jose|female|
|    Fernando|female|
+------------+------+

But it is difficult to push down for reading multiple property groups:

    val vertex_df_with_index = reader.readAllVertexPropertyGroups()
    assert(vertex_df_with_index.columns.length == 5)
    assert(vertex_df_with_index.count() == 903)
    df_pd = vertex_df_with_index.filter(cond).select("firstName", "gender")
    df_pd.explain()
    df_pd.show()
== Physical Plan ==
*(1) Project [firstName#196, gender#198]
+- *(1) Filter (isnotnull(gender#198) AND (gender#198 = female))
   +- *(1) Scan ExistingRDD[_graphArVertexIndex#194L,id#195L,firstName#196,lastName#197,gender#198]
2

+------------+------+
|   firstName|gender|
+------------+------+
|         Eli|female|
|      Joseph|female|
|        Jose|female|
|         Jun|female|
|       A. C.|female|
|       Karim|female|
|        Poul|female|
|       Chipo|female|
|       Dovid|female|
|       Ashin|female|
|         Cam|female|
|        Kurt|female|
|Daouda Malam|female|
|       David|female|
|      Batong|female|
|       Zheng|female|
|     Gabriel|female|
|       Boris|female|
|        Jose|female|
|    Fernando|female|
+------------+------+

Because different property groups are actually stored in different parquet. What do you think about extending this APIs for reading multiple property groups?

Hi, @Ziy1-Tan, thanks for your proposal. Currently, it is OK for me. We do not intend to propose a method to support filter pushdown between different parquet files. In GraphAr's design, properties are encouraged to be in the same group if they are accessed together, thus pushdown is supported as well.

@Ziy1-Tan
Copy link
Contributor Author

cc @lixueclaire @acezen. Currently, reading a single property group is easy to push down:

    val property_group = vertex_info.getPropertyGroup("gender")

    // test reading a single property chunk
    val single_chunk_df = reader.readVertexPropertyChunk(property_group, 0)
    assert(single_chunk_df.columns.length == 3)
    assert(single_chunk_df.count() == 100)
    val cond = "gender = 'female'"
    var df_pd = single_chunk_df.select("firstName", "gender").filter(cond)
    df_pd.explain()
    df_pd.show()
== Physical Plan ==
*(1) Filter (isnotnull(gender#2) AND (gender#2 = female))
+- *(1) ColumnarToRow
   +- BatchScan[firstName#0, gender#2] GarScan DataFilters: [isnotnull(gender#2), (gender#2 = female)], Format: gar, Location: InMemoryFileIndex(1 paths)[file:/home/simple/code/cpp/GraphAr/spark/src/test/resources/gar-test/l..., PartitionFilters: [], PushedFilters: [IsNotNull(gender), EqualTo(gender,female)], ReadSchema: struct<firstName:string,gender:string>, PushedFilters: [IsNotNull(gender), EqualTo(gender,female)] RuntimeFilters: []
2

+------------+------+
|   firstName|gender|
+------------+------+
|         Eli|female|
|      Joseph|female|
|        Jose|female|
|         Jun|female|
|       A. C.|female|
|       Karim|female|
|        Poul|female|
|       Chipo|female|
|       Dovid|female|
|       Ashin|female|
|         Cam|female|
|        Kurt|female|
|Daouda Malam|female|
|       David|female|
|      Batong|female|
|       Zheng|female|
|     Gabriel|female|
|       Boris|female|
|        Jose|female|
|    Fernando|female|
+------------+------+

But it is difficult to push down for reading multiple property groups:

    val vertex_df_with_index = reader.readAllVertexPropertyGroups()
    assert(vertex_df_with_index.columns.length == 5)
    assert(vertex_df_with_index.count() == 903)
    df_pd = vertex_df_with_index.filter(cond).select("firstName", "gender")
    df_pd.explain()
    df_pd.show()
== Physical Plan ==
*(1) Project [firstName#196, gender#198]
+- *(1) Filter (isnotnull(gender#198) AND (gender#198 = female))
   +- *(1) Scan ExistingRDD[_graphArVertexIndex#194L,id#195L,firstName#196,lastName#197,gender#198]
2

+------------+------+
|   firstName|gender|
+------------+------+
|         Eli|female|
|      Joseph|female|
|        Jose|female|
|         Jun|female|
|       A. C.|female|
|       Karim|female|
|        Poul|female|
|       Chipo|female|
|       Dovid|female|
|       Ashin|female|
|         Cam|female|
|        Kurt|female|
|Daouda Malam|female|
|       David|female|
|      Batong|female|
|       Zheng|female|
|     Gabriel|female|
|       Boris|female|
|        Jose|female|
|    Fernando|female|
+------------+------+

Because different property groups are actually stored in different parquet. What do you think about extending this APIs for reading multiple property groups?

Hi, @Ziy1-Tan, thanks for your proposal. Currently, it is OK for me. We do not intend to propose a method to support filter pushdown between different parquet files. In GraphAr's design, properties are encouraged to be in the same group if they are accessed together, thus pushdown is supported as well.

Got it. I'm going to test the performance improvement based on spark filter pushdown.

  1. It seems that testing/ldbc and testing/modern_graph only contain metadata and not actual chunk data.
  2. Where can I find these data? Does the performance comparison need to be incorporated into the current documentation?

@lixueclaire
Copy link
Contributor

cc @lixueclaire @acezen. Currently, reading a single property group is easy to push down:

    val property_group = vertex_info.getPropertyGroup("gender")

    // test reading a single property chunk
    val single_chunk_df = reader.readVertexPropertyChunk(property_group, 0)
    assert(single_chunk_df.columns.length == 3)
    assert(single_chunk_df.count() == 100)
    val cond = "gender = 'female'"
    var df_pd = single_chunk_df.select("firstName", "gender").filter(cond)
    df_pd.explain()
    df_pd.show()
== Physical Plan ==
*(1) Filter (isnotnull(gender#2) AND (gender#2 = female))
+- *(1) ColumnarToRow
   +- BatchScan[firstName#0, gender#2] GarScan DataFilters: [isnotnull(gender#2), (gender#2 = female)], Format: gar, Location: InMemoryFileIndex(1 paths)[file:/home/simple/code/cpp/GraphAr/spark/src/test/resources/gar-test/l..., PartitionFilters: [], PushedFilters: [IsNotNull(gender), EqualTo(gender,female)], ReadSchema: struct<firstName:string,gender:string>, PushedFilters: [IsNotNull(gender), EqualTo(gender,female)] RuntimeFilters: []
2

+------------+------+
|   firstName|gender|
+------------+------+
|         Eli|female|
|      Joseph|female|
|        Jose|female|
|         Jun|female|
|       A. C.|female|
|       Karim|female|
|        Poul|female|
|       Chipo|female|
|       Dovid|female|
|       Ashin|female|
|         Cam|female|
|        Kurt|female|
|Daouda Malam|female|
|       David|female|
|      Batong|female|
|       Zheng|female|
|     Gabriel|female|
|       Boris|female|
|        Jose|female|
|    Fernando|female|
+------------+------+

But it is difficult to push down for reading multiple property groups:

    val vertex_df_with_index = reader.readAllVertexPropertyGroups()
    assert(vertex_df_with_index.columns.length == 5)
    assert(vertex_df_with_index.count() == 903)
    df_pd = vertex_df_with_index.filter(cond).select("firstName", "gender")
    df_pd.explain()
    df_pd.show()
== Physical Plan ==
*(1) Project [firstName#196, gender#198]
+- *(1) Filter (isnotnull(gender#198) AND (gender#198 = female))
   +- *(1) Scan ExistingRDD[_graphArVertexIndex#194L,id#195L,firstName#196,lastName#197,gender#198]
2

+------------+------+
|   firstName|gender|
+------------+------+
|         Eli|female|
|      Joseph|female|
|        Jose|female|
|         Jun|female|
|       A. C.|female|
|       Karim|female|
|        Poul|female|
|       Chipo|female|
|       Dovid|female|
|       Ashin|female|
|         Cam|female|
|        Kurt|female|
|Daouda Malam|female|
|       David|female|
|      Batong|female|
|       Zheng|female|
|     Gabriel|female|
|       Boris|female|
|        Jose|female|
|    Fernando|female|
+------------+------+

Because different property groups are actually stored in different parquet. What do you think about extending this APIs for reading multiple property groups?

Hi, @Ziy1-Tan, thanks for your proposal. Currently, it is OK for me. We do not intend to propose a method to support filter pushdown between different parquet files. In GraphAr's design, properties are encouraged to be in the same group if they are accessed together, thus pushdown is supported as well.

Got it. I'm going to test the performance improvement based on spark filter pushdown.

  1. It seems that testing/ldbc and testing/modern_graph only contain metadata and not actual chunk data.
  2. Where can I find these data? Does the performance comparison need to be incorporated into the current documentation?
  1. We do not provide the complete data of LDBC to reduce the size of testing repo. @acezen , could you please give Ziyi a guideline of how to use the LDBC data?
  2. Regarding the documentation, you can create another pull request to update the documentation and include the performance comparison. It would be helpful to provide a brief description of your evaluation methodology and the results of the performance comparison. You can refer to the GraphScope page as an example of how the documentation can be structured.

@acezen
Copy link
Contributor

acezen commented Aug 31, 2023

  1. We do not provide the complete data of LDBC to reduce the size of testing repo. @acezen , could you please give Ziyi a guideline of how to use the LDBC data?
  2. Regarding the documentation, you can create another pull request to update the documentation and include the performance comparison. It would be helpful to provide a brief description of your evaluation methodology and the results of the performance comparison. You can refer to the GraphScope page as an example of how the documentation can be structured.

We don't have large scale ldbc dataset yet. I can generate a copy of ldbc-sf30, ldbc-100 to OSS for performance test.

@Ziy1-Tan
Copy link
Contributor Author

Ziy1-Tan commented Sep 5, 2023

  1. We do not provide the complete data of LDBC to reduce the size of testing repo. @acezen , could you please give Ziyi a guideline of how to use the LDBC data?
  2. Regarding the documentation, you can create another pull request to update the documentation and include the performance comparison. It would be helpful to provide a brief description of your evaluation methodology and the results of the performance comparison. You can refer to the GraphScope page as an example of how the documentation can be structured.

We don't have large scale ldbc dataset yet. I can generate a copy of ldbc-sf30, ldbc-100 to OSS for performance test.

Got it. Can't wait to test the performance improvement.

@acezen
Copy link
Contributor

acezen commented Sep 5, 2023

  1. We do not provide the complete data of LDBC to reduce the size of testing repo. @acezen , could you please give Ziyi a guideline of how to use the LDBC data?
  2. Regarding the documentation, you can create another pull request to update the documentation and include the performance comparison. It would be helpful to provide a brief description of your evaluation methodology and the results of the performance comparison. You can refer to the GraphScope page as an example of how the documentation can be structured.

We don't have large scale ldbc dataset yet. I can generate a copy of ldbc-sf30, ldbc-100 to OSS for performance test.

Got it. Can't wait to test the performance improvement.

hi, @Ziy1-Tan, the large scale ldbc dataset has been publish to here, you can download it to test the performance.

@acezen
Copy link
Contributor

acezen commented Sep 7, 2023

hi, @Ziy1-Tan ,code format for scala and java has been merge into main, you can rebase and apply the format withmvn spotless:apply

assert(property_df.columns.size == 3)
val cond = "gender = 'female'"
var df_pd = single_chunk_df.select("firstName","gender").filter(cond)
df_pd.explain()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please include the resulting physical plan in the comments? This would effectively demonstrate the filter pushdown in a more intuitive manner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will apply plan on it.

Signed-off-by: Ziy1-Tan <[email protected]>
Signed-off-by: Ziy1-Tan <[email protected]>
Copy link
Contributor

@lixueclaire lixueclaire left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM~ This is highly appreciated and valuable, thank you for your contribution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants