From 761b7b9fa7d6eb29d2abb85ac12f9057b7cb98b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E6=99=93=E8=8C=9C?= Date: Fri, 21 Jan 2022 12:46:08 +0800 Subject: [PATCH] feat(config, reader): add postgresql reader --- .../exchange/common/config/Configs.scala | 12 ++++++ .../common/config/SourceConfigs.scala | 33 +++++++++++++++ .../src/test/resources/application.conf | 26 ++++++++++++ .../exchange/common/config/ConfigsSuite.scala | 13 ++++++ .../com/vesoft/nebula/exchange/Exchange.scala | 40 ++++--------------- .../exchange/reader/ServerBaseReader.scala | 35 +++++++++++----- 6 files changed, 117 insertions(+), 42 deletions(-) diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala index 170b4509..07d80984 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala @@ -659,6 +659,18 @@ object Configs { config.getString("password"), getOrElse(config, "sentence", "") ) + case SourceCategory.POSTGRESQL => + PostgresSQLSourceConfigEntry( + SourceCategory.POSTGRESQL, + config.getString("host"), + config.getInt("port"), + config.getString("database"), + config.getString("schema"), + config.getString("table"), + config.getString("user"), + config.getString("password"), + getOrElse(config, "sentence", "") + ) case SourceCategory.KAFKA => val intervalSeconds = if (config.hasPath("interval.seconds")) config.getInt("interval.seconds") diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SourceConfigs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SourceConfigs.scala index 7fac5fec..d8fb2a4b 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SourceConfigs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SourceConfigs.scala @@ -26,6 +26,7 @@ object SourceCategory extends Enumeration { val HBASE = Value("HBASE") val MAXCOMPUTE = Value("MAXCOMPUTE") val CLICKHOUSE = Value("CLICKHOUSE") + val POSTGRESQL = Value("POSTGRESQL") val SOCKET = Value("SOCKET") val KAFKA = Value("KAFKA") @@ -157,6 +158,38 @@ case class MySQLSourceConfigEntry(override val category: SourceCategory.Value, } } +/** + * PostgreSQLSourceConfigEntry + * + * @param category + * @param host + * @param port + * @param database + * @param table + * @param user + * @param password + * @param sentence + */ +case class PostgresSQLSourceConfigEntry(override val category: SourceCategory.Value, + host: String, + port: Int, + database: String, + schema: String, + table: String, + user: String, + password: String, + override val sentence: String + ) + extends ServerDataSourceConfigEntry { + require( + host.trim.length != 0 && port > 0 && database.trim.length > 0 && table.trim.length > 0 && user.trim.length > 0) + + override def toString: String = { + s"PostgreSql source host: ${host}, port: ${port}, database: ${database}, table: ${table}, " + + s"user: ${user}, password: ${password}, sentence: ${sentence}" + } +} + /** * TODO: Support more com.vesoft.exchange.common.config item about Kafka Consumer * diff --git a/exchange-common/src/test/resources/application.conf b/exchange-common/src/test/resources/application.conf index 3ece57a0..8ccaa88b 100644 --- a/exchange-common/src/test/resources/application.conf +++ b/exchange-common/src/test/resources/application.conf @@ -266,6 +266,32 @@ batch: 256 partition: 32 } + + # PostgreSQL + { + name: tag9 + type: { + source: mysql + sink: client + } + user:root + host: "127.0.0.1" + port: "5432" + database: database + schema: public + table: table + user: root + password: nebula + sentence: "select mysql-field0, mysql-field1, mysql-field2 from database.table" + fields: [mysql-field-0, mysql-field-1, mysql-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: { + field: mysql-field-0 + # policy: "hash" + } + batch: 256 + partition: 32 + } ] # Processing edges diff --git a/exchange-common/src/test/scala/com/vesoft/exchange/common/config/ConfigsSuite.scala b/exchange-common/src/test/scala/com/vesoft/exchange/common/config/ConfigsSuite.scala index f4e42482..7fff13c4 100644 --- a/exchange-common/src/test/scala/com/vesoft/exchange/common/config/ConfigsSuite.scala +++ b/exchange-common/src/test/scala/com/vesoft/exchange/common/config/ConfigsSuite.scala @@ -17,6 +17,7 @@ import com.vesoft.exchange.common.config.{ HBaseSourceConfigEntry, HiveSourceConfigEntry, MySQLSourceConfigEntry, + PostgresSQLSourceConfigEntry, Neo4JSourceConfigEntry, SinkCategory, SourceCategory @@ -156,6 +157,18 @@ class ConfigsSuite { assert(mysql.database.equals("database")) assert(mysql.table.equals("table")) } + case SourceCategory.POSTGRESQL => { + val postgresql = tagConfig.dataSourceConfigEntry.asInstanceOf[PostgresSQLSourceConfigEntry] + assert(label.equals("tag9")) + assert(postgresql.database.equals("database")) + assert(postgresql.host.equals("127.0.0.1")) + assert(postgresql.port == 5432) + assert(postgresql.user.equals("root")) + assert(postgresql.password.equals("nebula")) + assert(postgresql.schema.equals("public")) + assert(postgresql.database.equals("database")) + assert(postgresql.table.equals("table")) + } case _ => {} } } diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 28630d2a..f1c341f5 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -6,41 +6,12 @@ package com.vesoft.nebula.exchange import org.apache.spark.sql.{DataFrame, SparkSession} -import java.io.File +import java.io.File import com.vesoft.exchange.Argument import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler} -import com.vesoft.exchange.common.config.{ - ClickHouseConfigEntry, - Configs, - DataSourceConfigEntry, - FileBaseSourceConfigEntry, - HBaseSourceConfigEntry, - HiveSourceConfigEntry, - JanusGraphSourceConfigEntry, - KafkaSourceConfigEntry, - MaxComputeConfigEntry, - MySQLSourceConfigEntry, - Neo4JSourceConfigEntry, - PulsarSourceConfigEntry, - SinkCategory, - SourceCategory -} -import com.vesoft.nebula.exchange.reader.{ - CSVReader, - ClickhouseReader, - HBaseReader, - HiveReader, - JSONReader, - JanusGraphReader, - KafkaReader, - MaxcomputeReader, - MySQLReader, - Neo4JReader, - ORCReader, - ParquetReader, - PulsarReader -} +import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, FileBaseSourceConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgresSQLSourceConfigEntry, PulsarSourceConfigEntry, SinkCategory, SourceCategory} +import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, ParquetReader, PostgreSQLReader, PulsarReader} import com.vesoft.exchange.common.processor.ReloadProcessor import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor} import org.apache.log4j.Logger @@ -285,6 +256,11 @@ object Exchange { LOG.info(s"Loading from mysql com.vesoft.exchange.common.config: ${mysqlConfig}") val reader = new MySQLReader(session, mysqlConfig) Some(reader.read()) + case SourceCategory.POSTGRESQL => + val postgreConfig = config.asInstanceOf[PostgresSQLSourceConfigEntry] + LOG.info(s"Loading from postgre com.vesoft.exchange.common.config: ${postgreConfig}") + val reader = new PostgreSQLReader(session, postgreConfig) + Some(reader.read()) case SourceCategory.PULSAR => val pulsarConfig = config.asInstanceOf[PulsarSourceConfigEntry] LOG.info(s"Loading from pulsar com.vesoft.exchange.common.config: ${pulsarConfig}") diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala index 4f893f12..76cd78be 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala @@ -5,16 +5,7 @@ package com.vesoft.nebula.exchange.reader -import com.vesoft.exchange.common.config.{ - ClickHouseConfigEntry, - HBaseSourceConfigEntry, - HiveSourceConfigEntry, - JanusGraphSourceConfigEntry, - MaxComputeConfigEntry, - MySQLSourceConfigEntry, - Neo4JSourceConfigEntry, - ServerDataSourceConfigEntry -} +import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgresSQLSourceConfigEntry, ServerDataSourceConfigEntry} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable @@ -78,6 +69,30 @@ class MySQLReader(override val session: SparkSession, mysqlConfig: MySQLSourceCo } } +/** + * The PosrgreReader + * TODO + * + * @param session + * @param postgreConfig + */ +class PostgreSQLReader(override val session: SparkSession, postgreConfig: PostgresSQLSourceConfigEntry) + extends ServerBaseReader(session, postgreConfig.sentence) { + override def read(): DataFrame = { + val url = + s"jdbc:postgresql://${postgreConfig.host}:${postgreConfig.port}/${postgreConfig.database}" + val df = session.read + .format("jdbc") + .option("url", url) + .option("dbtable", s"${postgreConfig.schema}.${postgreConfig.table}") + .option("user", postgreConfig.user) + .option("password", postgreConfig.password) + .load() + df.createOrReplaceTempView(postgreConfig.table) + session.sql(sentence) + } +} + /** * Neo4JReader extends the ServerBaseReader * this reader support checkpoint by sacrificing performance