Skip to content

Commit

Permalink
params
Browse files Browse the repository at this point in the history
  • Loading branch information
bcherry committed Oct 30, 2024
1 parent aa60a83 commit 9b73d6c
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 78 deletions.
109 changes: 54 additions & 55 deletions examples/rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

async fn register_receiver_methods(greeters_room: &Arc<Room>, math_genius_room: &Arc<Room>) {
greeters_room.local_participant().register_rpc_method(
"arrival".to_string(),
|data| {
Box::pin(async move {
println!(
"[{}] [Greeter] Oh {} arrived and said \"{}\"",
elapsed_time(),
data.caller_identity,
data.payload
);
sleep(Duration::from_secs(2)).await;
Ok("Welcome and have a wonderful day!".to_string())
})
},
);
greeters_room.local_participant().register_rpc_method("arrival".to_string(), |data| {
Box::pin(async move {
println!(
"[{}] [Greeter] Oh {} arrived and said \"{}\"",
elapsed_time(),
data.caller_identity,
data.payload
);
sleep(Duration::from_secs(2)).await;
Ok("Welcome and have a wonderful day!".to_string())
})
});

math_genius_room.local_participant().register_rpc_method(
"square-root".to_string(),
Expand All @@ -118,34 +115,36 @@ async fn register_receiver_methods(greeters_room: &Arc<Room>, math_genius_room:
},
);

math_genius_room.local_participant().register_rpc_method(
"divide".to_string(),
|data| {
Box::pin(async move {
let json_data: Value = serde_json::from_str(&data.payload).unwrap();
let dividend = json_data["dividend"].as_i64().unwrap();
let divisor = json_data["divisor"].as_i64().unwrap();
println!(
"[{}] [Math Genius] {} wants me to divide {} by {}.",
elapsed_time(),
data.caller_identity,
dividend,
divisor
);

let result = dividend / divisor;
println!("[{}] [Math Genius] The result is {}", elapsed_time(), result);
Ok(json!({"result": result}).to_string())
})
},
);
math_genius_room.local_participant().register_rpc_method("divide".to_string(), |data| {
Box::pin(async move {
let json_data: Value = serde_json::from_str(&data.payload).unwrap();
let dividend = json_data["dividend"].as_i64().unwrap();
let divisor = json_data["divisor"].as_i64().unwrap();
println!(
"[{}] [Math Genius] {} wants me to divide {} by {}.",
elapsed_time(),
data.caller_identity,
dividend,
divisor
);

let result = dividend / divisor;
println!("[{}] [Math Genius] The result is {}", elapsed_time(), result);
Ok(json!({"result": result}).to_string())
})
});
}

