Skip to content

Commit

Permalink
Remove optional
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmittag committed Apr 17, 2024
1 parent 64f0cf6 commit b8d4757
Showing 1 changed file with 17 additions and 28 deletions.
45 changes: 17 additions & 28 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub struct QuerySubscription {

pub struct ChangeSubscription {
entries: HashMap<i32, HashSet<Field>>,
sender: Option<mpsc::Sender<EntryUpdates>>,
sender: mpsc::Sender<EntryUpdates>,
permissions: Permissions,
}

Expand Down Expand Up @@ -661,23 +661,18 @@ impl Subscriptions {
}
});
self.change_subscriptions.retain_mut(|sub| {
if let Some(sender) = &sub.sender {
if sender.is_closed() {
info!("Subscriber gone: removing subscription");
false
} else {
match &sub.permissions.expired() {
Ok(()) => true,
Err(PermissionError::Expired) => {
info!("Token expired: removing subscription");
false
}
Err(err) => panic!("Error: {:?}", err),
}
}
} else {
if sub.sender.is_closed() {
info!("Subscriber gone: removing subscription");
false
} else {
match &sub.permissions.expired() {
Ok(()) => true,
Err(PermissionError::Expired) => {
info!("Token expired: removing subscription");
false
}
Err(err) => panic!("Error: {:?}", err),
}
}
});
}
Expand Down Expand Up @@ -749,13 +744,11 @@ impl ChangeSubscription {
};
if notifications.updates.is_empty() {
Ok(())
} else if let Some(sender) = &self.sender {
match sender.send(notifications).await {
} else {
match &self.sender.send(notifications).await {
Ok(()) => Ok(()),
Err(_) => Err(NotificationError {}),
}
} else {
Err(NotificationError {})
}
} else {
Ok(())
Expand Down Expand Up @@ -792,13 +785,9 @@ impl ChangeSubscription {
}
notifications
};
if let Some(sender) = &self.sender {
match sender.send(notifications).await {
Ok(()) => Ok(()),
Err(_) => Err(NotificationError {}),
}
} else {
Err(NotificationError {})
match &self.sender.send(notifications).await {
Ok(()) => Ok(()),
Err(_) => Err(NotificationError {}),
}
}
}
Expand Down Expand Up @@ -1478,7 +1467,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
let (sender, receiver) = mpsc::channel(10);
let subscription = ChangeSubscription {
entries: valid_entries,
sender: Some(sender),
sender,
permissions: self.permissions.clone(),
};

Expand Down

0 comments on commit b8d4757

Please sign in to comment.