Skip to content

Commit

Permalink
fix: none-shared source to secret dependence (#19263)
Browse files Browse the repository at this point in the history
Signed-off-by: tabversion <[email protected]>
Co-authored-by: tabversion <[email protected]>
  • Loading branch information
tabVersion and tabversion authored Nov 5, 2024
1 parent 0f78e6a commit a945f52
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 2 deletions.
3 changes: 2 additions & 1 deletion ci/scripts/e2e-test-parallel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ kill_cluster
echo "--- e2e, ci-3streaming-2serving-3fe, batch"
RUST_LOG=$RUST_LOG \
risedev ci-start ci-3streaming-2serving-3fe
sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-batch-ddl-${profile}" --label "parallel"
# Exclude files that contain ALTER SYSTEM commands
find ./e2e_test/ddl -name "*.slt" -type f -exec grep -L "ALTER SYSTEM" {} \; | xargs -r sqllogictest "${host_args[@]}" -d dev --junit "parallel-batch-ddl-${profile}" --label "parallel"
sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}" --label "parallel"

kill_cluster
Expand Down
32 changes: 32 additions & 0 deletions e2e_test/source_inline/kafka/secret_dep.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Note: control substitution on will force us to use "\n" instead of "\n" in commands
control substitution on

# for non-shared source
statement ok
set streaming_use_shared_source to false;

system ok
rpk topic create test_secret_ref -p 3

statement ok
CREATE SECRET sec WITH (backend = 'meta') AS 'test_secret_ref';

statement ok
CREATE SOURCE s(x varchar)
WITH(
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = secret sec,
scan.startup.mode = 'earliest',
) FORMAT PLAIN ENCODE JSON;

statement error
DROP SECRET sec;

statement ok
DROP SOURCE s;

statement ok
DROP SECRET sec;

statement ok
set streaming_use_shared_source to true;
26 changes: 25 additions & 1 deletion src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ use crate::controller::utils::{
resolve_source_register_info_for_jobs, PartialObject,
};
use crate::controller::ObjectModel;
use crate::manager::{MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION};
use crate::manager::{
get_referred_secret_ids_from_source, MetaSrvEnv, NotificationVersion,
IGNORED_NOTIFICATION_VERSION,
};
use crate::rpc::ddl_controller::DropMode;
use crate::telemetry::MetaTelemetryJobDesc;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -1232,6 +1235,9 @@ impl CatalogController {
)
.await?;

// handle secret ref
let secret_ids = get_referred_secret_ids_from_source(&pb_source)?;

let source_obj = Self::create_object(
&txn,
ObjectType::Source,
Expand All @@ -1245,6 +1251,19 @@ impl CatalogController {
let source: source::ActiveModel = pb_source.clone().into();
Source::insert(source).exec(&txn).await?;

// add secret dependency
if !secret_ids.is_empty() {
ObjectDependency::insert_many(secret_ids.iter().map(|id| {
object_dependency::ActiveModel {
oid: Set(*id as _),
used_by: Set(source_id as _),
..Default::default()
}
}))
.exec(&txn)
.await?;
}

txn.commit().await?;

let version = self
Expand Down Expand Up @@ -3506,6 +3525,8 @@ async fn update_internal_tables(
#[cfg(test)]
mod tests {

use risingwave_pb::catalog::StreamSourceInfo;

use super::*;

const TEST_DATABASE_ID: DatabaseId = 1;
Expand Down Expand Up @@ -3665,6 +3686,9 @@ mod tests {
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON"#
.to_string(),
info: Some(StreamSourceInfo {
..Default::default()
}),
..Default::default()
};
mgr.create_source(pb_source).await?;
Expand Down

0 comments on commit a945f52

Please sign in to comment.