From e081b55a4f90514b447ac9bed67d153e2f0612f2 Mon Sep 17 00:00:00 2001 From: Rafael RL Date: Thu, 8 Feb 2024 18:19:19 +0100 Subject: [PATCH] Fix match --- kuksa_databroker/databroker/src/broker.rs | 80 ++++++++++++----------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/kuksa_databroker/databroker/src/broker.rs b/kuksa_databroker/databroker/src/broker.rs index 23b0bcc4..22be8b32 100644 --- a/kuksa_databroker/databroker/src/broker.rs +++ b/kuksa_databroker/databroker/src/broker.rs @@ -1502,60 +1502,64 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { existing_fields.extend(fields.clone())) .or_insert(fields.clone()); } - _ => { + types::ChangeType::Continuous => { entries_continuous .entry(id) .and_modify(|existing_fields| existing_fields.extend(fields.clone())) .or_insert(fields.clone()); } + types::ChangeType::Static => {} } } let (sender, receiver) = mpsc::channel(10); - let sender_on_changed = sender.clone(); - let subscription = ChangeSubscription { - entries: entries_on_changed, - sender: sender_on_changed, - permissions: self.permissions.clone(), - }; - - { - // Send everything subscribed to in an initial notification - let db = self.broker.database.read().await; - if subscription.notify(None, &db).await.is_err() { - warn!("Failed to create initial notification"); + if !entries_on_changed.is_empty() { + let sender_on_changed = sender.clone(); + let subscription = ChangeSubscription { + entries: entries_on_changed, + sender: sender_on_changed, + permissions: self.permissions.clone(), + }; + + { + // Send everything subscribed to in an initial notification + let db = self.broker.database.read().await; + if subscription.notify(None, &db).await.is_err() { + warn!("Failed to create initial notification"); + } } + + self.broker + .subscriptions + .write() + .await + .add_change_subscription(subscription); } - self.broker - .subscriptions - .write() - .await - .add_change_subscription(subscription); - - - let subscription_continuous = ContinuousSubscription { - entries: entries_continuous, - sender, - permissions: self.permissions.clone(), - database: Arc::clone(&self.broker.database), - }; - - { - // Send everything subscribed to in an initial notification - //let db = self.broker.database.read().await; - if subscription_continuous.notify(None).await.is_err() { - warn!("Failed to create initial notification"); + if !entries_continuous.is_empty() { + let subscription_continuous = ContinuousSubscription { + entries: entries_continuous, + sender, + permissions: self.permissions.clone(), + database: Arc::clone(&self.broker.database), + }; + + { + // Send everything subscribed to in an initial notification + //let db = self.broker.database.read().await; + if subscription_continuous.notify(None).await.is_err() { + warn!("Failed to create initial notification"); + } } + + self.broker + .subscriptions + .write() + .await + .add_continuous_subscription(subscription_continuous); } - self.broker - .subscriptions - .write() - .await - .add_continuous_subscription(subscription_continuous); - let stream = ReceiverStream::new(receiver); Ok(stream) }