Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(secret): alter secret in catalog #19495

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions e2e_test/ddl/secret.slt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,23 @@ create secret secret_1 with (
backend = 'meta'
) as 'demo_secret';

statement ok
alter secret secret_1 with (
backend = 'meta'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The backend option should not appear in the alter secret statement. To follow the convention of other alter commands, only the changed part should be provided. Here the backend option is obviously untouched, so users should not write it here.

But, furthermore, I think backend shouldn't be a per-secret option. In my mind it should be a global-wise config and can't be changed after cluster initialized.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, furthermore, I think backend shouldn't be a per-secret option. In my mind it should be a global-wise config and can't be changed after cluster initialized.

It's a bit off-topic. We can create a new issue

Copy link
Contributor Author

@yuhao-su yuhao-su Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the concern here. I was thinking the same. But there are 2 problems

  1. We are doing something unusual here. The backend info in with option is also encrypted. We need to decrypt the original secret to write the new one. I can do this.
  2. For hashvault, there is no value we can alter, we can only alter the with. For meta backend, user can alter the secret to using hashvault or just the value. So I try to ask user to write both WITH and AS so they know what they are doing.

So anyway, I think get rid of the WITH here is also a good choice. Will change this.

) as 'demo_secret_altered';

statement error
alter secret secret_2 with (
backend = 'meta'
) as 'demo_secret_altered';
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Catalog error
2: secret not found: secret_2


# wait for support for hashicorp_vault backend
# statement ok
# create secret secret_2 with (
Expand Down
5 changes: 5 additions & 0 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
statement ok
CREATE SECRET iceberg_s3_access_key WITH (
backend = 'meta'
) as 'hummockadmin_wrong';

statement ok
ALTER SECRET iceberg_s3_access_key WITH (
backend = 'meta'
) as 'hummockadmin';

statement ok
Expand Down
27 changes: 26 additions & 1 deletion e2e_test/source_legacy/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,32 @@ mysql --protocol=tcp -u root mytest < e2e_test/source_legacy/cdc/mysql_init_data
statement ok
create secret mysql_pwd with (
backend = 'meta'
) as 'incorrect_password';

# create a cdc source job, with incorrct password
statement error
create source mysql_mytest with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'rwcdc',
password = secret mysql_pwd,
database.name = 'mytest',
server.id = '5601'
);
# The detailed error message is commented out because the user IP in error message may vary in different environments.
# ----
# db error: ERROR: Failed to run the query
# Caused by these errors (recent errors listed first):
# 1: gRPC request to meta service failed: Internal error
# 2: failed to create source worker
# 3: failed to create SplitEnumerator
# 4: source cannot pass validation
# 5: Internal error: Access denied for user 'rwcdc'@'172.17.0.1' (using password: YES)

statement ok
alter secret mysql_pwd with (
backend = 'meta'
) as '${MYSQL_PWD:}';

# create a cdc source job, which format fixed to `FORMAT PLAIN ENCODE JSON`
Expand All @@ -36,7 +62,6 @@ create source mysql_mytest with (
server.id = '5601'
);


statement error Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source
create materialized view mv as select * from mysql_mytest;

Expand Down
14 changes: 14 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,19 @@ message DropSecretResponse {
WaitVersion version = 1;
}

message AlterSecretRequest {
uint32 secret_id = 1;
string name = 2;
bytes value = 3;
uint32 database_id = 4;
uint32 schema_id = 5;
uint32 owner_id = 6;
}

message AlterSecretResponse {
WaitVersion version = 1;
}

