Skip to content

Commit

Permalink
Format
Browse files Browse the repository at this point in the history
  • Loading branch information
acezen committed Jan 17, 2024
1 parent 470eacf commit b568d63
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
16 changes: 12 additions & 4 deletions spark/src/main/scala/com/alibaba/graphar/graph/GraphWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,17 @@ class GraphWriter() {
vertexInfos.foreach {
case (label, vertexInfo) => {
val primaryKey = primaryKeys(label)
vertices(label).persist(GeneralParams.defaultStorageLevel) // cache the vertex DataFrame
vertices(label).persist(
GeneralParams.defaultStorageLevel
) // cache the vertex DataFrame
val df_and_mapping = IndexGenerator
.generateVertexIndexColumnAndIndexMapping(vertices(label), primaryKey)
df_and_mapping._1.persist(GeneralParams.defaultStorageLevel) // cache the vertex DataFrame with index
df_and_mapping._2.persist(GeneralParams.defaultStorageLevel) // cache the index mapping DataFrame
df_and_mapping._1.persist(
GeneralParams.defaultStorageLevel
) // cache the vertex DataFrame with index
df_and_mapping._2.persist(
GeneralParams.defaultStorageLevel
) // cache the index mapping DataFrame
vertices(label).unpersist() // unpersist the vertex DataFrame
val df_with_index = df_and_mapping._1
indexMappings += label -> df_and_mapping._2
Expand Down Expand Up @@ -121,7 +127,9 @@ class GraphWriter() {
src_vertex_index_mapping,
dst_vertex_index_mapping
)
edge_df_with_index.persist(GeneralParams.defaultStorageLevel) // cache the edge DataFrame with index
edge_df_with_index.persist(
GeneralParams.defaultStorageLevel
) // cache the edge DataFrame with index

val adj_lists = edgeInfo.getAdj_lists
val adj_list_it = adj_lists.iterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class EdgeWriter(
private val spark: SparkSession = edgeDf.sparkSession
validate()
writeVertexNum()

edgeDf.persist(GeneralParams.defaultStorageLevel)

// validate data and info
Expand Down Expand Up @@ -266,7 +266,7 @@ class EdgeWriter(
adjListType,
vertexNum
)

// write out the edge number
private def writeEdgeNum(): Unit = {
val vertexChunkSize: Long = if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ class VertexWriter(
numVertices: Long = -1
) {
private val spark = vertexDf.sparkSession
vertexDf.persist(GeneralParams.defaultStorageLevel) // cache the vertex DataFrame
vertexDf.persist(
GeneralParams.defaultStorageLevel
) // cache the vertex DataFrame
validate()
private val vertexNum: Long =
if (numVertices < 0) vertexDf.count else numVertices
Expand Down

0 comments on commit b568d63

Please sign in to comment.