diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index f928e0156..cfb01b59e 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -47,6 +47,18 @@ jobs: make doc popd + - name: preview + uses: afc163/surge-preview@v1 + id: preview_step + with: + surge_token: ${{ secrets.SURGE_TOKEN }} + github_token: ${{ secrets.GITHUB_TOKEN }} + dist: docs/_build/html + build: echo "done" + + - name: Get the preview_url + run: echo "url => ${{ steps.preview_step.outputs.preview_url }}" + - name: Commit Doc if: ${{ github.ref == 'refs/heads/main' && github.repository == 'alibaba/GraphAr' }} run: | diff --git a/CMakeLists.txt b/CMakeLists.txt index 0a11dc989..9ddcd217e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -194,7 +194,7 @@ build_gar() # ------------------------------------------------------------------------------ # build example -# ------------------------------------------------------------------------------ +# ------------------------------------------------------------------------------ if (BUILD_EXAMPLES) find_package(Boost REQUIRED COMPONENTS graph) @@ -305,6 +305,8 @@ if(doxygen_EXECUTABLE AND sphinx_build_EXECUTABLE) COMMAND ${CMAKE_COMMAND} -E make_directory _build COMMAND ${doxygen_EXECUTABLE} COMMAND ${sphinx_build_EXECUTABLE} . _build/html + COMMAND cd ../spark && mvn scala:doc && rm -fr ${PROJECT_SOURCE_DIR}/docs/_build/html/reference/spark-api && + cp -fr target/site/scaladocs ${PROJECT_SOURCE_DIR}/docs/_build/html/reference/spark-api WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/docs VERBATIM ) diff --git a/docs/index.rst b/docs/index.rst index b0d849898..b9a5c689b 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -34,6 +34,6 @@ .. toctree:: :maxdepth: 1 :caption: API Reference - :hidden: - api-reference.rst + reference/api-reference-cpp.rst + Spark tool API Reference diff --git a/docs/api-reference.rst b/docs/reference/api-reference-cpp.rst similarity index 99% rename from docs/api-reference.rst rename to docs/reference/api-reference-cpp.rst index dbe83f947..5b11dc4f6 100644 --- a/docs/api-reference.rst +++ b/docs/reference/api-reference-cpp.rst @@ -1,4 +1,4 @@ -API Reference +C++ API Reference ============== .. _cpp-api: diff --git a/docs/reference/spark-api/index.rst b/docs/reference/spark-api/index.rst new file mode 100644 index 000000000..34ef83117 --- /dev/null +++ b/docs/reference/spark-api/index.rst @@ -0,0 +1,4 @@ +Spark API Reference (javadoc) +======================== + +Stub page for the Java reference docs; actual source is located in the spark-api/ directory. \ No newline at end of file diff --git a/include/gar/graph_info.h b/include/gar/graph_info.h index 8683809c8..476f98831 100644 --- a/include/gar/graph_info.h +++ b/include/gar/graph_info.h @@ -504,7 +504,7 @@ class EdgeInfo { } /** - * @brief Return property group that contains certain property and with adj + * @brief Return property group that contains property and with adj * list type * * @param property property name diff --git a/include/gar/reader/arrow_chunk_reader.h b/include/gar/reader/arrow_chunk_reader.h index ce69447b0..914e65d54 100644 --- a/include/gar/reader/arrow_chunk_reader.h +++ b/include/gar/reader/arrow_chunk_reader.h @@ -141,7 +141,7 @@ class AdjListArrowChunkReader { * @brief Initialize the AdjListArrowChunkReader. * * @param edge_info The edge info that describes the edge type. - * @param adj_list_type The adj list type for the edges. + * @param adj_list_type The adj list type for the edge. * @param prefix The absolute prefix. * @param vertex_chunk_index The vertex chunk index, default is 0. */ diff --git a/include/gar/utils/data_type.h b/include/gar/utils/data_type.h index 4ee9b5dcb..9cf7cb987 100644 --- a/include/gar/utils/data_type.h +++ b/include/gar/utils/data_type.h @@ -32,13 +32,13 @@ namespace GAR_NAMESPACE_INTERNAL { /// \brief Main data type enumeration enum class Type { - /// Boolean as 1 bit, LSB bit-packed ordering + /// Boolean BOOL = 0, - /// Signed 32-bit little-endian integer + /// Signed 32-bit integer INT32, - /// Signed 64-bit little-endian integer + /// Signed 64-bit integer INT64, /// 4-byte floating point value @@ -47,7 +47,7 @@ enum class Type { /// 8-byte floating point value DOUBLE, - /// UTF8 variable-length string as List + /// UTF8 variable-length string STRING, /// User-defined data type diff --git a/spark/pom.xml b/spark/pom.xml index 8d69d3a15..d350582a8 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -189,6 +189,17 @@ + + net.alchim31.maven + scala-maven-plugin + 4.8.0 + + + -Xms64m + -Xmx1024m + + + jar diff --git a/spark/src/main/java/com/alibaba/graphar/GeneralParams.java b/spark/src/main/java/com/alibaba/graphar/GeneralParams.java index 6c8868199..0ba691e2a 100644 --- a/spark/src/main/java/com/alibaba/graphar/GeneralParams.java +++ b/spark/src/main/java/com/alibaba/graphar/GeneralParams.java @@ -15,6 +15,7 @@ package com.alibaba.graphar; +/** General constant parameters for graphar. */ public class GeneralParams { // column name public static final String vertexIndexCol = "_graphArVertexIndex"; diff --git a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala index bddd4ee09..bb48a30ea 100644 --- a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala +++ b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala @@ -20,6 +20,7 @@ import org.yaml.snakeyaml.Yaml import org.yaml.snakeyaml.constructor.Constructor import scala.beans.BeanProperty +/** Edge info is a class to store the edge meta information. */ class EdgeInfo() { @BeanProperty var src_label: String = "" @BeanProperty var edge_label: String = "" @@ -32,6 +33,11 @@ class EdgeInfo() { @BeanProperty var adj_lists = new java.util.ArrayList[AdjList]() @BeanProperty var version: String = "" + /** Check if the edge info supports the adj list type. + * + * @param adj_list_type adjList type in gar to check. + * @return true if edge info supports the adj list type, otherwise return false. + */ def containAdjList(adj_list_type: AdjListType.Value): Boolean = { val tot: Int = adj_lists.size for ( k <- 0 to tot - 1 ) { @@ -42,6 +48,12 @@ class EdgeInfo() { return false } + /** Get path prefix of adj list type. + * + * @param adj_list_type The input adj list type in gar. + * @return path prefix of the adj list type, if edge info not support the adj list type, + * raise an IllegalArgumentException error. + */ def getAdjListPrefix(adj_list_type: AdjListType.Value): String = { val tot: Int = adj_lists.size for ( k <- 0 to tot - 1 ) { @@ -57,6 +69,12 @@ class EdgeInfo() { throw new IllegalArgumentException } + /** Get the adj list topology chunk file type of adj list type. + * + * @param adj_list_type the input adj list type. + * @return file format tupe in gar of the adj list type, if edge info not support the adj list type, + * raise an IllegalArgumentException error. + */ def getAdjListFileType(adj_list_type: AdjListType.Value): FileType.Value = { val tot: Int = adj_lists.size for ( k <- 0 to tot - 1 ) { @@ -68,6 +86,12 @@ class EdgeInfo() { throw new IllegalArgumentException } + /** Get the property groups of adj list type. + * + * @param adj_list_type the input adj list type. + * @return property group of the input adj list type, if edge info not support the adj list type, + * raise an IllegalArgumentException error. + */ def getPropertyGroups(adj_list_type: AdjListType.Value): java.util.ArrayList[PropertyGroup] = { val tot: Int = adj_lists.size for ( k <- 0 to tot - 1 ) { @@ -79,6 +103,14 @@ class EdgeInfo() { throw new IllegalArgumentException } + /** Check if the edge info contains the property group in cerain adj list structure. + * + * @param property_group the property group to check. + * @param adj_list_type the type of adj list structure. + * @return true if the edge info contains the property group in cerain adj list structure. + * If edge info not support the given adj list type or not contains the proerpty group in the adj list structure, + * return false. + */ def containPropertyGroup(property_group: PropertyGroup, adj_list_type: AdjListType.Value): Boolean = { val tot: Int = adj_lists.size for ( k <- 0 to tot - 1 ) { @@ -97,6 +129,11 @@ class EdgeInfo() { return false } + /** Check if the edge info contains the property. + * + * @param property_name name of the property. + * @return true if edge info contains the property, otherwise false. + */ def containProperty(property_name: String): Boolean = { val tot: Int = adj_lists.size for ( k <- 0 to tot - 1 ) { @@ -117,6 +154,13 @@ class EdgeInfo() { return false } + /** Get property group that contains property with adj list type. + * + * @param property_name name of the property. + * @param adj_list_type the type of adj list structure. + * @return property group that contains the property. If edge info not support the adj list type, + * or not find the property group that contains the property, return false. + */ def getPropertyGroup(property_name: String, adj_list_type: AdjListType.Value): PropertyGroup = { val tot: Int = adj_lists.size for ( k <- 0 to tot - 1 ) { @@ -139,6 +183,12 @@ class EdgeInfo() { throw new IllegalArgumentException } + /** Get the data type of property. + * + * @param property_name name of the property. + * @return data type in gar of the property. If edge info not contains the property, + * raise an IllegalArgumentException error. + */ def getPropertyType(property_name: String): GarType.Value = { val tot: Int = adj_lists.size for ( k <- 0 to tot - 1 ) { @@ -159,6 +209,12 @@ class EdgeInfo() { throw new IllegalArgumentException } + /** Check the property is primary key of edge info. + * + * @param property_name name of the property. + * @return true if the property is the primary key of edge info, false if not. + * If edge info not contains the property, raise an IllegalArgumentException error. + */ def isPrimaryKey(property_name: String): Boolean = { val tot: Int = adj_lists.size for ( k <- 0 to tot - 1 ) { @@ -179,6 +235,7 @@ class EdgeInfo() { throw new IllegalArgumentException } + /** Get Primary key of edge info. */ def getPrimaryKey(): String = { val tot: Int = adj_lists.size for ( k <- 0 to tot - 1 ) { @@ -199,6 +256,7 @@ class EdgeInfo() { return "" } + /** Check if the edge info is validated. */ def isValidated(): Boolean = { if (src_label == "" || edge_label == "" || dst_label == "") return false @@ -222,6 +280,14 @@ class EdgeInfo() { return true } + /** Get the adj list offset chunk file path of vertex chunk + * the offset chunks is aligned with the vertex chunks + * + * @param chunk_inde index of vertex chunk. + * @param adj_list_type type of adj list structure. + * @return the offset chunk file path. If edge info not support the adj list type, + * raise an IllegalArgumentException error. + */ def getAdjListOffsetFilePath(chunk_index: Long, adj_list_type: AdjListType.Value) : String = { if (containAdjList(adj_list_type) == false) throw new IllegalArgumentException @@ -230,28 +296,63 @@ class EdgeInfo() { return str } + /** Get the adj list offset chunk file directory path of adj list type. + * + * @param adj_list_type type of adj list structure. + * @return the offset directory. If edge info not support the adj list type, + * raise an IllegalArgumentException error. + */ def getAdjListOffsetDirPath(adj_list_type: AdjListType.Value) : String = { if (containAdjList(adj_list_type) == false) throw new IllegalArgumentException return prefix + getAdjListPrefix(adj_list_type) + "offset/" } + /** Get the file path of adj list topology chunk. + * + * @param vertex_chunk_index index of vertex chunk. + * @param chunk_index index of edge chunk. + * @param adj_list_type type of adj list structure. + * @return adj list chunk file path. + */ def getAdjListFilePath(vertex_chunk_index: Long, chunk_index: Long, adj_list_type: AdjListType.Value) : String = { var str: String = prefix + getAdjListPrefix(adj_list_type) + "adj_list/part" + vertex_chunk_index.toString() + "/chunk" + chunk_index.toString() return str } + /** Get the path of adj list topology chunk of certain vertex chunk. + * + * @param vertex_chunk_index index of vertex chunk. + * @param adj_list_type type of adj list structure. + * @return path prefix of the edge chunk of vertices of given vertex chunk. + */ def getAdjListFilePath(vertex_chunk_index: Long, adj_list_type: AdjListType.Value) : String = { var str: String = prefix + getAdjListPrefix(adj_list_type) + "adj_list/part" + vertex_chunk_index.toString() + "/" return str } + /** Get the adj list topology chunk file directory path of adj list type. + * + * @param adj_list_type type of adj list structure. + * @return directory path of adj list type. + */ def getAdjListDirPath(adj_list_type: AdjListType.Value) : String = { return prefix + getAdjListPrefix(adj_list_type) + "adj_list/" } + /** Get the chunk file path of adj list property group. + * the property group chunks is aligned with the adj list topology chunks + * + * @param property_group property group + * @param adj_list_type type of adj list structure. + * @param vertex_chunk_index index of vertex chunk. + * @param chunk_index index of edge chunk. + * + * @return property group chunk file path. If edge info not contains the property group, + * raise an IllegalArgumentException error. + */ def getPropertyFilePath(property_group: PropertyGroup, adj_list_type: AdjListType.Value, vertex_chunk_index: Long, chunk_index: Long) : String = { if (containPropertyGroup(property_group, adj_list_type) == false) throw new IllegalArgumentException @@ -271,6 +372,15 @@ class EdgeInfo() { return str } + /** Get path of adj list property group of certain vertex chunk. + * + * @param property_group property group. + * @param adj_list_type type of adj list structure. + * @param vertex_chunk_index index of vertex chunk. + * @return path prefix of property group chunks of of vertices of given vertex chunk. + * If edge info not contains the property group, + * raise an IllegalArgumentException error. + */ def getPropertyFilePath(property_group: PropertyGroup, adj_list_type: AdjListType.Value, vertex_chunk_index: Long) : String = { if (containPropertyGroup(property_group, adj_list_type) == false) throw new IllegalArgumentException @@ -290,6 +400,13 @@ class EdgeInfo() { return str } + /** Get the property group chunk file directory path of adj list type. + * + * @param property_group property group. + * @param adj_list_type type of adj list structure. + * @return directory path of property group chunks. If edge info not contains the property group, + * raise an IllegalArgumentException error. + */ def getPropertyDirPath(property_group: PropertyGroup, adj_list_type: AdjListType.Value) : String = { if (containPropertyGroup(property_group, adj_list_type) == false) throw new IllegalArgumentException diff --git a/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala b/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala index 52ad38c66..906086f6c 100644 --- a/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala +++ b/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala @@ -20,15 +20,28 @@ import org.yaml.snakeyaml.Yaml import org.yaml.snakeyaml.constructor.Constructor import scala.beans.BeanProperty +/** Main data type in gar enumeration */ object GarType extends Enumeration{ type GarType = Value + /** Boolean type */ val BOOL = Value(0) + /** Signed 32-bit integer */ val INT32 = Value(2) + /** Signed 64-bit integer */ val INT64 = Value(3) + /** 4-byte floating point value */ val FLOAT = Value(4) + /** 8-byte floating point value */ val DOUBLE = Value(5) + /** UTF8 variable-length string */ val STRING = Value(6) + /** Data type in gar to string. + * + * @param gar_type gar data type + * @return string name of gar data type. + * + */ def GarTypeToString(gar_type: GarType.Value): String = gar_type match { case GarType.BOOL => "bool" case GarType.INT32 => "int32" @@ -39,6 +52,12 @@ object GarType extends Enumeration{ case _ => throw new IllegalArgumentException } + /** String name to data type in gar. + * + * @param str the string name of gar data type + * @return gar data type. + * + */ def StringToGarType(str: String): GarType.Value = str match { case "bool" => GarType.BOOL case "int32" => GarType.INT32 @@ -50,12 +69,19 @@ object GarType extends Enumeration{ } } +/** Type of file format. */ object FileType extends Enumeration { type FileType = Value val CSV = Value(0) val PARQUET = Value(1) val ORC = Value(2) + /** File type to string. + * + * @param file_type file type in gar + * @return string name of file type. + * + */ def FileTypeToString(file_type: FileType.Value): String = file_type match { case FileType.CSV => "csv" case FileType.PARQUET => "parquet" @@ -63,6 +89,12 @@ object FileType extends Enumeration { case _ => throw new IllegalArgumentException } + /** String to file type. + * + * @param str the string name of file type + * @return file type in gar. + * + */ def StringToFileType(str: String): FileType.Value = str match { case "csv" => FileType.CSV case "parquet" => FileType.PARQUET @@ -72,13 +104,19 @@ object FileType extends Enumeration { } +/** Adj list type enumeration for adjacency list of graph. */ object AdjListType extends Enumeration { type AdjListType = Value + /** collection of edges by source, but unordered, can represent COO format */ val unordered_by_source = Value(0) + /** collection of edges by destination, but unordered, can represent COO format */ val unordered_by_dest = Value(1) + /** collection of edges by source, ordered by source, can represent CSR format */ val ordered_by_source = Value(2) + /** collection of edges by destination, ordered by destination, can represent CSC format */ val ordered_by_dest = Value(3) + /** adjList type in gar to string */ def AdjListTypeToString(adjList_type: AdjListType.Value): String = adjList_type match { case AdjListType.unordered_by_source => "unordered_by_source" case AdjListType.unordered_by_dest => "unordered_by_dest" @@ -87,6 +125,7 @@ object AdjListType extends Enumeration { case _ => throw new IllegalArgumentException } + /** String to adjList type in gar */ def StringToAdjListType(str: String): AdjListType.Value = str match { case "unordered_by_source" => AdjListType.unordered_by_source case "unordered_by_dest" => AdjListType.unordered_by_dest @@ -96,26 +135,31 @@ object AdjListType extends Enumeration { } } +/** The property information of vertex or edge. */ class Property () { @BeanProperty var name: String = "" @BeanProperty var data_type: String = "" @BeanProperty var is_primary: Boolean = false + /** Get data type in gar of the property */ def getData_type_in_gar: GarType.Value = { GarType.StringToGarType(data_type) } } +/** PropertyGroup is a class to store the property group information. */ class PropertyGroup () { @BeanProperty var prefix: String = "" @BeanProperty var file_type: String = "" @BeanProperty var properties = new java.util.ArrayList[Property]() + /** Get file type in gar of property group */ def getFile_type_in_gar: FileType.Value = { FileType.StringToFileType(file_type) } } +/** AdjList is a class to store the adj list information of edge. */ class AdjList () { @BeanProperty var ordered: Boolean = false @BeanProperty var aligned_by: String = "src" @@ -123,10 +167,12 @@ class AdjList () { @BeanProperty var file_type: String = "" @BeanProperty var property_groups = new java.util.ArrayList[PropertyGroup]() + /** Get file type in gar of adj list */ def getFile_type_in_gar: FileType.Value = { FileType.StringToFileType(file_type) } + /** Get adj list type in string*/ def getAdjList_type: String = { var str: String = "" if (ordered) { @@ -142,11 +188,13 @@ class AdjList () { return str } + /** Get adj list type in gar */ def getAdjList_type_in_gar: AdjListType.Value = { AdjListType.StringToAdjListType(getAdjList_type) } } +/** GraphInfo is is a class to store the graph meta information. */ class GraphInfo() { @BeanProperty var name: String = "" @BeanProperty var prefix: String = "" diff --git a/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala index 225f4999e..ecf9d80b1 100644 --- a/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala +++ b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala @@ -20,6 +20,7 @@ import org.yaml.snakeyaml.Yaml import org.yaml.snakeyaml.constructor.Constructor import scala.beans.BeanProperty +/** VertexInfo is a class to store the vertex meta information. */ class VertexInfo() { @BeanProperty var label: String = "" @BeanProperty var chunk_size: Long = 0 @@ -27,6 +28,11 @@ class VertexInfo() { @BeanProperty var property_groups = new java.util.ArrayList[PropertyGroup]() @BeanProperty var version: String = "" + /** Check if the vertex info contains the property group. + * + * @param property_group the property group to check. + * @return true if the vertex info contains the property group, otherwise false. + */ def containPropertyGroup(property_group: PropertyGroup): Boolean = { val len: Int = property_groups.size for ( i <- 0 to len - 1 ) { @@ -38,6 +44,11 @@ class VertexInfo() { return false } + /** Check if the vertex info contains certain property. + * + * @param property_name name of the property. + * @return true if the vertex info contains the property, otherwise false. + */ def containProperty(property_name: String): Boolean = { val len: Int = property_groups.size for ( i <- 0 to len - 1 ) { @@ -53,6 +64,11 @@ class VertexInfo() { return false } + /** Get the property group that contains property. + * + * @param property_name name of the property. + * @return property group that contains the property, otherwise raise IllegalArgumentException error. + */ def getPropertyGroup(property_name: String): PropertyGroup = { val len: Int = property_groups.size for ( i <- 0 to len - 1 ) { @@ -68,6 +84,12 @@ class VertexInfo() { throw new IllegalArgumentException } + /** Get the data type of property. + * + * @param property_name name of the property. + * @return the data type in gar of the proeprty. If the vertex info does not contains the property, + * raise IllegalArgumentException error. + */ def getPropertyType(property_name: String): GarType.Value = { val len: Int = property_groups.size for ( i <- 0 to len - 1 ) { @@ -83,6 +105,11 @@ class VertexInfo() { throw new IllegalArgumentException } + /** Check if the property is primary key. + * + * @param property_name name of the property to check. + * @return true if the property if the primary key of vertex info, otherwise return false. + */ def isPrimaryKey(property_name: String): Boolean = { val len: Int = property_groups.size for ( i <- 0 to len - 1 ) { @@ -98,6 +125,10 @@ class VertexInfo() { throw new IllegalArgumentException } + /** Get primary key of vertex info. + * + * @return name of the primary key. + */ def getPrimaryKey(): String = { val len: Int = property_groups.size for ( i <- 0 to len - 1 ) { @@ -113,6 +144,10 @@ class VertexInfo() { return "" } + /** Check if the vertex info is validated. + * + * @return true if the vertex info is validated, otherwise return false. + */ def isValidated(): Boolean = { if (label == "" || chunk_size <= 0) return false @@ -128,10 +163,17 @@ class VertexInfo() { return true } + /** Get the vertex num file path of vertex info. */ def getVerticesNumFilePath(): String = { return prefix + "vertex_count" } + /** Get the chunk file path of property group of vertex chunk. + * + * @param property_group the property group. + * @param chunk_index the index of vertex chunk + * @return chunk file path. + */ def getFilePath(property_group: PropertyGroup, chunk_index: Long): String = { if (containPropertyGroup(property_group) == false) throw new IllegalArgumentException @@ -151,6 +193,11 @@ class VertexInfo() { return prefix + str + "chunk" + chunk_index.toString() } + /** Get the chunk files directory path of property group. + * + * @param property_group the property group. + * @return the dirctory path that store the chunk files of property group. + */ def getDirPath(property_group: PropertyGroup): String = { if (containPropertyGroup(property_group) == false) throw new IllegalArgumentException diff --git a/spark/src/main/scala/com/alibaba/graphar/reader/EdgeReader.scala b/spark/src/main/scala/com/alibaba/graphar/reader/EdgeReader.scala index 4e3b2dcde..8a9ce0b62 100644 --- a/spark/src/main/scala/com/alibaba/graphar/reader/EdgeReader.scala +++ b/spark/src/main/scala/com/alibaba/graphar/reader/EdgeReader.scala @@ -23,11 +23,25 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ -class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.Value,spark:SparkSession) { - // load a single offset chunk as a DataFrame +/** Reader for edge chunks. + * + * @constructor create a new edge reader with edge info and AdjList type. + * @param prefix the absolute perfix. + * @param edgeInfo the edge info that describes the edge type. + * @param adjListType the adj list type for the edge. + * @param spark spark session for the reader to read chunks as Spark DataFrame. + */ +class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.Value, spark:SparkSession) { + if (edgeInfo.containAdjList(adjListType) == false) { + throw new IllegalArgumentException + } + + /** Load a single offset chunk as a DataFrame. + * + * @param chunk_index index of offset chunk + * @return offset chunk DataFrame. + */ def readOffset(chunk_index: Long): DataFrame = { - if (edgeInfo.containAdjList(adjListType) == false) - throw new IllegalArgumentException if (adjListType != AdjListType.ordered_by_source && adjListType != AdjListType.ordered_by_dest) throw new IllegalArgumentException val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType) @@ -37,10 +51,13 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V return df } - // load a single AdjList chunk as a DataFrame + /** Load a single AdjList chunk as a DataFrame. + * + * @param vertex_chunk_index index of vertex chunk + * @param chunk_index index of AdjList chunk. + * @return AdjList chunk DataFrame + */ def readAdjListChunk(vertex_chunk_index: Long, chunk_index: Long): DataFrame = { - if (edgeInfo.containAdjList(adjListType) == false) - throw new IllegalArgumentException val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType) val file_type = FileType.FileTypeToString(file_type_in_gar) val file_path = prefix + "/" + edgeInfo.getAdjListFilePath(vertex_chunk_index, chunk_index, adjListType) @@ -48,10 +65,13 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V return df } - // load all AdjList chunks for a vertex chunk as a DataFrame + /** Load all AdjList chunks for a vertex chunk as a DataFrame. + * + * @param vertex_chunk_index index of vertex chunk. + * @param addIndex flag that add edge index column or not in the final DataFrame. + * @return DataFrame of all AdjList chunks of vertices in given vertex chunk. + */ def readAdjListForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = false): DataFrame = { - if (edgeInfo.containAdjList(adjListType) == false) - throw new IllegalArgumentException val file_path = prefix + "/" + edgeInfo.getAdjListFilePath(vertex_chunk_index, adjListType) val file_system = FileSystem.get(new Path(file_path).toUri(), spark.sparkContext.hadoopConfiguration) val path_pattern = new Path(file_path + "chunk*") @@ -69,10 +89,12 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V return df } - // load all AdjList chunks for this edge type as a DataFrame + /** Load all AdjList chunks for this edge type as a DataFrame. + * + * @param addIndex flag that add index column or not in the final DataFrame. + * @return DataFrame of all AdjList chunks. + */ def readAllAdjList(addIndex: Boolean = false): DataFrame = { - if (edgeInfo.containAdjList(adjListType) == false) - throw new IllegalArgumentException val file_path = prefix + "/" + edgeInfo.getAdjListDirPath(adjListType) val file_system = FileSystem.get(new Path(file_path).toUri(), spark.sparkContext.hadoopConfiguration) val path_pattern = new Path(file_path + "part*") @@ -90,10 +112,15 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V return df } - // load a single edge property chunk as a DataFrame + /** Load a single edge property chunk as a DataFrame. + * + * @param propertyGroup property group. + * @param vertex_chunk_index index of vertex chunk. + * @param chunk_index index of property group chunk. + * @return property group chunk DataFrame. If edge info does not contain the property group, + * raise an IllegalArgumentException error. + */ def readEdgePropertyChunk(propertyGroup: PropertyGroup, vertex_chunk_index: Long, chunk_index: Long): DataFrame = { - if (edgeInfo.containAdjList(adjListType) == false) - throw new IllegalArgumentException if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false) throw new IllegalArgumentException val file_type = propertyGroup.getFile_type(); @@ -102,10 +129,15 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V return df } - // load the chunks for a property group of a vertex chunk as a DataFrame + /** Load the chunks for a property group of a vertex chunk as a DataFrame. + * + * @param propertyGroup property group. + * @param vertex_chunk_index index of vertex chunk. + * @param addIndex flag that add edge index column or not in the final DataFrame. + * @return DataFrame that contains all property group chunks of vertices in given vertex chunk. + * If edge info does not contain the property group, raise an IllegalArgumentException error. + */ def readEdgePropertiesForVertexChunk(propertyGroup: PropertyGroup, vertex_chunk_index: Long, addIndex: Boolean = false): DataFrame = { - if (edgeInfo.containAdjList(adjListType) == false) - throw new IllegalArgumentException if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false) throw new IllegalArgumentException val file_path = prefix + "/" + edgeInfo.getPropertyFilePath(propertyGroup, adjListType, vertex_chunk_index) @@ -125,10 +157,14 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V return df } - // load all chunks for a property group as a DataFrame + /** Load all chunks for a property group as a DataFrame. + * + * @param propertyGroup property group. + * @param addIndex flag that add edge index column or not in the final DataFrame. + * @return DataFrame that contains all chunks of property group. + * If edge info does not contain the property group, raise an IllegalArgumentException error. + */ def readEdgeProperties(propertyGroup: PropertyGroup, addIndex: Boolean = false): DataFrame = { - if (edgeInfo.containAdjList(adjListType) == false) - throw new IllegalArgumentException if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false) throw new IllegalArgumentException val file_path = prefix + "/" + edgeInfo.getPropertyDirPath(propertyGroup, adjListType) @@ -148,10 +184,13 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V return df } - // load the chunks for all property groups of a vertex chunk as a DataFrame + /** Load the chunks for all property groups of a vertex chunk as a DataFrame. + * + * @param vertex_chunk_index index of vertex chunk. + * @param addIndex flag that add edge index column or not in the final DataFrame. + * @return DataFrame that contains all property groups chunks of a vertex chunk. + */ def readAllEdgePropertiesForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = false): DataFrame = { - if (edgeInfo.containAdjList(adjListType) == false) - throw new IllegalArgumentException var df = spark.emptyDataFrame val property_groups = edgeInfo.getPropertyGroups(adjListType) val len: Int = property_groups.size @@ -168,11 +207,13 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V df = df.drop(GeneralParams.edgeIndexCol) return df } - - // load the chunks for all property groups as a DataFrame + + /** Load the chunks for all property groups as a DataFrame. + * + * @param addIndex flag that add edge index column or not in the final DataFrame. + * @return DataFrame tha contains all property groups chunks of edge. + */ def readAllEdgeProperties(addIndex: Boolean = false): DataFrame = { - if (edgeInfo.containAdjList(adjListType) == false) - throw new IllegalArgumentException var df = spark.emptyDataFrame val property_groups = edgeInfo.getPropertyGroups(adjListType) val len: Int = property_groups.size @@ -190,10 +231,13 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V return df } - // load the chunks for the AdjList and all property groups for a vertex chunk as a DataFrame + /** Load the chunks for the AdjList and all property groups for a vertex chunk as a DataFrame. + * + * @param vertex_chunk_index index of vertex chunk + * @param addIndex flag that add edge index column or not in the final DataFrame. + * @return DataFrame that contains all chunks of AdjList and property groups of vertices in given vertex chunk. + */ def readEdgesForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = false): DataFrame = { - if (edgeInfo.containAdjList(adjListType) == false) - throw new IllegalArgumentException val adjList_df = readAdjListForVertexChunk(vertex_chunk_index, true) val properties_df = readAllEdgePropertiesForVertexChunk(vertex_chunk_index, true) var df = adjList_df.join(properties_df, Seq(GeneralParams.edgeIndexCol)).sort(GeneralParams.edgeIndexCol) @@ -202,10 +246,12 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V return df } - // load the chunks for the AdjList and all property groups as a DataFrame + /** Load the chunks for the AdjList and all property groups as a DataFrame. + * + * @param addIndex flag that add edge index column or not in the final DataFrame. + * @return DataFrame that contains all chunks of AdjList and property groups of edge. + */ def readEdges(addIndex: Boolean = false): DataFrame = { - if (edgeInfo.containAdjList(adjListType) == false) - throw new IllegalArgumentException val adjList_df = readAllAdjList(true) val properties_df = readAllEdgeProperties(true) var df = adjList_df.join(properties_df, Seq(GeneralParams.edgeIndexCol)).sort(GeneralParams.edgeIndexCol); diff --git a/spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala b/spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala index 7032c1a22..15e2d0c96 100644 --- a/spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala +++ b/spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala @@ -23,6 +23,13 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ +/** Reader for vertex chunks. + * + * @constructor create a new vertex reader with vertex info. + * @param prefix the absolute prefix. + * @param vertexInfo the vertex info that describes the vertex type. + * @param spark spark session for the reader to read chunks as Spark DataFrame. + */ class VertexReader(prefix: String, vertexInfo: VertexInfo, spark: SparkSession) { private val vertices_number = readVerticesNumber() private val chunk_size = vertexInfo.getChunk_size() @@ -30,7 +37,7 @@ class VertexReader(prefix: String, vertexInfo: VertexInfo, spark: SparkSession) if (vertices_number % chunk_size != 0) chunk_number = chunk_number + 1 - // load the total number of vertices for this vertex type + /** Load the total number of vertices for this vertex type. */ def readVerticesNumber(): Long = { val file_path = prefix + "/" + vertexInfo.getVerticesNumFilePath() val path = new Path(file_path) @@ -41,7 +48,12 @@ class VertexReader(prefix: String, vertexInfo: VertexInfo, spark: SparkSession) return number } - // load a single vertex property chunk as a DataFrame + /** Load a single vertex property chunk as a DataFrame. + * + * @param propertyGroup property group. + * @param chunk_index index of vertex chunk. + * @return vertex property chunk DataFrame. + */ def readVertexPropertyChunk(propertyGroup: PropertyGroup, chunk_index: Long): DataFrame = { if (vertexInfo.containPropertyGroup(propertyGroup) == false) throw new IllegalArgumentException @@ -51,7 +63,12 @@ class VertexReader(prefix: String, vertexInfo: VertexInfo, spark: SparkSession) return df } - // load all chunks for a property group as a DataFrame + /** Load all chunks for a property group as a DataFrame. + * + * @param propertyGroup property group. + * @param addIndex flag that add vertex index column or not in the final DataFrame. + * @return DataFrame that contains all chunks of property group. + */ def readVertexProperties(propertyGroup: PropertyGroup, addIndex: Boolean = false): DataFrame = { if (vertexInfo.containPropertyGroup(propertyGroup) == false) throw new IllegalArgumentException @@ -68,7 +85,11 @@ class VertexReader(prefix: String, vertexInfo: VertexInfo, spark: SparkSession) return df } - // load the chunks for all property groups as a DataFrame + /** Load the chunks for all property groups as a DataFrame. + * + * @param addIndex flag that add vertex index column or not in the final DataFrame. + * @return DataFrame that contains all property group chunks of vertex. + */ def readAllVertexProperties(addIndex: Boolean = false): DataFrame = { var df = spark.emptyDataFrame val property_groups = vertexInfo.getProperty_groups() diff --git a/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala b/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala index 2ce28a7d2..45d46961b 100644 --- a/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala +++ b/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala @@ -20,6 +20,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.DataFrame import org.apache.hadoop.fs +/** Helper object to write dataframe to chunk files */ object FileSystem { private def renameSparkGeneratedFiles(spark: SparkSession, filePrefix: String): Unit = { val sc = spark.sparkContext @@ -33,6 +34,12 @@ object FileSystem { } } + /** Write input dataframe to output path with certain file format. + * + * @param dataframe DataFrame to write out. + * @param fileType output file format type, the value could be csv|parquet|orc. + * @param outputPrefix output path prefix. + */ def writeDataFrame(dataFrame: DataFrame, fileType: String, outputPrefix: String): Unit = { val spark = dataFrame.sparkSession spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") diff --git a/spark/src/main/scala/com/alibaba/graphar/utils/IndexGenerator.scala b/spark/src/main/scala/com/alibaba/graphar/utils/IndexGenerator.scala index 2e6f594f9..f611794ec 100644 --- a/spark/src/main/scala/com/alibaba/graphar/utils/IndexGenerator.scala +++ b/spark/src/main/scala/com/alibaba/graphar/utils/IndexGenerator.scala @@ -27,11 +27,17 @@ import org.apache.spark.rdd.RDD import scala.collection.SortedMap import scala.collection.mutable.ArrayBuffer +/** IndexGenerator is a object to help to generate the indices for vertex/edge DataFrames. */ object IndexGenerator { // index helper for the vertex DataFrame - // return a DataFrame contains two columns: vertex index & primary key + /** Generate a vertex index mapping of primary key.a DataFrame contains two columns: vertex index & primary key + * + * @param vertexDf input vertex DataFrame. + * @param primaryKey the primary key of vertex + * @return a DataFrame contains two columns: vertex index & primary key. + */ def constructVertexIndexMapping(vertexDf: DataFrame, primaryKey: String): DataFrame = { val spark = vertexDf.sparkSession val schema = vertexDf.schema @@ -54,7 +60,11 @@ object IndexGenerator { spark.createDataFrame(mapping, mapping_schema).withColumnRenamed(primaryKey, GeneralParams.primaryCol) } - //add a column contains vertex index + /** Add a column contains vertex index to DataFrame + * + * @param vertexDf the input vertex DataFrame. + * @return DataFrame that contains a new vertex index column. + */ def generateVertexIndexColumn(vertexDf: DataFrame): DataFrame = { val spark = vertexDf.sparkSession val schema = vertexDf.schema @@ -76,9 +86,9 @@ object IndexGenerator { spark.createDataFrame(rdd_with_index, schema_with_index) } - //index helper for the Edge DataFrame + // index helper for the Edge DataFrame - //add a column contains edge index + /** Add a column contains edge index to input edge DataFrame. */ def generateEdgeIndexColumn(edgeDf: DataFrame): DataFrame = { val spark = edgeDf.sparkSession val schema = edgeDf.schema @@ -100,7 +110,7 @@ object IndexGenerator { spark.createDataFrame(rdd_with_index, schema_with_index) } - // join the edge table with the vertex index mapping for source column + /** Join the edge table with the vertex index mapping for source column. */ def generateSrcIndexForEdgesFromMapping(edgeDf: DataFrame, srcColumnName: String, srcIndexMapping: DataFrame): DataFrame = { val spark = edgeDf.sparkSession srcIndexMapping.createOrReplaceTempView("src_vertex") @@ -113,7 +123,7 @@ object IndexGenerator { trans_df.drop(srcColumnName) } - // join the edge table with the vertex index mapping for destination column + /** Join the edge table with the vertex index mapping for destination column. */ def generateDstIndexForEdgesFromMapping(edgeDf: DataFrame, dstColumnName: String, dstIndexMapping: DataFrame): DataFrame = { val spark = edgeDf.sparkSession dstIndexMapping.createOrReplaceTempView("dst_vertex") @@ -126,27 +136,27 @@ object IndexGenerator { trans_df.drop(dstColumnName) } - // join the edge table with the vertex index mapping for source & destination columns + /** Join the edge table with the vertex index mapping for source & destination columns. */ def generateSrcAndDstIndexForEdgesFromMapping(edgeDf: DataFrame, srcColumnName: String, dstColumnName: String, srcIndexMapping: DataFrame, dstIndexMapping: DataFrame): DataFrame = { val df_with_src_index = generateSrcIndexForEdgesFromMapping(edgeDf, srcColumnName, srcIndexMapping) generateDstIndexForEdgesFromMapping(df_with_src_index, dstColumnName, dstIndexMapping) } - // construct vertex index for source column + /** Construct vertex index for source column. */ def generateSrcIndexForEdges(edgeDf: DataFrame, srcColumnName: String): DataFrame = { val srcDf = edgeDf.select(srcColumnName).distinct() val srcIndexMapping = constructVertexIndexMapping(srcDf, srcColumnName) generateSrcIndexForEdgesFromMapping(edgeDf, srcColumnName, srcIndexMapping) } - // construct vertex index for destination column + /** Construct vertex index for destination column. */ def generateDstIndexForEdges(edgeDf: DataFrame, dstColumnName: String): DataFrame = { val dstDf = edgeDf.select(dstColumnName).distinct() val dstIndexMapping = constructVertexIndexMapping(dstDf, dstColumnName) generateDstIndexForEdgesFromMapping(edgeDf, dstColumnName, dstIndexMapping) } - // union and construct vertex index for source & destination columns + /** Union and construct vertex index for source & destination columns. */ def generateSrcAndDstIndexUnitedlyForEdges(edgeDf: DataFrame, srcColumnName: String, dstColumnName: String): DataFrame = { val srcDf = edgeDf.select(srcColumnName) val dstDf = edgeDf.select(dstColumnName) diff --git a/spark/src/main/scala/com/alibaba/graphar/utils/Patitioner.scala b/spark/src/main/scala/com/alibaba/graphar/utils/Patitioner.scala index 7d68a7ef0..614289efc 100644 --- a/spark/src/main/scala/com/alibaba/graphar/utils/Patitioner.scala +++ b/spark/src/main/scala/com/alibaba/graphar/utils/Patitioner.scala @@ -18,7 +18,13 @@ package com.alibaba.graphar.utils import org.apache.spark.sql.types._ import org.apache.spark.Partitioner - +/** Partioner for vertex/edge DataFrame to partition by chunk size. + * + * @constructor create a new chunk partitioner + * @param partitions partition num. + * @chunk_size size of vertex or edge chunk. + * + */ class ChunkPartitioner(partitions: Int, chunk_size: Long) extends Partitioner { require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala index f3c49bb6a..b14f2f20c 100644 --- a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala +++ b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.functions._ import scala.collection.SortedMap import scala.collection.mutable.ArrayBuffer +/** Helper object for EdgeWriter class. */ object EdgeWriter { // split the whole edge dataframe into chunk dataframes by vertex chunk size. private def split(edgeDf: DataFrame, keyColumnName: String, vertexChunkSize: Long): Seq[DataFrame] = { @@ -102,6 +103,14 @@ object EdgeWriter { } } +/** Writer for edge dataframe. + * + * @constructor create a new writer for edge dataframe with edge info. + * @param prefix the absolute prefix. + * @param edgeInfo the edge info that describes the ede type. + * @param adjListType the adj list type for the edge. + * @param edgeDf the input edge DataFrame. + */ class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.Value, edgeDf: DataFrame) { private var chunks: Seq[DataFrame] = preprocess() @@ -156,7 +165,7 @@ class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V } } - // generate the chunks of AdjList from edge dataframe for this edge type + /** Generate the chunks of AdjList from edge dataframe for this edge type. */ def writeAdjList(): Unit = { val file_type = edgeInfo.getAdjListFileType(adjListType) var chunk_index: Long = 0 @@ -172,7 +181,10 @@ class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V } } - // generate the chunks of the property group from edge dataframe + /** Generate the chunks of the property group from edge dataframe. + * + * @param propertyGroup property group + */ def writeEdgeProperties(propertyGroup: PropertyGroup): Unit = { if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false) { throw new IllegalArgumentException @@ -193,7 +205,7 @@ class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V } } - // generate the chunks of all property groups from edge dataframe + /** Generate the chunks of all property groups from edge dataframe. */ def writeEdgeProperties(): Unit = { val property_groups = edgeInfo.getPropertyGroups(adjListType) val it = property_groups.iterator @@ -203,7 +215,7 @@ class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V } } - // generate the chunks for the AdjList and all property groups from edge dataframe + /** Generate the chunks for the AdjList and all property groups from edge dataframe. */ def writeEdges(): Unit = { writeAdjList() writeEdgeProperties() diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala index 41780114c..4fb7b8670 100644 --- a/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala +++ b/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala @@ -31,6 +31,7 @@ import scala.collection.SortedMap import scala.collection.mutable.ArrayBuffer +/** Helper object for VertexWriter class. */ object VertexWriter { private def repartitionAndSort(vertexDf: DataFrame, chunkSize: Long): DataFrame = { val vertex_df_schema = vertexDf.schema @@ -45,6 +46,13 @@ object VertexWriter { } } +/** Writer for vertex DataFrame. + * + * @constructor create a new writer for vertex DataFrame with vertex info. + * @param prefix the absolute prefix. + * @param vertexInfo the vertex info that describes the vertex type. + * @param vertexDf the input vertex DataFrame. + */ class VertexWriter(prefix: String, vertexInfo: VertexInfo, vertexDf: DataFrame) { private var chunks:DataFrame = preprocess() private val spark = vertexDf.sparkSession @@ -58,7 +66,10 @@ class VertexWriter(prefix: String, vertexInfo: VertexInfo, vertexDf: DataFrame) return VertexWriter.repartitionAndSort(vertexDf, vertexInfo.getChunk_size()) } - // generate chunks of the property group for vertex dataframe + /** Generate chunks of the property group for vertex dataframe. + * + * @param propertyGroup property group + */ def writeVertexProperties(propertyGroup: PropertyGroup): Unit = { // check if contains the property group if (vertexInfo.containPropertyGroup(propertyGroup) == false) { @@ -77,7 +88,7 @@ class VertexWriter(prefix: String, vertexInfo: VertexInfo, vertexDf: DataFrame) FileSystem.writeDataFrame(pg_df, propertyGroup.getFile_type(), output_prefix) } - // generate chunks of all property groups for vertex dataframe + /** Generate chunks of all property groups for vertex dataframe. */ def writeVertexProperties(): Unit = { val property_groups = vertexInfo.getProperty_groups() val it = property_groups.iterator