diff --git a/conf-template/client_import/csv_datasource.conf b/conf-template/client_import/csv_datasource.conf new file mode 100644 index 00000000..9936bdda --- /dev/null +++ b/conf-template/client_import/csv_datasource.conf @@ -0,0 +1,94 @@ +# Use the command to submit the exchange job: + +# spark-submit \ +# --master "spark://master_ip:7077" \ +# --driver-memory=2G --executor-memory=30G \ +# --num-executors=3 --executor-cores=20 \ +# --class com.vesoft.nebula.exchange.Exchange \ +# nebula-exchange-3.0-SNAPSHOT.jar -c csv_datasource.conf + +{ + # Spark config + spark: { + app: { + name: NebulaGraph Exchange + } + } + + # Nebula Graph config + nebula: { + address:{ + graph:["127.0.0.1:9669"] + # if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta. + # use `SHOW meta leader` to see your meta leader's address + meta:["127.0.0.1:9559"] + } + user: root + pswd: nebula + space: test + + # nebula client connection parameters + connection { + # socket connect & execute timeout, unit: millisecond + timeout: 30000 + } + + error: { + # max number of failures, if the number of failures is bigger than max, then exit the application. + max: 32 + # failed data will be recorded in output path, format with ngql + output: /tmp/errors + } + + # use google's RateLimiter to limit the requests send to NebulaGraph + rate: { + # the stable throughput of RateLimiter + limit: 1024 + # Acquires a permit from RateLimiter, unit: MILLISECONDS + # if it can't be obtained within the specified timeout, then give up the request. + timeout: 1000 + } + } + + # Processing tags + tags: [ + { + name: tag-name-1 + type: { + source: csv + sink: client + } + # if your file in not in hdfs, config "file:///path/test.csv" + path: "hdfs://ip:port/path/test.csv" + # if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields + fields: [csv-field-0, csv-field-1, csv-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: csv-field-0 + separator: "," + header: true + batch: 2000 + partition: 60 + } + ] + + # process edges + edges: [ + { + name: edge-name-1 + type: { + source: csv + sink: client + } + path: "hdfs://ip:port/path/test.csv" + fields: [csv-field-0, csv-field-1, csv-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + source: csv-field-0 + target: csv-field-1 + ranking: csv-field-2 + separator: "," + header: true + batch: 2000 + partition: 60 + } + ] +} diff --git a/conf-template/client_import/hive_datasource.conf b/conf-template/client_import/hive_datasource.conf new file mode 100644 index 00000000..2b2c6e26 --- /dev/null +++ b/conf-template/client_import/hive_datasource.conf @@ -0,0 +1,88 @@ +# Use the command to submit the exchange job: + +# spark-submit \ +# --master "spark://master_ip:7077" \ +# --driver-memory=2G --executor-memory=30G \ +# --num-executors=3 --executor-cores=20 \ +# --class com.vesoft.nebula.exchange.Exchange \ +# nebula-exchange-3.0-SNAPSHOT.jar -c csv_datasource.conf + +{ + # Spark config + spark: { + app: { + name: NebulaGraph Exchange + } + } + + # Nebula Graph config + nebula: { + address:{ + graph:["127.0.0.1:9669"] + # if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta. + # use `SHOW meta leader` to see your meta leader's address + meta:["127.0.0.1:9559"] + } + user: root + pswd: nebula + space: test + + # nebula client connection parameters + connection { + # socket connect & execute timeout, unit: millisecond + timeout: 30000 + } + + error: { + # max number of failures, if the number of failures is bigger than max, then exit the application. + max: 32 + # failed data will be recorded in output path, format with ngql + output: /tmp/errors + } + + # use google's RateLimiter to limit the requests send to NebulaGraph + rate: { + # the stable throughput of RateLimiter + limit: 1024 + # Acquires a permit from RateLimiter, unit: MILLISECONDS + # if it can't be obtained within the specified timeout, then give up the request. + timeout: 1000 + } + } + + # Processing tags + tags: [ + { + name: tag-name-1 + type: { + source: hive + sink: client + } + exec: "select hive-field0, hive-field1, hive-field2 from database.table" + fields: [hive-field-0, hive-field-1, hive-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: hive-field-0 + batch: 2000 + partition: 60 + } + ] + + # process edges + edges: [ + { + name: edge-name-1 + type: { + source: hive + sink: client + } + exec: "select hive-field0, hive-field1, hive-field2 from database.table" + fields: [ hive-field-0, hive-field-1, hive-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + source: hive-field-0 + target: hive-field-1 + ranking: hive-filed-2 + batch: 2000 + partition: 60 + } + ] +} diff --git a/conf-template/sst_import/csv_datasource.conf b/conf-template/sst_import/csv_datasource.conf new file mode 100644 index 00000000..dd20466d --- /dev/null +++ b/conf-template/sst_import/csv_datasource.conf @@ -0,0 +1,101 @@ +# Use the command to submit the exchange job: + +# spark-submit \ +# --master "spark://master_ip:7077" \ +# --driver-memory=2G --executor-memory=30G \ +# --num-executors=3 --executor-cores=20 \ +# --class com.vesoft.nebula.exchange.Exchange \ +# nebula-exchange-3.0-SNAPSHOT.jar -c csv_datasource.conf + +{ + # Spark config + spark: { + app: { + name: NebulaGraph Exchange + } + } + + # Nebula Graph config + nebula: { + address:{ + graph:["127.0.0.1:9669"] + # if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta. + # use `SHOW meta leader` to see your meta leader's address + meta:["127.0.0.1:9559"] + } + user: root + pswd: nebula + space: test + + path:{ + # any path that owns read and write access is ok + local:"/tmp" + remote:"/sst" + hdfs.namenode: "hdfs://name_node:9000" + } + + # nebula client connection parameters + connection { + # socket connect & execute timeout, unit: millisecond + timeout: 30000 + } + + error: { + # max number of failures, if the number of failures is bigger than max, then exit the application. + max: 32 + # failed data will be recorded in output path, format with ngql + output: /tmp/errors + } + + # use google's RateLimiter to limit the requests send to NebulaGraph + rate: { + # the stable throughput of RateLimiter + limit: 1024 + # Acquires a permit from RateLimiter, unit: MILLISECONDS + # if it can't be obtained within the specified timeout, then give up the request. + timeout: 1000 + } + } + + # Processing tags + tags: [ + { + name: tag-name-1 + type: { + source: csv + sink: sst + } + # if your file in not in hdfs, config "file:///path/test.csv" + path: "hdfs://ip:port/path/test.csv" + # if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields + fields: [csv-field-0, csv-field-1, csv-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: csv-field-0 + separator: "," + header: true + batch: 2000 + partition: 60 + } + ] + + # process edges + edges: [ + { + name: edge-name-1 + type: { + source: csv + sink: sst + } + path: "hdfs://ip:port/path/test.csv" + fields: [csv-field-0, csv-field-1, csv-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + source: csv-field-0 + target: csv-field-1 + ranking: csv-field-2 + separator: "," + header: true + batch: 2000 + partition: 60 + } + ] +} diff --git a/conf-template/sst_import/hive_datasource.conf b/conf-template/sst_import/hive_datasource.conf new file mode 100644 index 00000000..fdbbc0e0 --- /dev/null +++ b/conf-template/sst_import/hive_datasource.conf @@ -0,0 +1,95 @@ +# Use the command to submit the exchange job: + +# spark-submit \ +# --master "spark://master_ip:7077" \ +# --driver-memory=2G --executor-memory=30G \ +# --num-executors=3 --executor-cores=20 \ +# --class com.vesoft.nebula.exchange.Exchange \ +# nebula-exchange-3.0-SNAPSHOT.jar -c csv_datasource.conf + +{ + # Spark config + spark: { + app: { + name: NebulaGraph Exchange + } + } + + # Nebula Graph config + nebula: { + address:{ + graph:["127.0.0.1:9669"] + # if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta. + # use `SHOW meta leader` to see your meta leader's address + meta:["127.0.0.1:9559"] + } + user: root + pswd: nebula + space: test + + path:{ + # any path that owns read and write access is ok + local:"/tmp" + remote:"/sst" + hdfs.namenode: "hdfs://name_node:9000" + } + + # nebula client connection parameters + connection { + # socket connect & execute timeout, unit: millisecond + timeout: 30000 + } + + error: { + # max number of failures, if the number of failures is bigger than max, then exit the application. + max: 32 + # failed data will be recorded in output path, format with ngql + output: /tmp/errors + } + + # use google's RateLimiter to limit the requests send to NebulaGraph + rate: { + # the stable throughput of RateLimiter + limit: 1024 + # Acquires a permit from RateLimiter, unit: MILLISECONDS + # if it can't be obtained within the specified timeout, then give up the request. + timeout: 1000 + } + } + + # Processing tags + tags: [ + { + name: tag-name-1 + type: { + source: hive + sink: sst + } + exec: "select hive-field0, hive-field1, hive-field2 from database.table" + fields: [hive-field-0, hive-field-1, hive-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: hive-field-0 + batch: 2000 + partition: 60 + } + ] + + # process edges + edges: [ + { + name: edge-name-1 + type: { + source: hive + sink: sst + } + exec: "select hive-field0, hive-field1, hive-field2 from database.table" + fields: [ hive-field-0, hive-field-1, hive-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + source: hive-field-0 + target: hive-field-1 + ranking: hive-filed-2 + batch: 2000 + partition: 60 + } + ] +} diff --git a/nebula-exchange_spark_2.4/src/main/resources/application.conf b/nebula-exchange_spark_2.4/src/main/resources/application.conf index c3028227..131f0cf7 100644 --- a/nebula-exchange_spark_2.4/src/main/resources/application.conf +++ b/nebula-exchange_spark_2.4/src/main/resources/application.conf @@ -1,23 +1,8 @@ { - # Spark relation com.vesoft.exchange.common.config + # Spark config spark: { app: { - name: Nebula Exchange 2.0 - } - - master:local - - driver: { - cores: 1 - maxResultSize: 1G - } - - executor: { - memory:1G - } - - cores:{ - max: 16 + name: Nebula Exchange } } @@ -33,7 +18,7 @@ # } - # Nebula Graph relation com.vesoft.exchange.common.config + # Nebula Graph config nebula: { address:{ graph:["127.0.0.1:9669"] @@ -118,8 +103,8 @@ field:parquet-field-0 #policy:hash } - batch: 256 - partition: 32 + batch: 2000 + partition: 60 } # HDFS csv @@ -139,8 +124,8 @@ } separator: "," header: true - batch: 256 - partition: 32 + batch: 2000 + partition: 60 # optional config, default is false # config repartitionWithNebula as true means: repartition spark dataframe with nebula partition number to write sst files. repartitionWithNebula: false @@ -160,8 +145,8 @@ field: json-field-0 #policy: hash } - batch: 256 - partition: 32 + batch: 2000 + partition: 60 } # Hive @@ -178,8 +163,8 @@ field: hive-field-0 # policy: "hash" } - batch: 256 - partition: 32 + batch: 2000 + partition: 60 } # neo4j @@ -199,8 +184,8 @@ field:neo4j-field-0 # policy:hash } - partition: 10 - batch: 1000 + partition: 60 + batch: 2000 check_point_path: /tmp/test } @@ -221,8 +206,8 @@ vertex: { field:rowkey } - partition: 10 - batch: 1000 + partition: 60 + batch: 2000 } # Pulsar @@ -243,8 +228,8 @@ vertex: { field:pulsar-field-0 } - partition: 10 - batch: 1000 + partition: 60 + batch: 2000 interval.seconds: 10 } @@ -262,8 +247,8 @@ vertex: { field: kafka-field-0 } - partition: 10 - batch: 10 + partition: 60 + batch: 2000 interval.seconds: 10 } @@ -290,8 +275,8 @@ vertex:{ field: maxcompute-field-2 } - partition:10 - batch:10 + partition:60 + batch:2000 } # ClickHouse @@ -313,8 +298,8 @@ field:clickhouse-field-0 #policy:hash } - batch: 256 - partition: 32 + batch: 2000 + partition: 60 } # PostgreSQL @@ -338,8 +323,8 @@ field: postgre-field-0 # policy: "hash" } - batch: 256 - partition: 32 + batch: 2000 + partition: 60 } # Oracle @@ -361,8 +346,8 @@ field: oracle-field-0 # policy: "hash" } - batch: 256 - partition: 32 + batch: 2000 + partition: 60 } ] @@ -389,8 +374,8 @@ #policy:hash } ranking: parquet-field-2 - batch: 256 - partition: 32 + batch: 2000 + partition: 60 } # HDFS csv @@ -413,8 +398,8 @@ ranking: csv-field-2 separator: "," header: true - batch: 256 - partition: 32 + batch: 2000 + partition: 60 # optional config, default is false # config repartitionWithNebula as true means: repartition spark dataframe with nebula partition number to write sst files. repartitionWithNebula: false @@ -438,8 +423,8 @@ field: json-field-1 } ranking: json-field-2 - batch: 256 - partition: 32 + batch: 2000 + partition: 60 } # Hive @@ -455,8 +440,8 @@ source: hive-field-0 target: hive-field-1 ranking: hive-filed-2 - batch: 256 - partition: 32 + batch: 2000 + partition: 60 } # Neo4j @@ -479,8 +464,8 @@ field: b.neo4j-target-field } ranking: neo4j-field-2 - partition: 10 - batch: 1000 + partition: 60 + batch: 2000 check_point_path: /tmp/test } @@ -504,8 +489,8 @@ field: hbase-column-h } ranking: hbase-column-t - partition: 10 - batch: 1000 + partition: 60 + batch: 2000 } @@ -532,8 +517,8 @@ field: pulsar-field-1 } ranking: pulsar-field-2 - partition: 10 - batch: 10 + partition: 60 + batch: 2000 interval.seconds: 10 } @@ -551,8 +536,8 @@ source: kafka-field-0 target: kafka-field-1 ranking: kafka-field-2 - partition: 10 - batch: 1000 + partition: 60 + batch: 2000 interval.seconds: 10 } @@ -581,8 +566,8 @@ field: maxcompute-field-3 } ranking: maxcompute-field-4 - partition:10 - batch:10 + partition:60 + batch:2000 } # ClickHouse @@ -609,8 +594,8 @@ #policy:hash } ranking:clickhouse-field-3 - batch: 256 - partition: 32 + batch: 2000 + partition: 60 } # PostgreSQL @@ -639,8 +624,8 @@ # policy: "hash" } ranking: postgre-field-1 - batch: 256 - partition: 32 + batch: 2000 + partition: 60 } # Oracle @@ -666,8 +651,8 @@ field: oracle-field-1 } ranking: oracle-field-2 - batch: 256 - partition: 32 + batch: 2000 + partition: 60 } ] }