Skip to content

Commit

Permalink
bucket notification - ensure all lines in a persitent log are process…
Browse files Browse the repository at this point in the history
…ed (gh issue 8653) (#8662)

Signed-off-by: Amit Prinz Setter <[email protected]>
  • Loading branch information
alphaprinz authored Jan 8, 2025
1 parent 02f26cb commit 038ce84
Showing 1 changed file with 25 additions and 20 deletions.
45 changes: 25 additions & 20 deletions src/util/notifications_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,28 +114,33 @@ class Notificator {
const file = new LogFile(fs_context, log_file);
const send_promises = [];
await file.collect_and_process(async str => {
const notif = JSON.parse(str);
dbg.log2("notifying with notification =", notif);
let connect = this.notif_to_connect.get(notif.meta.name);
if (!connect) {
connect = await this.parse_connect_file(notif.meta.connect);
this.notif_to_connect.set(notif.meta.name, connect);
}
let connection = this.connect_str_to_connection.get(notif.meta.name);
if (!connection) {
connection = get_connection(connect);
try {
await connection.connect();
} catch (err) {
//failed to connect
dbg.error("Connection failed for", connect);
await failure_append(str);
return;
try {
const notif = JSON.parse(str);
dbg.log2("notifying with notification =", notif);
let connect = this.notif_to_connect.get(notif.meta.name);
if (!connect) {
connect = await this.parse_connect_file(notif.meta.connect);
this.notif_to_connect.set(notif.meta.name, connect);
}
this.connect_str_to_connection.set(notif.meta.name, connection);
let connection = this.connect_str_to_connection.get(notif.meta.name);
if (!connection) {
connection = get_connection(connect);
try {
await connection.connect();
} catch (err) {
//failed to connect
dbg.error("Connection failed for", connect);
await failure_append(str);
return;
}
this.connect_str_to_connection.set(notif.meta.name, connection);
}
const send_promise = connection.promise_notify(notif, failure_append);
if (send_promise) send_promises.push(send_promise);
} catch (err) {
dbg.error("Failed to notify. err = ", err, ", str =", str);
await failure_append(str);
}
const send_promise = connection.promise_notify(notif, failure_append);
if (send_promise) send_promises.push(send_promise);
});
//note we can't reject promises here, since Promise.all() is rejected on
//first rejected promise, and that would not await other send_promises()
Expand Down

0 comments on commit 038ce84

Please sign in to comment.