Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sending note to self as primary without secondary devices #339

Merged
merged 5 commits into from
Oct 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 52 additions & 36 deletions src/sender.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use core::time;
use std::{collections::HashSet, time::SystemTime};

use chrono::prelude::*;
Expand All @@ -7,7 +6,7 @@ use libsignal_protocol::{
ProtocolStore, SenderCertificate, SenderKeyStore, SignalProtocolError,
};
use rand::{CryptoRng, Rng};
use tracing::{debug, error, info, trace};
use tracing::{debug, error, info, trace, warn};
use tracing_futures::Instrument;
use uuid::Uuid;
use zkgroup::GROUP_IDENTIFIER_LEN;
Expand Down Expand Up @@ -132,6 +131,9 @@ pub enum MessageSenderError {

#[error("Recipient not found: {addr:?}")]
NotFound { addr: ServiceAddress },

#[error("no messages were encrypted: this should not really happen and most likely implies a logic error")]
NoMessagesToSend,
}

pub type GroupV2Id = [u8; GROUP_IDENTIFIER_LEN];
Expand All @@ -142,6 +144,12 @@ pub enum ThreadIdentifier {
Group(GroupV2Id),
}

#[derive(Debug)]
pub struct EncryptedMessages {
messages: Vec<OutgoingPushMessage>,
used_identity_key: IdentityKey,
}

impl<S, R> MessageSender<S, R>
where
S: ProtocolStore + SenderKeyStore + SessionStoreExt + Sync + Clone,
Expand Down Expand Up @@ -349,6 +357,7 @@ where
) -> SendMessageResult {
let content_body = message.into();
let message_to_self = recipient == &self.local_aci;
let is_multi_device = self.is_multi_device().await;

use crate::proto::data_message::Flags;

Expand All @@ -360,7 +369,7 @@ where
};

// only send a sync message when sending to self and skip the rest of the process
if message_to_self {
if message_to_self && is_multi_device {
debug!("sending note to self");
let sync_message =
Self::create_multi_device_sent_transcript_content(
Expand Down Expand Up @@ -403,7 +412,7 @@ where
_ => false,
};

if needs_sync || self.is_multi_device().await {
if needs_sync || is_multi_device {
debug!("sending multi-device sync message");
let sync_message =
Self::create_multi_device_sent_transcript_content(
Expand Down Expand Up @@ -533,13 +542,22 @@ where
let content_bytes = content.encode_to_vec();

for _ in 0..4u8 {
let (messages, used_identity_key) = self
let Some(EncryptedMessages {
messages,
used_identity_key,
}) = self
.create_encrypted_messages(
&recipient,
unidentified_access.map(|x| &x.certificate),
&content_bytes,
)
.await?;
.await?
else {
// this can happen for example when a device is primary, without any secondaries
// and we send a message to ourselves (which is only a SyncMessage { sent: ... })
// addressed to self
return Err(MessageSenderError::NoMessagesToSend);
};
rubdos marked this conversation as resolved.
Show resolved Hide resolved

let messages = OutgoingPushMessages {
destination: recipient.uuid,
Expand Down Expand Up @@ -839,8 +857,7 @@ where
recipient: &ServiceAddress,
unidentified_access: Option<&SenderCertificate>,
content: &[u8],
) -> Result<(Vec<OutgoingPushMessage>, IdentityKey), MessageSenderError>
{
) -> Result<Option<EncryptedMessages>, MessageSenderError> {
let mut messages = vec![];

let mut devices: HashSet<DeviceId> = self
Expand Down Expand Up @@ -891,32 +908,24 @@ where
messages.push(message);
break;
},
Err(
e @ MessageSenderError::ServiceError(
ServiceError::SignalProtocolError(
SignalProtocolError::SessionNotFound(_),
),
Err(MessageSenderError::ServiceError(
ServiceError::SignalProtocolError(
SignalProtocolError::SessionNotFound(addr),
),
) => {
let MessageSenderError::ServiceError(
ServiceError::SignalProtocolError(
SignalProtocolError::SessionNotFound(addr),
),
) = &e
else {
// We can't bind to addr above, because we move into `e`.
unreachable!()
};
)) => {
// SessionNotFound is returned on certain session corruption.
// Since delete_session *creates* a session if it doesn't exist,
// the NotFound error is an indicator of session corruption.
// Try to delete this session, if it gets succesfully deleted, retry. Otherwise, fail.
tracing::warn!("Potential session corruption for {}, deleting session", addr);
match self.protocol_store.delete_session(addr).await {
match self.protocol_store.delete_session(&addr).await {
Ok(()) => continue,
Err(_e) => {
tracing::warn!("Failed to delete session for {}, failing message. {}", addr, _e);
return Err(e);
Err(error) => {
tracing::warn!(%error, %addr, "failed to delete session");
return Err(
SignalProtocolError::SessionNotFound(addr)
.into(),
);
},
}
},
Expand All @@ -925,15 +934,22 @@ where
}
}

let identity_key = self
.protocol_store
.get_identity(&recipient.to_protocol_address(DEFAULT_DEVICE_ID))
.await?
.ok_or(MessageSenderError::UntrustedIdentity {
address: *recipient,
})?;

Ok((messages, identity_key))
if messages.is_empty() {
Ok(None)
} else {
Ok(Some(EncryptedMessages {
messages,
used_identity_key: self
.protocol_store
.get_identity(
&recipient.to_protocol_address(DEFAULT_DEVICE_ID),
)
.await?
.ok_or(MessageSenderError::UntrustedIdentity {
address: *recipient,
})?,
}))
}
}

/// Equivalent to `getEncryptedMessage`
Expand Down
Loading