Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bucket notifications - validate notifications on change (gh issue 8649) #8667

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/cmd/manage_nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ async function delete_bucket(data, force) {
*/
async function bucket_management(action, user_input) {
const data = action === ACTIONS.LIST ? undefined : await fetch_bucket_data(action, user_input);
await manage_nsfs_validations.validate_bucket_args(config_fs, data, action);
await manage_nsfs_validations.validate_bucket_args(config_fs, data, action, user_input);

let response = {};
if (action === ACTIONS.ADD) {
Expand Down
13 changes: 12 additions & 1 deletion src/manage_nsfs/manage_nsfs_validations.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const { TYPES, ACTIONS, VALID_OPTIONS, OPTION_TYPE, FROM_FILE, BOOLEAN_STRING_VA
GLACIER_ACTIONS, LIST_UNSETABLE_OPTIONS, ANONYMOUS, DIAGNOSE_ACTIONS, UPGRADE_ACTIONS } = require('../manage_nsfs/manage_nsfs_constants');
const iam_utils = require('../endpoint/iam/iam_utils');
const { check_root_account_owns_user } = require('../nc/nc_utils');
const notifications_util = require('../util/notifications_util');

/////////////////////////////
//// GENERAL VALIDATIONS ////
Expand Down Expand Up @@ -348,8 +349,9 @@ async function check_new_access_key_exists(config_fs, action, data) {
* @param {import('../sdk/config_fs').ConfigFS} config_fs
* @param {object} data
* @param {string} action
* @param {object} user_input
*/
async function validate_bucket_args(config_fs, data, action) {
async function validate_bucket_args(config_fs, data, action, user_input) {
if (action === ACTIONS.ADD || action === ACTIONS.UPDATE) {
if (action === ACTIONS.ADD) native_fs_utils.validate_bucket_creation({ name: data.name });
if ((action === ACTIONS.UPDATE) && (data.new_name !== undefined)) native_fs_utils.validate_bucket_creation({ name: data.new_name });
Expand Down Expand Up @@ -403,6 +405,15 @@ async function validate_bucket_args(config_fs, data, action) {
}
}
}

//if there's a change to the bucket's notifications, we need to test them
//if one of the specified notifications fail, we need to fail the user's request
if (user_input.notifications) {
const test_notif_err = await notifications_util.test_notifications(user_input, config_fs.connections_dir_path);
if (test_notif_err) {
throw_cli_error(ManageCLIError.InvalidArgument, "Failed to update notifications", test_notif_err);
}
}
}

/////////////////////////////
Expand Down
32 changes: 24 additions & 8 deletions src/util/notifications_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class HttpNotificator {
return;
}
dbg.error("Notify err =", err);
promise_failure_cb(JSON.stringify(notif)).then(resolve);
promise_failure_cb(JSON.stringify(notif), err).then(resolve);
});
req.on('timeout', () => {
dbg.error("Notify timeout");
Expand Down Expand Up @@ -249,7 +249,7 @@ class KafkaNotificator {
Date.now(),
(err, offset) => {
if (err) {
promise_failure_cb(JSON.stringify(notif)).then(resolve);
promise_failure_cb(JSON.stringify(notif), err).then(resolve);
} else {
resolve();
}
Expand Down Expand Up @@ -303,16 +303,32 @@ function get_connection(connect) {
async function test_notifications(bucket, connect_files_dir) {
const notificator = new Notificator({connect_files_dir});
for (const notif of bucket.notifications) {
const connect = await notificator.parse_connect_file(notif.connect);
dbg.log1("testing notif", notif);
let connect;
let connection;
let failure = false;
let notif_failure;
try {
const connection = get_connection(connect);
connect = await notificator.parse_connect_file(notif.topic[0]);
connection = get_connection(connect);
await connection.connect();
await connection.promise_notify({notif: "test notification"}, async err => err);
connection.destroy();
await connection.promise_notify({notif: "test notification"}, async (notif_cb, err_cb) => {
failure = true;
notif_failure = err_cb;
});
if (failure) {
if (notif_failure) {
throw notif_failure;
}
//no error was thrown during notify, throw generic error
throw new Error();
}
} catch (err) {
dbg.error("Connection failed for", connect);
dbg.error("Connection failed for", notif, ", connect =", connect, ", err = ", err);
return err;
} finally {
if (connection) {
connection.destroy();
}
}
}
}
Expand Down
Loading