-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support PostgreSQL data source #62
Conversation
(cherry picked from commit 8bdff07)
Thanks so much @ianhhhhhhhhe for the amazing work ;-) |
One test failed.
|
fix postgresql config bug
Codecov Report
@@ Coverage Diff @@
## master #62 +/- ##
============================================
+ Coverage 54.51% 55.14% +0.62%
Complexity 76 76
============================================
Files 17 17
Lines 1317 1342 +25
Branches 250 254 +4
============================================
+ Hits 718 740 +22
+ Misses 474 473 -1
- Partials 125 129 +4
Continue to review full report at Codecov.
|
assert(postgresql.port == 5432) | ||
assert(postgresql.user.equals("root")) | ||
assert(postgresql.password.equals("nebula")) | ||
assert(postgresql.database.equals("database")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate test
PulsarReader | ||
} | ||
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, FileBaseSourceConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgresSQLSourceConfigEntry, PulsarSourceConfigEntry, SinkCategory, SourceCategory} | ||
import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, ParquetReader, PostgreSQLReader, PulsarReader} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please format the import
Neo4JSourceConfigEntry, | ||
ServerDataSourceConfigEntry | ||
} | ||
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgresSQLSourceConfigEntry, ServerDataSourceConfigEntry} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please format the import
.option("user", postgreConfig.user) | ||
.option("password", postgreConfig.password) | ||
.load() | ||
df.createOrReplaceTempView(postgreConfig.table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you test the postgresql datasource? If your table is configed as table
but the table name in sentence is db.table
, can this sentence be executed successfully?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested with
df.createOrReplaceTempView("table")
session.sql("select * from db.table")
and got exceptions like :
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: `db`.`table`; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation `db`.`table`
So, please change the table name in config sentence
to keep the same with config table
in config file.
* @param password | ||
* @param sentence | ||
*/ | ||
case class PostgresSQLSourceConfigEntry(override val category: SourceCategory.Value, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we refactor PostgresSQLSourceConfigEntry
to PostgreSQLSourceConfigEntry
?
can you also add postgresql datasource for spark2.2 and spark2.4 ? |
add postgresql dependency to avoid exception:
available postgresql versions are: 9.4.1207 - 9.4.1212, for example:
|
…n test, declare driver in pom
Sure ^ - ^ |
.option("password", postgreConfig.password) | ||
.load() | ||
df.createOrReplaceTempView(postgreConfig.table) | ||
session.sql(sentence) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in configs, if user does not config sentence, default sentence value is "", which will throw park.sql.catalyst.parser.ParseException
when execute session.sql(sentence)
So, please add check for session.sql
:
if(!"".equals(sentence))
session.sql(sentence)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if I check it when user set the configs ?
require(
host.trim.length != 0 &&
port > 0 &&
database.trim.length > 0 &&
table.trim.length > 0 &&
user.trim.length > 0 &&
sentence.trim.length > 0
)
It will save many if
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's contradictory with the default sentence value:
getOrElse(config, "sentence", "")
- for default sentence value, I prefer to use
null
but not "" - for users, they are allowed to not config the sentence, so we still need to process the two situations for sentence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If sentence is not configured, executing select * from ${table}
might be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If sentence is not configured, executing
select * from ${table}
might be better.
No need to execute select * from table
, default read from db for sparksql is reading all data.
.option("password", postgreConfig.password) | ||
.load() | ||
df.createOrReplaceTempView(postgreConfig.table) | ||
session.sql(sentence) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
.option("password", postgreConfig.password) | ||
.load() | ||
df.createOrReplaceTempView(postgreConfig.table) | ||
session.sql(sentence) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gread work, thanks~
spark_v3.0