diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 00000000000..c0bab04941d --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.2.8 diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index 9ac31c648c2..2faee1c66f2 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -7,5 +7,4 @@ if (NOT SKIP_JAVA_CLIENT) add_subdirectory(importer) else() message(STATUS "Skip building the importer") -endif() - +endif() \ No newline at end of file diff --git a/src/tools/native-client/CMakeLists.txt b/src/tools/native-client/CMakeLists.txt index 0d41b8bf7bb..b99422febc9 100644 --- a/src/tools/native-client/CMakeLists.txt +++ b/src/tools/native-client/CMakeLists.txt @@ -1,7 +1,8 @@ +# locate jni header include_directories($ENV{JAVA_HOME}/include $ENV{JAVA_HOME}/include/linux) -add_library(nebula_native_client SHARED +add_library(nebula_native_client SHARED $ $ $ diff --git a/src/tools/native-client/build.sh b/src/tools/native-client/build.sh index 9a1f4c52a99..44b42546ba9 100755 --- a/src/tools/native-client/build.sh +++ b/src/tools/native-client/build.sh @@ -1,4 +1,3 @@ #!/bin/bash -mvn clean compile package -DskipTests -mvn test +mvn clean package -X diff --git a/src/tools/native-client/pom.xml b/src/tools/native-client/pom.xml index 0af3a84b1f8..30b0c3ee00d 100644 --- a/src/tools/native-client/pom.xml +++ b/src/tools/native-client/pom.xml @@ -12,18 +12,13 @@ com.vesoft native-client - 0.0.1 + 0.1.0 ../../.. - - org.rocksdb - rocksdbjni - 5.15.10 - org.slf4j slf4j-api @@ -76,10 +71,14 @@ javah - ${basedir}/src/main/cpp - com_vesoft_client_NativeClient.h + ${basedir}/src/main/cpp + + + com_vesoft_client_NativeClient.h + - com.vesoft.client.NativeClient + com.vesoft.client.NativeClient + @@ -98,12 +97,14 @@ UTF-8 - ${basedir}/src/main/resources/ + ${basedir}/src/main/resources/ + _build - libnebula_native_client.so + libnebula_native_client.so + false @@ -117,11 +118,14 @@ maven-checkstyle-plugin 3.0.0 - ${project.root.dir}/.linters/java/nebula_java_style_checks.xml + + ${project.root.dir}/.linters/java/nebula_java_style_checks.xml + UTF-8 true false - true + true + 0 warning @@ -142,6 +146,15 @@ + + org.apache.maven.plugins + maven-surefire-plugin + 2.10 + + + -Djava.library.path=_build/ + + diff --git a/src/tools/native-client/src/main/java/com/vesoft/client/NativeClient.java b/src/tools/native-client/src/main/java/com/vesoft/client/NativeClient.java index 3584fcde4b9..8f5a1a9f812 100644 --- a/src/tools/native-client/src/main/java/com/vesoft/client/NativeClient.java +++ b/src/tools/native-client/src/main/java/com/vesoft/client/NativeClient.java @@ -12,16 +12,14 @@ import java.util.Map; import java.util.Objects; -import org.rocksdb.EnvOptions; -import org.rocksdb.Options; -import org.rocksdb.RocksDBException; -import org.rocksdb.SstFileWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NativeClient implements AutoCloseable { +public class NativeClient { + private static final Logger LOGGER = LoggerFactory.getLogger(NativeClient.class.getName()); + static { - NativeClientResourceLoader.resourceLoader(); + System.loadLibrary("nebula_native_client"); } public static class Pair { @@ -47,8 +45,6 @@ public String toString() { } } - private static final Logger LOGGER = LoggerFactory.getLogger(NativeClient.class.getName()); - private static final int PARTITION_ID = 4; private static final int VERTEX_ID = 8; private static final int TAG_ID = 4; @@ -59,80 +55,6 @@ public String toString() { private static final int VERTEX_SIZE = PARTITION_ID + VERTEX_ID + TAG_ID + TAG_VERSION; private static final int EDGE_SIZE = PARTITION_ID + VERTEX_ID + EDGE_TYPE + EDGE_RANKING + VERTEX_ID + EDGE_VERSION; - private SstFileWriter writer; - - public NativeClient(String path) { - EnvOptions env = new EnvOptions(); - Options options = new Options(); - options.setCreateIfMissing(true) - .setCreateMissingColumnFamilies(true); - new NativeClient(path, env, options); - } - - public NativeClient(String path, EnvOptions env, Options options) { - if (path == null || path.trim().length() == 0) { - throw new IllegalArgumentException("File Path should not be null and empty"); - } - writer = new SstFileWriter(env, options); - try { - writer.open(path); - } catch (RocksDBException e) { - LOGGER.error("SstFileWriter Open Failed {}", e.getMessage()); - } - } - - public boolean addVertex(String key, Object[] values) { - if (checkKey(key) || checkValues(values)) { - throw new IllegalArgumentException("Add Vertex key and value should not null"); - } - - byte[] value = encode(values); - try { - writer.put(key.getBytes(), value); - return true; - } catch (RocksDBException e) { - LOGGER.error("AddVertex Failed {}", e.getMessage()); - return false; - } - } - - public boolean addEdge(String key, Object[] values) { - if (checkKey(key) || checkValues(values)) { - throw new IllegalArgumentException("Add Vertex key and value should not null"); - } - - byte[] value = encode(values); - try { - writer.put(key.getBytes(), value); - return true; - } catch (RocksDBException e) { - LOGGER.error("AddEdge Failed {}", e.getMessage()); - return false; - } - } - - public boolean deleteVertex(String key) { - return delete(key); - } - - public boolean deleteEdge(String key) { - return delete(key); - } - - private boolean delete(String key) { - if (checkKey(key)) { - throw new IllegalArgumentException("Add Vertex key and value should not null"); - } - - try { - writer.delete(key.getBytes()); - return true; - } catch (RocksDBException e) { - LOGGER.error("Delete Failed {}", e.getMessage()); - return false; - } - } - public static byte[] createEdgeKey(int partitionId, long srcId, int edgeType, long edgeRank, long dstId, long edgeVersion) { ByteBuffer buffer = ByteBuffer.allocate(EDGE_SIZE); @@ -157,17 +79,9 @@ public static byte[] createVertexKey(int partitionId, long vertexId, return buffer.array(); } - private static native byte[] encode(Object[] values); + public static native byte[] encode(Object[] values); - public static byte[] encoded(Object[] values) { - return encode(values); - } - - private static native Map decode(byte[] encoded, Pair[] fields); - - public static Map decoded(byte[] encoded, Pair[] fields) { - return decode(encoded, fields); - } + public static native Map decode(byte[] encoded, Pair[] fields); private boolean checkKey(String key) { return Objects.isNull(key) || key.length() == 0; @@ -177,11 +91,4 @@ private boolean checkValues(Object[] values) { return Objects.isNull(values) || values.length == 0 || Arrays.asList(values).contains(null); } - - @Override - public void close() throws Exception { - writer.finish(); - writer.close(); - } - } diff --git a/src/tools/native-client/src/test/java/com/vesoft/client/NativeClientTest.java b/src/tools/native-client/src/test/java/com/vesoft/client/NativeClientTest.java index c06205408bb..86efdf70532 100644 --- a/src/tools/native-client/src/test/java/com/vesoft/client/NativeClientTest.java +++ b/src/tools/native-client/src/test/java/com/vesoft/client/NativeClientTest.java @@ -6,8 +6,6 @@ package com.vesoft.client; -import static com.vesoft.client.NativeClient.decoded; -import static com.vesoft.client.NativeClient.encoded; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -35,7 +33,7 @@ public void testDecoded() { 0.618, "Hello".getBytes() }; - byte[] result = encoded(values); + byte[] result = NativeClient.encode(values); NativeClient.Pair[] pairs = new NativeClient.Pair[]{ new NativeClient.Pair("b_field", Boolean.class.getName()), @@ -45,7 +43,7 @@ public void testDecoded() { new NativeClient.Pair("s_field", byte[].class.getName()) }; - Map decodedResult = decoded(result, pairs); + Map decodedResult = NativeClient.decode(result, pairs); byte byteValue = decodedResult.get("b_field")[0]; boolean boolValue = (byteValue == 0x00) ? false : true; diff --git a/src/tools/spark-sstfile-generator/README.md b/src/tools/spark-sstfile-generator/README.md new file mode 100644 index 00000000000..71477126f17 --- /dev/null +++ b/src/tools/spark-sstfile-generator/README.md @@ -0,0 +1,75 @@ +Generate sst files from hive tables datasource, guided by a mapping file, which maps hive tables to vertexes and edges. +Multiple vertexes or edges may map to a single hive table, where a partition column will be used to distinguish different +vertex or edge. +The hive tables may be periodically be regenerated by upstream system to reflect the latest data in so far, and may be +partitioned by a time column to indicate the time when data are generated. +*$HADOOP_HOME* env need to be set for running this job. + +# Environment +component|version +---|--- +os|centos6.5 final(kernel 2.6.32-431.el6.x86_64) +spark|1.6.2 +hadoop|2.7.4 +jdk|1.8+ +scala|2.10.5 +sbt|1.2.8 + + +# Spark-submit command line reference +This is what we used in production environment: +```bash +${SPARK_HOME}/bin/spark-submit --master yarn --queue fmprod --conf spark.executor.instances=24 --conf spark.executor.memory=90g --conf spark.executor.cores=2 --conf spark.executorEnv.LD_LIBRARY_PATH='/soft/server/nebula_native_client:/usr/local/lib:/usr/local/lib64' --conf spark.driver.extraJavaOptions='-Djava.library.path=/soft/server/nebula_native_client/:/usr/local/lib64:/usr/local/lib' --class com.vesoft.tools.SparkSstFileGenerator --files mapping.json nebula-spark-sstfile-generator.jar -di "2019-05-13" -mi mapping.json -pi dt -so file://home/hdp/nebula_output +``` +The application options are described as following. + +# Spark application command line reference +We keep a convention when naming the option,those suffix with _i_ will be an INPUT type option, while those suffix with _o_ will be an OUTPUT type option + +```bash +usage: nebula spark sst file generator + -ci,--default_column_mapping_policy If omitted, what policy to use when mapping column to property,all columns except primary_key's column will be mapped to tag's property with the same name by default + -di,--latest_date_input Latest date to query,date format YYYY-MM-dd + -hi,--string_value_charset_input When the value is of type String,what charset is used when encoded,default to UTF-8 + -ho,--hdfs_sst_file_output Which hdfs directory will those sstfiles be put, should not starts with file:/// + -li,--limit_input Return at most this number of edges/vertex, usually used in POC stage, when omitted, fetch all data. + -mi,--mapping_file_input Hive tables to nebula graph schema mapping file + -pi,--date_partition_input A partition field of type String of hive table, which represent a Date, and has format of YYY-MM-dd + -ri,--repartition_number_input Repartition number. Some optimization trick to improve generation speed and data skewness. Need tuning to suit your data. + -so,--local_sst_file_output Which local directory those generated sst files will be put, should starts with file:/// + -ti,--datasource_type_input Data source types supported, must be among [hive|hbase|csv] for now, default=hive +``` + +# Mapping file schema + +Mapping file are json format.File Schema is provided as [mapping-schema.json](mapping-schema.json) according to [Json Schema Standard](http://json-schema.org). We provide an example mapping file: [mapping.json](mapping.json) + +# FAQ +## How to use libnebula-native-client.so under CentOS6.5(2.6.32-431 x86-64) + +1. Don't use officially distributed librocksdbjni-linux64.so, build it natively on CentOS6.5. + +```bash +DEBUG_LEVEL=0 make shared_lib +DEBUG_LEVEL=0 make rocksdbjava +``` +_make sure to keep consistent with DEBUG_LEVEL when building, or there will be some link error like `symbol not found` +2. run `sbt assembly` to package this project to a spark job jar, which is default named: `nebula-spark-sstfile-generator.jar` +3. run `jar uvf nebula-spark-sstfile-generator.jar librocksdbjni-linux64.so libnebula_native_client.so` to replace the `*.so` files packaged inside the dependency org.rocksdb:rocksdbjni:5.17.2,or some error like following will occur when spark-submit: + +``` +*** glibc detected *** /soft/java/bin/java: free(): invalid pointer: 0x00007f7985b9f0a0 *** +======= Backtrace: ========= +/lib64/libc.so.6(+0x75f4e)[0x7f7c7d5e6f4e] +/lib64/libc.so.6(+0x78c5d)[0x7f7c7d5e9c5d] +/tmp/librocksdbjni3419235685305324910.so(_ZN7rocksdb10EnvOptionsC1Ev+0x578)[0x7f79431ff908] +/tmp/librocksdbjni3419235685305324910.so(Java_org_rocksdb_EnvOptions_newEnvOptions+0x1c)[0x7f7943044dbc] +[0x7f7c689c1747] +``` + +# TODO +1. Add database_name property to graphspace level and tag/edge level, which the latter will override the former when provided in both levels +2. Schema column definitions' order is important, keep it when parsing mapping file and when encoding +3. Integrated build with maven or cmake, where this spark assembly should be build after nebula native client +4. To handle following situation: different tables share a common Tag, like a tag with properties of (start_time, end_time) + diff --git a/src/tools/spark-sstfile-generator/build.sbt b/src/tools/spark-sstfile-generator/build.sbt new file mode 100644 index 00000000000..ab69f52094a --- /dev/null +++ b/src/tools/spark-sstfile-generator/build.sbt @@ -0,0 +1,86 @@ +import Commons._ + +organization := "com.vesoft" +// compatible with spark 1.6.0 +scalaVersion := "2.10.5" + +// compatible with spark 1.6.2 +// scalaVersion := "2.11.8" + +name := "nebula-spark-sstfile-generator" +version := "0.1.0" + +test in assembly in ThisBuild := {} + +//not include src and doc when dist +sources in(Compile, doc) in ThisBuild := Seq.empty +publishArtifact in(Compile, packageDoc) in ThisBuild := false +sources in (packageSrc) := Seq.empty + +libraryDependencies ++= commonDependencies +libraryDependencies ++= Seq( + // compatible with spark 1.6.2 + // "org.apache.spark" %% "spark-core" % "1.6.2" % "provided", + // "org.apache.spark" %% "spark-sql" % "1.6.2" % "provided", + // "org.apache.spark" %% "spark-yarn" % "1.6.2" % "provided", + // "com.databricks" %% "spark-csv" % "1.5.0" % "provided", + // "org.apache.spark" %% "spark-hive" % "1.6.2" % "provided", + + // compatible with spark 1.6.0 & hadoop 2.6.0 + "org.apache.spark" %% "spark-core" % "1.6.0" % "provided" + exclude("org.apache.hadoop", "hadoop-client"), + + "org.apache.spark" %% "spark-sql" % "1.6.0" % "provided", + "org.apache.spark" %% "spark-yarn" % "1.6.0" % "provided" + exclude("org.apache.hadoop", "hadoop-yarn-api") + exclude("org.apache.hadoop", "hadoop-yarn-common") + exclude("org.apache.hadoop", "hadoop-client"), + + "com.databricks" %% "spark-csv" % "1.5.0" % "provided", + "org.apache.spark" %% "spark-hive" % "1.6.0" % "provided", + + "org.apache.hadoop" % "hadoop-common" % "2.6.0" % "provided", + "org.apache.hadoop" % "hadoop-client" % "2.6.0" % "provided", + "org.apache.hadoop" % "hadoop-yarn-api" % "2.6.0" % "provided", + "org.apache.hadoop" % "hadoop-yarn-common" % "2.6.0" % "provided", + + //cmd line parsing + "commons-cli" % "commons-cli" % "1.4", + "org.scalatest" %% "scalatest" % "3.0.4" % Test, + + // json parsing + "com.typesafe.play" %% "play-json" % "2.6.0", + "joda-time" % "joda-time" % "2.10.1", + // to eliminate the annoying '[warn] Class org.joda.convert.FromString not found - continuing with a stub.' message + "org.joda" % "joda-convert" % "2.2.0" % "provided", + + //need nebula native client for encoding, need to run mvn install to deploy to local repo before used + "org.rocksdb" % "rocksdbjni" % "5.17.2", + "com.vesoft" % "native-client" % "0.1.0" +) + +//CAUTION: when dependency with version of X-SNAPSHOT is updated, you should comment out the following line, and run sbt update +updateOptions := updateOptions.value.withLatestSnapshots(false) + +assemblyMergeStrategy in assembly := { + case PathList("org", "slf4j", xs@_*) => MergeStrategy.discard + case PathList("org", "apache", "logging", xs@_*) => MergeStrategy.discard + case PathList("ch", "qos", "logback", xs@_*) => MergeStrategy.discard + case PathList("org", "scalactic", xs@_*) => MergeStrategy.discard + case x => + val oldStrategy = (assemblyMergeStrategy in assembly).value + oldStrategy(x) +} + +assemblyShadeRules in assembly := Seq( + ShadeRule.rename("org.apache.commons.cli.**" -> "shadecli.org.apache.commons.cli.@1").inAll +) + +assemblyJarName in assembly := "nebula-spark-sstfile-generator.jar" + +test in assembly := {} +// should not include scala runtime when submitting spark job +assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) + + + diff --git a/src/tools/spark-sstfile-generator/mapping-schema.json b/src/tools/spark-sstfile-generator/mapping-schema.json new file mode 100644 index 00000000000..f6069bfd822 --- /dev/null +++ b/src/tools/spark-sstfile-generator/mapping-schema.json @@ -0,0 +1,160 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://github.com/vesoft-inc/nebula/src/tools/spark-sstfile-generator/mapping-schema.json", + "title": "Nebula spark sst file generator mapping file schema", + "description": "A mapping file which map hive tables to nebula's vertexes and edges", + "type": "object", + "required": [ + "database" + ], + "properties": { + "database": { + "description": "The hive database name where source data comes from", + "type": "object", + "required": [ + "partitions" + ], + "properties": { + "key_policy": { + "description": "Encoding algorithm used when convert business key to vertex key, only support `hash_primary_key` in so far(case-insensitive), could be omitted, or used as it's", + "type": "string" + }, + "partitions": { + "description": "Graphspace's partition number,should be consistent with Nebula server side's", + "type": "number", + "minimum": 1 + } + }, + "tags": { + "description": "Vertex tags' mapping", + "type": "array", + "items": { + "type": "object", + "properties": { + "table_name": { + "description": "Tag's datasource table name", + "type": "string" + }, + "tag_name": { + "description": "Tag's name", + "type": "string" + }, + "date_partition_key": { + "description": "Date partition key used by datasource table", + "type": "string" + }, + "type_partition_key": { + "description": "if one datasource table maps to multiple tags, which column would be used as discrimination columns", + "type": "string" + }, + "primary_key": { + "description": "tag's datasource table's primary key column ", + "type": "string" + }, + "mappings": { + "description": "tag's property mappings, not all columns in source table will be used as properties. When omitted, all columns will be used as its properties except those specified by primary_key,date_partition_key and type_partition_key", + "type": "object", + "items": { + "tag_property_name": { + "name": { + "description": "Datasource table column's name, which hold this property's value", + "type": "string" + }, + "\"type\"": { + "description": "Datasource table column's type, will be used to do type conversion to graph's native data type, default to string. The charset used default to UTF-8, could be changed through command line option '--string_value_charset_input','-hi' for short", + "type": "string" + } + } + }, + "required": [ + "tag_property_name" + ] + } + }, + "required": [ + "table_name", + "tag_name", + "date_partition_key", + "primary_key" + ] + }, + "minItems": 1, + "uniqueItems": true + }, + "edges": { + "description": "Edges' mapping", + "type": "array", + "items": { + "type": "object", + "properties": { + "table_name": { + "description": "edge's datasource table name", + "type": "string" + }, + "edge_name": { + "description": "edge's name", + "type": "string" + }, + "date_partition_key": { + "description": "date partition key used by datasource table", + "type": "string" + }, + "type_partition_key": { + "description": "if one datasource table maps to multiple tags, which column would be used as discrimination columns", + "type": "string" + }, + "from_foreign_key_column": { + "description": "edge's FROM column", + "type": "string" + }, + "from_tag": { + "description": "what edge's FROM column represent, would be used as a cross reference", + "type": "string" + }, + "to_foreign_key_column": { + "description": "edge's TO column", + "type": "string" + }, + "to_tag": { + "description": "what edge's TO column represent, would be used as a cross reference", + "type": "string" + }, + "mappings": { + "description": "edge's property mappings,not all columns in source table will be used as properties. When omitted, all columns will be used as its properties except those specified by from_foreign_key_column,to_foreign_key_column、date_partition_key and type_partition_key", + "type": "array", + "items": { + "type": "object", + "properties": { + "edge_property_name": { + "name": { + "description": "Datasource table column's name, which hold this property's value", + "type": "string" + }, + "\"type\"": { + "description": "Datasource table column's type, will be used to do type conversion to graph's native data type, default to string. The charset used default to UTF-8, could be changed through command line option '--string_value_charset_input','-hi' for short", + "type": "string" + } + } + }, + "minItems": 1, + "uniqueItems": true + } + }, + "required": [ + "table_name", + "edge_name", + "date_partition_key", + "from_foreign_key_column", + "from_tag", + "to_foreign_key_column", + "to_tag" + ] + }, + "uniqueItems": true + } + } + } + } +} + + diff --git a/src/tools/spark-sstfile-generator/mapping.json b/src/tools/spark-sstfile-generator/mapping.json new file mode 100644 index 00000000000..56a94f70032 --- /dev/null +++ b/src/tools/spark-sstfile-generator/mapping.json @@ -0,0 +1,58 @@ +{ + "fma": { + "key_policy": "hash_primary_key", + "partitions": 3, + "tags": [ + { + "table_name": "dmt_risk_graph_idmp_node_s_d", + "tag_name": "mac", + "date_partition_key": "dt", + "type_partition_key": "flag", + "primary_key": "node", + "mappings": [ + { + "src_pri_value": { + "name": "src_pri_value", + "type": "string" + } + } + ] + }, + { + "table_name": "dmt_risk_graph_idmp_node_s_d", + "tag_name": "user_pin", + "date_partition_key": "dt", + "type_partition_key": "flag", + "primary_key": "node", + "mappings": [ + { + "src_pri_value": { + "name": "src_pri_value", + "type": "string" + } + } + ] + } + ], + "edges": [ + { + "table_name": "dmt_risk_graph_idmp_edge_s_d", + "edge_name": "pin2mac", + "date_partition_key": "dt", + "type_partition_key": "flag", + "from_foreign_key_column": "from_node", + "from_tag": "user_pin", + "to_foreign_key_column": "to_node", + "to_tag": "mac", + "mappings": [ + { + "src_pri_value": { + "name": "src_pri_value", + "type": "string" + } + } + ] + } + ] + } +} \ No newline at end of file diff --git a/src/tools/spark-sstfile-generator/project/Commons.scala b/src/tools/spark-sstfile-generator/project/Commons.scala new file mode 100644 index 00000000000..d794ff97e40 --- /dev/null +++ b/src/tools/spark-sstfile-generator/project/Commons.scala @@ -0,0 +1,18 @@ +import sbt.Keys._ +import sbt._ + + +object Commons { + + lazy val commonDependencies = Seq( + "ch.qos.logback" % "logback-classic" % "1.2.3", + "org.slf4j" % "slf4j-api" % "1.7.25", + "org.scalactic" %% "scalactic" % "3.0.4", + "org.scalatest" %% "scalatest" % "3.0.4" % Test + ) + + scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint") + + scalacOptions in Test ++= Seq("-Yrangepos") +} + diff --git a/src/tools/spark-sstfile-generator/project/assembly.sbt b/src/tools/spark-sstfile-generator/project/assembly.sbt new file mode 100644 index 00000000000..813ce170210 --- /dev/null +++ b/src/tools/spark-sstfile-generator/project/assembly.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9") \ No newline at end of file diff --git a/src/tools/spark-sstfile-generator/project/build.properties b/src/tools/spark-sstfile-generator/project/build.properties new file mode 100644 index 00000000000..e2820dd8ca2 --- /dev/null +++ b/src/tools/spark-sstfile-generator/project/build.properties @@ -0,0 +1,2 @@ +sbt.version=1.2.8 + diff --git a/src/tools/spark-sstfile-generator/project/plugins.sbt b/src/tools/spark-sstfile-generator/project/plugins.sbt new file mode 100644 index 00000000000..1e23bf2ae63 --- /dev/null +++ b/src/tools/spark-sstfile-generator/project/plugins.sbt @@ -0,0 +1 @@ +resolvers += Resolver.typesafeRepo("releases") \ No newline at end of file diff --git a/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/DataTypeCompatibility.scala b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/DataTypeCompatibility.scala new file mode 100644 index 00000000000..5cba88c2a5a --- /dev/null +++ b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/DataTypeCompatibility.scala @@ -0,0 +1,33 @@ +package com.vesoft.tools + +object DataTypeCompatibility { + + /** + * nebula data type --> Hive data type mapping + */ + val compatibilityMatrix: Map[String, Set[String]] = Map( + "INTEGER" -> Set("TINYINT", "SMALLINT", "INT", "BIGINT"), + "DOUBLE" -> Set("DOUBLE", "DECIMAL"), + "FLOAT" -> Set("FLOAT", "DECIMAL"), + //TODO: varchar/char? + "STRING" -> Set("VARCHAR", "CHAR", "STRING"), + "BOOL" -> Set("BOOLEAN"), + "DATE" -> Set("DATE", "TIMESTAMP"), + "DATETIME" -> Set("DATE", "TIMESTAMP"), + "YEARMONTH" -> Set("DATE") + + //TODO "binary" -> ? + ) + + /** + * check whether nebula data type is compatible with hive data type + */ + def isCompatible(nebulaType: String, hiveType: String): Boolean = { + // all type can be converted to nebula's string + if (nebulaType.equalsIgnoreCase("string")) { + true + } else { + compatibilityMatrix.get(nebulaType.toUpperCase).map(_.contains(hiveType.toUpperCase)).getOrElse(false) + } + } +} diff --git a/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/FNVHash.scala b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/FNVHash.scala new file mode 100644 index 00000000000..29ae3e485bc --- /dev/null +++ b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/FNVHash.scala @@ -0,0 +1,47 @@ +package com.vesoft.tools + +/** + * FNV hash util + * + * @link http://www.isthe.com/chongo/tech/comp/fnv/index.html + */ +object FNVHash { + //some prime number + private val FNV_64_INIT = 0xcbf29ce484222325L + private val FNV_64_PRIME = 0x100000001b3L + private val FNV_32_INIT = 0x811c9dc5 + private val FNV_32_PRIME = 0x01000193 + + /** + * hash to int32 + */ + def hash32(value: String): Int = { + var rv = FNV_32_INIT + val len = value.length + var i = 0 + while (i < len) { + rv ^= value.charAt(i) + rv *= FNV_32_PRIME + i += 1 + } + + rv + } + + /** + * hash to int64 + */ + def hash64(value: String): Long = { + var rv = FNV_64_INIT + val len = value.length + var i = 0 + while (i < len) { + rv ^= value.charAt(i) + rv *= FNV_64_PRIME + + i += 1 + } + + rv + } +} diff --git a/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/MappingConfiguration.scala b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/MappingConfiguration.scala new file mode 100644 index 00000000000..013f6d84387 --- /dev/null +++ b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/MappingConfiguration.scala @@ -0,0 +1,254 @@ +package com.vesoft.tools + +import play.api.libs.json.{Reads, Writes, _} + +import scala.io.{Codec, Source} +import scala.language.implicitConversions + +/** + * column mapping + * + * @param columnName hive column name + * @param propertyName what's the property name this column maps to + * @param `type` map to which data type of nebula graph + */ +case class Column(columnName: String, propertyName: String, `type`: String = "string") + +object Column { + implicit val ColumnWrites: Writes[Column] = new Writes[Column] { + override def writes(col: Column): JsValue = { + Json.obj(col.columnName -> Json.obj( + "name" -> col.propertyName, + "type" -> col.`type` + )) + } + } + + implicit val ColumnReads: Reads[Column] = new Reads[Column] { + override def reads(json: JsValue): JsResult[Column] = { + json match { + case JsObject(mapping) => { + assert(mapping.keys.nonEmpty && mapping.keys.size == 1) + assert(mapping.values.nonEmpty) + + val columnName = mapping.keys.toSeq(0) + val mappingContent = mapping.values.toSeq(0) + + val propertyName = (mappingContent \ "name").as[String] + val tp = (mappingContent \ "type").get.as[String] + JsSuccess(Column(columnName, propertyName, tp)) + } + case a@_ => throw new IllegalStateException(s"Illegal mapping format:${a}") + } + } + } +} + +/** + * a trait that both Tag and Edge should extends + */ +trait WithColumnMapping { + + def tableName: String + + def name: String + + def columnMappings: Option[Seq[Column]] +} + +/** + * tag section of configuration file + * + * @param tableName hive table name + * @param name tag name + * @param primaryKey which column is PK + * @param datePartitionKey date partition column,Hive table in production usually is Date partitioned + * @param typePartitionKey type partition columns, when different vertex/edge's properties are identical,they are stored in one hive table, and partition by a `type` column + * @param columnMappings map of hive table column to properties + */ +case class Tag(override val tableName: String, override val name: String, primaryKey: String, datePartitionKey: Option[String] = None, typePartitionKey: Option[String] = None, override val columnMappings: Option[Seq[Column]] = None) extends WithColumnMapping { + def allColumnNames(): Option[Seq[String]] = columnMappings.map(_.map(_.columnName)) + + def getColumn(name: String): Option[Column] = { + columnMappings.map(_.filter { + col => { + col.columnName.equalsIgnoreCase(name) + } + }.headOption).head + } +} + +object Tag { + // json implicit converter + implicit val TagWrites: Writes[Tag] = new Writes[Tag] { + override def writes(tag: Tag): JsValue = { + tag.columnMappings.fold( + Json.obj("table_name" -> JsString(tag.tableName), + "tag_name" -> JsString(tag.name), + "primary_key" -> JsString(tag.primaryKey), + "date_partition_key" -> (if (tag.datePartitionKey.isDefined) JsString(tag.datePartitionKey.get) else JsNull), + "type_partition_key" -> (if (tag.typePartitionKey.isDefined) JsString(tag.typePartitionKey.get) else JsNull) + )) { + cols => + Json.obj("table_name" -> JsString(tag.tableName), + "tag_name" -> JsString(tag.name), + "primary_key" -> JsString(tag.primaryKey), + "date_partition_key" -> (if (tag.datePartitionKey.isDefined) JsString(tag.datePartitionKey.get) else JsNull), + "type_partition_key" -> (if (tag.typePartitionKey.isDefined) JsString(tag.typePartitionKey.get) else JsNull), + "mappings" -> Json.toJson(cols)) + } + } + } + + implicit val TagReads: Reads[Tag] = new Reads[Tag] { + override def reads(json: JsValue): JsResult[Tag] = { + val tableName = (json \ "table_name").as[String] + val tagName = (json \ "tag_name").as[String] + val pk = (json \ "primary_key").as[String] + val datePartitionKey = (json \ "date_partition_key").asOpt[String] + val typePartitionKey = (json \ "type_partition_key").asOpt[String] + val columnMappings = (json \ "mappings").asOpt[Seq[Column]] + + JsSuccess(Tag(tableName, tagName, pk, datePartitionKey, typePartitionKey, columnMappings)) + } + } +} + + +/** + * edge section of configuration file + * + * @param tableName hive table name + * @param name edge type name + * @param fromForeignKeyColumn which column is srcID + * @param fromReferenceTag which Tag does srcID column reference + * @param toForeignKeyColumn which column is dstID + * @param toReferenceTag which Tag does dstID column reference + * @param columnMappings map of hive table column to properties + */ +case class Edge(override val tableName: String, override val name: String, fromForeignKeyColumn: String, fromReferenceTag: String, toForeignKeyColumn: String, toReferenceTag: String, datePartitionKey: Option[String] = None, typePartitionKey: Option[String] = None, override val columnMappings: Option[Seq[Column]] = None) extends WithColumnMapping { + def allColumnNames(): Option[Seq[String]] = columnMappings.map(columns => columns.map(_.columnName)) +} + +object Edge { + implicit val EdgeWrites: Writes[Edge] = new Writes[Edge] { + override def writes(edge: Edge): JsValue = { + edge.columnMappings.fold( + Json.obj("table_name" -> JsString(edge.tableName), + "edge_name" -> JsString(edge.name), + "date_partition_key" -> (if (edge.datePartitionKey.isDefined) JsString(edge.datePartitionKey.get) else JsNull), + "type_partition_key" -> (if (edge.typePartitionKey.isDefined) JsString(edge.typePartitionKey.get) else JsNull), + "from_foreign_key_column" -> JsString(edge.fromForeignKeyColumn), + "from_tag" -> JsString(edge.fromReferenceTag), + "to_foreign_key_column" -> JsString(edge.toForeignKeyColumn), + "to_tag" -> JsString(edge.toReferenceTag) + )) { + cols => + Json.obj("table_name" -> JsString(edge.tableName), + "edge_name" -> JsString(edge.name), + "date_partition_key" -> (if (edge.datePartitionKey.isDefined) JsString(edge.datePartitionKey.get) else JsNull), + "type_partition_key" -> (if (edge.typePartitionKey.isDefined) JsString(edge.typePartitionKey.get) else JsNull), + "from_foreign_key_column" -> JsString(edge.fromForeignKeyColumn), + "from_tag" -> JsString(edge.fromReferenceTag), + "to_foreign_key_column" -> JsString(edge.toForeignKeyColumn), + "to_tag" -> JsString(edge.toReferenceTag), + "mappings" -> Json.toJson(cols) + ) + } + } + } + + implicit val EdgeReads: Reads[Edge] = new Reads[Edge] { + override def reads(json: JsValue): JsResult[Edge] = { + + val tableName = (json \ "table_name").as[String] + val edgeName = (json \ "edge_name").as[String] + val datePartitionKey = (json \ "date_partition_key").asOpt[String] + val typePartitionKey = (json \ "type_partition_key").asOpt[String] + + val fromForeignKeyColumn = (json \ "from_foreign_key_column").as[String] + val fromTag = (json \ "from_tag").as[String] + val toForeignKeyColumn = (json \ "to_foreign_key_column").as[String] + val toTag = (json \ "to_tag").as[String] + + val columnMappings = (json \ "mappings").asOpt[Seq[Column]] + + JsSuccess(Edge(tableName, edgeName, fromForeignKeyColumn, fromTag, toForeignKeyColumn, toTag, datePartitionKey, typePartitionKey, columnMappings)) + } + } +} + +/** + * a mapping file in-memory representation + * + * @param databaseName hive database name for this mapping configuration + * @param partitions partition number of the target graphspace + * @param tags tag's mapping + * @param edges edge's mapping + * @param keyPolicy policy which used to generate unique id, default=hash_primary_key + */ +case class MappingConfiguration(databaseName: String, partitions: Int, tags: Seq[Tag], edges: Seq[Edge], keyPolicy: Option[String] = Some("hash_primary_key")) + +object MappingConfiguration { + implicit val MappingConfigurationWrites: Writes[MappingConfiguration] = new Writes[MappingConfiguration] { + override def writes(mapping: MappingConfiguration): JsValue = { + Json.obj(mapping.databaseName -> mapping.keyPolicy.fold( + Json.obj("partitions" -> mapping.partitions, + "tags" -> mapping.tags, + "edges" -> mapping.edges + )) { + keyPolicy => + Json.obj("key_policy" -> JsString(keyPolicy), + "partitions" -> mapping.partitions, + "tags" -> mapping.tags, + "edges" -> mapping.edges + ) + }) + } + } + + implicit val MappingConfigurationReads: Reads[MappingConfiguration] = new Reads[MappingConfiguration] { + override def reads(json: JsValue): JsResult[MappingConfiguration] = { + json match { + case JsObject(mapping) => { + // should be one mapping file for a single graph space + //TODO: fold when ill-formatted + assert(mapping.keys.nonEmpty && mapping.keys.size == 1) + assert(mapping.values.nonEmpty) + + val graphSpaceName = mapping.keys.toSeq(0) + val mappingContent = mapping.values.toSeq(0) + + val keyPolicy = (mappingContent \ "key_policy").asOpt[String] + val partitions = (mappingContent \ "partitions").as[Int] + val tags = (mappingContent \ "tags").get.as[Seq[Tag]] + val edges = (mappingContent \ "edges").get.as[Seq[Edge]] + + // make sure all edge reference existing tags + val allReferencedTag = (edges.map(_.fromReferenceTag) ++ edges.map(_.toReferenceTag)).distinct + if (tags.map(_.name).intersect(allReferencedTag).size != allReferencedTag.size) { + throw new IllegalStateException("Edge's from/to tag reference non-existing tag") + } + + JsSuccess(MappingConfiguration(graphSpaceName, partitions, tags, edges, keyPolicy)) + } + case a@_ => throw new IllegalStateException(s"Illegal mapping format:${a}") + } + } + } + + /** + * construct from a mapping file + * + * @param mappingFile mapping file, should be provided through "--files" option, and specified the application arg "---mapping_file_input"(--mi for short) at the same time, + * this file will be consumed as a classpath resource + * @return MappingConfiguration instance + */ + def apply(mappingFile: String): MappingConfiguration = { + val bufferedSource = Source.fromFile(mappingFile)(Codec("UTF-8")) + val toString = bufferedSource.mkString + val config = Json.parse(toString).as[MappingConfiguration] + bufferedSource.close + config + } +} \ No newline at end of file diff --git a/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/PropertyValueAndTypeWritable.scala b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/PropertyValueAndTypeWritable.scala new file mode 100644 index 00000000000..677fdbf4101 --- /dev/null +++ b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/PropertyValueAndTypeWritable.scala @@ -0,0 +1,36 @@ +package com.vesoft.tools + +import java.io.{DataInput, DataOutput} + +import com.vesoft.tools.VertexOrEdgeEnum.VertexOrEdgeEnum +import org.apache.hadoop.io.{BytesWritable, Writable} + +object VertexOrEdgeEnum extends Enumeration { + type VertexOrEdgeEnum = Value + + val Vertex = Value(0, "vertex") + val Edge = Value(1, "edge") +} + +/** + * composite value for SstRecordWriter + * + * @param values encoded values by nebula native client + * @param vertexOrEdgeEnum which indicate what the encoded values represents, a vertex or an edge, default=vertex + */ +class PropertyValueAndTypeWritable(var values: BytesWritable, var vertexOrEdgeEnum: VertexOrEdgeEnum = VertexOrEdgeEnum.Vertex) extends Writable { + override def write(out: DataOutput): Unit = { + out.write(vertexOrEdgeEnum.id) + values.write(out) + } + + override def readFields(in: DataInput): Unit = { + vertexOrEdgeEnum = in.readInt() match { + case 0 => VertexOrEdgeEnum.Vertex + case 1 => VertexOrEdgeEnum.Edge + case a@_ => throw new IllegalStateException(s"Non supported value type:${a}") + } + + values.readFields(in) + } +} diff --git a/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/SparkSstFileGenerator.scala b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/SparkSstFileGenerator.scala new file mode 100644 index 00000000000..f87a4d08824 --- /dev/null +++ b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/SparkSstFileGenerator.scala @@ -0,0 +1,444 @@ +package com.vesoft.tools + +import java.nio.charset.{Charset, UnsupportedCharsetException} + +import com.vesoft.client.NativeClient +import javax.xml.bind.DatatypeConverter +import org.apache.commons.cli.{CommandLine, DefaultParser, HelpFormatter, Options, ParseException, Option => CliOption} +import org.apache.hadoop.io.BytesWritable +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.{Partitioner, SparkConf, SparkContext} +import org.slf4j.LoggerFactory + +/** + * Use spark to generate sst files in batch, which will be ingested by Nebula engine. + * $HADOOP_HOME env need to be set. + * + * The following use cases are supported: + * + *

