Skip to content

Commit

Permalink
Parallel process chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
acezen committed Jan 17, 2024
1 parent b568d63 commit f7e6ae8
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,9 @@ class EdgeWriter(
else edgeInfo.getDst_chunk_size()
val vertexChunkNum: Int =
((vertexNum + vertexChunkSize - 1) / vertexChunkSize).toInt
for (i <- 0 until vertexChunkNum) {
val edgeNumber = edgeDfAndOffsetDf._4(i)
val outputPath = prefix + edgeInfo.getEdgesNumFilePath(i, adjListType)
val parallelEdgeNums = edgeDfAndOffsetDf._4.par
parallelEdgeNums.foreach { case (chunkIndex, edgeNumber) =>
val outputPath = prefix + edgeInfo.getEdgesNumFilePath(chunkIndex, adjListType)
FileSystem.writeValue(
edgeNumber,
outputPath,
Expand All @@ -291,16 +291,16 @@ class EdgeWriter(
var chunkIndex: Int = 0
val fileType = edgeInfo.getAdjListFileType(adjListType)
val outputPrefix = prefix + edgeInfo.getOffsetPathPrefix(adjListType)
for (offsetChunk <- edgeDfAndOffsetDf._2) {
val parallelOffsetChunks = edgeDfAndOffsetDf._2.par
parallelOffsetChunks.zipWithIndex.foreach { case (offsetChunk, i) =>
FileSystem.writeDataFrame(
offsetChunk,
FileType.FileTypeToString(fileType),
outputPrefix,
Some(chunkIndex),
Some(i),
None
)
offsetChunk.unpersist()
chunkIndex = chunkIndex + 1
}
}

Expand Down

0 comments on commit f7e6ae8

Please sign in to comment.