Skip to content

Commit

Permalink
Spark
Browse files Browse the repository at this point in the history
  • Loading branch information
acezen committed Nov 6, 2023
1 parent ea0dce2 commit ce9daf3
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 13 deletions.
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ Take the "person" vertex table as an example, if the chunk size is set to be 500
:align: center
:alt: vertex physical table

Note: For efficiently utilize the filter push-down of the payload file format like Parquet, the internal vertex id is stored in the payload file as a column. And since the internal vertex id is continuous, the payload file format can use the dictionary encoding for the internal vertex id column, which would not bring too much overhead for the storage.

Edges in GraphAr
^^^^^^^^^^^^^^^^

Expand Down
2 changes: 2 additions & 0 deletions docs/user-guide/file-format.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ Take the "person" vertex table as an example, if the chunk size is set to be 500
:alt: vertex physical table


Note: For efficiently utilize the filter push-down of the payload file format like Parquet, the internal vertex id is stored in the payload file as a column. And since the internal vertex id is continuous, the payload file format can use the dictionary encoding for the internal vertex id column, which would not bring too much overhead for the storage.

Edges in GraphAr
------------------------

Expand Down
13 changes: 10 additions & 3 deletions spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.alibaba.graphar.writer

import com.alibaba.graphar.util.{FileSystem, ChunkPartitioner}
import com.alibaba.graphar.util.{FileSystem, ChunkPartitioner, IndexGenerator}
import com.alibaba.graphar.{GeneralParams, VertexInfo, PropertyGroup}

import org.apache.spark.sql.types._
Expand All @@ -33,10 +33,16 @@ object VertexWriter {
chunkSize: Long,
vertexNum: Long
): DataFrame = {
val vertex_df_schema = vertexDf.schema
val vertexDfWithIndex = vertexDf.schema.contains(
StructField(GeneralParams.vertexIndexCol, LongType)
) match {
case true => vertexDf
case _ => IndexGenerator.generateVertexIndexColumn(vertexDf)
}
val vertex_df_schema = vertexDfWithIndex.schema
val index = vertex_df_schema.fieldIndex(GeneralParams.vertexIndexCol)
val partition_num = ((vertexNum + chunkSize - 1) / chunkSize).toInt
val rdd = vertexDf.rdd.map(row => (row(index).asInstanceOf[Long], row))
val rdd = vertexDfWithIndex.rdd.map(row => (row(index).asInstanceOf[Long], row))

// repartition
val partitioner = new ChunkPartitioner(partition_num, chunkSize)
Expand Down Expand Up @@ -109,6 +115,7 @@ class VertexWriter(
// write out the chunks
val output_prefix = prefix + vertexInfo.getPathPrefix(propertyGroup)
val property_list = ArrayBuffer[String]()
property_list += "`" + GeneralParams.vertexIndexCol + "`"
val it = propertyGroup.getProperties().iterator
while (it.hasNext()) {
val property = it.next()
Expand Down
13 changes: 3 additions & 10 deletions spark/src/test/scala/com/alibaba/graphar/TestWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class WriterSuite extends AnyFunSuite {

// read vertex yaml
val vertex_yaml_path = getClass.getClassLoader
.getResource("gar-test/ldbc_sample/parquet/person.vertex.yml")
.getResource("gar-test/ldbc_sample/csv/person.vertex.yml")
.getPath
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml_path, spark)

Expand Down Expand Up @@ -74,15 +74,8 @@ class WriterSuite extends AnyFunSuite {
vertex_num_path,
spark.sparkContext.hadoopConfiguration
)
assert(number.toInt == vertex_df.count())

assertThrows[IllegalArgumentException](
new VertexWriter(prefix, vertex_info, vertex_df)
)
val invalid_property_group = new PropertyGroup()
assertThrows[IllegalArgumentException](
writer.writeVertexProperties(invalid_property_group)
)
val property_group_2 = vertex_info.getPropertyGroup("firstName")
writer.writeVertexProperties(property_group_2)

// clean generated files and close FileSystem instance
fs.delete(new Path(prefix + "vertex"))
Expand Down

0 comments on commit ce9daf3

Please sign in to comment.