Skip to content
This repository has been archived by the owner on Dec 18, 2024. It is now read-only.

Commit

Permalink
Continuous subscription at a specific frequency
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeling committed Mar 6, 2024
1 parent 10592bb commit 960e057
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 10 deletions.
22 changes: 18 additions & 4 deletions kuksa_databroker/databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,11 @@ impl Subscriptions {
self.change_subscriptions.push(subscription)
}

pub fn add_continuous_subscription(&mut self, subscription: ContinuousSubscription) {
pub fn add_continuous_subscription(
&mut self,
subscription: ContinuousSubscription,
frequency: u64,
) {
let local_subscription = subscription.clone();
self.continuous_subscriptions
.lock()
Expand All @@ -627,7 +631,7 @@ impl Subscriptions {
while !local_subscription.sender.is_closed() {
let _ = local_subscription.notify(None).await;
// Simulate some asynchronous work
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_millis(1 / frequency * 1000)).await;
}
});
}
Expand Down Expand Up @@ -1593,6 +1597,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
pub async fn subscribe(
&self,
valid_entries: HashMap<i32, HashSet<Field>>,
frequency: Option<u64>,
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> {
if valid_entries.is_empty() {
return Err(SubscriptionError::InvalidInput);
Expand Down Expand Up @@ -1636,6 +1641,9 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {

let (sender, receiver) = mpsc::channel(10);
if !entries_on_changed.is_empty() {
if frequency.is_some() && entries_continuous.is_empty() {
return Err(SubscriptionError::InvalidInput);
}
let subscription = ChangeSubscription {
entries: entries_on_changed,
sender: sender.clone(),
Expand All @@ -1658,6 +1666,9 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
}

if !entries_continuous.is_empty() {
if frequency.is_none() {
return Err(SubscriptionError::InvalidInput);
}
let subscription_continuous = ContinuousSubscription {
entries: entries_continuous,
sender,
Expand All @@ -1677,7 +1688,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.subscriptions
.write()
.await
.add_continuous_subscription(subscription_continuous);
.add_continuous_subscription(subscription_continuous, frequency.unwrap());
}

let stream = ReceiverStream::new(receiver);
Expand Down Expand Up @@ -3161,7 +3172,10 @@ mod tests {
.expect("Register datapoint should succeed");

let mut stream = broker
.subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))]))
.subscribe(
HashMap::from([(id1, HashSet::from([Field::Datapoint]))]),
None,
)
.await
.expect("subscription should succeed");

Expand Down
2 changes: 1 addition & 1 deletion kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

match broker.subscribe(entries).await {
match broker.subscribe(entries, request.frequency_hertz).await {
Ok(stream) => {
let stream = convert_to_proto_stream(stream);
Ok(tonic::Response::new(Box::pin(stream)))
Expand Down
2 changes: 1 addition & 1 deletion kuksa_databroker/databroker/src/viss/v2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl Viss for Server {
});
};

match broker.subscribe(entries).await {
match broker.subscribe(entries, None).await {
Ok(stream) => {
let subscription_id = SubscriptionId::new();

Expand Down
15 changes: 12 additions & 3 deletions kuksa_databroker/lib/kuksa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,10 @@ impl KuksaClient {
})
}

let req = proto::v1::SubscribeRequest { entries };
let req = proto::v1::SubscribeRequest {
entries,
frequency_hertz: None,
};

match client.subscribe(req).await {
Ok(response) => Ok(response.into_inner()),
Expand Down Expand Up @@ -321,7 +324,10 @@ impl KuksaClient {
})
}

let req = proto::v1::SubscribeRequest { entries };
let req = proto::v1::SubscribeRequest {
entries,
frequency_hertz: None,
};

match client.subscribe(req).await {
Ok(response) => Ok(response.into_inner()),
Expand All @@ -346,7 +352,10 @@ impl KuksaClient {
})
}

let req = proto::v1::SubscribeRequest { entries };
let req = proto::v1::SubscribeRequest {
entries,
frequency_hertz: None,
};

match client.subscribe(req).await {
Ok(response) => Ok(response.into_inner()),
Expand Down
3 changes: 2 additions & 1 deletion proto/kuksa/val/v1/val.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ message SubscribeEntry {
// Subscribe to changes in datapoints.
message SubscribeRequest {
repeated SubscribeEntry entries = 1;
optional uint64 frequency_hertz = 2;
}

// A subscription response
Expand All @@ -112,4 +113,4 @@ message GetServerInfoRequest {
message GetServerInfoResponse {
string name = 1;
string version = 2;
}
}

0 comments on commit 960e057

Please sign in to comment.