+ * Generate sst files from hive tables datasource, guided by a mapping file, which maps hive tables to vertexes and edges, + * Multiple vertexes or edges may map to a single hive table, where a partition column will be used to distinguish different + * vertex or edge. + * The hive tables may be periodically be regenerated by your business system to reflect the latest data in so far, and may be + * partitioned by a time column to indicate the time when generated. + *

+ */ +object SparkSstFileGenerator { + private[this] val log = LoggerFactory.getLogger(this.getClass) + + /** + * configuration key for sst file output + */ + val SSF_OUTPUT_LOCAL_DIR_CONF_KEY = "nebula.graph.spark.sst.file.local.dir" + val SSF_OUTPUT_HDFS_DIR_CONF_KEY = "nebula.graph.spark.sst.file.hdfs.dir" + + /** + * cmd line's options, which's name following the convention: input will suffix with "i", output will suffix with "o" + */ + lazy val options: Options = { + val dataSourceTypeInput = CliOption.builder("ti").longOpt("datasource_type_input") + .hasArg() + .desc("Data source types supported, must be among [hive|hbase|csv] for now, default=hive") + .build + + val defaultColumnMapPolicy = CliOption.builder("ci").longOpt("default_column_mapping_policy") + .hasArg() + .desc("If omitted, what policy to use when mapping column to property," + + "all columns except primary_key's column will be mapped to tag's property with the same name by default") + .build + + val mappingFileInput = CliOption.builder("mi").longOpt("mapping_file_input") + .required() + .hasArg() + .desc("Hive tables to nebula graph schema mapping file") + .build + + val localSstFileOutput = CliOption.builder("so").longOpt("local_sst_file_output") + .required() + .hasArg() + .desc("Which local directory those generated sst files will be put, should starts with file:///") + .build + + val hdfsSstFileOutput = CliOption.builder("ho").longOpt("hdfs_sst_file_output") + .required() + .hasArg() + .desc("Which hdfs directory will those sstfiles be put, should not starts with file:///") + .build + + val datePartitionKey = CliOption.builder("pi").longOpt("date_partition_input") + .required() + .hasArg() + .desc("A partition field of type String of hive table, which represent a Date, and has format of YYY-MM-dd") + .build + + // when the newest data arrive, used in non-incremental environment + val latestDate = CliOption.builder("di").longOpt("latest_date_input") + .required() + .hasArg() + .desc("Latest date to query,date format YYYY-MM-dd") + .build + + val repartitionNumber = CliOption.builder("ri").longOpt("repartition_number_input") + .hasArg() + .desc("Repartition number. Some optimization trick to improve generation speed and data skewness. Need tuning to suit your data.") + .build + + // may be used in some test run to prove the correctness + val limit = CliOption.builder("li").longOpt("limit_input") + .hasArg() + .desc("Return at most this number of edges/vertex, usually used in POC stage, when omitted, fetch all data.") + .build + + val charset = CliOption.builder("hi").longOpt("string_value_charset_input") + .hasArg() + .desc("When the value is of type String,what charset is used when encoded,default to UTF-8") + .build + + val opts = new Options() + opts.addOption(defaultColumnMapPolicy) + opts.addOption(dataSourceTypeInput) + opts.addOption(mappingFileInput) + opts.addOption(localSstFileOutput) + opts.addOption(hdfsSstFileOutput) + opts.addOption(datePartitionKey) + opts.addOption(latestDate) + opts.addOption(repartitionNumber) + opts.addOption(charset) + opts.addOption(limit) + + } + + // cmd line formatter when something is wrong with options + lazy val formatter = { + val format = new HelpFormatter + format.setWidth(300) + format + } + + /** + * composite key for vertex/edge RDD, the partitionId part is used by Partitioner, + * the valueEncoded part is used by SortWithPartition + * + * @param partitionId partition number + * @param `type` tag/edge type + * @param valueEncoded vertex/edge key encoded by native client + */ + case class GraphPartitionIdAndKeyValueEncoded(partitionId: Int, `type`: Int, valueEncoded: BytesWritable) + + /** + * partition by the partitionId part of key + */ + class SortByKeyPartitioner(num: Int) extends Partitioner { + override def numPartitions: Int = num + + override def getPartition(key: Any): Int = { + (key.asInstanceOf[GraphPartitionIdAndKeyValueEncoded].partitionId % numPartitions) + } + } + + val DefaultVersion = 1 + + // default charset when encoding String type + val DefaultCharset = "UTF-8" + + def main(args: Array[String]): Unit = { + val parser = new DefaultParser + + var cmd: CommandLine = null + try { + cmd = parser.parse(options, args) + } + catch { + case e: ParseException => { + log.error("Illegal arguments", e) + formatter.printHelp("nebula spark sst file generator", options) + System.exit(-1) + } + } + + var dataSourceTypeInput: String = cmd.getOptionValue("ti") + if (dataSourceTypeInput == null) { + dataSourceTypeInput = "hive" + } + + var columnMapPolicy: String = cmd.getOptionValue("ci") + if (columnMapPolicy == null) { + columnMapPolicy = "hash_primary_key" + } + + val mappingFileInput: String = cmd.getOptionValue("mi") + var localSstFileOutput: String = cmd.getOptionValue("so") + while (localSstFileOutput.endsWith("/")) { + localSstFileOutput = localSstFileOutput.stripSuffix("/") + } + + // make sure use local file system to write sst file + if (!localSstFileOutput.toLowerCase.startsWith("file://")) { + throw new IllegalArgumentException("Argument: -so --local_sst_file_output should start with file:///") + } + + var hdfsSstFileOutput: String = cmd.getOptionValue("ho") + while (hdfsSstFileOutput.endsWith("/")) { + hdfsSstFileOutput = hdfsSstFileOutput.stripSuffix("/") + } + + if (hdfsSstFileOutput.toLowerCase.startsWith("file:///")) { + throw new IllegalArgumentException("Argument: -ho --hdfs_sst_file_output should not start with file:///") + } + + + val limitOption: String = cmd.getOptionValue("li") + val limit = if (limitOption != null && limitOption.nonEmpty) { + try { + s"LIMIT ${limitOption.toLong}" + } + catch { + case _: NumberFormatException => "" + } + } else "" + + //when date partition is used, we should use the LATEST data + val datePartitionKey: String = cmd.getOptionValue("pi") + val latestDate = cmd.getOptionValue("di") + + val repartitionNumberOpt = cmd.getOptionValue("ri") + val repartitionNumber: Option[Int] = + if (repartitionNumberOpt == null || repartitionNumberOpt.isEmpty) { + None + } else { + try { + Some(repartitionNumberOpt.toInt) + } catch { + case e: Exception => { + log.error(s"Argument: -ri --repartition_number_input should be int, but found:${repartitionNumberOpt}") + None + } + } + } + + // to test whether charset is supported + val charsetOpt = cmd.getOptionValue("hi") + val charset = + if (charsetOpt == null || charsetOpt.isEmpty) { + DefaultCharset + } else { + try { + try { + Charset.forName(charsetOpt) + charsetOpt + } catch { + case e: UnsupportedCharsetException => { + log.error(s"Argument: -hi --string_value_charset_input is a not supported charset:${repartitionNumberOpt}") + DefaultCharset + } + } + } + } + + // parse mapping file + val mappingConfiguration: MappingConfiguration = MappingConfiguration(mappingFileInput) + + val sparkConf = new SparkConf().setAppName("nebula-graph-sstFileGenerator") + val sc = new SparkContext(sparkConf) + val sqlContext = new HiveContext(sc) + + // to pass sst file dir to SstFileOutputFormat + sc.hadoopConfiguration.set(SSF_OUTPUT_LOCAL_DIR_CONF_KEY, localSstFileOutput) + sc.hadoopConfiguration.set(SSF_OUTPUT_HDFS_DIR_CONF_KEY, hdfsSstFileOutput) + + // disable file output compression, because rocksdb can't recognize it + sc.hadoopConfiguration.set(FileOutputFormat.COMPRESS, "false") + + // id generator lambda, use FNV hash for now + //TODO: support id generator function other than FNV hash + //TODO: handle hash collision, might cause data corruption + val idGeneratorFunction = mappingConfiguration.keyPolicy.map(_.toLowerCase) match { + case Some("hash_primary_key") => (key: String) => FNVHash.hash64(key) + case Some(a@_) => throw new IllegalStateException(s"Not supported key generator=${a}") + case None => (key: String) => FNVHash.hash64(key) + } + + // implicit ordering used by PairedRDD.repartitionAndSortWithinPartitions which's key is of type PartitionIdAndBytesEncoded + implicit def ordering[A <: GraphPartitionIdAndKeyValueEncoded]: Ordering[A] = new Ordering[A] { + override def compare(x: A, y: A): Int = { + x.valueEncoded.compareTo(y.valueEncoded) + } + } + + //1) handle vertex, encode all column except PK column as a single Tag's properties + mappingConfiguration.tags.zipWithIndex.foreach { + //tag index used as tagType + case (tag: Tag, tagType: Int) => { + //all column w/o PK column, allColumns does not include primaryKey + val (allColumns, _) = validateColumns(sqlContext, tag, Seq(tag.primaryKey), Seq(tag.primaryKey), mappingConfiguration.databaseName) + val columnExpression = { + if (allColumns.isEmpty) { + log.warn(s"Tag:${tag.name} in database doesn't has any column, so three will be no property defined.") + } + + allColumns.map(_.columnName).fold(tag.primaryKey) { (acc, column) => acc + "," + column } + } + + val whereClause = tag.typePartitionKey.map(key => s"${key}='${tag.name}' AND ${datePartitionKey}='${latestDate}'").getOrElse(s"${datePartitionKey}='${latestDate}'") + //TODO:to handle multiple partition columns' Cartesian product + val sql = s"SELECT ${columnExpression} FROM ${mappingConfiguration.databaseName}.${tag.tableName} WHERE ${whereClause} ${limit}" + log.debug(s"sql=s${sql}") + val tagDF = sqlContext.sql(sql) + //RDD[(businessKey->values)] + val tagKeyAndValues: RDD[(String, Seq[AnyRef])] = tagDF.map(row => { + (row.getAs[String](tag.primaryKey) + "_" + tag.tableName, //businessId_tableName will be unique, and used as key before HASH + allColumns.filter(!_.columnName.equalsIgnoreCase(tag.primaryKey)).map(valueExtractor(row, _, charset)) + ) + }) + + tagKeyAndValues.map { + case (key, values) => { + val vertexId: Long = idGeneratorFunction.apply(key) + // hash function generated sign long, but partition id should be unsigned + val graphPartitionId: Int = (Math.abs(vertexId) % mappingConfiguration.partitions).asInstanceOf[Int] + + // use NativeClient to generate key and encode values + val keyEncoded: Array[Byte] = NativeClient.createVertexKey(graphPartitionId, vertexId, tagType, DefaultVersion) + val valuesEncoded: Array[Byte] = NativeClient.encode(values.toArray) + log.debug(s"Tag(partition=${graphPartitionId}): " + DatatypeConverter.printHexBinary(keyEncoded) + " = " + DatatypeConverter.printHexBinary(valuesEncoded)) + (GraphPartitionIdAndKeyValueEncoded(graphPartitionId, tagType, new BytesWritable(keyEncoded)), new PropertyValueAndTypeWritable(new BytesWritable(valuesEncoded))) + } + }.repartitionAndSortWithinPartitions(new SortByKeyPartitioner(repartitionNumber.getOrElse(tagKeyAndValues.partitions.length))).saveAsNewAPIHadoopFile(localSstFileOutput, classOf[GraphPartitionIdAndKeyValueEncoded], classOf[PropertyValueAndTypeWritable], classOf[SstFileOutputFormat]) + } + } + + // For now nebula doesn't support expanding through all edgeTypes(The wildcard in the following nGQL `go from src over * where $.prop1="pin2mac" yield src.id, dst.id`) + // so we workaround it: All edges are of same type, and give it a fixed name. Using edge name as a extra property to distinguish between them. + // TODO: when nebula support the above feature, we should undo those changes. + //2) handle edges + mappingConfiguration.edges.zipWithIndex.foreach { + //edge index used as edge_type + case (edge: Edge, edgeType: Int) => { + //all column w/o PK column + val (allColumns, _) = validateColumns(sqlContext, edge, Seq(edge.fromForeignKeyColumn), Seq(edge.fromForeignKeyColumn, edge.toForeignKeyColumn), mappingConfiguration.databaseName) + + val columnExpression = { + assert(allColumns.size > 0) + s"${edge.fromForeignKeyColumn},${edge.toForeignKeyColumn}," + allColumns.map(_.columnName).mkString(",") + } + + val whereClause = edge.typePartitionKey.map(key => s"${key}='${edge.name}' AND ${datePartitionKey}='${latestDate}'").getOrElse(s"${datePartitionKey}='${latestDate}'") + + //TODO: join FROM_COLUMN and join TO_COLUMN from the table where this columns referencing, to make sure that the claimed id really exists in the reference table.BUT with HUGE Perf penalty + val edgeDf = sqlContext.sql(s"SELECT ${columnExpression} FROM ${mappingConfiguration.databaseName}.${edge.tableName} WHERE ${whereClause} ${limit}") + assert(edgeDf.count() > 0) + //RDD[Tuple3(from_vertex_businessKey,end_vertex_businessKey,values)] + val edgeKeyAndValues: RDD[(String, String, Seq[AnyRef])] = edgeDf.map(row => { + (row.getAs[String](edge.fromForeignKeyColumn), // consistent with vertexId generation logic, to make sure that vertex and its' outbound edges are in the same partition + row.getAs[String](edge.toForeignKeyColumn), + allColumns.filterNot(col => (col.columnName.equalsIgnoreCase(edge.fromForeignKeyColumn) || col.columnName.equalsIgnoreCase(edge.toForeignKeyColumn))).map(valueExtractor(row, _, charset)) + ) + }) + + edgeKeyAndValues.map { + case (srcIDString, dstIdString, values) => { + val id = idGeneratorFunction.apply(srcIDString) + val graphPartitionId: Int = (Math.abs(id) % mappingConfiguration.partitions).asInstanceOf[Int] + + val srcId = Math.abs(idGeneratorFunction.apply(srcIDString)) + val dstId = Math.abs(idGeneratorFunction.apply(dstIdString)) + + // TODO: support edge ranking,like create_time desc + val keyEncoded = NativeClient.createEdgeKey(graphPartitionId, srcId, 1, -1L, dstId, DefaultVersion) + //val keyEncoded = NativeClient.createEdgeKey(partitionId, srcId, edgeType, -1L, dstId, DefaultVersion) + + // TODO: only support a single edge type , put edge_type value in 0th index. Nebula server side must define extra edge property: edge_type + val valuesEncoded: Array[Byte] = NativeClient.encode((edge.name.getBytes(charset) +: values).toArray) + //val valuesEncoded: Array[Byte] = NativeClient.encode(values.toArray) + log.debug(s"Edge(partition=${graphPartitionId}): " + DatatypeConverter.printHexBinary(keyEncoded) + " = " + DatatypeConverter.printHexBinary(valuesEncoded)) + (GraphPartitionIdAndKeyValueEncoded(graphPartitionId, 1, new BytesWritable(keyEncoded)), new PropertyValueAndTypeWritable(new BytesWritable(valuesEncoded), VertexOrEdgeEnum.Edge)) + } + }.repartitionAndSortWithinPartitions(new SortByKeyPartitioner(repartitionNumber.getOrElse(edgeKeyAndValues.partitions.length))).saveAsNewAPIHadoopFile(localSstFileOutput, classOf[GraphPartitionIdAndKeyValueEncoded], classOf[PropertyValueAndTypeWritable], classOf[SstFileOutputFormat]) + } + } + } + + /** + * extract value from a column + */ + private def valueExtractor(row: Row, col: Column, charset: String) = { + col.`type`.toUpperCase match { + case "INTEGER" => Int.box(row.getAs[Int](col.columnName)) + case "STRING" => row.getAs[String](col.columnName).getBytes(charset) + case "FLOAT" => Float.box(row.getAs[Float](col.columnName)) + case "LONG" => Long.box(row.getAs[Long](col.columnName)) + case "DOUBLE" => Double.box(row.getAs[Double](col.columnName)) + case "BOOL" => Boolean.box(row.getAs[Boolean](col.columnName)) + case a@_ => throw new IllegalStateException(s"Unsupported edge data type ${a}") + } + } + + /** + * check that columns claimed in mapping configuration file are indeed defined in db(hive) + * and its type is compatible, when not, throw exception, return all required column definitions + * + * @return Tuple2(AllColumns w/o partition columns, partition columns) + */ + private def validateColumns(sqlContext: HiveContext, edge: WithColumnMapping, colsMustCheck: Seq[String], colsMustFilter: Seq[String], databaseName: String): (Seq[Column], Seq[String]) = { + val descriptionDF = sqlContext.sql(s"DESC ${databaseName}.${edge.tableName}") + // all columns' name ---> type mapping in db + val allColumnsMapInDB: Seq[(String, String)] = descriptionDF.map { + case Row(colName: String, colType: String, _) => { + (colName.toUpperCase, colType.toUpperCase) + } + }.collect.toSeq + + // columns that generated by DESC are separated by comments, before comments are non-partition columns, after comments are partition columns + val commentsStart = allColumnsMapInDB.indexWhere(_._1.startsWith("#")) + val (allColumnMap, partitionColumns): (Map[String, String], Seq[(String, String)]) = + if (commentsStart == -1) { + (allColumnsMapInDB.toMap, Seq.empty[(String, String)]) + } + else { + val commentsEnd = allColumnsMapInDB.lastIndexWhere(_._1.startsWith("#")) + assert((commentsEnd >= commentsStart) && ((commentsEnd + 1) < allColumnsMapInDB.size)) + // all columns except partition columns + (allColumnsMapInDB.slice(0, commentsStart).toMap, allColumnsMapInDB.slice(commentsEnd + 1, allColumnsMapInDB.size)) + } + + // check the claimed columns really exist in db + colsMustCheck.map(_.toUpperCase).foreach { + col => + if (allColumnMap.get(col).isEmpty) { + throw new IllegalStateException(s"${edge.name}'s from column: ${col} not defined in table=${edge.tableName}") + } + } + + if (edge.columnMappings.isEmpty) { + //only (from,to) columns are checked, but all columns should be returned + (allColumnMap.filter(!partitionColumns.contains(_)).filter(!colsMustFilter.contains(_)).map { + case (colName, colType) => { + Column(colName, colName, colType) // propertyName default=colName + } + }.toSeq, partitionColumns.map(_._1)) + } + else { + // tag/edge's columnMappings should be checked and returned + val columnMappings = edge.columnMappings.get + val notValid = columnMappings.filter( + col => { + val typeInDb = allColumnMap.get(col.columnName.toUpperCase) + typeInDb.isEmpty || !DataTypeCompatibility.isCompatible(col.`type`, typeInDb.get) + } + ).map { + case col => s"name=${col.columnName},type=${col.`type`}" + } + + if (notValid.nonEmpty) { + throw new IllegalStateException(s"${edge.name}'s columns: ${notValid.mkString("\t")} not defined in or compatible with db's definitions") + } + else { + (columnMappings, partitionColumns.map(_._1)) + } + } + } +} diff --git a/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/SstFileOutputFormat.scala b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/SstFileOutputFormat.scala new file mode 100644 index 00000000000..16db6c3d279 --- /dev/null +++ b/src/tools/spark-sstfile-generator/src/main/scala/com/vesoft/tools/SstFileOutputFormat.scala @@ -0,0 +1,233 @@ +package com.vesoft.tools + +import java.io.{File, IOException} + +import com.vesoft.tools.SparkSstFileGenerator.GraphPartitionIdAndKeyValueEncoded +import com.vesoft.tools.VertexOrEdgeEnum.VertexOrEdgeEnum +import javax.xml.bind.DatatypeConverter +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.mapred.InvalidJobConfException +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.hadoop.mapreduce.security.TokenCache +import org.rocksdb.{EnvOptions, Options, SstFileWriter} +import org.slf4j.LoggerFactory + +import scala.collection.mutable +import scala.sys.process._ + +/** + * Custom outputFormat, which generate a sub dir per partition per worker, the local dir structure of EACH worker node: + * + * ${LOCAL_ROOT} (local dir will be stripped off when `hdfs -copyFromLocal`, specified by user through cmd line) + * |---1 (this is PARTITION number) + * | | ---- vertex-${FIRST_KEY_IN_THIS_FILE}.sst + * | | ---- vertex-${FIRST_KEY_IN_THIS_FILE}.sst + * | | ---- edge-${FIRST_KEY_IN_THIS_FILE}.sst + * | | ---- edge-${FIRST_KEY_IN_THIS_FILE}.sst + * |---2 + * | ---- vertex-${FIRST_KEY_IN_THIS_FILE}.sst + * | ---- edge-${FIRST_KEY_IN_THIS_FILE}.sst + * | ---- edge-${FIRST_KEY_IN_THIS_FILE}.sst + * .... + * + * Sst file name convention is {TYPE}-${FIRST_KEY_IN_THIS_FILE}.sst, where type=vertex OR edge, FIRST_KEY_IN_THIS_FILE is the first key the file sees. + * This combination will make the sst file name unique between all worker nodes. + * + * After hdfs -copyFromLocal ,the final hdfs dir layout is: + * + * ${HDFS_ROOT} (specified by user through cmd line) + * |---1 (this is PARTITION number, will hold all sst file from every single worker node with the same PARTITION number) + * | | ---- vertex-${FIRST_KEY_IN_THIS_FILE}.sst(may be from worker node#1) + * | | ---- vertex-${FIRST_KEY_IN_THIS_FILE}.sst(may be from worker node#2) + * | | ---- edge-${FIRST_KEY_IN_THIS_FILE}.sst(may be from worker node#1) + * | | ---- edge-${FIRST_KEY_IN_THIS_FILE}.sst(may be from worker node#2) + * |---2 (same as above) + * | ---- vertex-${FIRST_KEY_IN_THIS_FILE}.sst + * + **/ +class SstFileOutputFormat extends FileOutputFormat[GraphPartitionIdAndKeyValueEncoded, PropertyValueAndTypeWritable] { + override def getRecordWriter(job: TaskAttemptContext): RecordWriter[GraphPartitionIdAndKeyValueEncoded, PropertyValueAndTypeWritable] = { + if (FileOutputFormat.getCompressOutput(job)) { + job.getConfiguration.setBoolean(FileOutputFormat.COMPRESS, false) + } + + val sstFileOutput = job.getConfiguration.get(SparkSstFileGenerator.SSF_OUTPUT_LOCAL_DIR_CONF_KEY) + new SstRecordWriter(sstFileOutput, job.getConfiguration) + } + + // not check output dir exist, for edge&vertex data are coexist in the same partition dir + override def checkOutputSpecs(job: JobContext): Unit = { + val outDir = FileOutputFormat.getOutputPath(job) + if (outDir == null) throw new InvalidJobConfException("Output directory not set.") + // get delegation token for outDir's file system + TokenCache.obtainTokensForNamenodes(job.getCredentials, Array[Path](outDir), job.getConfiguration) + } +} + +/** + * custom outputFormat, which generate a sub dir per partition per worker + * + * @param localSstFileOutput spark conf item,which points to the local dir where rocksdb sst files will be put + * @param configuration hadoop configuration + */ +class SstRecordWriter(localSstFileOutput: String, configuration: Configuration) extends RecordWriter[GraphPartitionIdAndKeyValueEncoded, PropertyValueAndTypeWritable] { + private[this] val log = LoggerFactory.getLogger(this.getClass) + + /** + * all sst files opened, (vertexOrEdge,partitionId)->(SstFileWriter,localSstFilePath,hdfsParentPath) + */ + private var sstFilesMap = mutable.Map.empty[(VertexOrEdgeEnum, Int), (SstFileWriter, String, String)] + + /** + * need local file system only, for rocksdb sstFileWriter can't write to remote file system(only can back up to HDFS) for now + */ + private val localFileSystem = FileSystem.get(new Configuration(false)) + + /** + * hdfs file system, for create destination sst file dir + */ + private val hdfsFileSystem = FileSystem.get(configuration) + + if (!hdfsFileSystem.getScheme.equalsIgnoreCase("hdfs")) { + throw new IllegalStateException("File system is not hdfs") + } + + /** + * which dir in hdfs to put sst files + */ + private val hdfsParentDir = configuration.get(SparkSstFileGenerator.SSF_OUTPUT_HDFS_DIR_CONF_KEY) + + log.debug(s"SstRecordWriter read hdfs dir:${hdfsParentDir}") + + // all RocksObject should be closed + private var env: EnvOptions = _ + private var options: Options = _ + + override def write(key: GraphPartitionIdAndKeyValueEncoded, value: PropertyValueAndTypeWritable): Unit = { + var sstFileWriter: SstFileWriter = null + + //cache per partition per vertex/edge type's sstfile writer + val sstWriterOptional = sstFilesMap.get((value.vertexOrEdgeEnum, key.partitionId)) + if (sstWriterOptional.isEmpty) { + /** + * https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files + * + * Please note that: + * + *1. Options passed to SstFileWriter will be used to figure out the table type, compression options, etc that will be used to create the SST file. + *2. The Comparator that is passed to the SstFileWriter must be exactly the same as the Comparator used in the DB that this file will be ingested into. + *3. Rows must be inserted in a strictly increasing order. + */ + //TODO: how to make sure the options used here to be consistent with the server's + // What's the fastest way to load data into RocksDB: https://rocksdb.org.cn/doc/RocksDB-FAQ.html + env = new EnvOptions + // TODO: prove setWritableFileMaxBufferSize is working? + options = new Options().setCreateIfMissing(true).setCreateMissingColumnFamilies(true).setWritableFileMaxBufferSize(1024 * 100).setMaxBackgroundFlushes(5).prepareForBulkLoad() + sstFileWriter = new SstFileWriter(env, options) + + // TODO: rolling to another file when file size > some THRESHOLD, or some other criteria + // Each partition can generated multiple sst files, among which keys will be ordered, and keys could overlap between different sst files. + // All these sst files will be `hdfs -copyFromLocal` to the same HDFS dir(and consumed by subsequent nebula `IMPORT` command), so we need different suffixes to distinguish between them. + val hdfsSubDirectory = s"${File.separator}${key.partitionId}${File.separator}" + + val localDir = s"${localSstFileOutput}${hdfsSubDirectory}" + val sstFileName = s"${value.vertexOrEdgeEnum}-${key.`type`}-${DatatypeConverter.printHexBinary(key.valueEncoded.getBytes)}.sst".toLowerCase + + val localDirPath = new Path(localDir) + if (!localFileSystem.exists(localDirPath)) { + localFileSystem.mkdirs(localDirPath) + } + else { + if (localFileSystem.isFile(localDirPath)) { + localFileSystem.delete(localDirPath, true) + } + } + + var localSstFile = s"${localDir}${sstFileName}" + localFileSystem.create(new Path(localSstFile)) + + if (localSstFile.startsWith("file:///")) { + localSstFile = localSstFile.substring(7) + } + + sstFileWriter.open(localSstFile) + sstFilesMap += (value.vertexOrEdgeEnum, key.partitionId) -> (sstFileWriter, localSstFile, hdfsSubDirectory) + } else { + sstFileWriter = sstWriterOptional.get._1 + } + + //TODO: could be batched? + sstFileWriter.put(key.valueEncoded.getBytes(), value.values.getBytes) + } + + override def close(context: TaskAttemptContext): Unit = { + if (env != null) { + env.close() + } + + if (options != null) { + options.close() + } + + sstFilesMap.values.foreach { case (sstFile, localSstFile, hdfsDirectory) => + try { + sstFile.finish() + sstFile.close() + + runHdfsCopyFromLocal(localSstFile, hdfsDirectory) + } + catch { + case e: Exception => { + log.error("Error when closing a sst file", e) + } + } + + try { + // There could be multiple containers on a single host, parent dir are shared between multiple containers, + // so should not delete parent dir but delete individual file + localFileSystem.delete(new Path(localSstFile), true) + } + catch { + case e: Exception => { + log.error(s"Error when deleting local dir:${localSstFileOutput}", e) + } + } + } + } + + /** + * assembly a hdfs dfs -copyFromLocal command line to put local sst file to hdfs, $HADOOP_HOME env need to be set + */ + private def runHdfsCopyFromLocal(localSstFile: String, hdfsDirectory: String) = { + var hadoopHome = System.getenv("HADOOP_HOME") + while (hadoopHome.endsWith("/")) { + hadoopHome = hadoopHome.stripSuffix("/") + } + + val destinationHdfsDir = s"${hdfsParentDir}${hdfsDirectory}" + val destinationPath = new Path(destinationHdfsDir) + + try { + if (!hdfsFileSystem.exists(destinationPath)) { + hdfsFileSystem.mkdirs(destinationPath) + } + } + catch { + case e: IOException => { + log.error(s"Error when making hdfs dir ${destinationPath}", e) + throw e + } + } + + val command = List(s"${hadoopHome}/bin/hdfs", "dfs", "-copyFromLocal", s"${localSstFile}", destinationHdfsDir) + val exitCode = command.! + log.debug(s"Running command:${command.mkString(" ")}, exitCode=${exitCode}") + + if (exitCode != 0) { + throw new IllegalStateException(s"Can't put local file `${localSstFile}` to hdfs dir: `${destinationHdfsDir}`," + + "sst files will reside on each worker's local file system only! Need to run `hdfs dfs -copyFromLocal` manually") + } + } +} diff --git a/src/tools/spark-sstfile-generator/src/test/resources/mapping-ill-format.json b/src/tools/spark-sstfile-generator/src/test/resources/mapping-ill-format.json new file mode 100644 index 00000000000..6720566e6e6 --- /dev/null +++ b/src/tools/spark-sstfile-generator/src/test/resources/mapping-ill-format.json @@ -0,0 +1,58 @@ +{ + "fma": { + "key_policy": "hash_primary_key", + "partitions": 3, + "tags": [ + { + "table_name": "dmt_risk_graph_idmp_node_s_d", + "tag_name": "mac", + "date_partition_key": "dt", + "type_partition_key": "flag", + "primary_key": "node", + "mappings": [ + { + "src_pri_value": { + "name": "src_pri_value", + "type": "string" + } + } + ] + }, + { + "table_name": "dmt_risk_graph_idmp_node_s_d", + "tag_name": "user_pin", + "date_partition_key": "dt", + "type_partition_key": "flag", + "primary_key": "node", + "mappings": [ + { + "src_pri_value": { + "name": "src_pri_value", + "type": "string" + } + } + ] + } + ], + "edges": [ + { + "table_name": "dmt_risk_graph_idmp_edge_s_d", + "edge_name": "pin2mac", + "date_partition_key": "dt", + "type_partition_key": "flag", + "from_foreign_key_column": "from_node", + "from_tag": "user_pin", + "to_foreign_key_column": "to_node", + "to_tag": "qq", + "mappings": [ + { + "src_pri_value": { + "name": "src_pri_value", + "type": "string" + } + } + ] + } + ] + } +} \ No newline at end of file diff --git a/src/tools/spark-sstfile-generator/src/test/resources/mapping.json b/src/tools/spark-sstfile-generator/src/test/resources/mapping.json new file mode 100644 index 00000000000..56a94f70032 --- /dev/null +++ b/src/tools/spark-sstfile-generator/src/test/resources/mapping.json @@ -0,0 +1,58 @@ +{ + "fma": { + "key_policy": "hash_primary_key", + "partitions": 3, + "tags": [ + { + "table_name": "dmt_risk_graph_idmp_node_s_d", + "tag_name": "mac", + "date_partition_key": "dt", + "type_partition_key": "flag", + "primary_key": "node", + "mappings": [ + { + "src_pri_value": { + "name": "src_pri_value", + "type": "string" + } + } + ] + }, + { + "table_name": "dmt_risk_graph_idmp_node_s_d", + "tag_name": "user_pin", + "date_partition_key": "dt", + "type_partition_key": "flag", + "primary_key": "node", + "mappings": [ + { + "src_pri_value": { + "name": "src_pri_value", + "type": "string" + } + } + ] + } + ], + "edges": [ + { + "table_name": "dmt_risk_graph_idmp_edge_s_d", + "edge_name": "pin2mac", + "date_partition_key": "dt", + "type_partition_key": "flag", + "from_foreign_key_column": "from_node", + "from_tag": "user_pin", + "to_foreign_key_column": "to_node", + "to_tag": "mac", + "mappings": [ + { + "src_pri_value": { + "name": "src_pri_value", + "type": "string" + } + } + ] + } + ] + } +} \ No newline at end of file diff --git a/src/tools/spark-sstfile-generator/src/test/scala/com/vesoft/tools/MappingConfigurationTest.scala b/src/tools/spark-sstfile-generator/src/test/scala/com/vesoft/tools/MappingConfigurationTest.scala new file mode 100644 index 00000000000..1084b2140da --- /dev/null +++ b/src/tools/spark-sstfile-generator/src/test/scala/com/vesoft/tools/MappingConfigurationTest.scala @@ -0,0 +1,102 @@ +package com.vesoft.tools + +import org.scalatest.Matchers._ +import org.scalatest.{BeforeAndAfter, FlatSpec} +import play.api.libs.json.Json + +import scala.io.Source +import scala.language.implicitConversions + +class MappingConfigurationTest extends FlatSpec with BeforeAndAfter { + "an Edge" should "be serializable to and deserializable from json" in { + // edge w/o properties + val emptyPropertyEdge = Edge("table1", "edge1", "from_column1", "table1", "to_column1", "table2", Some("dt"), Some("pin2mac")) + val jsonString = Json.toJson(emptyPropertyEdge).toString() + val expectedResult = + """{"table_name":"table1","edge_name":"edge1","date_partition_key":"dt","type_partition_key":"pin2mac","from_foreign_key_column":"from_column1","from_tag":"table1","to_foreign_key_column":"to_column1","to_tag":"table2"}""" + assert(expectedResult == jsonString) + + val deserializedEdge = Json.parse(expectedResult).as[Edge] + assert(deserializedEdge == emptyPropertyEdge) + + // edge w/ properties + val withPropertiesEdge = Edge("table1", "edge1", "from_column1", "table1", "to_column1", "table2", Some("dt"), Some("pin2mac"), Some(Seq(Column("col1", "prop1"), Column("col2", "prop2")))) + val withPropertiesEdgeJsonString = Json.toJson(withPropertiesEdge).toString() + val expectedResultWithPropertiesEdge = + """{"table_name":"table1","edge_name":"edge1","date_partition_key":"dt","type_partition_key":"pin2mac","from_foreign_key_column":"from_column1","from_tag":"table1","to_foreign_key_column":"to_column1","to_tag":"table2","mappings":[{"col1":{"name":"prop1","type":"string"}},{"col2":{"name":"prop2","type":"string"}}]}""" + assert(expectedResultWithPropertiesEdge == withPropertiesEdgeJsonString) + } + + "a Tag" should "be serializable to and deserializable from json" in { + // tag w/o properties + val emptyPropertyTag = Tag("table1", "tag1", "column1") + val jsonString = Json.toJson(emptyPropertyTag).toString() + val expectedResult ="""{"table_name":"table1","tag_name":"tag1","primary_key":"column1","date_partition_key":null,"type_partition_key":null}""" + assert(expectedResult == jsonString) + + val deserializedTag = Json.parse(expectedResult).as[Tag] + assert(deserializedTag == emptyPropertyTag) + + // tag w/ properties + val withPropertiesTag = Tag("table1", "tag1", "column1", Some("dt"), Some("pin2mac"), Some(Seq(Column("col1", "prop1"), Column("col2", "prop2")))) + val withPropertiesTagJsonString = Json.toJson(withPropertiesTag).toString() + val expectedResultWithPropertiesTag = + """{"table_name":"table1","tag_name":"tag1","primary_key":"column1","date_partition_key":"dt","type_partition_key":"pin2mac","mappings":[{"col1":{"name":"prop1","type":"string"}},{"col2":{"name":"prop2","type":"string"}}]}""" + assert(expectedResultWithPropertiesTag == withPropertiesTagJsonString) + } + + "a MappingConfiguration" should "be serializable to and deserializable from json" in { + // MappingConfiguration w/o custom ken generation policy + val tag1 = Tag("table1", "tag1", "column1", Some("dt"), Some("pin2mac"), Some(Seq(Column("col1", "prop1"), Column("col2", "prop2")))) + val tag2 = Tag("table2", "tag2", "column2", Some("dt"), Some("pin2qq"), Some(Seq(Column("col1", "prop1"), Column("col2", "prop2")))) + val tag3 = Tag("table3", "tag3", "column3", Some("dt"), Some("pin2wallet"), Some(Seq(Column("col1", "prop1"), Column("col2", "prop2")))) + + // edge w/ properties + val edge1 = Edge("table4", "edge1", "from_column1", "tag1", "to_column1", "tag2", Some("dt"), Some("pin2qq"), Some(Seq(Column("col1", "prop1"), Column("col2", "prop2")))) + val edge2 = Edge("table5", "edge2", "from_column1", "tag2", "to_column1", "tag3", Some("dt"), Some("pin2mac"), Some(Seq(Column("col1", "prop1"), Column("col2", "prop2")))) + + val mappingConfig = MappingConfiguration("fma", 3, Seq(tag1, tag2, tag3), Seq(edge1, edge2)) + val jsonString = Json.toJson(mappingConfig).toString() + val expectedResult = + """{"fma":{"key_policy":"hash_primary_key","partitions":3,"tags":[{"table_name":"table1","tag_name":"tag1","primary_key":"column1","date_partition_key":"dt","type_partition_key":"pin2mac","mappings":[{"col1":{"name":"prop1","type":"string"}},{"col2":{"name":"prop2","type":"string"}}]},{"table_name":"table2","tag_name":"tag2","primary_key":"column2","date_partition_key":"dt","type_partition_key":"pin2qq","mappings":[{"col1":{"name":"prop1","type":"string"}},{"col2":{"name":"prop2","type":"string"}}]},{"table_name":"table3","tag_name":"tag3","primary_key":"column3","date_partition_key":"dt","type_partition_key":"pin2wallet","mappings":[{"col1":{"name":"prop1","type":"string"}},{"col2":{"name":"prop2","type":"string"}}]}],"edges":[{"table_name":"table4","edge_name":"edge1","date_partition_key":"dt","type_partition_key":"pin2qq","from_foreign_key_column":"from_column1","from_tag":"tag1","to_foreign_key_column":"to_column1","to_tag":"tag2","mappings":[{"col1":{"name":"prop1","type":"string"}},{"col2":{"name":"prop2","type":"string"}}]},{"table_name":"table5","edge_name":"edge2","date_partition_key":"dt","type_partition_key":"pin2mac","from_foreign_key_column":"from_column1","from_tag":"tag2","to_foreign_key_column":"to_column1","to_tag":"tag3","mappings":[{"col1":{"name":"prop1","type":"string"}},{"col2":{"name":"prop2","type":"string"}}]}]}}""" + assert(expectedResult == jsonString) + + val deserializedMappingConfig = Json.parse(expectedResult).as[MappingConfiguration] + assert(deserializedMappingConfig == mappingConfig) + + // MappingConfiguration w/ custom ken generation policy + val mappingConfigWithCustomPolicy = MappingConfiguration("fma", 3, Seq(tag1, tag2, tag3), Seq(edge1, edge2), Some("some_other_policy")) + val mappingConfigWithCustomPolicyJson = Json.toJson(mappingConfigWithCustomPolicy).toString() + val mappingConfigWithCustomPolicyExpected = + """{"fma":{"key_policy":"some_other_policy","partitions":3,"tags":[{"table_name":"table1","tag_name":"tag1","primary_key":"column1","date_partition_key":"dt","type_partition_key":"pin2mac","mappings":[{"col1":{"name":"prop1","type":"string"}},{"col2":{"name":"prop2","type":"string"}}]},{"table_name":"table2","tag_name":"tag2","primary_key":"column2","date_partition_key":"dt","type_partition_key":"pin2qq","mappings":[{"col1":{"name":"prop1","type":"string"}},{"col2":{"name":"prop2","type":"string"}}]},{"table_name":"table3","tag_name":"tag3","primary_key":"column3","date_partition_key":"dt","type_partition_key":"pin2wallet","mappings":[{"col1":{"name":"prop1","type":"string"}},{"col2":{"name":"prop2","type":"string"}}]}],"edges":[{"table_name":"table4","edge_name":"edge1","date_partition_key":"dt","type_partition_key":"pin2qq","from_foreign_key_column":"from_column1","from_tag":"tag1","to_foreign_key_column":"to_column1","to_tag":"tag2","mappings":[{"col1":{"name":"prop1","type":"string"}},{"col2":{"name":"prop2","type":"string"}}]},{"table_name":"table5","edge_name":"edge2","date_partition_key":"dt","type_partition_key":"pin2mac","from_foreign_key_column":"from_column1","from_tag":"tag2","to_foreign_key_column":"to_column1","to_tag":"tag3","mappings":[{"col1":{"name":"prop1","type":"string"}},{"col2":{"name":"prop2","type":"string"}}]}]}}""" + assert(mappingConfigWithCustomPolicyExpected == mappingConfigWithCustomPolicyJson) + + val deserializedMappingConfigWithCustomPolicy = Json.parse(mappingConfigWithCustomPolicyExpected).as[MappingConfiguration] + assert(deserializedMappingConfigWithCustomPolicy == mappingConfigWithCustomPolicy) + } + + "a MappingConfiguration" should "be constructed from a configuration file" in { + val config = MappingConfiguration("mapping.json") + assert(config != null) + assert(config.databaseName == "fma") + assert(config.keyPolicy.get == "hash_primary_key") + + val tag1 = Tag("dmt_risk_graph_idmp_node_s_d", "mac", "node", Some("dt"), Some("flag"), Some(Seq(Column("src_pri_value", "src_pri_value", "string")))) + val tag2 = Tag("dmt_risk_graph_idmp_node_s_d", "user_pin", "node", Some("dt"), Some("flag"), Some(Seq(Column("src_pri_value", "src_pri_value", "string")))) + + // edge w/ properties + val edge1 = Edge("dmt_risk_graph_idmp_edge_s_d", "pin2mac", "from_node", "user_pin", "to_node", "mac", Some("dt"), Some("flag"), Some(Seq(Column("src_pri_value", "src_pri_value", "string")))) + + Seq(tag1, tag2) should contain theSameElementsInOrderAs config.tags + List(edge1) should contain theSameElementsInOrderAs config.edges + } + + "a MappingConfiguration" should "be throw exception from an ill-formatted configuration file" in { + intercept[IllegalStateException] { + val bufferedSource = Source.fromInputStream(getClass.getClassLoader.getResourceAsStream("mapping-ill-format.json")) + val toString = bufferedSource.mkString + Json.parse(toString).as[MappingConfiguration] + bufferedSource.close + } + } +} diff --git a/src/tools/spark-sstfile-generator/src/test/scala/com/vesoft/tools/SparkSstFileGeneratorTest.scala b/src/tools/spark-sstfile-generator/src/test/scala/com/vesoft/tools/SparkSstFileGeneratorTest.scala new file mode 100644 index 00000000000..8a3b289b28f --- /dev/null +++ b/src/tools/spark-sstfile-generator/src/test/scala/com/vesoft/tools/SparkSstFileGeneratorTest.scala @@ -0,0 +1,13 @@ +package com.vesoft.tools + +import com.vesoft.client.NativeClient +import org.scalatest.{BeforeAndAfter, FlatSpec} + +class SparkSstFileGeneratorTest extends FlatSpec with BeforeAndAfter { + // need to put the dir which contains nebula_native_client.so in java.library.path before run this test + "an Seq[Any]" should "be encoded by NativeClient" in { + val values = Seq("Hello World".getBytes("UTF-8"), Boolean.box(false), Long.box(1024L), Int.box(7), Float.box(3.1415f), Double.box(9.12)) + val bytes: Array[Byte] = NativeClient.encode(values.toArray) + assert(bytes == Array(0, 11, 96, -28, -108, -21, 0, 0, 0, 0, -72, 65, 20, 7, 86, 14, 73, 64, 61, 10, -41, -93, 112, 61, 34, 64)) + } +}