Skip to content

Commit

Permalink
register googleDialect for jdbc to support BigQuery datasource (#147)
Browse files Browse the repository at this point in the history
* add note for version restriction

* register googleDialect for jdbc to support BigQuery datasource

* add config template for bigquery
  • Loading branch information
Nicole00 authored Jul 3, 2023
1 parent e724860 commit f628fb9
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Exchange 支持的 Spark 版本包括 2.2、2.4 和 3.0,对应的工具包名
> - 3.4.0 版本不支持 kafka 和 pulsar, 若需将 kafka 或 pulsar 数据导入 NebulaGraph,请使用 3.0.0 或 3.3.0 或 3.5.0 版本。
> - 本仓库仅支持 NebulaGraph 2.x 和 3.x,如果您在使用 NebulaGraph v1.x,请使用 [NebulaExchange v1.0](https://github.com/vesoft-inc/nebula-java/tree/v1.0/tools/exchange) ,或参考 Exchange 1.0 的使用文档[NebulaExchange 用户手册](https://docs.nebula-graph.com.cn/nebula-exchange/about-exchange/ex-ug-what-is-exchange/ "点击前往 Nebula Graph 网站")
> 注意:3.4.0版本不支持 kafka 和 pulsar, 若需将 kafka 或 pulsar 数据导入 NebulaGraph,请使用 3.0.0 或 3.3.0 或 3.5.0 版本。
## 如何获取

1. 编译打包最新的 Exchange。
Expand Down
84 changes: 84 additions & 0 deletions conf-template/client_import/bigquery_datasource.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Use the command to submit the exchange job:

# spark-submit \
# --master "spark://master_ip:7077" \
# --driver-memory=2G --executor-memory=30G \
# --num-executors=3 --executor-cores=20 \
# --jars $(echo /bigquery-jdbc-dependency-path/*.jar | tr ' ' ',')
# --class com.vesoft.nebula.exchange.Exchange \
# nebula-exchange-3.0-SNAPSHOT.jar -c bigquery_datasource.conf

# you can get all dependency jars for bigquery from https://cloud.google.com/bigquery/docs/reference/odbc-jdbc-drivers?hl=zh-cn#jdbc_release_1331004
{
# Spark config
spark: {
app: {
name: NebulaGraph Exchange
}
}

# Nebula Graph config
nebula: {
address:{
graph: ["127.0.0.1:9669"]
# if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta.
# use `SHOW meta leader` to see your meta leader's address
meta: ["127.0.0.1:9559"]
}
user: root
pswd: nebula
space: test

# nebula client connection parameters
connection {
# socket connect & execute timeout, unit: millisecond
timeout: 30000
}

error: {
# max number of failures, if the number of failures is bigger than max, then exit the application.
max: 32
# failed data will be recorded in output path, format with ngql
output: /tmp/errors
}

# use google's RateLimiter to limit the requests send to NebulaGraph
rate: {
# the stable throughput of RateLimiter
limit: 1024
# Acquires a permit from RateLimiter, unit: MILLISECONDS
# if it can't be obtained within the specified timeout, then give up the request.
timeout: 1000
}
}

# Processing tags
tags: [
{
name: tag-name-1
type: {
source: jdbc
sink: client
}

# bigquery url, the auth way if configed in url. In this example, OAuthPvtKeyPath=/tmp/bq-reader-sa-key.json file should be accessible for all spark workers.
url:"jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=nebula-cloud-test;OAuthType=0;OAuthServiceAcctEmail=bq-reader@nebula-cloud-test.iam.gserviceaccount.com;OAuthPvtKeyPath=/tmp/bq-reader-sa-key.json"
# JDBC driver
driver:"com.simba.googlebigquery.jdbc.Driver"

user:"[email protected]"
password:"12345"

table:"db.person"
sentence:"select id,firstName,lastName,gender from person"

fields: [firstName, lastName, gender]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
vertex: {
field: id
}
batch: 2000
partition: 60
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,17 @@ class JdbcReader(override val session: SparkSession, jdbcConfig: JdbcConfigEntry
extends ServerBaseReader(session, jdbcConfig.sentence) {
Class.forName(jdbcConfig.driver)
override def read(): DataFrame = {
import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcDialect}
val GoogleDialect = new JdbcDialect {
override def canHandle(url: String): Boolean =
url.startsWith("jdbc:bigquery") || url.contains("bigquery")

override def quoteIdentifier(colName: String): String = {
s"`$colName`"
}
}
JdbcDialects.registerDialect(GoogleDialect)

var dfReader = session.read
.format("jdbc")
.option("url", jdbcConfig.url)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,17 @@ class JdbcReader(override val session: SparkSession, jdbcConfig: JdbcConfigEntry
extends ServerBaseReader(session, jdbcConfig.sentence) {
Class.forName(jdbcConfig.driver)
override def read(): DataFrame = {
import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcDialect}
val GoogleDialect = new JdbcDialect {
override def canHandle(url: String): Boolean =
url.startsWith("jdbc:bigquery") || url.contains("bigquery")

override def quoteIdentifier(colName: String): String = {
s"`$colName`"
}
}
JdbcDialects.registerDialect(GoogleDialect)

var dfReader = session.read
.format("jdbc")
.option("url", jdbcConfig.url)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,16 @@ class JdbcReader(override val session: SparkSession, jdbcConfig: JdbcConfigEntry
extends ServerBaseReader(session, jdbcConfig.sentence) {
Class.forName(jdbcConfig.driver)
override def read(): DataFrame = {
import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcDialect}
val GoogleDialect = new JdbcDialect {
override def canHandle(url: String): Boolean =
url.startsWith("jdbc:bigquery") || url.contains("bigquery")
override def quoteIdentifier(colName: String): String = {
s"`$colName`"
}
}
JdbcDialects.registerDialect(GoogleDialect)

var dfReader = session.read
.format("jdbc")
.option("url", jdbcConfig.url)
Expand Down

0 comments on commit f628fb9

Please sign in to comment.