Skip to content

Commit

Permalink
Implement all kinds of attachments v4
Browse files Browse the repository at this point in the history
  • Loading branch information
gferon committed Oct 20, 2024
1 parent 3c8b760 commit 9c2d865
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 89 deletions.
38 changes: 32 additions & 6 deletions src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,20 @@ pub enum SignalServers {
#[derive(Debug)]
pub enum Endpoint<'a> {
Absolute(Url),
Service { path: Cow<'a, str> },
Storage { path: Cow<'a, str> },
Cdn { path: Cow<'a, str>, cdn_id: u32 },
ContactDiscovery { path: Cow<'a, str> },
Service {
path: Cow<'a, str>,
},
Storage {
path: Cow<'a, str>,
},
Cdn {
cdn_id: u32,
path: Cow<'a, str>,
query: Option<Cow<'a, str>>,
},
ContactDiscovery {
path: Cow<'a, str>,
},
}

impl<'a> Endpoint<'a> {
Expand All @@ -89,8 +99,17 @@ impl<'a> Endpoint<'a> {

pub fn cdn(cdn_id: u32, path: impl Into<Cow<'a, str>>) -> Self {
Self::Cdn {
cdn_id,
path: path.into(),
query: None,
}
}

pub fn cdn_url(cdn_id: u32, url: &'a Url) -> Self {
Self::Cdn {
cdn_id,
path: url.path().into(),
query: url.query().map(Into::into),
}
}

Expand All @@ -109,8 +128,15 @@ impl<'a> Endpoint<'a> {
Endpoint::Storage { path } => {
service_configuration.storage_url.join(&path)
},
Endpoint::Cdn { path, ref cdn_id } => {
service_configuration.cdn_urls[cdn_id].join(&path)
Endpoint::Cdn {
ref cdn_id,
path,
query,
} => {
let mut url = service_configuration.cdn_urls[cdn_id].clone();
url.set_path(&path);
url.set_query(query.as_deref());
Ok(url)
},
Endpoint::ContactDiscovery { path } => {
service_configuration.contact_discovery_url.join(&path)
Expand Down
258 changes: 175 additions & 83 deletions src/push_service/cdn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use std::{

use futures::TryStreamExt;
use reqwest::{
header::{CONTENT_LENGTH, CONTENT_TYPE},
header::{CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, RANGE},
multipart::Part,
Method,
Method, StatusCode,
};
use serde::Deserialize;
use tracing::{debug, trace};
Expand Down Expand Up @@ -49,18 +49,6 @@ pub struct AttachmentDigest {
pub incremental_mac_chunk_size: u64,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ResumableUploadSpec {
attachment_key: Vec<u8>,
attachment_iv: Vec<u8>,
cdn_key: String,
cdn_number: u32,
resume_location: String,
expiration_timestamp: u64,
headers: HashMap<String, String>,
}

#[derive(Debug)]
pub struct ResumeInfo {
pub content_range: Option<String>,
Expand Down Expand Up @@ -175,16 +163,79 @@ impl PushService {
.parse()?)
}

#[tracing::instrument(skip(self))]
async fn get_attachment_resume_info_cdn2(
&mut self,
resumable_url: &Url,
content_length: u64,
) -> Result<ResumeInfo, ServiceError> {
let response = self
.request(
Method::PUT,
Endpoint::cdn_url(2, resumable_url),
HttpAuthOverride::Unidentified,
)?
.header(CONTENT_RANGE, format!("bytes */{content_length}"))
.send()
.await?
.error_for_status()?;

let status = response.status();

if status.is_success() {
Ok(ResumeInfo {
content_range: None,
content_start: content_length,
})
} else if status == StatusCode::PERMANENT_REDIRECT {
let offset =
match response.headers().get(RANGE) {
Some(range) => range
.to_str()
.map_err(|_| ServiceError::InvalidFrame {
reason: "invalid format for Range HTTP header",
})?
.split("-")
.nth(1)
.ok_or_else(|| ServiceError::InvalidFrame {
reason:
"invalid value format for Range HTTP header",
})?
.parse::<u64>()
.map_err(|_| ServiceError::InvalidFrame {
reason:
"invalid number format for Range HTTP header",
})?
+ 1,
None => 0,
};

Ok(ResumeInfo {
content_range: Some(format!(
"bytes {}-{}/{}",
offset,
content_length - 1,
content_length
)),
content_start: offset,
})
} else {
Err(ServiceError::InvalidFrame {
reason: "failed to get resumable upload data from CDN2",
})
}
}

#[tracing::instrument(skip(self))]
async fn get_attachment_resume_info_cdn3(
&mut self,
resumable_url: &Url,
headers: &HashMap<String, String>,
) -> Result<ResumeInfo, ServiceError> {
trace!("getting attachment resume info from CDN3");
let mut request = self
.request(
Method::HEAD,
Endpoint::cdn(3, resumable_url.path()),
Endpoint::cdn_url(3, resumable_url),
HttpAuthOverride::Unidentified,
)?
.header("Tus-Resumable", "1.0.0");
Expand Down Expand Up @@ -220,85 +271,32 @@ impl PushService {
///
/// Returns attachment ID and the attachment digest
#[tracing::instrument(skip(self, headers, content))]
pub(crate) async fn upload_attachment_v4<
's,
R: std::io::Read + std::io::Seek + Send + 's,
>(
pub(crate) async fn upload_attachment_v4(
&mut self,
cdn_id: u32,
resumable_url: &Url,
content_type: &str,
length: u64,
content_length: u64,
headers: HashMap<String, String>,
mut content: R,
content: impl std::io::Read + std::io::Seek + Send,
) -> Result<AttachmentDigest, ServiceError> {
if cdn_id == 2 {
unimplemented!()
// self.post_to_cdn2(
// attachment.getResumableUploadSpec().getResumeLocation(),
// attachment.getData(),
// "application/octet-stream",
// attachment.getDataSize(),
// attachment.getIncremental(),
// attachment.getOutputStreamFactory(),
// attachment.getListener(),
// attachment.getCancelationSignal(),
// )
self.upload_to_cdn2(resumable_url, content_length, content)
.await
} else {
let resume_info = self
.get_attachment_resume_info_cdn3(resumable_url, &headers)
.await?;

trace!(?resume_info, "got resume info");

if resume_info.content_start == length {
let mut digester =
crate::digeststream::DigestingReader::new(&mut content);
let mut buf = Vec::new();
digester.read_to_end(&mut buf).unwrap();
return Ok(AttachmentDigest {
digest: digester.finalize(),
incremental_digest: None,
incremental_mac_chunk_size: 0,
});
}

let mut digester =
crate::digeststream::DigestingReader::new(&mut content);
digester
.seek(SeekFrom::Start(resume_info.content_start))
.unwrap();

let mut buf = Vec::new();
digester.read_to_end(&mut buf).unwrap();

trace!("digested content");

self.request(
Method::PATCH,
Endpoint::cdn(3, resumable_url.path()),
HttpAuthOverride::Unidentified,
)?
.header("Tus-Resumable", "1.0.0")
.header("Upload-Offset", resume_info.content_start)
.header("Upload-Length", buf.len())
.header(CONTENT_TYPE, content_type)
.body(buf)
.send()
.await?;

trace!("attachment uploaded");

Ok(AttachmentDigest {
digest: digester.finalize(),
incremental_digest: None,
incremental_mac_chunk_size: 0,
})
self.upload_to_cdn3(
resumable_url,
&headers,
content_type,
content_length,
content,
)
.await
}
}

#[tracing::instrument(skip(self, upload_attributes, reader))]
pub async fn post_to_cdn0(
pub async fn upload_to_cdn0(
&mut self,
path: &str,
upload_attributes: AttachmentV2UploadAttributes,
Expand Down Expand Up @@ -344,4 +342,98 @@ impl PushService {

Ok(())
}

async fn upload_to_cdn2(
&mut self,
resumable_url: &Url,
content_length: u64,
mut content: impl std::io::Read + std::io::Seek + Send,
) -> Result<AttachmentDigest, ServiceError> {
let resume_info = self
.get_attachment_resume_info_cdn2(resumable_url, content_length)
.await?;

let mut digester =
crate::digeststream::DigestingReader::new(&mut content);

let mut buf = Vec::new();
digester.read_to_end(&mut buf)?;

trace!("digested content");

let mut request = self.request(
Method::PUT,
Endpoint::cdn_url(2, resumable_url),
HttpAuthOverride::Unidentified,
)?;

if let Some(content_range) = resume_info.content_range {
request = request.header(CONTENT_RANGE, content_range);
}

request.body(buf).send().await?.error_for_status()?;

Ok(AttachmentDigest {
digest: digester.finalize(),
incremental_digest: None,
incremental_mac_chunk_size: 0,
})
}

async fn upload_to_cdn3(
&mut self,
resumable_url: &Url,
headers: &HashMap<String, String>,
content_type: &str,
content_length: u64,
mut content: impl std::io::Read + std::io::Seek + Send,
) -> Result<AttachmentDigest, ServiceError> {
let resume_info = self
.get_attachment_resume_info_cdn3(resumable_url, &headers)

Check warning on line 392 in src/push_service/cdn.rs

View workflow job for this annotation

GitHub Actions / clippy

this expression creates a reference which is immediately dereferenced by the compiler

warning: this expression creates a reference which is immediately dereferenced by the compiler --> src/push_service/cdn.rs:392:61 | 392 | .get_attachment_resume_info_cdn3(resumable_url, &headers) | ^^^^^^^^ help: change this to: `headers` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow = note: `#[warn(clippy::needless_borrow)]` on by default
.await?;

trace!(?resume_info, "got resume info");

if resume_info.content_start == content_length {
let mut digester =
crate::digeststream::DigestingReader::new(&mut content);
let mut buf = Vec::new();
digester.read_to_end(&mut buf)?;
return Ok(AttachmentDigest {
digest: digester.finalize(),
incremental_digest: None,
incremental_mac_chunk_size: 0,
});
}

let mut digester =
crate::digeststream::DigestingReader::new(&mut content);
digester.seek(SeekFrom::Start(resume_info.content_start))?;

let mut buf = Vec::new();
digester.read_to_end(&mut buf)?;

trace!("digested content");

self.request(
Method::PATCH,
Endpoint::cdn(3, resumable_url.path()),
HttpAuthOverride::Unidentified,
)?
.header("Tus-Resumable", "1.0.0")
.header("Upload-Offset", resume_info.content_start)
.header("Upload-Length", buf.len())
.header(CONTENT_TYPE, content_type)
.body(buf)
.send()
.await?;

trace!("attachment uploaded");

Ok(AttachmentDigest {
digest: digester.finalize(),
incremental_digest: None,
incremental_mac_chunk_size: 0,
})
}
}

0 comments on commit 9c2d865

Please sign in to comment.