Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upsert a contact when seeing it for the first time #211

Merged
merged 4 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 4 additions & 25 deletions presage-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ enum Cmd {
#[clap(long, help = "Force to register again if already registered")]
force: bool,
},
#[clap(about = "Unregister from Signal")]
Unregister,
#[clap(
about = "Generate a QR code to scan with Signal for iOS or Android to link this client as secondary device"
)]
LinkDevice {
/// Possible values: staging, production
#[clap(long, short = 's', default_value = "production")]
Expand All @@ -106,16 +101,6 @@ enum Cmd {
#[clap(long, value_parser = parse_base64_profile_key)]
profile_key: Option<ProfileKey>,
},
#[clap(about = "Set a name, status and avatar")]
UpdateProfile,
#[clap(about = "Check if a user is registered on Signal")]
GetUserStatus,
#[clap(about = "Block contacts or groups")]
Block,
#[clap(about = "Unblock contacts or groups")]
Unblock,
#[clap(about = "Update the details of a contact")]
UpdateContact,
#[clap(about = "Receive all pending messages and saves them to disk")]
Receive {
#[clap(long = "notifications", short = 'n')]
Expand Down Expand Up @@ -420,15 +405,15 @@ fn print_message<S: Store>(
let ts = content.timestamp();
let (prefix, body) = match msg {
Msg::Received(Thread::Contact(sender), body) => {
let contact = format_contact(*sender);
let contact = format_contact(sender);
(format!("From {contact} @ {ts}: "), body)
}
Msg::Sent(Thread::Contact(recipient), body) => {
let contact = format_contact(*recipient);
let contact = format_contact(recipient);
(format!("To {contact} @ {ts}"), body)
}
Msg::Received(Thread::Group(key), body) => {
let sender = format_contact(content.metadata.sender.uuid);
let sender = format_contact(&content.metadata.sender.uuid);
let group = format_group(*key);
(format!("From {sender} to group {group} @ {ts}: "), body)
}
Expand Down Expand Up @@ -571,7 +556,6 @@ async fn run<S: Store + 'static>(subcommand: Cmd, config_store: S) -> anyhow::Re

send(&mut manager, Recipient::Group(master_key), data_message).await?;
}
Cmd::Unregister => unimplemented!(),
Cmd::RetrieveProfile {
uuid,
mut profile_key,
Expand Down Expand Up @@ -599,11 +583,6 @@ async fn run<S: Store + 'static>(subcommand: Cmd, config_store: S) -> anyhow::Re
};
println!("{profile:#?}");
}
Cmd::UpdateProfile => unimplemented!(),
Cmd::GetUserStatus => unimplemented!(),
Cmd::Block => unimplemented!(),
Cmd::Unblock => unimplemented!(),
Cmd::UpdateContact => unimplemented!(),
Cmd::ListGroups => {
let manager = Manager::load_registered(config_store).await?;
for group in manager.store().groups()? {
Expand Down Expand Up @@ -648,7 +627,7 @@ async fn run<S: Store + 'static>(subcommand: Cmd, config_store: S) -> anyhow::Re
}
Cmd::GetContact { ref uuid } => {
let manager = Manager::load_registered(config_store).await?;
match manager.store().contact_by_id(*uuid)? {
match manager.store().contact_by_id(uuid)? {
Some(contact) => println!("{contact:#?}"),
None => eprintln!("Could not find contact for {uuid}"),
}
Expand Down
13 changes: 4 additions & 9 deletions presage-store-sled/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,14 +428,9 @@ impl ContentsStore for SledStore {
Ok(())
}

fn save_contacts(
&mut self,
contacts: impl Iterator<Item = Contact>,
) -> Result<(), SledStoreError> {
for contact in contacts {
self.insert(SLED_TREE_CONTACTS, contact.uuid, contact)?;
}
debug!("saved contacts");
fn save_contact(&mut self, contact: &Contact) -> Result<(), SledStoreError> {
self.insert(SLED_TREE_CONTACTS, contact.uuid, contact)?;
debug!("saved contact");
Ok(())
}

Expand All @@ -447,7 +442,7 @@ impl ContentsStore for SledStore {
})
}

fn contact_by_id(&self, id: Uuid) -> Result<Option<Contact>, SledStoreError> {
fn contact_by_id(&self, id: &Uuid) -> Result<Option<Contact>, SledStoreError> {
self.get(SLED_TREE_CONTACTS, id)
}

Expand Down
157 changes: 111 additions & 46 deletions presage/src/manager/registered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ use libsignal_service::messagepipe::{Incoming, MessagePipe, ServiceCredentials};
use libsignal_service::models::Contact;
use libsignal_service::prelude::phonenumber::PhoneNumber;
use libsignal_service::prelude::Uuid;
use libsignal_service::profile_cipher::ProfileCipher;
use libsignal_service::proto::data_message::Delete;
use libsignal_service::proto::{
sync_message, AttachmentPointer, DataMessage, EditMessage, GroupContextV2, NullMessage,
SyncMessage,
SyncMessage, Verified,
};
use libsignal_service::protocol::SenderCertificate;
use libsignal_service::protocol::{PrivateKey, PublicKey};
Expand Down Expand Up @@ -508,6 +509,7 @@ impl<S: Store> Manager<S, Registered> {
encrypted_messages: S,
message_receiver: MessageReceiver<HyperPushService>,
service_cipher: ServiceCipher<C>,
push_service: HyperPushService,
store: C,
groups_manager: GroupsManager<HyperPushService, InMemoryCredentialsCache>,
mode: ReceivingMode,
Expand All @@ -517,6 +519,7 @@ impl<S: Store> Manager<S, Registered> {
encrypted_messages: Box::pin(self.receive_messages_encrypted().await?),
message_receiver: MessageReceiver::new(self.identified_push_service()),
service_cipher: self.new_service_cipher()?,
push_service: self.identified_push_service(),
store: self.store.clone(),
groups_manager: self.groups_manager()?,
mode,
Expand All @@ -539,15 +542,11 @@ impl<S: Store> Manager<S, Registered> {
match state.message_receiver.retrieve_contacts(contacts).await {
Ok(contacts) => {
let _ = state.store.clear_contacts();
match state
.store
.save_contacts(contacts.filter_map(Result::ok))
{
Ok(()) => {
info!("saved contacts");
}
Err(e) => {
info!("saving contacts");
for contact in contacts.filter_map(Result::ok) {
if let Err(e) = state.store.save_contact(&contact) {
warn!("failed to save contacts: {e}");
break;
}
}
}
Expand Down Expand Up @@ -603,7 +602,13 @@ impl<S: Store> Manager<S, Registered> {
}
}

if let Err(e) = save_message(&mut state.store, content.clone()) {
if let Err(e) = save_message(
&mut state.store,
&mut state.push_service,
content.clone(),
)
.await
{
error!("Error saving message to store: {}", e);
}

Expand Down Expand Up @@ -704,7 +709,8 @@ impl<S: Store> Manager<S, Registered> {
body: content_body,
};

save_message(&mut self.store, content)?;
let mut push_service = self.identified_push_service();
save_message(&mut self.store, &mut push_service, content).await?;

Ok(())
}
Expand Down Expand Up @@ -802,7 +808,8 @@ impl<S: Store> Manager<S, Registered> {
body: content_body,
};

save_message(&mut self.store, content)?;
let mut push_service = self.identified_push_service();
save_message(&mut self.store, &mut push_service, content).await?;

Ok(())
}
Expand Down Expand Up @@ -899,7 +906,7 @@ impl<S: Store> Manager<S, Registered> {
pub async fn thread_title(&self, thread: &Thread) -> Result<String, Error<S::Error>> {
match thread {
Thread::Contact(uuid) => {
let contact = match self.store.contact_by_id(*uuid) {
let contact = match self.store.contact_by_id(uuid) {
Ok(contact) => contact,
Err(e) => {
info!("Error getting contact by id: {}, {:?}", e, uuid);
Expand All @@ -925,7 +932,7 @@ impl<S: Store> Manager<S, Registered> {
/// Note: this only currently works when linked as secondary device (the contacts are sent by the primary device at linking time)
#[deprecated = "use the store handle directly"]
pub fn contact_by_id(&self, id: &Uuid) -> Result<Option<Contact>, Error<S::Error>> {
Ok(self.store.contact_by_id(*id)?)
Ok(self.store.contact_by_id(id)?)
}

/// Returns an iterator on contacts stored in the [Store].
Expand Down Expand Up @@ -1010,52 +1017,110 @@ async fn upsert_group<S: Store>(
Ok(store.group(master_key_bytes.try_into()?)?)
}

fn save_message<S: Store>(store: &mut S, message: Content) -> Result<(), Error<S::Error>> {
async fn save_message<S: Store>(
store: &mut S,
push_service: &mut HyperPushService,
message: Content,
) -> Result<(), Error<S::Error>> {
// derive the thread from the message type
let thread = Thread::try_from(&message)?;

// only save DataMessage and SynchronizeMessage (sent)
let message = match message.body {
ContentBody::NullMessage(_) => Some(message),
ContentBody::DataMessage(ref data_message)
ContentBody::DataMessage(
ref data_message @ DataMessage {
ref profile_key, ..
},
)
| ContentBody::SynchronizeMessage(SyncMessage {
sent:
Some(sync_message::Sent {
message: Some(ref data_message),
message:
Some(
ref data_message @ DataMessage {
ref profile_key, ..
},
),
..
}),
..
}) => match data_message {
DataMessage {
profile_key: Some(profile_key_bytes),
delete:
Some(Delete {
target_sent_timestamp: Some(ts),
}),
..
} => {
// update recipient profile key
if let Ok(profile_key_bytes) = profile_key_bytes.clone().try_into() {
let sender_uuid = message.metadata.sender.uuid;
let profile_key = ProfileKey::create(profile_key_bytes);
debug!("inserting profile key for {sender_uuid}");
store.upsert_profile_key(&sender_uuid, profile_key)?;
}) => {
// update recipient profile key if changed
if let Some(profile_key_bytes) = profile_key.clone().and_then(|p| p.try_into().ok()) {
let sender_uuid = message.metadata.sender.uuid;
let profile_key = ProfileKey::create(profile_key_bytes);
debug!("inserting profile key for {sender_uuid}");

// Either:
// - insert a new contact with the profile information
// - update the contact if the profile key has changed
// TODO: mark this contact as "created by us" maybe to know whether we should update it or not
if store.contact_by_id(&sender_uuid)?.is_none()
|| !store
.profile_key(&sender_uuid)?
.is_some_and(|p| p.bytes == profile_key.bytes)
{
let encrypted_profile = push_service
.retrieve_profile_by_id(sender_uuid.into(), Some(profile_key))
.await?;
let profile_cipher = ProfileCipher::from(profile_key);
let decrypted_profile = encrypted_profile.decrypt(profile_cipher).unwrap();

let contact = Contact {
uuid: sender_uuid,
phone_number: None,
name: decrypted_profile
.name
// FIXME: this assumes [firstname] [lastname]
.map(|pn| {
if let Some(family_name) = pn.family_name {
format!("{} {}", pn.given_name, family_name)
} else {
pn.given_name
}
})
.unwrap_or_default(),
profile_key: profile_key.bytes.to_vec(),
color: None,
blocked: false,
expire_timer: 0,
inbox_position: 0,
archived: false,
avatar: None,
verified: Verified::default(),
};

info!("saved contact after seeing {sender_uuid} for the first time");
store.save_contact(&contact)?;
}

// replace an existing message by an empty NullMessage
if let Some(mut existing_msg) = store.message(&thread, *ts)? {
existing_msg.metadata.sender.uuid = Uuid::nil();
existing_msg.body = NullMessage::default().into();
store.save_message(&thread, existing_msg)?;
debug!("message in thread {thread} @ {ts} deleted");
None
} else {
warn!("could not find message to delete in thread {thread} @ {ts}");
None
store.upsert_profile_key(&sender_uuid, profile_key)?;
}

match data_message {
DataMessage {
delete:
Some(Delete {
target_sent_timestamp: Some(ts),
}),
..
} => {
// replace an existing message by an empty NullMessage
if let Some(mut existing_msg) = store.message(&thread, *ts)? {
existing_msg.metadata.sender.uuid = Uuid::nil();
existing_msg.body = NullMessage::default().into();
store.save_message(&thread, existing_msg)?;
debug!("message in thread {thread} @ {ts} deleted");
None
} else {
warn!("could not find message to delete in thread {thread} @ {ts}");
None
}
}
_ => Some(message),
}
_ => Some(message),
},
}
ContentBody::EditMessage(EditMessage {
target_sent_timestamp: Some(ts),
data_message: Some(data_message),
Expand Down Expand Up @@ -1088,8 +1153,8 @@ fn save_message<S: Store>(store: &mut S, message: Content) -> Result<(), Error<S
call_event: Some(_),
..
}) => Some(message),
ContentBody::SynchronizeMessage(s) => {
debug!("skipping saving sync message without interesting fields: {s:?}");
ContentBody::SynchronizeMessage(_) => {
debug!("skipping saving sync message without interesting fields");
None
}
ContentBody::ReceiptMessage(_) => {
Expand Down
11 changes: 4 additions & 7 deletions presage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub trait ContentsStore {
/// or [Group::disappearing_messages_timer].
fn expire_timer(&self, thread: &Thread) -> Result<Option<u32>, Self::ContentsStoreError> {
match thread {
Thread::Contact(uuid) => Ok(self.contact_by_id(*uuid)?.map(|c| c.expire_timer)),
Thread::Contact(uuid) => Ok(self.contact_by_id(uuid)?.map(|c| c.expire_timer)),
Thread::Group(key) => Ok(self
.group(*key)?
.and_then(|g| g.disappearing_messages_timer)
Expand All @@ -175,17 +175,14 @@ pub trait ContentsStore {
/// Clear all saved synchronized contact data
fn clear_contacts(&mut self) -> Result<(), Self::ContentsStoreError>;

/// Replace all contact data
fn save_contacts(
&mut self,
contacts: impl Iterator<Item = Contact>,
) -> Result<(), Self::ContentsStoreError>;
/// Save a contact
fn save_contact(&mut self, contacts: &Contact) -> Result<(), Self::ContentsStoreError>;

/// Get an iterator on all stored (synchronized) contacts
fn contacts(&self) -> Result<Self::ContactsIter, Self::ContentsStoreError>;

/// Get contact data for a single user by its [Uuid].
fn contact_by_id(&self, id: Uuid) -> Result<Option<Contact>, Self::ContentsStoreError>;
fn contact_by_id(&self, id: &Uuid) -> Result<Option<Contact>, Self::ContentsStoreError>;

/// Delete all cached group data
fn clear_groups(&mut self) -> Result<(), Self::ContentsStoreError>;
Expand Down
Loading