Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle metadata #205

Merged
merged 3 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,18 @@ message RoomEvent {
TrackMuted track_muted = 11;
TrackUnmuted track_unmuted = 12;
ActiveSpeakersChanged active_speakers_changed = 13;
ConnectionQualityChanged connection_quality_changed = 14;
DataReceived data_received = 15;
ConnectionStateChanged connection_state_changed = 16;
// Connected connected = 17;
Disconnected disconnected = 18;
Reconnecting reconnecting = 19;
Reconnected reconnected = 20;
E2eeStateChanged e2ee_state_changed = 21;
RoomEOS eos = 22; // The stream of room events has ended
RoomMetadataChanged room_metadata_changed = 14;
ParticipantMetadataChanged participant_metadata_changed = 15;
ParticipantNameChanged participant_name_changed = 16;
ConnectionQualityChanged connection_quality_changed = 17;
DataReceived data_received = 18;
ConnectionStateChanged connection_state_changed = 19;
// Connected connected = 20;
Disconnected disconnected = 21;
Reconnecting reconnecting = 22;
Reconnected reconnected = 23;
E2eeStateChanged e2ee_state_changed = 24;
RoomEOS eos = 25; // The stream of room events has ended
}
}

Expand Down Expand Up @@ -292,6 +295,23 @@ message E2eeStateChanged {

message ActiveSpeakersChanged { repeated string participant_sids = 1; }

message RoomMetadataChanged {
string old_metadata = 1;
string metadata = 2;
}

message ParticipantMetadataChanged {
string participant_sid = 1;
string old_metadata = 2;
string metadata = 3;
}

message ParticipantNameChanged {
string participant_sid = 1;
string old_name = 2;
string name = 3;
}

message ConnectionQualityChanged {
string participant_sid = 1;
ConnectionQuality quality = 2;
Expand All @@ -310,4 +330,4 @@ message Disconnected {}
message Reconnecting {}
message Reconnected {}

message RoomEOS {}
message RoomEOS {}
52 changes: 43 additions & 9 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ pub struct OwnedBuffer {
pub struct RoomEvent {
#[prost(uint64, tag="1")]
pub room_handle: u64,
#[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 18, 19, 20, 21, 22")]
#[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21, 22, 23, 24, 25")]
pub message: ::core::option::Option<room_event::Message>,
}
/// Nested message and enum types in `RoomEvent`.
Expand Down Expand Up @@ -1305,22 +1305,28 @@ pub mod room_event {
#[prost(message, tag="13")]
ActiveSpeakersChanged(super::ActiveSpeakersChanged),
#[prost(message, tag="14")]
ConnectionQualityChanged(super::ConnectionQualityChanged),
RoomMetadataChanged(super::RoomMetadataChanged),
#[prost(message, tag="15")]
DataReceived(super::DataReceived),
ParticipantMetadataChanged(super::ParticipantMetadataChanged),
#[prost(message, tag="16")]
ConnectionStateChanged(super::ConnectionStateChanged),
/// Connected connected = 17;
ParticipantNameChanged(super::ParticipantNameChanged),
#[prost(message, tag="17")]
ConnectionQualityChanged(super::ConnectionQualityChanged),
#[prost(message, tag="18")]
Disconnected(super::Disconnected),
DataReceived(super::DataReceived),
#[prost(message, tag="19")]
ConnectionStateChanged(super::ConnectionStateChanged),
/// Connected connected = 20;
#[prost(message, tag="21")]
Disconnected(super::Disconnected),
#[prost(message, tag="22")]
Reconnecting(super::Reconnecting),
#[prost(message, tag="20")]
#[prost(message, tag="23")]
Reconnected(super::Reconnected),
#[prost(message, tag="21")]
#[prost(message, tag="24")]
E2eeStateChanged(super::E2eeStateChanged),
/// The stream of room events has ended
#[prost(message, tag="22")]
#[prost(message, tag="25")]
Eos(super::RoomEos),
}
}
Expand Down Expand Up @@ -1446,6 +1452,34 @@ pub struct ActiveSpeakersChanged {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RoomMetadataChanged {
#[prost(string, tag="1")]
pub old_metadata: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub metadata: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ParticipantMetadataChanged {
#[prost(string, tag="1")]
pub participant_sid: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub old_metadata: ::prost::alloc::string::String,
#[prost(string, tag="3")]
pub metadata: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ParticipantNameChanged {
#[prost(string, tag="1")]
pub participant_sid: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub old_name: ::prost::alloc::string::String,
#[prost(string, tag="3")]
pub name: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ConnectionQualityChanged {
#[prost(string, tag="1")]
pub participant_sid: ::prost::alloc::string::String,
Expand Down
44 changes: 43 additions & 1 deletion livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,46 @@ async fn forward_event(
))
.await;
}
RoomEvent::RoomMetadataChanged {
old_metadata,
metadata,
} => {
let _ = send_event(proto::room_event::Message::RoomMetadataChanged(
proto::RoomMetadataChanged {
old_metadata,
metadata,
},
))
.await;
}
RoomEvent::ParticipantMetadataChanged {
participant,
old_metadata,
metadata,
} => {
let _ = send_event(proto::room_event::Message::ParticipantMetadataChanged(
proto::ParticipantMetadataChanged {
participant_sid: participant.sid().to_string(),
old_metadata,
metadata,
},
))
.await;
}
RoomEvent::ParticipantNameChanged {
participant,
old_name,
name,
} => {
let _ = send_event(proto::room_event::Message::ParticipantNameChanged(
proto::ParticipantNameChanged {
participant_sid: participant.sid().to_string(),
old_name,
name,
},
))
.await;
}
RoomEvent::ActiveSpeakersChanged { speakers } => {
let participant_sids = speakers
.iter()
Expand Down Expand Up @@ -710,7 +750,9 @@ async fn forward_event(
))
.await;
}
_ => {}
_ => {
log::warn!("unhandled room event: {:?}", event);
}
};
}

Expand Down
77 changes: 76 additions & 1 deletion livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ pub enum RoomEvent {
publication: RemoteTrackPublication,
participant: RemoteParticipant,
},
// TODO(theomonnom): Should we also add track for muted events?
TrackMuted {
participant: Participant,
publication: TrackPublication,
Expand All @@ -107,6 +106,20 @@ pub enum RoomEvent {
participant: Participant,
publication: TrackPublication,
},
RoomMetadataChanged {
old_metadata: String,
metadata: String,
},
ParticipantMetadataChanged {
participant: Participant,
old_metadata: String,
metadata: String,
},
ParticipantNameChanged {
participant: Participant,
old_name: String,
name: String,
},
ActiveSpeakersChanged {
speakers: Vec<Participant>,
},
Expand Down Expand Up @@ -284,6 +297,30 @@ impl Room {
}
});

local_participant.on_metadata_changed({
let dispatcher = dispatcher.clone();
move |participant, old_metadata, metadata| {
let event = RoomEvent::ParticipantMetadataChanged {
participant,
old_metadata,
metadata,
};
dispatcher.dispatch(&event);
}
});

local_participant.on_name_changed({
let dispatcher = dispatcher.clone();
move |participant, old_name, name| {
let event = RoomEvent::ParticipantNameChanged {
participant,
old_name,
name,
};
dispatcher.dispatch(&event);
}
});

let room_info = join_response.room.unwrap();
let inner = Arc::new(RoomSession {
sid: room_info.sid.try_into().unwrap(),
Expand Down Expand Up @@ -500,6 +537,7 @@ impl RoomSession {
stream,
transceiver,
} => self.handle_media_track(track, stream, transceiver),
EngineEvent::RoomUpdate { room } => self.handle_room_update(room),
EngineEvent::Resuming(tx) => self.handle_resuming(tx),
EngineEvent::Resumed(tx) => self.handle_resumed(tx),
EngineEvent::SignalResumed {
Expand Down Expand Up @@ -803,6 +841,17 @@ impl RoomSession {
.await;
}

fn handle_room_update(self: &Arc<Self>, room: proto::Room) {
let mut info = self.info.write();
let old_metadata = std::mem::replace(&mut info.metadata, room.metadata.clone());
if old_metadata != room.metadata {
self.dispatcher.dispatch(&RoomEvent::RoomMetadataChanged {
old_metadata,
metadata: info.metadata.clone(),
});
}
}

fn handle_resuming(self: &Arc<Self>, tx: oneshot::Sender<()>) {
if self.update_connection_state(ConnectionState::Reconnecting) {
self.dispatcher.dispatch(&RoomEvent::Reconnecting);
Expand Down Expand Up @@ -906,6 +955,8 @@ impl RoomSession {
.update_info(join_response.participant.unwrap()); // The sid may have changed

self.handle_participant_update(join_response.other_participants);
self.handle_room_update(join_response.room.unwrap());

let _ = tx.send(());
}

Expand Down Expand Up @@ -1018,6 +1069,30 @@ impl RoomSession {
}
});

participant.on_metadata_changed({
let dispatcher = self.dispatcher.clone();
move |participant, old_metadata, metadata| {
let event = RoomEvent::ParticipantMetadataChanged {
participant,
old_metadata,
metadata,
};
dispatcher.dispatch(&event);
}
});

participant.on_name_changed({
let dispatcher = self.dispatcher.clone();
move |participant, old_name, name| {
let event = RoomEvent::ParticipantNameChanged {
participant,
old_name,
name,
};
dispatcher.dispatch(&event);
}
});

let mut participants = self.participants.write();
participants.0.insert(sid, participant.clone());
participants.1.insert(identity, participant.clone());
Expand Down
40 changes: 40 additions & 0 deletions livekit/src/room/participant/local_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,20 @@ impl LocalParticipant {
super::on_track_unmuted(&self.inner, handler)
}

pub(crate) fn on_metadata_changed(
&self,
handler: impl Fn(Participant, String, String) + Send + 'static,
) {
super::on_metadata_changed(&self.inner, handler)
}

pub(crate) fn on_name_changed(
&self,
handler: impl Fn(Participant, String, String) + Send + 'static,
) {
super::on_name_changed(&self.inner, handler)
}

pub(crate) fn add_publication(&self, publication: TrackPublication) {
super::add_publication(&self.inner, &Participant::Local(self.clone()), publication);
}
Expand Down Expand Up @@ -218,6 +232,32 @@ impl LocalParticipant {
Ok(publication)
}

pub async fn update_metadata(&self, metadata: String) -> RoomResult<()> {
self.inner
.rtc_engine
.send_request(proto::signal_request::Message::UpdateMetadata(
proto::UpdateParticipantMetadata {
metadata,
name: self.name(),
},
))
.await;
Ok(())
}

pub async fn update_name(&self, name: String) -> RoomResult<()> {
self.inner
.rtc_engine
.send_request(proto::signal_request::Message::UpdateMetadata(
proto::UpdateParticipantMetadata {
metadata: self.metadata(),
name,
},
))
.await;
Ok(())
}

pub async fn unpublish_track(
&self,
sid: &TrackSid,
Expand Down
Loading