async fn perform_greeting(room: &Arc<Room>) -> Result<(), Box<dyn std::error::Error>> {
println!("[{}] Letting the greeter know that I've arrived", elapsed_time());
match room
.local_participant()
.perform_rpc("greeter".to_string(), "arrival".to_string(), "Hello".to_string(), None)
.perform_rpc(PerformRpcParams {
destination_identity: "greeter".to_string(),
method: "arrival".to_string(),
payload: "Hello".to_string(),
..Default::default()
})
.await
{
Ok(response) => {
Expand All @@ -160,12 +159,12 @@ async fn perform_square_root(room: &Arc<Room>) -> Result<(), Box<dyn std::error:
println!("[{}] What's the square root of 16?", elapsed_time());
match room
.local_participant()
.perform_rpc(
"math-genius".to_string(),
"square-root".to_string(),
json!({"number": 16}).to_string(),
None,
)
.perform_rpc(PerformRpcParams {
destination_identity: "math-genius".to_string(),
method: "square-root".to_string(),
payload: json!({"number": 16}).to_string(),
..Default::default()
})
.await
{
Ok(response) => {
Expand All @@ -183,12 +182,12 @@ async fn perform_quantum_hypergeometric_series(
println!("[{}] What's the quantum hypergeometric series of 42?", elapsed_time());
match room
.local_participant()
.perform_rpc(
"math-genius".to_string(),
"quantum-hypergeometric-series".to_string(),
json!({"number": 42}).to_string(),
None,
)
.perform_rpc(PerformRpcParams {
destination_identity: "math-genius".to_string(),
method: "quantum-hypergeometric-series".to_string(),
payload: json!({"number": 42}).to_string(),
..Default::default()
})
.await
{
Ok(response) => {
Expand All @@ -210,12 +209,12 @@ async fn perform_division(room: &Arc<Room>) -> Result<(), Box<dyn std::error::Er
println!("[{}] Let's try dividing 5 by 0", elapsed_time());
match room
.local_participant()
.perform_rpc(
"math-genius".to_string(),
"divide".to_string(),
json!({"dividend": 5, "divisor": 0}).to_string(),
None,
)
.perform_rpc(PerformRpcParams {
destination_identity: "math-genius".to_string(),
method: "divide".to_string(),
payload: json!({"dividend": 5, "divisor": 0}).to_string(),
..Default::default()
})
.await
{
Ok(response) => {
Expand Down
15 changes: 9 additions & 6 deletions livekit-ffi/src/server/participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ impl FfiParticipant {

let handle = server.async_runtime.spawn(async move {
let result = local
.perform_rpc(
request.destination_identity.to_string(),
request.method,
request.payload,
request.response_timeout_ms.map(|ms| Duration::from_millis(ms as u64)),
)
.perform_rpc(PerformRpcParams {
destination_identity: request.destination_identity.to_string(),
method: request.method,
payload: request.payload,
response_timeout: request
.response_timeout_ms
.map(|ms| Duration::from_millis(ms as u64))
.unwrap_or(PerformRpcParams::default().response_timeout),
})
.await;

let callback = proto::PerformRpcCallback {
Expand Down
4 changes: 2 additions & 2 deletions livekit/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
pub use crate::{
id::*,
participant::{
ConnectionQuality, LocalParticipant, Participant, RemoteParticipant, RpcError,
RpcErrorCode, RpcInvocationData,
ConnectionQuality, LocalParticipant, Participant, PerformRpcParams, RemoteParticipant,
RpcError, RpcErrorCode, RpcInvocationData,
},
publication::{LocalTrackPublication, RemoteTrackPublication, TrackPublication},
track::{
Expand Down
22 changes: 7 additions & 15 deletions livekit/src/room/participant/local_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ impl RpcState {
}
}
}

struct LocalInfo {
events: LocalEvents,
encryption_type: EncryptionType,
Expand Down Expand Up @@ -638,17 +637,10 @@ impl LocalParticipant {
self.inner.info.read().kind
}

pub async fn perform_rpc(
&self,
destination_identity: String,
method: String,
payload: String,
response_timeout: Option<Duration>,
) -> Result<String, RpcError> {
let response_timeout = response_timeout.unwrap_or(Duration::from_millis(10000));
pub async fn perform_rpc(&self, params: PerformRpcParams) -> Result<String, RpcError> {
let max_round_trip_latency = Duration::from_millis(2000);

if payload.len() > MAX_PAYLOAD_BYTES {
if params.payload.len() > MAX_PAYLOAD_BYTES {
return Err(RpcError::built_in(RpcErrorCode::RequestPayloadTooLarge, None));
}

Expand All @@ -670,11 +662,11 @@ impl LocalParticipant {

match self
.publish_rpc_request(RpcRequest {
destination_identity: destination_identity.clone(),
destination_identity: params.destination_identity.clone(),
id: id.clone(),
method: method.clone(),
payload: payload.clone(),
response_timeout: response_timeout - max_round_trip_latency,
method: params.method.clone(),
payload: params.payload.clone(),
response_timeout: params.response_timeout - max_round_trip_latency,
version: 1,
})
.await
Expand Down Expand Up @@ -704,7 +696,7 @@ impl LocalParticipant {
}

// Wait for response timout
let response = match tokio::time::timeout(response_timeout, response_rx).await {
let response = match tokio::time::timeout(params.response_timeout, response_rx).await {
Err(_) => {
self.local.rpc_state.lock().pending_responses.remove(&id);
return Err(RpcError::built_in(RpcErrorCode::ResponseTimeout, None));
Expand Down
21 changes: 21 additions & 0 deletions livekit/src/room/participant/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,27 @@
use crate::room::participant::ParticipantIdentity;
use livekit_protocol::RpcError as RpcError_Proto;
use std::time::Duration;

/// Parameters for performing an RPC call
#[derive(Debug, Clone)]
pub struct PerformRpcParams {
pub destination_identity: String,
pub method: String,
pub payload: String,
pub response_timeout: Duration,
}

impl Default for PerformRpcParams {
fn default() -> Self {
Self {
destination_identity: Default::default(),
method: Default::default(),
payload: Default::default(),
response_timeout: Duration::from_secs(10),
}
}
}

/// Data passed to method handler for incoming RPC invocations
///
/// Attributes:
Expand Down

0 comments on commit 9b73d6c

Please sign in to comment.