Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
acezen committed Jan 17, 2024
1 parent 442f5f9 commit 470eacf
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 25 deletions.
5 changes: 4 additions & 1 deletion spark/src/main/java/com/alibaba/graphar/GeneralParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.alibaba.graphar;

import org.apache.spark.storage.StorageLevel;

/** General constant parameters for graphar. */
public class GeneralParams {
// column name
Expand All @@ -26,11 +28,12 @@ public class GeneralParams {
public static final String primaryCol = "_graphArPrimary";
public static final String vertexChunkIndexCol = "_graphArVertexChunkIndex";
public static final String edgeIndexCol = "_graphArEdgeIndex";
public static final String regularSeperator = "_";
public static final String regularSeparator = "_";
public static final String offsetStartChunkIndexKey = "_graphar_offset_start_chunk_index";
public static final String aggNumListOfEdgeChunkKey = "_graphar_agg_num_list_of_edge_chunk";
public static final Long defaultVertexChunkSize = 262144L; // 2^18
public static final Long defaultEdgeChunkSize = 4194304L; // 2^22
public static final String defaultFileType = "parquet";
public static final String defaultVersion = "v1";
public static final StorageLevel defaultStorageLevel = StorageLevel.MEMORY_AND_DISK_SER();
}
8 changes: 4 additions & 4 deletions spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ class EdgeInfo() {
val num = properties.size
for (j <- 0 to num - 1) {
if (j > 0)
str += GeneralParams.regularSeperator
str += GeneralParams.regularSeparator
str += properties.get(j).getName;
}
str += "/"
Expand Down Expand Up @@ -513,7 +513,7 @@ class EdgeInfo() {
val num = properties.size
for (j <- 0 to num - 1) {
if (j > 0)
str += GeneralParams.regularSeperator
str += GeneralParams.regularSeparator
str += properties.get(j).getName;
}
str += "/"
Expand Down Expand Up @@ -546,7 +546,7 @@ class EdgeInfo() {
val num = properties.size
for (j <- 0 to num - 1) {
if (j > 0)
str += GeneralParams.regularSeperator
str += GeneralParams.regularSeparator
str += properties.get(j).getName;
}
str += "/"
Expand All @@ -556,7 +556,7 @@ class EdgeInfo() {
}

def getConcatKey(): String = {
return getSrc_label + GeneralParams.regularSeperator + getEdge_label + GeneralParams.regularSeperator + getDst_label
return getSrc_label + GeneralParams.regularSeparator + getEdge_label + GeneralParams.regularSeparator + getDst_label
}

/** Dump to Yaml string. */
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ class GraphInfo() {
dstLabel: String
): EdgeInfo = {
val key =
srcLabel + GeneralParams.regularSeperator + edgeLabel + GeneralParams.regularSeperator + dstLabel
srcLabel + GeneralParams.regularSeparator + edgeLabel + GeneralParams.regularSeparator + dstLabel
edgeInfos(key)
}

Expand Down
4 changes: 2 additions & 2 deletions spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class VertexInfo() {
val num = properties.size
for (j <- 0 to num - 1) {
if (j > 0) {
str += GeneralParams.regularSeperator
str += GeneralParams.regularSeparator
}
str += properties.get(j).getName;
}
Expand Down Expand Up @@ -243,7 +243,7 @@ class VertexInfo() {
val num = properties.size
for (j <- 0 to num - 1) {
if (j > 0)
str += GeneralParams.regularSeperator
str += GeneralParams.regularSeparator
str += properties.get(j).getName;
}
str += "/"
Expand Down
10 changes: 5 additions & 5 deletions spark/src/main/scala/com/alibaba/graphar/graph/GraphWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import com.alibaba.graphar.util.IndexGenerator
import com.alibaba.graphar.util.Utils

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.types._

import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -88,18 +87,19 @@ class GraphWriter() {
vertexInfos.foreach {
case (label, vertexInfo) => {
val primaryKey = primaryKeys(label)
vertices(label).persist(StorageLevel.MEMORY_AND_DISK_SER) // cache the vertex DataFrame
vertices(label).persist(GeneralParams.defaultStorageLevel) // cache the vertex DataFrame
val df_and_mapping = IndexGenerator
.generateVertexIndexColumnAndIndexMapping(vertices(label), primaryKey)
df_and_mapping._1.persist(StorageLevel.MEMORY_AND_DISK_SER) // cache the vertex DataFrame with index
df_and_mapping._2.persist(StorageLevel.MEMORY_AND_DISK_SER) // cache the index mapping DataFrame
df_and_mapping._1.persist(GeneralParams.defaultStorageLevel) // cache the vertex DataFrame with index
df_and_mapping._2.persist(GeneralParams.defaultStorageLevel) // cache the index mapping DataFrame
vertices(label).unpersist() // unpersist the vertex DataFrame
val df_with_index = df_and_mapping._1
indexMappings += label -> df_and_mapping._2
val writer =
new VertexWriter(prefix, vertexInfo, df_with_index)
vertexNums += label -> writer.getVertexNum()
writer.writeVertexProperties()
df_with_index.unpersist()
}
}

Expand All @@ -121,7 +121,7 @@ class GraphWriter() {
src_vertex_index_mapping,
dst_vertex_index_mapping
)
edge_df_with_index.persist(StorageLevel.MEMORY_AND_DISK_SER) // cache the edge DataFrame with index
edge_df_with_index.persist(GeneralParams.defaultStorageLevel) // cache the edge DataFrame with index

val adj_lists = edgeInfo.getAdj_lists
val adj_list_it = adj_lists.iterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ object FileSystem {
// TODO: Make the hard-code setting to configurable
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
spark.conf.set("parquet.enable.summary-metadata", "false")
spark.conf.set("spark.sql.orc.compression.codec", "snappy")
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")
spark.conf.set("spark.sql.orc.compression.codec", "zstd")
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
// first check the outputPrefix exists, if not, create it
val path = new Path(outputPrefix)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
Expand Down
19 changes: 12 additions & 7 deletions spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.spark.sql.types.{
StructField
}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel

import scala.collection.SortedMap
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -65,7 +64,7 @@ object EdgeWriter {

// sort by primary key and generate continue edge id for edge records
val sortedDfRDD = edgeDf.sort(colName).rdd
sortedDfRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
sortedDfRDD.persist(GeneralParams.defaultStorageLevel)
// generate continue edge id for every edge
val partitionCounts = sortedDfRDD
.mapPartitionsWithIndex(
Expand All @@ -84,7 +83,7 @@ object EdgeWriter {
val start = broadcastedPartitionCounts.value(i)
for { (row, j) <- ps.zipWithIndex } yield (start + j, row)
})
rddWithEid.persist(StorageLevel.MEMORY_AND_DISK_SER)
rddWithEid.persist(GeneralParams.defaultStorageLevel)

// Construct partitioner for edge chunk
// get edge num of every vertex chunk
Expand Down Expand Up @@ -136,7 +135,7 @@ object EdgeWriter {
rddWithEid.repartitionAndSortWithinPartitions(partitioner).values
val partitionEdgeDf = spark.createDataFrame(partitionRDD, edgeSchema)
rddWithEid.unpersist() // unpersist the rddWithEid
partitionEdgeDf.persist(StorageLevel.MEMORY_AND_DISK_SER)
partitionEdgeDf.persist(GeneralParams.defaultStorageLevel)

// generate offset DataFrames
if (
Expand All @@ -147,7 +146,7 @@ object EdgeWriter {
iterator.map(row => (row(colIndex).asInstanceOf[Long], 1))
})
.reduceByKey(_ + _)
edgeCountsByPrimaryKey.persist(StorageLevel.MEMORY_AND_DISK_SER)
edgeCountsByPrimaryKey.persist(GeneralParams.defaultStorageLevel)
val offsetDfSchema = StructType(
Seq(StructField(GeneralParams.offsetCol, IntegerType))
)
Expand Down Expand Up @@ -175,10 +174,11 @@ object EdgeWriter {
})
.map { case (k, v) => Row(v) }
val offsetChunk = spark.createDataFrame(offsetRDD, offsetDfSchema)
offsetChunk.persist(StorageLevel.MEMORY_AND_DISK_SER)
offsetChunk.persist(GeneralParams.defaultStorageLevel)
offsetChunk
}
}
edgeCountsByPrimaryKey.unpersist() // unpersist the edgeCountsByPrimaryKey
return (
partitionEdgeDf,
offsetDfArray,
Expand Down Expand Up @@ -223,7 +223,7 @@ class EdgeWriter(
validate()
writeVertexNum()

edgeDf.persist(StorageLevel.MEMORY_AND_DISK_SER)
edgeDf.persist(GeneralParams.defaultStorageLevel)

// validate data and info
private def validate(): Unit = {
Expand Down Expand Up @@ -299,6 +299,7 @@ class EdgeWriter(
Some(chunkIndex),
None
)
offsetChunk.unpersist()
chunkIndex = chunkIndex + 1
}
}
Expand Down Expand Up @@ -375,4 +376,8 @@ class EdgeWriter(
writeAdjList()
writeEdgeProperties()
}

override def finalize(): Unit = {
edgeDfAndOffsetDf._1.unpersist()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{LongType, StructField}
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -75,7 +74,7 @@ class VertexWriter(
numVertices: Long = -1
) {
private val spark = vertexDf.sparkSession
vertexDf.persist(StorageLevel.MEMORY_AND_DISK_SER) // cache the vertex DataFrame
vertexDf.persist(GeneralParams.defaultStorageLevel) // cache the vertex DataFrame
validate()
private val vertexNum: Long =
if (numVertices < 0) vertexDf.count else numVertices
Expand All @@ -87,7 +86,7 @@ class VertexWriter(
vertexNum
)
vertexDf.unpersist() // unpersist the vertex DataFrame
chunks.persist(StorageLevel.MEMORY_AND_DISK_SER)
chunks.persist(GeneralParams.defaultStorageLevel)

private def validate(): Unit = {
// check if vertex DataFrame contains the index_filed
Expand Down Expand Up @@ -152,4 +151,9 @@ class VertexWriter(
writeVertexProperties(property_group)
}
}

override def finalize(): Unit = {
chunks.unpersist()
}

}

0 comments on commit 470eacf

Please sign in to comment.