Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use samllvec to reduce heap usage, use inline functions and general improvements #115

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions databroker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ lazy_static = "1.4.0"
thiserror = "1.0.47"
futures = { version = "0.3.28" }
async-trait = "0.1.82"
smallvec = "1.13.2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you should update the lock file as well, or?


# VISS
axum = { version = "0.6.20", optional = true, features = ["ws"] }
Expand Down
155 changes: 122 additions & 33 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
use crate::permissions::{PermissionError, Permissions};
pub use crate::types;

use smallvec::SmallVec;

use crate::query;
pub use crate::types::{ChangeType, DataType, DataValue, EntryType};

Expand All @@ -22,7 +24,7 @@ use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::{Stream, StreamExt};

use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -112,6 +114,10 @@ pub enum Field {
ActuatorTarget,
MetadataUnit,
}
#[derive(Debug, Clone)]
pub struct StackVecField {
pub svec: SmallVec<[Field; 3]>,
}

#[derive(Default)]
pub struct Database {
Expand Down Expand Up @@ -142,7 +148,7 @@ pub struct QueryField {
pub struct ChangeNotification {
pub id: i32,
pub update: EntryUpdate,
pub fields: HashSet<Field>,
pub fields: StackVecField,
}

#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -201,7 +207,7 @@ pub struct QuerySubscription {
}

pub struct ChangeSubscription {
entries: HashMap<i32, HashSet<Field>>,
entries: HashMap<i32, StackVecField>,
sender: broadcast::Sender<EntryUpdates>,
permissions: Permissions,
}
Expand Down Expand Up @@ -233,7 +239,63 @@ pub struct EntryUpdate {
pub unit: Option<String>,
}

impl StackVecField {
#[inline]
pub fn new() -> Self {
Self {
svec: SmallVec::new(),
}
}

#[inline]
pub fn push(&mut self, value: Field) {
self.svec.push(value);
}

#[inline]
pub fn contains(&self, element: &Field) -> bool {
self.svec.contains(element)
}

#[inline]
pub fn extend_from_stack(&mut self, other: &StackVecField) {
self.svec.extend(other.svec.iter().cloned());
}

#[inline]
pub fn with_elements(elements: SmallVec<[Field; 3]>) -> Self {
Self { svec: elements }
}

#[inline]
pub fn are_disjoint(&self, other: &StackVecField) -> bool {
for item in &self.svec {
if other.svec.contains(item) {
return false; // Found a common element
}
}
true // No common elements found
}

#[inline]
pub fn is_empty(&self) -> bool {
self.svec.is_empty()
}

#[inline]
pub fn iter(&self) -> impl Iterator<Item = &Field> {
self.svec.iter()
}
}

impl Default for StackVecField {
fn default() -> Self {
Self::new()
}
}

impl Entry {
#[inline]
pub fn diff(&self, mut update: EntryUpdate) -> EntryUpdate {
if let Some(datapoint) = &update.datapoint {
if self.metadata.change_type != ChangeType::Continuous {
Expand Down Expand Up @@ -710,16 +772,17 @@ impl Entry {
self.lag_datapoint = self.datapoint.clone();
}

pub fn apply(&mut self, update: EntryUpdate) -> HashSet<Field> {
let mut changed = HashSet::new();
#[inline]
pub fn apply(&mut self, update: EntryUpdate) -> StackVecField {
let mut changed = StackVecField::new();
if let Some(datapoint) = update.datapoint {
self.lag_datapoint = self.datapoint.clone();
self.datapoint = datapoint;
changed.insert(Field::Datapoint);
changed.push(Field::Datapoint);
}
if let Some(actuator_target) = update.actuator_target {
self.actuator_target = actuator_target;
changed.insert(Field::ActuatorTarget);
changed.push(Field::ActuatorTarget);
}

if let Some(updated_allowed) = update.allowed {
Expand Down Expand Up @@ -755,10 +818,23 @@ impl Subscriptions {

pub async fn notify(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
changed: Option<&HashMap<i32, StackVecField>>,
db: &Database,
) -> Result<Option<HashMap<String, ()>>, NotificationError> {
let mut error = None;

for sub in &self.change_subscriptions {
match sub.notify(changed, db).await {
Ok(_) => {}
Err(err) => error = Some(err),
}
}

//Leave method here if error is none and query_subscription is empty
if error.is_none() && self.query_subscriptions.is_empty() {
return Ok(None);
}

let mut lag_updates: HashMap<String, ()> = HashMap::new();
for sub in &self.query_subscriptions {
match sub.notify(changed, db).await {
Expand All @@ -774,13 +850,6 @@ impl Subscriptions {
}
}

for sub in &self.change_subscriptions {
match sub.notify(changed, db).await {
Ok(_) => {}
Err(err) => error = Some(err),
}
}

match error {
Some(err) => Err(err),
None => {
Expand Down Expand Up @@ -837,7 +906,7 @@ impl Subscriptions {
impl ChangeSubscription {
async fn notify(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
changed: Option<&HashMap<i32, StackVecField>>,
db: &Database,
) -> Result<(), NotificationError> {
let db_read = db.authorized_read_access(&self.permissions);
Expand All @@ -846,7 +915,7 @@ impl ChangeSubscription {
let mut matches = false;
for (id, changed_fields) in changed {
if let Some(fields) = self.entries.get(id) {
if !fields.is_disjoint(changed_fields) {
if !fields.are_disjoint(changed_fields) {
matches = true;
break;
}
Expand All @@ -858,25 +927,25 @@ impl ChangeSubscription {
let mut notifications = EntryUpdates::default();
for (id, changed_fields) in changed {
if let Some(fields) = self.entries.get(id) {
if !fields.is_disjoint(changed_fields) {
if !fields.are_disjoint(changed_fields) {
match db_read.get_entry_by_id(*id) {
Ok(entry) => {
let mut update = EntryUpdate::default();
let mut notify_fields = HashSet::new();
let mut notify_fields = StackVecField::new();
// TODO: Perhaps make path optional
update.path = Some(entry.metadata.path.clone());
if changed_fields.contains(&Field::Datapoint)
&& fields.contains(&Field::Datapoint)
{
update.datapoint = Some(entry.datapoint.clone());
notify_fields.insert(Field::Datapoint);
notify_fields.push(Field::Datapoint);
}
if changed_fields.contains(&Field::ActuatorTarget)
&& fields.contains(&Field::ActuatorTarget)
{
update.actuator_target =
Some(entry.actuator_target.clone());
notify_fields.insert(Field::ActuatorTarget);
notify_fields.push(Field::ActuatorTarget);
}
// fill unit field always
update.unit.clone_from(&entry.metadata.unit);
Expand Down Expand Up @@ -922,16 +991,16 @@ impl ChangeSubscription {
match db_read.get_entry_by_id(*id) {
Ok(entry) => {
let mut update = EntryUpdate::default();
let mut notify_fields = HashSet::new();
let mut notify_fields = StackVecField::new();
// TODO: Perhaps make path optional
update.path = Some(entry.metadata.path.clone());
if fields.contains(&Field::Datapoint) {
update.datapoint = Some(entry.datapoint.clone());
notify_fields.insert(Field::Datapoint);
notify_fields.push(Field::Datapoint);
}
if fields.contains(&Field::ActuatorTarget) {
update.actuator_target = Some(entry.actuator_target.clone());
notify_fields.insert(Field::ActuatorTarget);
notify_fields.push(Field::ActuatorTarget);
}
notifications.updates.push(ChangeNotification {
id: *id,
Expand Down Expand Up @@ -989,7 +1058,7 @@ impl QuerySubscription {
}
fn check_if_changes_match(
query: &CompiledQuery,
changed_origin: Option<&HashMap<i32, HashSet<Field>>>,
changed_origin: Option<&HashMap<i32, StackVecField>>,
db: &DatabaseReadAccess,
) -> bool {
match changed_origin {
Expand Down Expand Up @@ -1039,7 +1108,7 @@ impl QuerySubscription {
}
fn generate_input(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
changed: Option<&HashMap<i32, StackVecField>>,
db: &DatabaseReadAccess,
) -> Option<impl ExecutionInput> {
let id_used_in_query = QuerySubscription::check_if_changes_match(&self.query, changed, db);
Expand All @@ -1055,7 +1124,7 @@ impl QuerySubscription {

async fn notify(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
changed: Option<&HashMap<i32, StackVecField>>,
db: &Database,
) -> Result<Option<impl query::ExecutionInput>, NotificationError> {
let db_read = db.authorized_read_access(&self.permissions);
Expand Down Expand Up @@ -1190,6 +1259,11 @@ impl<'a, 'b> DatabaseReadAccess<'a, 'b> {
self.db.entries.get(&id).map(|entry| &entry.metadata)
}

#[inline]
pub fn contains_id(&self, id: i32) -> bool {
self.db.entries.contains_key(&id)
}

pub fn get_metadata_by_path(&self, path: &str) -> Option<&Metadata> {
let id = self.db.path_to_id.get(path)?;
self.get_metadata_by_id(*id)
Expand All @@ -1208,7 +1282,7 @@ impl<'a, 'b> DatabaseWriteAccess<'a, 'b> {
&mut self,
path: &str,
update: EntryUpdate,
) -> Result<HashSet<Field>, UpdateError> {
) -> Result<StackVecField, UpdateError> {
match self.db.path_to_id.get(path) {
Some(id) => self.update(*id, update),
None => Err(UpdateError::NotFound),
Expand All @@ -1228,7 +1302,7 @@ impl<'a, 'b> DatabaseWriteAccess<'a, 'b> {
}
}

pub fn update(&mut self, id: i32, update: EntryUpdate) -> Result<HashSet<Field>, UpdateError> {
pub fn update(&mut self, id: i32, update: EntryUpdate) -> Result<StackVecField, UpdateError> {
match self.db.entries.get_mut(&id) {
Some(entry) => {
if update.path.is_some()
Expand Down Expand Up @@ -1493,6 +1567,15 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.cloned()
}

pub async fn contains_id(&self, id: i32) -> bool {
self.broker
.database
.read()
.await
.authorized_read_access(self.permissions)
.contains_id(id)
}

pub async fn get_metadata_by_path(&self, path: &str) -> Option<Metadata> {
self.broker
.database
Expand Down Expand Up @@ -1569,7 +1652,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {

let cleanup_needed = {
let changed = {
let mut changed = HashMap::<i32, HashSet<Field>>::new();
let mut changed = HashMap::<i32, StackVecField>::new();
for (id, update) in updates {
debug!("setting id {} to {:?}", id, update);
match db_write.update(id, update) {
Expand Down Expand Up @@ -1631,7 +1714,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {

pub async fn subscribe(
&self,
valid_entries: HashMap<i32, HashSet<Field>>,
valid_entries: HashMap<i32, StackVecField>,
buffer_size: Option<usize>,
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> {
if valid_entries.is_empty() {
Expand Down Expand Up @@ -4262,7 +4345,10 @@ pub mod tests {

let mut stream = broker
.subscribe(
HashMap::from([(id1, HashSet::from([Field::Datapoint]))]),
HashMap::from([(
id1,
StackVecField::with_elements(smallvec::smallvec![Field::Datapoint]),
)]),
buffer_size,
)
.await
Expand Down Expand Up @@ -4371,7 +4457,10 @@ pub mod tests {

match broker
.subscribe(
HashMap::from([(id1, HashSet::from([Field::Datapoint]))]),
HashMap::from([(
id1,
StackVecField::with_elements(smallvec::smallvec![Field::Datapoint]),
)]),
// 1001 is just outside valid range 0-1000
Some(1001),
)
Expand Down
Loading
Loading