-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Refine the GraphWriter
to automatically generate graph info and improve the Neo4j case
#196
Conversation
hi, @lixueclaire , please help me review the change and give your opinion. |
Overall, it looks good for me. Have you ever tested the modified examples and checked the results? If you decide to change the example of Neo4j2GraphAr, there may be some extra things that should be updated:
Besides, since the data generated from reading Neo4j changed, the example of GraphAr2Neo4j may not work now. |
spark/src/main/scala/com/alibaba/graphar/graph/GraphWriter.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/com/alibaba/graphar/graph/GraphWriter.scala
Outdated
Show resolved
Hide resolved
Yes, I will update the document and Neo4j case if we all good about the API change. |
GraphWriter
to automatically generate graph info base the…GraphWriter
to automatically generate graph info base the…
@@ -94,7 +94,11 @@ class VertexReader(prefix: String, vertexInfo: VertexInfo, spark: SparkSession) | |||
val pg0: PropertyGroup = propertyGroups.get(0) | |||
val df0 = readVertexPropertyGroup(pg0, false) | |||
if (len == 1) { | |||
return df0 | |||
if (addIndex) { | |||
return IndexGenerator.generateVertexIndexColumn(df0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NB: this is fixing the bug that no index column generated in the result when property group only contain one property.
@@ -86,6 +86,19 @@ object IndexGenerator { | |||
spark.createDataFrame(rdd_with_index, schema_with_index) | |||
} | |||
|
|||
def generateVertexIndexColumnAndIndexMapping(vertexDf: DataFrame, primaryKey: String = ""): (DataFrame, DataFrame) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NB: The method is added for generating index column and index mapping in one-shot, avoid to generate index mapping again when process edges.
val df = DataFrameConcat.concat(adjList_df, properties_df) | ||
val property_groups = edgeInfo.getPropertyGroups(adjListType) | ||
val df = if (property_groups.size == 0) { | ||
adjList_df |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NB: this is fixing the bug that when there are only adj list and no property group, the concatenated DataFrame in line 339 partition num is not same between adjList_df
and properties_df
(the properties_df
is a empty DataFrame and its partition num is 0).
… schema of dataset
7f70ab2
to
06a8bc7
Compare
docs/applications/spark.rst
Outdated
.save() | ||
}) | ||
|
||
def main(args: Array[String]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix format
docs/applications/spark.rst
Outdated
|
||
- <id> the internal Neo4j ID | ||
- <labels> a list of labels for that node | ||
def main(args: Array[String]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix format
|
||
putVertexDataIntoNeo4j(graphInfo, vertexData, spark) | ||
putEdgeDataIntoNeo4j(graphInfo, vertexData, edgeData, spark) | ||
} | ||
|
||
See `GraphAr2Neo4j.scala`_ for the complete example. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add some descriptions about how to write in different modes (i.e., how to modify the example).
@@ -480,6 +480,29 @@ class EdgeInfo() { | |||
def getConcatKey(): String = { | |||
return getSrc_label + GeneralParams.regularSeperator + getEdge_label + GeneralParams.regularSeperator + getDst_label | |||
} | |||
|
|||
/** Dump to Json string. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yaml?
@@ -223,6 +223,25 @@ class VertexInfo() { | |||
} | |||
return prefix + str | |||
} | |||
|
|||
/** Dump to Json string. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yaml
@@ -34,20 +35,20 @@ object GraphReader { | |||
private def readAllVertices(prefix: String, vertexInfos: Map[String, VertexInfo], spark: SparkSession): Map[String, DataFrame] = { | |||
val vertex_dataframes: Map[String, DataFrame] = vertexInfos.map { case (label, vertexInfo) => { | |||
val reader = new VertexReader(prefix, vertexInfo, spark) | |||
(label, reader.readAllVertexPropertyGroups(false)) | |||
(label, reader.readAllVertexPropertyGroups(true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be adding a parameter for adding index is required for this function.
def write(path: String, | ||
spark: SparkSession, | ||
name: String = "graph", | ||
vertex_chunk_size: Long = 262144, // 2^18 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the default value into general parameters?
|
||
object Neo4j2GraphAr { | ||
|
||
def main(args: Array[String]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to add some comments for args.
val edgeChunkSize: Long = args(2).toLong | ||
val fileType: String = args(3) | ||
|
||
writer.write(outputPath, spark, "MovieGraph", vertexChunkSize, edgeChunkSize, fileType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you help to update the test data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CI and case is no need to read from gar-test repo anymore. But if you think we still need to add a copy to test data, I will update it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CI and case is no need to read from gar-test repo anymore.
We did not need read from gar-test previously, either. The test data in gar-test is used to give a showcase for the data generated by you example. Currently, the data is inconsistent with the new examples, thus it would be a little confusing.
|
||
object GraphAr2Neo4j { | ||
|
||
def main(args: Array[String]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comments for args.
697a657
to
33d375d
Compare
Thanks a lot for the review @lixueclaire . I agree with all your comments and made the necessary changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM~ I believe it's a prime example of GraphAr's capabilities. Thank you for making this change!
GraphWriter
to automatically generate graph info base the…GraphWriter
to automatically generate graph info and improve the Neo4j case
… schema of dataset
Proposed changes
This change focus on Spark's graph level API's refine and Neo4j case's refactor:
GraphWriter
implement as class and addPutVertexData
andPutEdgeData
method to put vertex/edge data frame to writer. User no need to construct Mapping for data any more.dump
method toGraphInfo
,VertexInfo
andEdgeInfo
to support dumps the info to JSON string.There still something we can optimize:
.option(xxxx)
style to configure parameter of write method likevertex_chunk_size
,edge_chunk_size
etc.Types of changes
What types of changes does your code introduce to GraphAr?
Put an
x
in the boxes that applyChecklist
Put an
x
in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code.Further comments
Fixes #200
Fixes #201