Skip to content

Commit

Permalink
RPC updates (#476)
Browse files Browse the repository at this point in the history
* Check RPC version

* invocation data, duration

* Duration

* fmt

* params

* data
  • Loading branch information
bcherry authored Oct 31, 2024
1 parent e9cebf6 commit 91fcf57
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 155 deletions.
120 changes: 61 additions & 59 deletions examples/rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,71 +78,73 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

async fn register_receiver_methods(greeters_room: &Arc<Room>, math_genius_room: &Arc<Room>) {
greeters_room.local_participant().register_rpc_method(
"arrival".to_string(),
|_, caller_identity, payload, _| {
Box::pin(async move {
println!(
"[{}] [Greeter] Oh {} arrived and said \"{}\"",
elapsed_time(),
caller_identity,
payload
);
sleep(Duration::from_secs(2)).await;
Ok("Welcome and have a wonderful day!".to_string())
})
},
);

math_genius_room.local_participant().register_rpc_method("square-root".to_string(), |_, caller_identity, payload, response_timeout_ms| {
greeters_room.local_participant().register_rpc_method("arrival".to_string(), |data| {
Box::pin(async move {
let json_data: Value = serde_json::from_str(&payload).unwrap();
let number = json_data["number"].as_f64().unwrap();
println!(
"[{}] [Math Genius] I guess {} wants the square root of {}. I've only got {} seconds to respond but I think I can pull it off.",
"[{}] [Greeter] Oh {} arrived and said \"{}\"",
elapsed_time(),
caller_identity,
number,
response_timeout_ms.as_secs()
data.caller_identity,
data.payload
);

println!("[{}] [Math Genius] *doing math*…", elapsed_time());
sleep(Duration::from_secs(2)).await;

let result = number.sqrt();
println!("[{}] [Math Genius] Aha! It's {}", elapsed_time(), result);
Ok(json!({"result": result}).to_string())
Ok("Welcome and have a wonderful day!".to_string())
})
});

math_genius_room.local_participant().register_rpc_method(
"divide".to_string(),
|_, caller_identity, payload, _| {
"square-root".to_string(),
|data| {
Box::pin(async move {
let json_data: Value = serde_json::from_str(&payload).unwrap();
let dividend = json_data["dividend"].as_i64().unwrap();
let divisor = json_data["divisor"].as_i64().unwrap();
let json_data: Value = serde_json::from_str(&data.payload).unwrap();
let number = json_data["number"].as_f64().unwrap();
println!(
"[{}] [Math Genius] {} wants me to divide {} by {}.",
"[{}] [Math Genius] I guess {} wants the square root of {}. I've only got {} seconds to respond but I think I can pull it off.",
elapsed_time(),
caller_identity,
dividend,
divisor
data.caller_identity,
number,
data.response_timeout.as_secs()
);

let result = dividend / divisor;
println!("[{}] [Math Genius] The result is {}", elapsed_time(), result);
println!("[{}] [Math Genius] *doing math*…", elapsed_time());
sleep(Duration::from_secs(2)).await;

let result = number.sqrt();
println!("[{}] [Math Genius] Aha! It's {}", elapsed_time(), result);
Ok(json!({"result": result}).to_string())
})
},
);

math_genius_room.local_participant().register_rpc_method("divide".to_string(), |data| {
Box::pin(async move {
let json_data: Value = serde_json::from_str(&data.payload).unwrap();
let dividend = json_data["dividend"].as_i64().unwrap();
let divisor = json_data["divisor"].as_i64().unwrap();
println!(
"[{}] [Math Genius] {} wants me to divide {} by {}.",
elapsed_time(),
data.caller_identity,
dividend,
divisor
);

let result = dividend / divisor;
println!("[{}] [Math Genius] The result is {}", elapsed_time(), result);
Ok(json!({"result": result}).to_string())
})
});
}

async fn perform_greeting(room: &Arc<Room>) -> Result<(), Box<dyn std::error::Error>> {
println!("[{}] Letting the greeter know that I've arrived", elapsed_time());
match room
.local_participant()
.perform_rpc("greeter".to_string(), "arrival".to_string(), "Hello".to_string(), None)
.perform_rpc(PerformRpcData {
destination_identity: "greeter".to_string(),
method: "arrival".to_string(),
payload: "Hello".to_string(),
..Default::default()
})
.await
{
Ok(response) => {
Expand All @@ -157,12 +159,12 @@ async fn perform_square_root(room: &Arc<Room>) -> Result<(), Box<dyn std::error:
println!("[{}] What's the square root of 16?", elapsed_time());
match room
.local_participant()
.perform_rpc(
"math-genius".to_string(),
"square-root".to_string(),
json!({"number": 16}).to_string(),
None,
)
.perform_rpc(PerformRpcData {
destination_identity: "math-genius".to_string(),
method: "square-root".to_string(),
payload: json!({"number": 16}).to_string(),
..Default::default()
})
.await
{
Ok(response) => {
Expand All @@ -180,12 +182,12 @@ async fn perform_quantum_hypergeometric_series(
println!("[{}] What's the quantum hypergeometric series of 42?", elapsed_time());
match room
.local_participant()
.perform_rpc(
"math-genius".to_string(),
"quantum-hypergeometric-series".to_string(),
json!({"number": 42}).to_string(),
None,
)
.perform_rpc(PerformRpcData {
destination_identity: "math-genius".to_string(),
method: "quantum-hypergeometric-series".to_string(),
payload: json!({"number": 42}).to_string(),
..Default::default()
})
.await
{
Ok(response) => {
Expand All @@ -207,12 +209,12 @@ async fn perform_division(room: &Arc<Room>) -> Result<(), Box<dyn std::error::Er
println!("[{}] Let's try dividing 5 by 0", elapsed_time());
match room
.local_participant()
.perform_rpc(
"math-genius".to_string(),
"divide".to_string(),
json!({"dividend": 5, "divisor": 0}).to_string(),
None,
)
.perform_rpc(PerformRpcData {
destination_identity: "math-genius".to_string(),
method: "divide".to_string(),
payload: json!({"dividend": 5, "divisor": 0}).to_string(),
..Default::default()
})
.await
{
Ok(response) => {
Expand Down
67 changes: 30 additions & 37 deletions livekit-ffi/src/server/participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ impl FfiParticipant {

let handle = server.async_runtime.spawn(async move {
let result = local
.perform_rpc(
request.destination_identity.to_string(),
request.method,
request.payload,
request.response_timeout_ms,
)
.perform_rpc(PerformRpcData {
destination_identity: request.destination_identity.to_string(),
method: request.method,
payload: request.payload,
response_timeout: request
.response_timeout_ms
.map(|ms| Duration::from_millis(ms as u64))
.unwrap_or(PerformRpcData::default().response_timeout),
})
.await;

let callback = proto::PerformRpcCallback {
Expand Down Expand Up @@ -91,34 +94,27 @@ impl FfiParticipant {

let local_participant_handle = self.handle.clone();
let room: Arc<RoomInner> = self.room.clone();
local.register_rpc_method(
method.clone(),
move |request_id, caller_identity, payload, response_timeout| {
Box::pin({
let room = room.clone();
let method = method.clone();
async move {
forward_rpc_method_invocation(
server,
room,
local_participant_handle,
method,
request_id,
caller_identity,
payload,
response_timeout,
)
.await
}
})
},
);
local.register_rpc_method(method.clone(), move |data| {
Box::pin({
let room = room.clone();
let method = method.clone();
async move {
forward_rpc_method_invocation(
server,
room,
local_participant_handle,
method,
data,
)
.await
}
})
});
Ok(proto::RegisterRpcMethodResponse {})
}

pub fn unregister_rpc_method(
&self,
server: &'static FfiServer,
request: proto::UnregisterRpcMethodRequest,
) -> FfiResult<proto::UnregisterRpcMethodResponse> {
let local = match &self.participant {
Expand All @@ -139,10 +135,7 @@ async fn forward_rpc_method_invocation(
room: Arc<RoomInner>,
local_participant_handle: FfiHandleId,
method: String,
request_id: String,
caller_identity: ParticipantIdentity,
payload: String,
response_timeout: Duration,
data: RpcInvocationData,
) -> Result<String, RpcError> {
let (tx, rx) = oneshot::channel();
let invocation_id = server.next_id();
Expand All @@ -152,10 +145,10 @@ async fn forward_rpc_method_invocation(
local_participant_handle: local_participant_handle as u64,
invocation_id,
method,
request_id,
caller_identity: caller_identity.into(),
payload,
response_timeout_ms: response_timeout.as_millis() as u32,
request_id: data.request_id,
caller_identity: data.caller_identity.into(),
payload: data.payload,
response_timeout_ms: data.response_timeout.as_millis() as u32,
},
));

Expand Down
2 changes: 1 addition & 1 deletion livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ fn on_unregister_rpc_method(
) -> FfiResult<proto::UnregisterRpcMethodResponse> {
let ffi_participant =
server.retrieve_handle::<FfiParticipant>(request.local_participant_handle)?.clone();
return ffi_participant.unregister_rpc_method(server, request);
return ffi_participant.unregister_rpc_method(request);
}

fn on_rpc_method_invocation_response(
Expand Down
3 changes: 2 additions & 1 deletion livekit/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
pub use crate::{
id::*,
participant::{
ConnectionQuality, LocalParticipant, Participant, RemoteParticipant, RpcError, RpcErrorCode,
ConnectionQuality, LocalParticipant, Participant, PerformRpcData, RemoteParticipant,
RpcError, RpcErrorCode, RpcInvocationData,
},
publication::{LocalTrackPublication, RemoteTrackPublication, TrackPublication},
track::{
Expand Down
7 changes: 4 additions & 3 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ pub struct RpcRequest {
pub id: String,
pub method: String,
pub payload: String,
pub response_timeout_ms: u32,
pub response_timeout: Duration,
pub version: u32,
}

Expand Down Expand Up @@ -689,7 +689,7 @@ impl RoomSession {
request_id,
method,
payload,
response_timeout_ms,
response_timeout,
version,
} => {
if caller_identity.is_none() {
Expand All @@ -702,7 +702,8 @@ impl RoomSession {
request_id,
method,
payload,
response_timeout_ms,
response_timeout,
version,
)
.await;
}
Expand Down
Loading

0 comments on commit 91fcf57

Please sign in to comment.