Skip to content
This repository has been archived by the owner on Dec 18, 2024. It is now read-only.

Commit

Permalink
Fix match
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeling committed Feb 8, 2024
1 parent 79fc150 commit e081b55
Showing 1 changed file with 42 additions and 38 deletions.
80 changes: 42 additions & 38 deletions kuksa_databroker/databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1502,60 +1502,64 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
existing_fields.extend(fields.clone()))
.or_insert(fields.clone());
}
_ => {
types::ChangeType::Continuous => {
entries_continuous
.entry(id)
.and_modify(|existing_fields|
existing_fields.extend(fields.clone()))
.or_insert(fields.clone());
}
types::ChangeType::Static => {}
}
}

let (sender, receiver) = mpsc::channel(10);
let sender_on_changed = sender.clone();
let subscription = ChangeSubscription {
entries: entries_on_changed,
sender: sender_on_changed,
permissions: self.permissions.clone(),
};

{
// Send everything subscribed to in an initial notification
let db = self.broker.database.read().await;
if subscription.notify(None, &db).await.is_err() {
warn!("Failed to create initial notification");
if !entries_on_changed.is_empty() {
let sender_on_changed = sender.clone();
let subscription = ChangeSubscription {
entries: entries_on_changed,
sender: sender_on_changed,
permissions: self.permissions.clone(),
};

{
// Send everything subscribed to in an initial notification
let db = self.broker.database.read().await;
if subscription.notify(None, &db).await.is_err() {
warn!("Failed to create initial notification");
}
}

self.broker
.subscriptions
.write()
.await
.add_change_subscription(subscription);
}

self.broker
.subscriptions
.write()
.await
.add_change_subscription(subscription);


let subscription_continuous = ContinuousSubscription {
entries: entries_continuous,
sender,
permissions: self.permissions.clone(),
database: Arc::clone(&self.broker.database),
};

{
// Send everything subscribed to in an initial notification
//let db = self.broker.database.read().await;
if subscription_continuous.notify(None).await.is_err() {
warn!("Failed to create initial notification");
if !entries_continuous.is_empty() {
let subscription_continuous = ContinuousSubscription {
entries: entries_continuous,
sender,
permissions: self.permissions.clone(),
database: Arc::clone(&self.broker.database),
};

{
// Send everything subscribed to in an initial notification
//let db = self.broker.database.read().await;
if subscription_continuous.notify(None).await.is_err() {
warn!("Failed to create initial notification");
}
}

self.broker
.subscriptions
.write()
.await
.add_continuous_subscription(subscription_continuous);
}

self.broker
.subscriptions
.write()
.await
.add_continuous_subscription(subscription_continuous);

let stream = ReceiverStream::new(receiver);
Ok(stream)
}
Expand Down

0 comments on commit e081b55

Please sign in to comment.