From f628fb9a4f594a0b10b3d1ac48fb4afa333c11bd Mon Sep 17 00:00:00 2001 From: Anqi Date: Mon, 3 Jul 2023 09:37:59 +0800 Subject: [PATCH] register googleDialect for jdbc to support BigQuery datasource (#147) * add note for version restriction * register googleDialect for jdbc to support BigQuery datasource * add config template for bigquery --- README-CN.md | 2 + .../client_import/bigquery_datasource.conf | 84 +++++++++++++++++++ .../exchange/reader/ServerBaseReader.scala | 11 +++ .../exchange/reader/ServerBaseReader.scala | 11 +++ .../exchange/reader/ServerBaseReader.scala | 10 +++ 5 files changed, 118 insertions(+) create mode 100644 conf-template/client_import/bigquery_datasource.conf diff --git a/README-CN.md b/README-CN.md index 87cc754d..510dde54 100644 --- a/README-CN.md +++ b/README-CN.md @@ -10,6 +10,8 @@ Exchange 支持的 Spark 版本包括 2.2、2.4 和 3.0,对应的工具包名 > - 3.4.0 版本不支持 kafka 和 pulsar, 若需将 kafka 或 pulsar 数据导入 NebulaGraph,请使用 3.0.0 或 3.3.0 或 3.5.0 版本。 > - 本仓库仅支持 NebulaGraph 2.x 和 3.x,如果您在使用 NebulaGraph v1.x,请使用 [NebulaExchange v1.0](https://github.com/vesoft-inc/nebula-java/tree/v1.0/tools/exchange) ,或参考 Exchange 1.0 的使用文档[NebulaExchange 用户手册](https://docs.nebula-graph.com.cn/nebula-exchange/about-exchange/ex-ug-what-is-exchange/ "点击前往 Nebula Graph 网站")。 +> 注意:3.4.0版本不支持 kafka 和 pulsar, 若需将 kafka 或 pulsar 数据导入 NebulaGraph,请使用 3.0.0 或 3.3.0 或 3.5.0 版本。 + ## 如何获取 1. 编译打包最新的 Exchange。 diff --git a/conf-template/client_import/bigquery_datasource.conf b/conf-template/client_import/bigquery_datasource.conf new file mode 100644 index 00000000..94f244ea --- /dev/null +++ b/conf-template/client_import/bigquery_datasource.conf @@ -0,0 +1,84 @@ +# Use the command to submit the exchange job: + +# spark-submit \ +# --master "spark://master_ip:7077" \ +# --driver-memory=2G --executor-memory=30G \ +# --num-executors=3 --executor-cores=20 \ +# --jars $(echo /bigquery-jdbc-dependency-path/*.jar | tr ' ' ',') +# --class com.vesoft.nebula.exchange.Exchange \ +# nebula-exchange-3.0-SNAPSHOT.jar -c bigquery_datasource.conf + +# you can get all dependency jars for bigquery from https://cloud.google.com/bigquery/docs/reference/odbc-jdbc-drivers?hl=zh-cn#jdbc_release_1331004 +{ + # Spark config + spark: { + app: { + name: NebulaGraph Exchange + } + } + + # Nebula Graph config + nebula: { + address:{ + graph: ["127.0.0.1:9669"] + # if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta. + # use `SHOW meta leader` to see your meta leader's address + meta: ["127.0.0.1:9559"] + } + user: root + pswd: nebula + space: test + + # nebula client connection parameters + connection { + # socket connect & execute timeout, unit: millisecond + timeout: 30000 + } + + error: { + # max number of failures, if the number of failures is bigger than max, then exit the application. + max: 32 + # failed data will be recorded in output path, format with ngql + output: /tmp/errors + } + + # use google's RateLimiter to limit the requests send to NebulaGraph + rate: { + # the stable throughput of RateLimiter + limit: 1024 + # Acquires a permit from RateLimiter, unit: MILLISECONDS + # if it can't be obtained within the specified timeout, then give up the request. + timeout: 1000 + } + } + + # Processing tags + tags: [ + { + name: tag-name-1 + type: { + source: jdbc + sink: client + } + + # bigquery url, the auth way if configed in url. In this example, OAuthPvtKeyPath=/tmp/bq-reader-sa-key.json file should be accessible for all spark workers. + url:"jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=nebula-cloud-test;OAuthType=0;OAuthServiceAcctEmail=bq-reader@nebula-cloud-test.iam.gserviceaccount.com;OAuthPvtKeyPath=/tmp/bq-reader-sa-key.json" + # JDBC driver + driver:"com.simba.googlebigquery.jdbc.Driver" + + user:"bq-reader@nebula-cloud-test.iam.gserviceaccount.com" + password:"12345" + + table:"db.person" + sentence:"select id,firstName,lastName,gender from person" + + fields: [firstName, lastName, gender] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: { + field: id + } + batch: 2000 + partition: 60 + } + ] +} diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala index eaba95c0..577a05d4 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala @@ -326,6 +326,17 @@ class JdbcReader(override val session: SparkSession, jdbcConfig: JdbcConfigEntry extends ServerBaseReader(session, jdbcConfig.sentence) { Class.forName(jdbcConfig.driver) override def read(): DataFrame = { + import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcDialect} + val GoogleDialect = new JdbcDialect { + override def canHandle(url: String): Boolean = + url.startsWith("jdbc:bigquery") || url.contains("bigquery") + + override def quoteIdentifier(colName: String): String = { + s"`$colName`" + } + } + JdbcDialects.registerDialect(GoogleDialect) + var dfReader = session.read .format("jdbc") .option("url", jdbcConfig.url) diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala index d26ddfe8..0bd59fac 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala @@ -384,6 +384,17 @@ class JdbcReader(override val session: SparkSession, jdbcConfig: JdbcConfigEntry extends ServerBaseReader(session, jdbcConfig.sentence) { Class.forName(jdbcConfig.driver) override def read(): DataFrame = { + import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcDialect} + val GoogleDialect = new JdbcDialect { + override def canHandle(url: String): Boolean = + url.startsWith("jdbc:bigquery") || url.contains("bigquery") + + override def quoteIdentifier(colName: String): String = { + s"`$colName`" + } + } + JdbcDialects.registerDialect(GoogleDialect) + var dfReader = session.read .format("jdbc") .option("url", jdbcConfig.url) diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala index 8799b276..2d006813 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala @@ -273,6 +273,16 @@ class JdbcReader(override val session: SparkSession, jdbcConfig: JdbcConfigEntry extends ServerBaseReader(session, jdbcConfig.sentence) { Class.forName(jdbcConfig.driver) override def read(): DataFrame = { + import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcDialect} + val GoogleDialect = new JdbcDialect { + override def canHandle(url: String): Boolean = + url.startsWith("jdbc:bigquery") || url.contains("bigquery") + override def quoteIdentifier(colName: String): String = { + s"`$colName`" + } + } + JdbcDialects.registerDialect(GoogleDialect) + var dfReader = session.read .format("jdbc") .option("url", jdbcConfig.url)