Skip to content

Commit

Permalink
feat(config, reader): add postgresql reader
Browse files Browse the repository at this point in the history
  • Loading branch information
马晓茜 authored and 马晓茜 committed Jan 21, 2022
1 parent 1fc042b commit 761b7b9
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,18 @@ object Configs {
config.getString("password"),
getOrElse(config, "sentence", "")
)
case SourceCategory.POSTGRESQL =>
PostgresSQLSourceConfigEntry(
SourceCategory.POSTGRESQL,
config.getString("host"),
config.getInt("port"),
config.getString("database"),
config.getString("schema"),
config.getString("table"),
config.getString("user"),
config.getString("password"),
getOrElse(config, "sentence", "")
)
case SourceCategory.KAFKA =>
val intervalSeconds =
if (config.hasPath("interval.seconds")) config.getInt("interval.seconds")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ object SourceCategory extends Enumeration {
val HBASE = Value("HBASE")
val MAXCOMPUTE = Value("MAXCOMPUTE")
val CLICKHOUSE = Value("CLICKHOUSE")
val POSTGRESQL = Value("POSTGRESQL")

val SOCKET = Value("SOCKET")
val KAFKA = Value("KAFKA")
Expand Down Expand Up @@ -157,6 +158,38 @@ case class MySQLSourceConfigEntry(override val category: SourceCategory.Value,
}
}

/**
* PostgreSQLSourceConfigEntry
*
* @param category
* @param host
* @param port
* @param database
* @param table
* @param user
* @param password
* @param sentence
*/
case class PostgresSQLSourceConfigEntry(override val category: SourceCategory.Value,
host: String,
port: Int,
database: String,
schema: String,
table: String,
user: String,
password: String,
override val sentence: String
)
extends ServerDataSourceConfigEntry {
require(
host.trim.length != 0 && port > 0 && database.trim.length > 0 && table.trim.length > 0 && user.trim.length > 0)

override def toString: String = {
s"PostgreSql source host: ${host}, port: ${port}, database: ${database}, table: ${table}, " +
s"user: ${user}, password: ${password}, sentence: ${sentence}"
}
}

/**
* TODO: Support more com.vesoft.exchange.common.config item about Kafka Consumer
*
Expand Down
26 changes: 26 additions & 0 deletions exchange-common/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,32 @@
batch: 256
partition: 32
}

# PostgreSQL
{
name: tag9
type: {
source: mysql
sink: client
}
user:root
host: "127.0.0.1"
port: "5432"
database: database
schema: public
table: table
user: root
password: nebula
sentence: "select mysql-field0, mysql-field1, mysql-field2 from database.table"
fields: [mysql-field-0, mysql-field-1, mysql-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
vertex: {
field: mysql-field-0
# policy: "hash"
}
batch: 256
partition: 32
}
]

# Processing edges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.vesoft.exchange.common.config.{
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
MySQLSourceConfigEntry,
PostgresSQLSourceConfigEntry,
Neo4JSourceConfigEntry,
SinkCategory,
SourceCategory
Expand Down Expand Up @@ -156,6 +157,18 @@ class ConfigsSuite {
assert(mysql.database.equals("database"))
assert(mysql.table.equals("table"))
}
case SourceCategory.POSTGRESQL => {
val postgresql = tagConfig.dataSourceConfigEntry.asInstanceOf[PostgresSQLSourceConfigEntry]
assert(label.equals("tag9"))
assert(postgresql.database.equals("database"))
assert(postgresql.host.equals("127.0.0.1"))
assert(postgresql.port == 5432)
assert(postgresql.user.equals("root"))
assert(postgresql.password.equals("nebula"))
assert(postgresql.schema.equals("public"))
assert(postgresql.database.equals("database"))
assert(postgresql.table.equals("table"))
}
case _ => {}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,12 @@
package com.vesoft.nebula.exchange

import org.apache.spark.sql.{DataFrame, SparkSession}
import java.io.File

import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
Configs,
DataSourceConfigEntry,
FileBaseSourceConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
KafkaSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
PulsarSourceConfigEntry,
SinkCategory,
SourceCategory
}
import com.vesoft.nebula.exchange.reader.{
CSVReader,
ClickhouseReader,
HBaseReader,
HiveReader,
JSONReader,
JanusGraphReader,
KafkaReader,
MaxcomputeReader,
MySQLReader,
Neo4JReader,
ORCReader,
ParquetReader,
PulsarReader
}
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, FileBaseSourceConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgresSQLSourceConfigEntry, PulsarSourceConfigEntry, SinkCategory, SourceCategory}
import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, ParquetReader, PostgreSQLReader, PulsarReader}
import com.vesoft.exchange.common.processor.ReloadProcessor
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
import org.apache.log4j.Logger
Expand Down Expand Up @@ -285,6 +256,11 @@ object Exchange {
LOG.info(s"Loading from mysql com.vesoft.exchange.common.config: ${mysqlConfig}")
val reader = new MySQLReader(session, mysqlConfig)
Some(reader.read())
case SourceCategory.POSTGRESQL =>
val postgreConfig = config.asInstanceOf[PostgresSQLSourceConfigEntry]
LOG.info(s"Loading from postgre com.vesoft.exchange.common.config: ${postgreConfig}")
val reader = new PostgreSQLReader(session, postgreConfig)
Some(reader.read())
case SourceCategory.PULSAR =>
val pulsarConfig = config.asInstanceOf[PulsarSourceConfigEntry]
LOG.info(s"Loading from pulsar com.vesoft.exchange.common.config: ${pulsarConfig}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,7 @@

package com.vesoft.nebula.exchange.reader

import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
ServerDataSourceConfigEntry
}
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgresSQLSourceConfigEntry, ServerDataSourceConfigEntry}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
Expand Down Expand Up @@ -78,6 +69,30 @@ class MySQLReader(override val session: SparkSession, mysqlConfig: MySQLSourceCo
}
}

/**
* The PosrgreReader
* TODO
*
* @param session
* @param postgreConfig
*/
class PostgreSQLReader(override val session: SparkSession, postgreConfig: PostgresSQLSourceConfigEntry)
extends ServerBaseReader(session, postgreConfig.sentence) {
override def read(): DataFrame = {
val url =
s"jdbc:postgresql://${postgreConfig.host}:${postgreConfig.port}/${postgreConfig.database}"
val df = session.read
.format("jdbc")
.option("url", url)
.option("dbtable", s"${postgreConfig.schema}.${postgreConfig.table}")
.option("user", postgreConfig.user)
.option("password", postgreConfig.password)
.load()
df.createOrReplaceTempView(postgreConfig.table)
session.sql(sentence)
}
}

/**
* Neo4JReader extends the ServerBaseReader
* this reader support checkpoint by sacrificing performance
Expand Down

0 comments on commit 761b7b9

Please sign in to comment.