From d73e7f3abf21a19124d187549a8971d0c8b81de7 Mon Sep 17 00:00:00 2001 From: lukasmittag Date: Wed, 10 Apr 2024 13:16:18 +0200 Subject: [PATCH 1/4] Close subscription after expired token --- databroker/src/broker.rs | 58 +++++++++++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 16 deletions(-) diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index dfa61eef..05ca954b 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -155,7 +155,7 @@ pub struct QuerySubscription { pub struct ChangeSubscription { entries: HashMap>, - sender: mpsc::Sender, + sender: Option>, permissions: Permissions, } @@ -607,7 +607,7 @@ impl Subscriptions { } pub async fn notify( - &self, + &mut self, changed: Option<&HashMap>>, db: &Database, ) -> Result>, NotificationError> { @@ -627,7 +627,7 @@ impl Subscriptions { } } - for sub in &self.change_subscriptions { + for sub in &mut self.change_subscriptions { match sub.notify(changed, db).await { Ok(_) => {} Err(err) => error = Some(err), @@ -660,12 +660,24 @@ impl Subscriptions { true } }); - self.change_subscriptions.retain(|sub| { - if sub.sender.is_closed() { + self.change_subscriptions.retain_mut(|sub| { + if let Some(sender) = &sub.sender { + if sender.is_closed() { + info!("Subscriber gone: removing subscription"); + false + } else { + match &sub.permissions.expired() { + Ok(()) => true, + Err(PermissionError::Expired) => { + sub.sender = None; + false + } + Err(err) => panic!("Error: {:?}", err), + } + } + } else { info!("Subscriber gone: removing subscription"); false - } else { - true } }); } @@ -693,7 +705,7 @@ impl ChangeSubscription { // notify let notifications = { let mut notifications = EntryUpdates::default(); - + let mut error = None; for (id, changed_fields) in changed { if let Some(fields) = self.entries.get(id) { if !fields.is_disjoint(changed_fields) { @@ -723,22 +735,32 @@ impl ChangeSubscription { fields: notify_fields, }); } + Err(ReadError::PermissionExpired) => { + debug!("notify: token expired, closing subscription channel"); + error = Some(NotificationError {}); + break; + } Err(_) => { - debug!("notify: could not find entry with id {}", id) + debug!("notify: could not find entry with id {}", id); } } } } } + if let Some(err) = error { + return Err(err); + } notifications }; if notifications.updates.is_empty() { Ok(()) - } else { - match self.sender.send(notifications).await { + } else if let Some(sender) = &self.sender { + match sender.send(notifications).await { Ok(()) => Ok(()), Err(_) => Err(NotificationError {}), } + } else { + Err(NotificationError {}) } } else { Ok(()) @@ -775,9 +797,13 @@ impl ChangeSubscription { } notifications }; - match self.sender.send(notifications).await { - Ok(()) => Ok(()), - Err(_) => Err(NotificationError {}), + if let Some(sender) = &self.sender { + match sender.send(notifications).await { + Ok(()) => Ok(()), + Err(_) => Err(NotificationError {}), + } + } else { + Err(NotificationError {}) } } } @@ -1411,7 +1437,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { match self .broker .subscriptions - .read() + .write() .await .notify(Some(&changed), &db) .await @@ -1457,7 +1483,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { let (sender, receiver) = mpsc::channel(10); let subscription = ChangeSubscription { entries: valid_entries, - sender, + sender: Some(sender), permissions: self.permissions.clone(), }; From 4c522ca2566bfcbf997a1db289c8da6d5c6d26cc Mon Sep 17 00:00:00 2001 From: lukasmittag Date: Fri, 12 Apr 2024 15:18:05 +0200 Subject: [PATCH 2/4] Refinement --- databroker/src/broker.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index 05ca954b..6a48052b 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -669,7 +669,7 @@ impl Subscriptions { match &sub.permissions.expired() { Ok(()) => true, Err(PermissionError::Expired) => { - sub.sender = None; + info!("Token expired: removing subscription"); false } Err(err) => panic!("Error: {:?}", err), @@ -705,7 +705,6 @@ impl ChangeSubscription { // notify let notifications = { let mut notifications = EntryUpdates::default(); - let mut error = None; for (id, changed_fields) in changed { if let Some(fields) = self.entries.get(id) { if !fields.is_disjoint(changed_fields) { @@ -737,8 +736,7 @@ impl ChangeSubscription { } Err(ReadError::PermissionExpired) => { debug!("notify: token expired, closing subscription channel"); - error = Some(NotificationError {}); - break; + return Err(NotificationError {}); } Err(_) => { debug!("notify: could not find entry with id {}", id); @@ -747,9 +745,6 @@ impl ChangeSubscription { } } } - if let Some(err) = error { - return Err(err); - } notifications }; if notifications.updates.is_empty() { From 7abd7c7cf85c93aa08b01608f3c98e689df94e28 Mon Sep 17 00:00:00 2001 From: lukasmittag Date: Wed, 17 Apr 2024 18:12:59 +0200 Subject: [PATCH 3/4] Remove optional --- databroker/src/broker.rs | 45 +++++++++++++++------------------------- 1 file changed, 17 insertions(+), 28 deletions(-) diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index 6a48052b..4b88d2cd 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -155,7 +155,7 @@ pub struct QuerySubscription { pub struct ChangeSubscription { entries: HashMap>, - sender: Option>, + sender: mpsc::Sender, permissions: Permissions, } @@ -661,23 +661,18 @@ impl Subscriptions { } }); self.change_subscriptions.retain_mut(|sub| { - if let Some(sender) = &sub.sender { - if sender.is_closed() { - info!("Subscriber gone: removing subscription"); - false - } else { - match &sub.permissions.expired() { - Ok(()) => true, - Err(PermissionError::Expired) => { - info!("Token expired: removing subscription"); - false - } - Err(err) => panic!("Error: {:?}", err), - } - } - } else { + if sub.sender.is_closed() { info!("Subscriber gone: removing subscription"); false + } else { + match &sub.permissions.expired() { + Ok(()) => true, + Err(PermissionError::Expired) => { + info!("Token expired: removing subscription"); + false + } + Err(err) => panic!("Error: {:?}", err), + } } }); } @@ -749,13 +744,11 @@ impl ChangeSubscription { }; if notifications.updates.is_empty() { Ok(()) - } else if let Some(sender) = &self.sender { - match sender.send(notifications).await { + } else { + match &self.sender.send(notifications).await { Ok(()) => Ok(()), Err(_) => Err(NotificationError {}), } - } else { - Err(NotificationError {}) } } else { Ok(()) @@ -792,13 +785,9 @@ impl ChangeSubscription { } notifications }; - if let Some(sender) = &self.sender { - match sender.send(notifications).await { - Ok(()) => Ok(()), - Err(_) => Err(NotificationError {}), - } - } else { - Err(NotificationError {}) + match &self.sender.send(notifications).await { + Ok(()) => Ok(()), + Err(_) => Err(NotificationError {}), } } } @@ -1478,7 +1467,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { let (sender, receiver) = mpsc::channel(10); let subscription = ChangeSubscription { entries: valid_entries, - sender: Some(sender), + sender, permissions: self.permissions.clone(), }; From a55966014adb1e5d21ef05330ba70662d5ac1b66 Mon Sep 17 00:00:00 2001 From: lukasmittag Date: Tue, 23 Apr 2024 09:24:32 +0200 Subject: [PATCH 4/4] Remove unnecessary mut references --- databroker/src/broker.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index 4b88d2cd..180cf2d4 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -607,7 +607,7 @@ impl Subscriptions { } pub async fn notify( - &mut self, + &self, changed: Option<&HashMap>>, db: &Database, ) -> Result>, NotificationError> { @@ -627,7 +627,7 @@ impl Subscriptions { } } - for sub in &mut self.change_subscriptions { + for sub in &self.change_subscriptions { match sub.notify(changed, db).await { Ok(_) => {} Err(err) => error = Some(err), @@ -660,7 +660,7 @@ impl Subscriptions { true } }); - self.change_subscriptions.retain_mut(|sub| { + self.change_subscriptions.retain(|sub| { if sub.sender.is_closed() { info!("Subscriber gone: removing subscription"); false @@ -671,7 +671,10 @@ impl Subscriptions { info!("Token expired: removing subscription"); false } - Err(err) => panic!("Error: {:?}", err), + Err(err) => { + info!("Error: {:?} -> removing subscription", err); + false + } } } }); @@ -745,7 +748,7 @@ impl ChangeSubscription { if notifications.updates.is_empty() { Ok(()) } else { - match &self.sender.send(notifications).await { + match self.sender.send(notifications).await { Ok(()) => Ok(()), Err(_) => Err(NotificationError {}), } @@ -785,7 +788,7 @@ impl ChangeSubscription { } notifications }; - match &self.sender.send(notifications).await { + match self.sender.send(notifications).await { Ok(()) => Ok(()), Err(_) => Err(NotificationError {}), } @@ -1421,7 +1424,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { match self .broker .subscriptions - .write() + .read() .await .notify(Some(&changed), &db) .await