message CreateConnectionRequest {
message PrivateLink {
catalog.Connection.PrivateLinkService.PrivateLinkProvider provider = 1;
Expand Down Expand Up @@ -510,6 +523,7 @@ service DdlService {
rpc CreateTable(CreateTableRequest) returns (CreateTableResponse);
rpc CreateSecret(CreateSecretRequest) returns (CreateSecretResponse);
rpc DropSecret(DropSecretRequest) returns (DropSecretResponse);
rpc AlterSecret(AlterSecretRequest) returns (AlterSecretResponse);
rpc AlterName(AlterNameRequest) returns (AlterNameResponse);
rpc AlterSource(AlterSourceRequest) returns (AlterSourceResponse);
rpc AlterOwner(AlterOwnerRequest) returns (AlterOwnerResponse);
Expand Down
18 changes: 17 additions & 1 deletion src/common/secret/src/secret_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,23 @@ impl LocalSecretManager {

pub fn add_secret(&self, secret_id: SecretId, secret: Vec<u8>) {
let mut secret_guard = self.secrets.write();
secret_guard.insert(secret_id, secret);
if secret_guard.insert(secret_id, secret).is_some() {
tracing::error!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I tend to make it a real error i.e. return Err(...) and reject the add_secret

The status quo is somehow weird to me - an error log is printed, but the action actually succeeded.

Copy link
Contributor Author

@yuhao-su yuhao-su Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually intentional or something we have to do.

  1. add_secret is called using the LocalSecretManager on each worker node asynchronizly by notification service, so we can't return an error just like any other notification serivces.
  2. alter secret result was already persisted to meta before we call add_secret. So returning an error here can confuse users unless we decide to roll back the meta commit. Besides, this secret_id cames from meta catalog, so we should trust the data from meta instead of LocalSecretManager

secret_id = secret_id,
"adding a secret but it already exists, overwriting it"
);
};
}

pub fn update_secret(&self, secret_id: SecretId, secret: Vec<u8>) {
let mut secret_guard = self.secrets.write();
if secret_guard.insert(secret_id, secret).is_none() {
tracing::error!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it trigger the existing actors to upgrade to the new secret?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it trigger the existing actors to upgrade to the new secret?

No. Already doced this limitation in PR.

secret_id = secret_id,
"updating a secret but it does not exist, adding it"
);
}
self.remove_secret_file_if_exist(&secret_id);
}

pub fn init_secrets(&self, secrets: Vec<PbSecret>) {
Expand Down
3 changes: 3 additions & 0 deletions src/compute/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ impl ObserverState for ComputeObserverNode {
Operation::Delete => {
LocalSecretManager::global().remove_secret(s.id);
}
Operation::Update => {
LocalSecretManager::global().update_secret(s.id, s.value);
}
_ => {
panic!("error type notification");
}
Expand Down
33 changes: 33 additions & 0 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ pub trait CatalogWriter: Send + Sync {

async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;

async fn alter_secret(
&self,
secret_id: u32,
secret_name: String,
database_id: u32,
schema_id: u32,
owner_id: u32,
payload: Vec<u8>,
) -> Result<()>;

async fn alter_name(
&self,
object_id: alter_name_request::Object,
Expand Down Expand Up @@ -506,6 +516,29 @@ impl CatalogWriter for CatalogWriterImpl {
let version = self.meta_client.alter_swap_rename(object).await?;
self.wait_version(version).await
}

async fn alter_secret(
&self,
secret_id: u32,
secret_name: String,
database_id: u32,
schema_id: u32,
owner_id: u32,
payload: Vec<u8>,
) -> Result<()> {
let version = self
.meta_client
.alter_secret(
secret_id,
secret_name,
database_id,
schema_id,
owner_id,
payload,
)
.await?;
self.wait_version(version).await
}
}

impl CatalogWriterImpl {
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/catalog/secret_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@

use risingwave_pb::catalog::PbSecret;

use crate::catalog::{DatabaseId, OwnedByUserCatalog, SecretId};
use crate::catalog::{DatabaseId, OwnedByUserCatalog, SchemaId, SecretId};
use crate::user::UserId;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct SecretCatalog {
pub id: SecretId,
pub name: String,
pub database_id: DatabaseId,
pub schema_id: SchemaId,
pub value: Vec<u8>,
pub owner: UserId,
}
Expand All @@ -34,6 +35,7 @@ impl From<&PbSecret> for SecretCatalog {
owner: value.owner,
name: value.name.clone(),
value: value.value.clone(),
schema_id: value.schema_id,
}
}
}
Expand Down
68 changes: 68 additions & 0 deletions src/frontend/src/handler/alter_secret.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::StatementType;
use risingwave_common::license::Feature;
use risingwave_sqlparser::ast::{AlterSecretOperation, ObjectName, SqlOption};

use super::create_secret::get_secret_payload;
use super::drop_secret::fetch_secret_catalog_with_db_schema_id;
use crate::error::Result;
use crate::handler::{HandlerArgs, RwPgResponse};
use crate::WithOptions;

pub async fn handle_alter_secret(
handler_args: HandlerArgs,
secret_name: ObjectName,
sql_options: Vec<SqlOption>,
operation: AlterSecretOperation,
) -> Result<RwPgResponse> {
Feature::SecretManagement
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;

let session = handler_args.session;

if let Some((secret_catalog, _, _)) =
fetch_secret_catalog_with_db_schema_id(&session, &secret_name, false)?
{
let AlterSecretOperation::ChangeCredential { new_credential } = operation;

let with_options = WithOptions::try_from(sql_options.as_ref() as &[SqlOption])?;

let secret_payload = get_secret_payload(new_credential, with_options)?;

let catalog_writer = session.catalog_writer()?;

catalog_writer
.alter_secret(
secret_catalog.id.secret_id(),
secret_catalog.name.clone(),
secret_catalog.database_id,
secret_catalog.schema_id,
secret_catalog.owner,
secret_payload,
)
.await?;

Ok(RwPgResponse::empty_result(StatementType::ALTER_SECRET))
} else {
Ok(RwPgResponse::builder(StatementType::ALTER_SECRET)
.notice(format!(
"secret \"{}\" does not exist, skipping",
secret_name
))
.into())
}
}
85 changes: 42 additions & 43 deletions src/frontend/src/handler/create_secret.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,60 +37,21 @@ pub async fn handle_create_secret(

let session = handler_args.session.clone();
let db_name = session.database();
let (schema_name, connection_name) =
let (schema_name, secret_name) =
Binder::resolve_schema_qualified_name(db_name, stmt.secret_name.clone())?;

if let Err(e) = session.check_secret_name_duplicated(stmt.secret_name.clone()) {
return if stmt.if_not_exists {
Ok(PgResponse::builder(StatementType::CREATE_SECRET)
.notice(format!("secret \"{}\" exists, skipping", connection_name))
.notice(format!("secret \"{}\" exists, skipping", secret_name))
.into())
} else {
Err(e)
};
}
let with_options = WithOptions::try_from(stmt.with_properties.0.as_ref() as &[SqlOption])?;

let secret = secret_to_str(&stmt.credential)?.as_bytes().to_vec();

// check if the secret backend is supported
let with_props = WithOptions::try_from(stmt.with_properties.0.as_ref() as &[SqlOption])?;
let secret_payload: Vec<u8> = {
if let Some(backend) = with_props.get(SECRET_BACKEND_KEY) {
match backend.to_lowercase().as_ref() {
SECRET_BACKEND_META => {
let backend = risingwave_pb::secret::Secret {
secret_backend: Some(risingwave_pb::secret::secret::SecretBackend::Meta(
risingwave_pb::secret::SecretMetaBackend { value: secret },
)),
};
backend.encode_to_vec()
}
SECRET_BACKEND_HASHICORP_VAULT => {
if stmt.credential != Value::Null {
return Err(ErrorCode::InvalidParameterValue(
"credential must be null for hashicorp_vault backend".to_string(),
)
.into());
}
bail_not_implemented!("hashicorp_vault backend is not implemented yet")
}
_ => {
return Err(ErrorCode::InvalidParameterValue(format!(
"secret backend \"{}\" is not supported. Supported backends are: {}",
backend,
[SECRET_BACKEND_META, SECRET_BACKEND_HASHICORP_VAULT].join(",")
))
.into());
}
}
} else {
return Err(ErrorCode::InvalidParameterValue(format!(
"secret backend is not specified in with clause. Supported backends are: {}",
[SECRET_BACKEND_META, SECRET_BACKEND_HASHICORP_VAULT].join(",")
))
.into());
}
};
let secret_payload = get_secret_payload(stmt.credential, with_options)?;

let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;

Expand All @@ -117,3 +78,41 @@ fn secret_to_str(value: &Value) -> Result<String> {
.into()),
}
}

pub(crate) fn get_secret_payload(credential: Value, with_options: WithOptions) -> Result<Vec<u8>> {
let secret = secret_to_str(&credential)?.as_bytes().to_vec();

if let Some(backend) = with_options.get(SECRET_BACKEND_KEY) {
match backend.to_lowercase().as_ref() {
SECRET_BACKEND_META => {
let backend = risingwave_pb::secret::Secret {
secret_backend: Some(risingwave_pb::secret::secret::SecretBackend::Meta(
risingwave_pb::secret::SecretMetaBackend { value: secret },
)),
};
Ok(backend.encode_to_vec())
}
SECRET_BACKEND_HASHICORP_VAULT => {
if credential != Value::Null {
return Err(ErrorCode::InvalidParameterValue(
"credential must be null for hashicorp_vault backend".to_string(),
)
.into());
}
bail_not_implemented!("hashicorp_vault backend is not implemented yet")
}
_ => Err(ErrorCode::InvalidParameterValue(format!(
"secret backend \"{}\" is not supported. Supported backends are: {}",
backend,
[SECRET_BACKEND_META, SECRET_BACKEND_HASHICORP_VAULT].join(",")
))
.into()),
}
} else {
Err(ErrorCode::InvalidParameterValue(format!(
"secret backend is not specified in with clause. Supported backends are: {}",
[SECRET_BACKEND_META, SECRET_BACKEND_HASHICORP_VAULT].join(",")
))
.into())
}
}
Loading
Loading