-
Notifications
You must be signed in to change notification settings - Fork 597
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(batch): support basic postgres tvf #18811
Conversation
|
GitGuardian id | GitGuardian status | Secret | Commit | Filename | |
---|---|---|---|---|---|
9425213 | Triggered | Generic Password | b1a286d | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | e2e20f0 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | e2e20f0 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | e2e20f0 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | e2e20f0 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | b1a286d | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | 111a7a5 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | b5e76a5 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | b5e76a5 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | b5e76a5 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | b5e76a5 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | 5b66d01 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | c254e71 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | 66464d9 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | fe10528 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | 8ea47ed | e2e_test/source/tvf/postgres_query.slt | View secret |
🛠 Guidelines to remediate hardcoded secrets
- Understand the implications of revoking this secret by investigating where it is used in your code.
- Replace and store your secrets safely. Learn here the best practices.
- Revoke and rotate these secrets.
- If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.
To avoid such incidents in the future consider
- following these best practices for managing and storing secrets including API keys and other credentials
- install secret detection on pre-commit to catch secret before it leaves your machine and ease remediation.
🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.
e27da02
to
ad7db51
Compare
} | ||
|
||
pub fn clone_with_dist(&self) -> Self { | ||
let base = self.base.clone_with_new_distribution(Distribution::Single); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed file_scan
uses Distribution::SomeShard
instead here. But that's a problem because when using that, 4 instances of the PostgresQueryExecutor
were spawned. It should only spawn a single one.
I changed it to Distribution::Single
for batch_postgres_query.rs
and it fixed it for me.
Just flagging it, in case a similar bug can occur for file_scan
source. cc @chenzl25
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw table function can also be part of ProjectSet, although practically we can leave it as unsupported:
dev=> select postgres_query('localhost', '5432', 'postgres', 'postgres', 'test', 'select * from t');
ERROR: Failed to run the query
Caused by these errors (recent errors listed first):
1: failed to build executor
2: Expr error
3: Unsupported function: postgres_query(character varying, character varying, character varying, character varying, character varying, character varying) -> struct<s character varying>
Also wants to mentioned #14763. Without it:
With it:
|
Co-authored-by: xiangjinwu <[email protected]>
Co-authored-by: xiangjinwu <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feature should be a TECHNICAL PREVIEW.
sounds good
echo "--- postgres_query tvf test" | ||
risedev slt './e2e_test/source/tvf/postgres_query.slt' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to put in e2e_test/source_inline
, and no need to modify CI script
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently inline source test does not have a postgres service managed by risedev. We can refactor it after #18099.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -0,0 +1,40 @@ | |||
system ok | |||
PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test psql -c " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to use $RISEDEV_ envvars, instead of hard-coded ones. Then you can run it easier locally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current e2e-source-test does not have a postgres service managed by risedev. We can refactor it after #18099.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we already have risedev managed pg #16662
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, we already have a dedicated risedev profile, so I don't think it will affect other stuff..
risingwave/ci/scripts/e2e-source-test.sh
Lines 38 to 41 in c80cbf3
echo "--- e2e, inline test" | |
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ | |
risedev ci-start ci-inline-source-test | |
risedev slt './e2e_test/source_inline/**/*.slt' |
Lines 952 to 979 in c80cbf3
ci-inline-source-test: | |
config-path: src/config/ci-recovery.toml | |
steps: | |
- use: minio | |
- use: etcd | |
unsafe-no-fsync: true | |
- use: meta-node | |
meta-backend: etcd | |
- use: compute-node | |
enable-tiered-cache: true | |
- use: frontend | |
- use: compactor | |
- use: pubsub | |
persist-data: true | |
- use: kafka | |
user-managed: true | |
address: message_queue | |
port: 29092 | |
- use: schema-registry | |
user-managed: true | |
address: schemaregistry | |
port: 8082 | |
- use: mysql | |
port: 3306 | |
address: mysql | |
user: root | |
password: 123456 | |
user-managed: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just mean to write the new tests introduced in the new style, without changing existing tests. If you mean migrating all tests to the style later after that PR, it also sounds good
@@ -127,6 +128,13 @@ pub enum BatchError { | |||
ParquetError, | |||
), | |||
|
|||
#[error(transparent)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if the error messages from Postgres can be clearly distinguished from ours, given that we're using transparent
and not including any prompt here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we introduce more external system queries and directly interact with them in the batch
crate, I'm concerned that BatchError
might become another maintainability hell just like ConnectorError
.
Just FYI, in #15086, we made ConnectorError
a wrapper of anyhow::Error
because it has too many variants (may check the documentation on def_anyhow_wrapper
for the underlying ideas). For example, shall we introduce a ExtSystemError
with this approach (or even reuse ConnectorError
) and make it the only variant for all external system queries in BatchError
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just FYI, in #15086, we made ConnectorError a wrapper of anyhow::Error because it has too many variants (may check the documentation on def_anyhow_wrapper for the underlying ideas). For example, shall we introduce a ExtSystemError with this approach (or even reuse ConnectorError) and make it the only variant for all external system queries in BatchError?
I think this a fair point. There's quite a few tvf to be supported. And we should preemptively introduce an ExtSystemError to deal with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realize there's more external system errors like iceberg, parquet from before this PR, and not just postgres error, which was introduced in this PR.
I will handle it in a separate PR since this is a larger refactor: #18860
#[error(transparent)]
Iceberg(
#[from]
#[backtrace]
iceberg::Error,
),
#[error(transparent)]
Parquet(
#[from]
#[backtrace]
ParquetError,
),
#[error(transparent)]
Postgres(
#[from]
#[backtrace]
tokio_postgres::Error,
),
let schema = tokio::task::block_in_place(|| { | ||
RUNTIME.block_on(async { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May note that this still blocks the current thread, meaning that the concurrency of binding Postgres TVF cannot exceed the TOKIO_WORKER_THREADS
. As we're introducing more in the future, we may have to consider refactoring this part. Like
- make binder async (can be abused)
- resolve necessary info ahead of time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolve necessary info ahead of time
I was considering the latter approach. Something like splitting the binder into 2 phases, first is async phase which can resolve bindings like this by traversing the ast. Second is the current binder, just a pure function.
So it will be something like:
async fn bind(&self, ast) {
let pure_ast = self.bind_async(ast);
let ast = self.bind_sync(pure_ast);
}
async fn bind_async() {...}
fn bind_sync() {...}
This also means we need 2 IRs in binder. Something like:
PartialBoundQuery --- artifact from bind_async
BoundQuery -- artifact from bind_sync
But this approach seems quite heavy, we now need to maintain 2 IRs rather than jsut 1 in binder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just mentioning a third option. I do not prefer it but it may inspire an improved fourth option:
- Let the user specify return schema and defer the async validation (instead of
ahead of time
)
The following syntax exists in PostgreSQL but we have not supported it yet:
select * from json_to_recordset('[{"a":1,"b":"foo"}, {"a":"2","c":"bar"}]') as x(a int, b text)
-- Similarly: select * from postgres_query(...) as x(a int, b text)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- make binder async (can be abused)
Historical context on binder not being async, although most of the discussion happened in slack private channel:
#173 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far the simplest way, while preserving good ux, seems to be just making the binder async.
The other approaches in this thread will either make maintainence more cumbersome (extra IR in binder), or lead to complexity in ux.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify making binder async
may not be easy. We can discuss the topic separately, not blocking this PR but necessary before going further with such batch TVFs.
schema: Schema, | ||
hostname: String, | ||
port: String, | ||
username: String, | ||
password: String, | ||
database: String, | ||
query: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we extract these fields into a struct to make it clearer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will be removing some/all of these fields and using Connection
instead down the road. So no need to refactor now.
Thanks for the many reviews, but still need one for |
Just to confirm, can we use |
I'll add a test for it subsequently. Added to the PR description of followup tasks. |
Wait for |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
This PR introduces postgres tvf, in the form of
postgres_query(...)
. You can look at the release notes for more details.We implement the full postgres tvf, in frontend and batch engine. The key parts are firstly in the binder, where we instantiate a pg connection, then
prepare
the pg query to get the output schema. Subsequently, we will rewrite it into its own plan node (PostgresQueryNode
), and its own batch executor. Then in the batch executor, we again instantiate a client and run the actual query against postgres. Finally we deserialize the output from postgres and send it to downstream batch executors.There's still things missing. We can support these in separate PRs:
--
Although there's >740 LOC, most of these changes are boilerplate.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
This feature should be a TECHNICAL PREVIEW. See below.
This PR introduces the
postgres_query
tvf. Its syntax will very likely change down the road. So the interface and examples you see below will be obsolete in subsequent versions. That being said the core functionality will still exist.postgres_query
is defined as follows:An example of its usage:
Postgres:
RisingWave: