Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: none-shared source to secret dependence #19263

Merged
merged 5 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ci/scripts/e2e-test-parallel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ kill_cluster
echo "--- e2e, ci-3streaming-2serving-3fe, batch"
RUST_LOG=$RUST_LOG \
risedev ci-start ci-3streaming-2serving-3fe
sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-batch-ddl-${profile}" --label "parallel"
# Exclude files that contain ALTER SYSTEM commands
find ./e2e_test/ddl -name "*.slt" -type f -exec grep -L "ALTER SYSTEM" {} \; | xargs -r sqllogictest "${host_args[@]}" -d dev --junit "parallel-batch-ddl-${profile}" --label "parallel"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @BugenZhao for skipping tests if the file modifies SYSTEM PARAMETER.

sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}" --label "parallel"

kill_cluster
Expand Down
32 changes: 32 additions & 0 deletions e2e_test/source_inline/kafka/secret_dep.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Note: control substitution on will force us to use "\n" instead of "\n" in commands
control substitution on

# for non-shared source
statement ok
set streaming_use_shared_source to false;

system ok
rpk topic create test_secret_ref -p 3

statement ok
CREATE SECRET sec WITH (backend = 'meta') AS 'test_secret_ref';

statement ok
CREATE SOURCE s(x varchar)
WITH(
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = secret sec,
scan.startup.mode = 'earliest',
) FORMAT PLAIN ENCODE JSON;

statement error
DROP SECRET sec;

statement ok
DROP SOURCE s;

statement ok
DROP SECRET sec;

statement ok
set streaming_use_shared_source to true;
26 changes: 25 additions & 1 deletion src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ use crate::controller::utils::{
resolve_source_register_info_for_jobs, PartialObject,
};
use crate::controller::ObjectModel;
use crate::manager::{MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION};
use crate::manager::{
get_referred_secret_ids_from_source, MetaSrvEnv, NotificationVersion,
IGNORED_NOTIFICATION_VERSION,
};
use crate::rpc::ddl_controller::DropMode;
use crate::telemetry::MetaTelemetryJobDesc;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -1232,6 +1235,9 @@ impl CatalogController {
)
.await?;

// handle secret ref
let secret_ids = get_referred_secret_ids_from_source(&pb_source)?;

let source_obj = Self::create_object(
&txn,
ObjectType::Source,
Expand All @@ -1245,6 +1251,19 @@ impl CatalogController {
let source: source::ActiveModel = pb_source.clone().into();
Source::insert(source).exec(&txn).await?;

// add secret dependency
if !secret_ids.is_empty() {
ObjectDependency::insert_many(secret_ids.iter().map(|id| {
object_dependency::ActiveModel {
oid: Set(*id as _),
used_by: Set(source_id as _),
..Default::default()
}
}))
.exec(&txn)
.await?;
}

txn.commit().await?;

let version = self
Expand Down Expand Up @@ -3506,6 +3525,8 @@ async fn update_internal_tables(
#[cfg(test)]
mod tests {

use risingwave_pb::catalog::StreamSourceInfo;

use super::*;

const TEST_DATABASE_ID: DatabaseId = 1;
Expand Down Expand Up @@ -3665,6 +3686,9 @@ mod tests {
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON"#
.to_string(),
info: Some(StreamSourceInfo {
..Default::default()
}),
..Default::default()
};
mgr.create_source(pb_source).await?;
Expand Down
Loading