-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Improvement] Improve GraphAr spark writer performance and implement custom writer builder to bypass spark's write behavior #92
Conversation
3058940
to
566e853
Compare
🎊 PR Preview 04e1c3d has been successfully built and deployed to https://alibaba-graphar-build-pr-92.surge.sh 🤖 By surge-preview |
caaabaf
to
d5dfbe0
Compare
@@ -26,5 +26,7 @@ public class GeneralParams { | |||
public static final String vertexChunkIndexCol = "_graphArVertexChunkIndex"; | |||
public static final String edgeIndexCol = "_graphArEdgeIndex"; | |||
public static final String regularSeperator = "_"; | |||
public static final String offsetStartChunkIndexKey = "_graphar_offste_start_chunk_index"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"offste" -> "offset"
var mid = 0 | ||
while (low <= high) { | ||
mid = (high + low) / 2; | ||
if (aggNums(mid) <= key && aggNums(mid + 1) > key) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aggNums(mid + 1) may access an invalid address when low == high == mid == aggNums.length-1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, in our scenario, the key would always landing on index < aggNums.length-1,
but we can revise the condition to if (aggNums(mid) <= key && (mid == length - 1 || aggNums(mid + 1) > key))
for save.
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions( | ||
map.asCaseSensitiveMap().asScala.toMap) | ||
shortName() + " " + paths.map(qualifiedPathName(_, hadoopConf)).mkString(",") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val name = shortName() + " " + paths.map(qualifiedPathName(_, hadoopConf)).mkString(",")
Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, name)
I'm not sure the redact() function in the original code is required for our case
spark/src/main/scala/com/alibaba/graphar/utils/Patitioner.scala
Outdated
Show resolved
Hide resolved
val edgeNumOfVertexChunks = sortedDfRDD.mapPartitions(iterator => { | ||
iterator.map(row => (row(colIndex).asInstanceOf[Long] / vertexChunkSize, 1)) | ||
}).reduceByKey(_ + _).collectAsMap() | ||
val vertexChunkNum = edgeNumOfVertexChunks.size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set vertexChunkNum = Max(VertexChunkId) + 1 to handle the case when there are no edges for a vertex chunk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved by passing a vertex num augment.
// generate global edge id for each record of dataframe | ||
val parition_counts = df_rdd | ||
val edgeSchema = edgeDf.schema | ||
val colIndex = edgeSchema.fieldIndex(if (adjListType == AdjListType.ordered_by_source) GeneralParams.srcIndexCol else GeneralParams.dstIndexCol) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this function repartitionAndSort() only required for AdjListType.ordered_by_source & AdjListType.ordered_by_dest? It seems that it is called for all types.
val vertexChunkNum: Int = ((vertexNumOfPrimaryVertexLabel + vertexChunkSize - 1) / vertexChunkSize).toInt // ceil | ||
|
||
// sort by primary key and generate continue edge id for edge records | ||
val sortedDfRDD = edgeDf.sort(GeneralParams.srcIndexCol).rdd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"GeneralParams.srcIndexCol" -> "colIndex"
1f3b0df
to
91dca36
Compare
c38bfb7
to
552e5c6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Proposed changes
This PR aims to improve GraphAr spark writer's performace by changes:
GarCommitProtocol
to change the file strategy of spark writeTypes of changes
What types of changes does your code introduce to GraphAr?
Put an
x
in the boxes that applyChecklist
Put an
x
in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code.Further comments
#68