From 9e4268984ebdfd58b44ad7a85a79f5f1e0d47232 Mon Sep 17 00:00:00 2001 From: suyanhanx Date: Tue, 11 Apr 2023 22:50:49 +0800 Subject: [PATCH] refactor(services/oss): Migrate to async reqsign Signed-off-by: suyanhanx --- core/src/services/oss/backend.rs | 686 ++----------------------------- core/src/services/oss/core.rs | 660 +++++++++++++++++++++++++++++ core/src/services/oss/mod.rs | 1 + core/src/services/oss/pager.rs | 24 +- core/src/services/oss/writer.rs | 49 ++- 5 files changed, 739 insertions(+), 681 deletions(-) create mode 100644 core/src/services/oss/core.rs diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 4f3e88bbd8f..e99a58b4fe6 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -23,21 +23,14 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Buf; -use bytes::Bytes; -use http::header::CONTENT_DISPOSITION; -use http::header::CONTENT_LENGTH; -use http::header::CONTENT_TYPE; -use http::header::RANGE; -use http::Request; -use http::Response; use http::StatusCode; use http::Uri; use log::debug; -use reqsign::AliyunOssBuilder; -use reqsign::AliyunOssSigner; -use serde::Deserialize; -use serde::Serialize; +use reqsign_0_9::AliyunConfig; +use reqsign_0_9::AliyunLoader; +use reqsign_0_9::AliyunOssSigner; +use super::core::*; use super::error::parse_error; use super::pager::OssPager; use super::writer::OssWriter; @@ -113,7 +106,7 @@ use crate::*; /// Ok(()) /// } /// ``` -#[derive(Default, Clone)] +#[derive(Default)] pub struct OssBuilder { root: Option, @@ -135,19 +128,9 @@ impl Debug for OssBuilder { let mut d = f.debug_struct("Builder"); d.field("root", &self.root) .field("bucket", &self.bucket) - .field("endpoint", &self.endpoint) - .field("presign_endpoint", &self.presign_endpoint) - .field("allow_anonymous", &self.allow_anonymous); + .field("endpoint", &self.endpoint); - if self.access_key_id.is_some() { - d.field("access_key_id", &""); - } - - if self.access_key_secret.is_some() { - d.field("access_key_secret", &""); - } - - d.finish() + d.finish_non_exhaustive() } } @@ -338,66 +321,41 @@ impl Builder for OssBuilder { }; debug!("backend use presign_endpoint: {}", &presign_endpoint); - let mut signer_builder = AliyunOssBuilder::default(); + let mut cfg = AliyunConfig::default(); - if self.allow_anonymous { - signer_builder.allow_anonymous(); + if let Some(v) = self.access_key_id.take() { + cfg.access_key_id = Some(v); } - signer_builder.bucket(bucket); - - if let (Some(ak), Some(sk)) = (&self.access_key_id, &self.access_key_secret) { - signer_builder.access_key_id(ak); - signer_builder.access_key_secret(sk); + if let Some(v) = self.access_key_secret.take() { + cfg.access_key_secret = Some(v); } - let signer = signer_builder.build().map_err(|e| { - Error::new(ErrorKind::ConfigInvalid, "build AliyunOssSigner") - .with_context("service", Scheme::Oss) - .with_context("endpoint", &endpoint) - .with_context("bucket", bucket) - .set_source(e) - })?; + let loader = AliyunLoader::new(client.client(), cfg); + + let signer = AliyunOssSigner::new(bucket); - debug!("Backend build finished: {:?}", &self); + debug!("Backend build finished"); Ok(OssBackend { - root, - endpoint, - presign_endpoint, - host, - client, - bucket: self.bucket.clone(), - signer: Arc::new(signer), + core: Arc::new(OssCore { + root, + bucket: bucket.to_owned(), + endpoint, + host, + presign_endpoint, + signer, + loader, + client, + }), }) } } -#[derive(Clone)] +#[derive(Debug, Clone)] /// Aliyun Object Storage Service backend pub struct OssBackend { - pub client: HttpClient, - - root: String, - bucket: String, - /// buffered host string - /// - /// format: . - host: String, - endpoint: String, - presign_endpoint: String, - pub signer: Arc, -} - -impl Debug for OssBackend { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Backend") - .field("root", &self.root) - .field("bucket", &self.bucket) - .field("endpoint", &self.endpoint) - .field("host", &self.host) - .finish() - } + core: Arc, } #[async_trait] @@ -415,8 +373,8 @@ impl Accessor for OssBackend { let mut am = AccessorInfo::default(); am.set_scheme(Scheme::Oss) - .set_root(&self.root) - .set_name(&self.bucket) + .set_root(&self.core.root) + .set_name(&self.core.bucket) .set_max_batch_operations(1000) .set_capabilities(Read | Write | Copy | List | Scan | Presign | Batch) .set_hints(ReadStreamable); @@ -426,6 +384,7 @@ impl Accessor for OssBackend { async fn create(&self, path: &str, _: OpCreate) -> Result { let resp = self + .core .oss_put_object(path, None, None, None, AsyncBody::Empty) .await?; let status = resp.status(); @@ -440,7 +399,7 @@ impl Accessor for OssBackend { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let resp = self.oss_get_object(path, args.range()).await?; + let resp = self.core.oss_get_object(path, args.range()).await?; let status = resp.status(); @@ -455,7 +414,7 @@ impl Accessor for OssBackend { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { let upload_id = if args.append() { - let resp = self.oss_initiate_upload(path).await?; + let resp = self.core.oss_initiate_upload(path).await?; match resp.status() { StatusCode::OK => { let bs = resp.into_body().bytes().await?; @@ -472,12 +431,12 @@ impl Accessor for OssBackend { Ok(( RpWrite::default(), - OssWriter::new(self.clone(), args, path.to_string(), upload_id), + OssWriter::new(self.core.clone(), args, path.to_string(), upload_id), )) } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { - let resp = self.oss_copy_object(from, to).await?; + let resp = self.core.oss_copy_object(from, to).await?; let status = resp.status(); match status { @@ -495,7 +454,7 @@ impl Accessor for OssBackend { return Ok(RpStat::new(m)); } - let resp = self.oss_head_object(path).await?; + let resp = self.core.oss_head_object(path).await?; let status = resp.status(); @@ -511,7 +470,7 @@ impl Accessor for OssBackend { } async fn delete(&self, path: &str, _: OpDelete) -> Result { - let resp = self.oss_delete_object(path).await?; + let resp = self.core.oss_delete_object(path).await?; let status = resp.status(); match status { StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => { @@ -525,23 +484,23 @@ impl Accessor for OssBackend { async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { Ok(( RpList::default(), - OssPager::new(Arc::new(self.clone()), &self.root, path, "/", args.limit()), + OssPager::new(self.core.clone(), path, "/", args.limit()), )) } async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { Ok(( RpScan::default(), - OssPager::new(Arc::new(self.clone()), &self.root, path, "", args.limit()), + OssPager::new(self.core.clone(), path, "", args.limit()), )) } async fn presign(&self, path: &str, args: OpPresign) -> Result { // We will not send this request out, just for signing. let mut req = match args.operation() { - PresignOperation::Stat(_) => self.oss_head_object_request(path, true)?, - PresignOperation::Read(v) => self.oss_get_object_request(path, v.range(), true)?, - PresignOperation::Write(v) => self.oss_put_object_request( + PresignOperation::Stat(_) => self.core.oss_head_object_request(path, true)?, + PresignOperation::Read(v) => self.core.oss_get_object_request(path, v.range(), true)?, + PresignOperation::Write(v) => self.core.oss_put_object_request( path, None, v.content_type(), @@ -551,13 +510,7 @@ impl Accessor for OssBackend { )?, }; - self.signer - .sign_query( - &mut req, - // TODO: convert to std::time::Duration - time::Duration::seconds_f64(args.expire().as_secs_f64()), - ) - .map_err(new_request_sign_error)?; + self.core.sign_query(&mut req, args.expire()).await?; // We don't need this request anymore, consume it directly. let (parts, _) = req.into_parts(); @@ -592,7 +545,7 @@ impl Accessor for OssBackend { }) .collect(); - let resp = self.oss_delete_objects(paths).await?; + let resp = self.core.oss_delete_objects(paths).await?; let status = resp.status(); @@ -604,7 +557,7 @@ impl Accessor for OssBackend { let mut batched_result = Vec::with_capacity(ops_len); for i in result.deleted { - let path = build_rel_path(&self.root, &i.key); + let path = build_rel_path(&self.core.root, &i.key); keys.remove(&path); batched_result.push((path, Ok(RpDelete::default().into()))); } @@ -625,552 +578,3 @@ impl Accessor for OssBackend { } } } - -impl OssBackend { - pub fn oss_put_object_request( - &self, - path: &str, - size: Option, - content_type: Option<&str>, - content_disposition: Option<&str>, - body: AsyncBody, - is_presign: bool, - ) -> Result> { - let p = build_abs_path(&self.root, path); - let endpoint = self.get_endpoint(is_presign); - let url = format!("{}/{}", endpoint, percent_encode_path(&p)); - - let mut req = Request::put(&url); - - req = req.header(CONTENT_LENGTH, size.unwrap_or_default()); - - if let Some(mime) = content_type { - req = req.header(CONTENT_TYPE, mime); - } - - if let Some(pos) = content_disposition { - req = req.header(CONTENT_DISPOSITION, pos); - } - - let req = req.body(body).map_err(new_request_build_error)?; - Ok(req) - } - - fn oss_get_object_request( - &self, - path: &str, - range: BytesRange, - is_presign: bool, - ) -> Result> { - let p = build_abs_path(&self.root, path); - let endpoint = self.get_endpoint(is_presign); - let url = format!("{}/{}", endpoint, percent_encode_path(&p)); - - let mut req = Request::get(&url); - req = req.header(CONTENT_TYPE, "application/octet-stream"); - - if !range.is_full() { - req = req.header(RANGE, range.to_header()); - // Adding `x-oss-range-behavior` header to use standard behavior. - // ref: https://help.aliyun.com/document_detail/39571.html - req = req.header("x-oss-range-behavior", "standard"); - } - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - Ok(req) - } - - fn oss_delete_object_request(&self, path: &str) -> Result> { - let p = build_abs_path(&self.root, path); - let endpoint = self.get_endpoint(false); - let url = format!("{}/{}", endpoint, percent_encode_path(&p)); - let req = Request::delete(&url); - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - Ok(req) - } - - fn oss_head_object_request(&self, path: &str, is_presign: bool) -> Result> { - let p = build_abs_path(&self.root, path); - let endpoint = self.get_endpoint(is_presign); - let url = format!("{}/{}", endpoint, percent_encode_path(&p)); - - let req = Request::head(&url); - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - Ok(req) - } - - fn oss_list_object_request( - &self, - path: &str, - token: Option<&str>, - delimiter: &str, - limit: Option, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let endpoint = self.get_endpoint(false); - let url = format!( - "{}/?list-type=2&delimiter={delimiter}&prefix={}{}{}", - endpoint, - percent_encode_path(&p), - limit.map(|t| format!("&max-keys={t}")).unwrap_or_default(), - token - .map(|t| format!("&continuation-token={}", percent_encode_path(t))) - .unwrap_or_default(), - ); - - let req = Request::get(&url) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - Ok(req) - } - - async fn oss_get_object( - &self, - path: &str, - range: BytesRange, - ) -> Result> { - let mut req = self.oss_get_object_request(path, range, false)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - self.client.send(req).await - } - - async fn oss_head_object(&self, path: &str) -> Result> { - let mut req = self.oss_head_object_request(path, false)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - self.client.send(req).await - } - - async fn oss_put_object( - &self, - path: &str, - size: Option, - content_type: Option<&str>, - content_disposition: Option<&str>, - body: AsyncBody, - ) -> Result> { - let mut req = self.oss_put_object_request( - path, - size, - content_type, - content_disposition, - body, - false, - )?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - self.client.send(req).await - } - - async fn oss_copy_object(&self, from: &str, to: &str) -> Result> { - let source = build_abs_path(&self.root, from); - let target = build_abs_path(&self.root, to); - - let url = format!( - "{}/{}", - self.get_endpoint(false), - percent_encode_path(&target) - ); - let source = format!("/{}/{}", self.bucket, percent_encode_path(&source)); - - let mut req = Request::put(&url) - .header("x-oss-copy-source", source) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - self.client.send(req).await - } - - pub(super) async fn oss_list_object( - &self, - path: &str, - token: Option<&str>, - delimiter: &str, - limit: Option, - ) -> Result> { - let mut req = self.oss_list_object_request(path, token, delimiter, limit)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - self.client.send(req).await - } - - async fn oss_delete_object(&self, path: &str) -> Result> { - let mut req = self.oss_delete_object_request(path)?; - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - self.client.send(req).await - } - - async fn oss_delete_objects(&self, paths: Vec) -> Result> { - let url = format!("{}/?delete", self.endpoint); - - let req = Request::post(&url); - - let content = quick_xml::se::to_string(&DeleteObjectsRequest { - object: paths - .into_iter() - .map(|path| DeleteObjectsRequestObject { - key: build_abs_path(&self.root, &path), - }) - .collect(), - }) - .map_err(new_xml_deserialize_error)?; - - // Make sure content length has been set to avoid post with chunked encoding. - let req = req.header(CONTENT_LENGTH, content.len()); - // Set content-type to `application/xml` to avoid mixed with form post. - let req = req.header(CONTENT_TYPE, "application/xml"); - // Set content-md5 as required by API. - let req = req.header("CONTENT-MD5", format_content_md5(content.as_bytes())); - - let mut req = req - .body(AsyncBody::Bytes(Bytes::from(content))) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - fn get_endpoint(&self, is_presign: bool) -> &str { - if is_presign { - &self.presign_endpoint - } else { - &self.endpoint - } - } - - async fn oss_initiate_upload(&self, path: &str) -> Result> { - let req = self.oss_initiate_upload_request(path, None, None, AsyncBody::Empty, false)?; - self.client.send(req).await - } - - /// Creates a request that initiates multipart upload - fn oss_initiate_upload_request( - &self, - path: &str, - content_type: Option<&str>, - content_disposition: Option<&str>, - body: AsyncBody, - is_presign: bool, - ) -> Result> { - let path = build_abs_path(&self.root, path); - let endpoint = self.get_endpoint(is_presign); - let url = format!("{}/{}?uploads", endpoint, percent_encode_path(&path)); - let mut req = Request::post(&url); - if let Some(mime) = content_type { - req = req.header(CONTENT_TYPE, mime); - } - if let Some(disposition) = content_disposition { - req = req.header(CONTENT_DISPOSITION, disposition); - } - - let mut req = req.body(body).map_err(new_request_build_error)?; - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - Ok(req) - } - - /// Creates a request to upload a part - pub fn oss_upload_part_request( - &self, - path: &str, - upload_id: &str, - part_number: usize, - is_presign: bool, - size: Option, - body: AsyncBody, - ) -> Result> { - let p = build_abs_path(&self.root, path); - let endpoint = self.get_endpoint(is_presign); - - let url = format!( - "{}/{}?partNumber={}&uploadId={}", - endpoint, - percent_encode_path(&p), - part_number, - percent_encode_path(upload_id) - ); - - let mut req = Request::put(&url); - - if let Some(size) = size { - req = req.header(CONTENT_LENGTH, size); - } - let mut req = req.body(body).map_err(new_request_build_error)?; - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - Ok(req) - } - - pub async fn oss_complete_multipart_upload_request( - &self, - path: &str, - upload_id: &str, - is_presign: bool, - parts: &[MultipartUploadPart], - ) -> Result> { - let p = build_abs_path(&self.root, path); - let endpoint = self.get_endpoint(is_presign); - let url = format!( - "{}/{}?uploadId={}", - endpoint, - percent_encode_path(&p), - percent_encode_path(upload_id) - ); - - let req = Request::post(&url); - - let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { - part: parts.to_vec(), - }) - .map_err(new_xml_deserialize_error)?; - // Make sure content length has been set to avoid post with chunked encoding. - let req = req.header(CONTENT_LENGTH, content.len()); - // Set content-type to `application/xml` to avoid mixed with form post. - let req = req.header(CONTENT_TYPE, "application/xml"); - - let mut req = req - .body(AsyncBody::Bytes(Bytes::from(content))) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - self.client.send(req).await - } -} - -/// Request of DeleteObjects. -#[derive(Default, Debug, Serialize)] -#[serde(default, rename = "Delete", rename_all = "PascalCase")] -struct DeleteObjectsRequest { - object: Vec, -} - -#[derive(Default, Debug, Serialize)] -#[serde(rename_all = "PascalCase")] -struct DeleteObjectsRequestObject { - key: String, -} - -/// Result of DeleteObjects. -#[derive(Default, Debug, Deserialize)] -#[serde(default, rename = "DeleteResult", rename_all = "PascalCase")] -struct DeleteObjectsResult { - deleted: Vec, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(rename_all = "PascalCase")] -struct DeleteObjectsResultDeleted { - key: String, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(default, rename_all = "PascalCase")] -struct DeleteObjectsResultError { - code: String, - key: String, - message: String, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(rename_all = "PascalCase")] -struct InitiateMultipartUploadResult { - #[cfg(test)] - bucket: String, - #[cfg(test)] - key: String, - upload_id: String, -} - -#[derive(Clone, Default, Debug, Serialize)] -#[serde(default, rename_all = "PascalCase")] -pub struct MultipartUploadPart { - #[serde(rename = "PartNumber")] - pub part_number: usize, - #[serde(rename = "ETag")] - pub etag: String, -} - -#[derive(Default, Debug, Serialize)] -#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")] -struct CompleteMultipartUploadRequest { - part: Vec, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(rename_all = "PascalCase")] -pub struct CompleteMultipartUploadResult { - pub location: String, - pub bucket: String, - pub key: String, - #[serde(rename = "ETag")] - pub etag: String, -} - -#[cfg(test)] -mod tests { - use bytes::Buf; - use bytes::Bytes; - - use super::*; - - /// This example is from https://www.alibabacloud.com/help/zh/object-storage-service/latest/deletemultipleobjects - #[test] - fn test_serialize_delete_objects_request() { - let req = DeleteObjectsRequest { - object: vec![ - DeleteObjectsRequestObject { - key: "multipart.data".to_string(), - }, - DeleteObjectsRequestObject { - key: "test.jpg".to_string(), - }, - DeleteObjectsRequestObject { - key: "demo.jpg".to_string(), - }, - ], - }; - - let actual = quick_xml::se::to_string(&req).expect("must succeed"); - - pretty_assertions::assert_eq!( - actual, - r#" - - multipart.data - - - test.jpg - - - demo.jpg - -"# - // Cleanup space and new line - .replace([' ', '\n'], "") - ) - } - - /// This example is from https://www.alibabacloud.com/help/zh/object-storage-service/latest/deletemultipleobjects - #[test] - fn test_deserialize_delete_objects_result() { - let bs = Bytes::from( - r#" - - - multipart.data - - - test.jpg - - - demo.jpg - -"#, - ); - - let out: DeleteObjectsResult = - quick_xml::de::from_reader(bs.reader()).expect("must success"); - - assert_eq!(out.deleted.len(), 3); - assert_eq!(out.deleted[0].key, "multipart.data"); - assert_eq!(out.deleted[1].key, "test.jpg"); - assert_eq!(out.deleted[2].key, "demo.jpg"); - } - - #[test] - fn test_deserialize_initiate_multipart_upload_response() { - let bs = Bytes::from( - r#" - - oss-example - multipart.data - 0004B9894A22E5B1888A1E29F823**** -"#, - ); - let out: InitiateMultipartUploadResult = - quick_xml::de::from_reader(bs.reader()).expect("must success"); - - assert_eq!("0004B9894A22E5B1888A1E29F823****", out.upload_id); - assert_eq!("multipart.data", out.key); - assert_eq!("oss-example", out.bucket); - } - - #[test] - fn test_serialize_complete_multipart_upload_request() { - let req = CompleteMultipartUploadRequest { - part: vec![ - MultipartUploadPart { - part_number: 1, - etag: "\"3349DC700140D7F86A0784842780****\"".to_string(), - }, - MultipartUploadPart { - part_number: 5, - etag: "\"8EFDA8BE206636A695359836FE0A****\"".to_string(), - }, - MultipartUploadPart { - part_number: 8, - etag: "\"8C315065167132444177411FDA14****\"".to_string(), - }, - ], - }; - - // quick_xml::se::to_string() - let mut serializer = quick_xml::se::Serializer::new(String::new()); - serializer.indent(' ', 4); - let serialized = req.serialize(serializer).unwrap(); - pretty_assertions::assert_eq!( - serialized, - r#" - - 1 - "3349DC700140D7F86A0784842780****" - - - 5 - "8EFDA8BE206636A695359836FE0A****" - - - 8 - "8C315065167132444177411FDA14****" - -"# - .replace('"', """) /* Escape `"` by hand to address */ - ) - } - - #[test] - fn test_deserialize_complete_oss_multipart_result() { - let bytes = Bytes::from( - r#" - - url - http://oss-example.oss-cn-hangzhou.aliyuncs.com /multipart.data - oss-example - multipart.data - "B864DB6A936D376F9F8D3ED3BBE540****" -"#, - ); - - let result: CompleteMultipartUploadResult = - quick_xml::de::from_reader(bytes.reader()).unwrap(); - assert_eq!("\"B864DB6A936D376F9F8D3ED3BBE540****\"", result.etag); - assert_eq!( - "http://oss-example.oss-cn-hangzhou.aliyuncs.com /multipart.data", - result.location - ); - assert_eq!("oss-example", result.bucket); - assert_eq!("multipart.data", result.key); - } -} diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs new file mode 100644 index 00000000000..2755bc79c2c --- /dev/null +++ b/core/src/services/oss/core.rs @@ -0,0 +1,660 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; +use std::time::Duration; + +use bytes::Bytes; +use http::header::CONTENT_DISPOSITION; +use http::header::CONTENT_LENGTH; +use http::header::CONTENT_TYPE; +use http::header::RANGE; +use http::Request; +use http::Response; +use reqsign_0_9::AliyunCredential; +use reqsign_0_9::AliyunLoader; +use reqsign_0_9::AliyunOssSigner; +use serde::Deserialize; +use serde::Serialize; + +use crate::raw::*; +use crate::*; + +pub struct OssCore { + pub root: String, + pub bucket: String, + /// buffered host string + /// + /// format: . + pub host: String, + pub endpoint: String, + pub presign_endpoint: String, + + pub client: HttpClient, + pub loader: AliyunLoader, + pub signer: AliyunOssSigner, +} + +impl Debug for OssCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Backend") + .field("root", &self.root) + .field("bucket", &self.bucket) + .field("endpoint", &self.endpoint) + .field("host", &self.host) + .finish_non_exhaustive() + } +} + +impl OssCore { + async fn load_credential(&self) -> Result { + let cred = self + .loader + .load() + .await + .map_err(new_request_credential_error)?; + + if let Some(cred) = cred { + Ok(cred) + } else { + Err(Error::new( + ErrorKind::ConfigInvalid, + "no valid credential found", + )) + } + } + + pub async fn sign(&self, req: &mut Request) -> Result<()> { + let cred = self.load_credential().await?; + self.signer.sign(req, &cred).map_err(new_request_sign_error) + } + + pub async fn sign_query(&self, req: &mut Request, duration: Duration) -> Result<()> { + let cred = self.load_credential().await?; + self.signer + .sign_query(req, duration, &cred) + .map_err(new_request_sign_error) + } + + #[inline] + pub async fn send(&self, req: Request) -> Result> { + self.client.send(req).await + } +} + +impl OssCore { + pub fn oss_put_object_request( + &self, + path: &str, + size: Option, + content_type: Option<&str>, + content_disposition: Option<&str>, + body: AsyncBody, + is_presign: bool, + ) -> Result> { + let p = build_abs_path(&self.root, path); + let endpoint = self.get_endpoint(is_presign); + let url = format!("{}/{}", endpoint, percent_encode_path(&p)); + + let mut req = Request::put(&url); + + req = req.header(CONTENT_LENGTH, size.unwrap_or_default()); + + if let Some(mime) = content_type { + req = req.header(CONTENT_TYPE, mime); + } + + if let Some(pos) = content_disposition { + req = req.header(CONTENT_DISPOSITION, pos); + } + + let req = req.body(body).map_err(new_request_build_error)?; + Ok(req) + } + + pub fn oss_get_object_request( + &self, + path: &str, + range: BytesRange, + is_presign: bool, + ) -> Result> { + let p = build_abs_path(&self.root, path); + let endpoint = self.get_endpoint(is_presign); + let url = format!("{}/{}", endpoint, percent_encode_path(&p)); + + let mut req = Request::get(&url); + req = req.header(CONTENT_TYPE, "application/octet-stream"); + + if !range.is_full() { + req = req.header(RANGE, range.to_header()); + // Adding `x-oss-range-behavior` header to use standard behavior. + // ref: https://help.aliyun.com/document_detail/39571.html + req = req.header("x-oss-range-behavior", "standard"); + } + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + Ok(req) + } + + fn oss_delete_object_request(&self, path: &str) -> Result> { + let p = build_abs_path(&self.root, path); + let endpoint = self.get_endpoint(false); + let url = format!("{}/{}", endpoint, percent_encode_path(&p)); + let req = Request::delete(&url); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + Ok(req) + } + + pub fn oss_head_object_request( + &self, + path: &str, + is_presign: bool, + ) -> Result> { + let p = build_abs_path(&self.root, path); + let endpoint = self.get_endpoint(is_presign); + let url = format!("{}/{}", endpoint, percent_encode_path(&p)); + + let req = Request::head(&url); + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + Ok(req) + } + + pub fn oss_list_object_request( + &self, + path: &str, + token: Option<&str>, + delimiter: &str, + limit: Option, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let endpoint = self.get_endpoint(false); + let url = format!( + "{}/?list-type=2&delimiter={delimiter}&prefix={}{}{}", + endpoint, + percent_encode_path(&p), + limit.map(|t| format!("&max-keys={t}")).unwrap_or_default(), + token + .map(|t| format!("&continuation-token={}", percent_encode_path(t))) + .unwrap_or_default(), + ); + + let req = Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + Ok(req) + } + + pub async fn oss_get_object( + &self, + path: &str, + range: BytesRange, + ) -> Result> { + let mut req = self.oss_get_object_request(path, range, false)?; + + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn oss_head_object(&self, path: &str) -> Result> { + let mut req = self.oss_head_object_request(path, false)?; + + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn oss_put_object( + &self, + path: &str, + size: Option, + content_type: Option<&str>, + content_disposition: Option<&str>, + body: AsyncBody, + ) -> Result> { + let mut req = self.oss_put_object_request( + path, + size, + content_type, + content_disposition, + body, + false, + )?; + + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn oss_copy_object( + &self, + from: &str, + to: &str, + ) -> Result> { + let source = build_abs_path(&self.root, from); + let target = build_abs_path(&self.root, to); + + let url = format!( + "{}/{}", + self.get_endpoint(false), + percent_encode_path(&target) + ); + let source = format!("/{}/{}", self.bucket, percent_encode_path(&source)); + + let mut req = Request::put(&url) + .header("x-oss-copy-source", source) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn oss_list_object( + &self, + path: &str, + token: Option<&str>, + delimiter: &str, + limit: Option, + ) -> Result> { + let mut req = self.oss_list_object_request(path, token, delimiter, limit)?; + + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn oss_delete_object(&self, path: &str) -> Result> { + let mut req = self.oss_delete_object_request(path)?; + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn oss_delete_objects( + &self, + paths: Vec, + ) -> Result> { + let url = format!("{}/?delete", self.endpoint); + + let req = Request::post(&url); + + let content = quick_xml::se::to_string(&DeleteObjectsRequest { + object: paths + .into_iter() + .map(|path| DeleteObjectsRequestObject { + key: build_abs_path(&self.root, &path), + }) + .collect(), + }) + .map_err(new_xml_deserialize_error)?; + + // Make sure content length has been set to avoid post with chunked encoding. + let req = req.header(CONTENT_LENGTH, content.len()); + // Set content-type to `application/xml` to avoid mixed with form post. + let req = req.header(CONTENT_TYPE, "application/xml"); + // Set content-md5 as required by API. + let req = req.header("CONTENT-MD5", format_content_md5(content.as_bytes())); + + let mut req = req + .body(AsyncBody::Bytes(Bytes::from(content))) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + fn get_endpoint(&self, is_presign: bool) -> &str { + if is_presign { + &self.presign_endpoint + } else { + &self.endpoint + } + } + + pub async fn oss_initiate_upload(&self, path: &str) -> Result> { + let req = self + .oss_initiate_upload_request(path, None, None, AsyncBody::Empty, false) + .await?; + self.send(req).await + } + + /// Creates a request that initiates multipart upload + async fn oss_initiate_upload_request( + &self, + path: &str, + content_type: Option<&str>, + content_disposition: Option<&str>, + body: AsyncBody, + is_presign: bool, + ) -> Result> { + let path = build_abs_path(&self.root, path); + let endpoint = self.get_endpoint(is_presign); + let url = format!("{}/{}?uploads", endpoint, percent_encode_path(&path)); + let mut req = Request::post(&url); + if let Some(mime) = content_type { + req = req.header(CONTENT_TYPE, mime); + } + if let Some(disposition) = content_disposition { + req = req.header(CONTENT_DISPOSITION, disposition); + } + + let mut req = req.body(body).map_err(new_request_build_error)?; + self.sign(&mut req).await?; + Ok(req) + } + + /// Creates a request to upload a part + pub async fn oss_upload_part_request( + &self, + path: &str, + upload_id: &str, + part_number: usize, + is_presign: bool, + size: Option, + body: AsyncBody, + ) -> Result> { + let p = build_abs_path(&self.root, path); + let endpoint = self.get_endpoint(is_presign); + + let url = format!( + "{}/{}?partNumber={}&uploadId={}", + endpoint, + percent_encode_path(&p), + part_number, + percent_encode_path(upload_id) + ); + + let mut req = Request::put(&url); + + if let Some(size) = size { + req = req.header(CONTENT_LENGTH, size); + } + let mut req = req.body(body).map_err(new_request_build_error)?; + self.sign(&mut req).await?; + Ok(req) + } + + pub async fn oss_complete_multipart_upload_request( + &self, + path: &str, + upload_id: &str, + is_presign: bool, + parts: &[MultipartUploadPart], + ) -> Result> { + let p = build_abs_path(&self.root, path); + let endpoint = self.get_endpoint(is_presign); + let url = format!( + "{}/{}?uploadId={}", + endpoint, + percent_encode_path(&p), + percent_encode_path(upload_id) + ); + + let req = Request::post(&url); + + let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { + part: parts.to_vec(), + }) + .map_err(new_xml_deserialize_error)?; + // Make sure content length has been set to avoid post with chunked encoding. + let req = req.header(CONTENT_LENGTH, content.len()); + // Set content-type to `application/xml` to avoid mixed with form post. + let req = req.header(CONTENT_TYPE, "application/xml"); + + let mut req = req + .body(AsyncBody::Bytes(Bytes::from(content))) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + self.send(req).await + } +} + +/// Request of DeleteObjects. +#[derive(Default, Debug, Serialize)] +#[serde(default, rename = "Delete", rename_all = "PascalCase")] +pub struct DeleteObjectsRequest { + pub object: Vec, +} + +#[derive(Default, Debug, Serialize)] +#[serde(rename_all = "PascalCase")] +pub struct DeleteObjectsRequestObject { + pub key: String, +} + +/// Result of DeleteObjects. +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename = "DeleteResult", rename_all = "PascalCase")] +pub struct DeleteObjectsResult { + pub deleted: Vec, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct DeleteObjectsResultDeleted { + pub key: String, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename_all = "PascalCase")] +pub struct DeleteObjectsResultError { + pub code: String, + pub key: String, + pub message: String, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct InitiateMultipartUploadResult { + #[cfg(test)] + pub bucket: String, + #[cfg(test)] + pub key: String, + pub upload_id: String, +} + +#[derive(Clone, Default, Debug, Serialize)] +#[serde(default, rename_all = "PascalCase")] +pub struct MultipartUploadPart { + #[serde(rename = "PartNumber")] + pub part_number: usize, + #[serde(rename = "ETag")] + pub etag: String, +} + +#[derive(Default, Debug, Serialize)] +#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")] +pub struct CompleteMultipartUploadRequest { + pub part: Vec, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct CompleteMultipartUploadResult { + pub location: String, + pub bucket: String, + pub key: String, + #[serde(rename = "ETag")] + pub etag: String, +} + +#[cfg(test)] +mod tests { + use bytes::Buf; + use bytes::Bytes; + + use super::*; + + /// This example is from https://www.alibabacloud.com/help/zh/object-storage-service/latest/deletemultipleobjects + #[test] + fn test_serialize_delete_objects_request() { + let req = DeleteObjectsRequest { + object: vec![ + DeleteObjectsRequestObject { + key: "multipart.data".to_string(), + }, + DeleteObjectsRequestObject { + key: "test.jpg".to_string(), + }, + DeleteObjectsRequestObject { + key: "demo.jpg".to_string(), + }, + ], + }; + + let actual = quick_xml::se::to_string(&req).expect("must succeed"); + + pretty_assertions::assert_eq!( + actual, + r#" + + multipart.data + + + test.jpg + + + demo.jpg + +"# + // Cleanup space and new line + .replace([' ', '\n'], "") + ) + } + + /// This example is from https://www.alibabacloud.com/help/zh/object-storage-service/latest/deletemultipleobjects + #[test] + fn test_deserialize_delete_objects_result() { + let bs = Bytes::from( + r#" + + + multipart.data + + + test.jpg + + + demo.jpg + +"#, + ); + + let out: DeleteObjectsResult = + quick_xml::de::from_reader(bs.reader()).expect("must success"); + + assert_eq!(out.deleted.len(), 3); + assert_eq!(out.deleted[0].key, "multipart.data"); + assert_eq!(out.deleted[1].key, "test.jpg"); + assert_eq!(out.deleted[2].key, "demo.jpg"); + } + + #[test] + fn test_deserialize_initiate_multipart_upload_response() { + let bs = Bytes::from( + r#" + + oss-example + multipart.data + 0004B9894A22E5B1888A1E29F823**** +"#, + ); + let out: InitiateMultipartUploadResult = + quick_xml::de::from_reader(bs.reader()).expect("must success"); + + assert_eq!("0004B9894A22E5B1888A1E29F823****", out.upload_id); + assert_eq!("multipart.data", out.key); + assert_eq!("oss-example", out.bucket); + } + + #[test] + fn test_serialize_complete_multipart_upload_request() { + let req = CompleteMultipartUploadRequest { + part: vec![ + MultipartUploadPart { + part_number: 1, + etag: "\"3349DC700140D7F86A0784842780****\"".to_string(), + }, + MultipartUploadPart { + part_number: 5, + etag: "\"8EFDA8BE206636A695359836FE0A****\"".to_string(), + }, + MultipartUploadPart { + part_number: 8, + etag: "\"8C315065167132444177411FDA14****\"".to_string(), + }, + ], + }; + + // quick_xml::se::to_string() + let mut serializer = quick_xml::se::Serializer::new(String::new()); + serializer.indent(' ', 4); + let serialized = req.serialize(serializer).unwrap(); + pretty_assertions::assert_eq!( + serialized, + r#" + + 1 + "3349DC700140D7F86A0784842780****" + + + 5 + "8EFDA8BE206636A695359836FE0A****" + + + 8 + "8C315065167132444177411FDA14****" + +"# + .replace('"', """) /* Escape `"` by hand to address */ + ) + } + + #[test] + fn test_deserialize_complete_oss_multipart_result() { + let bytes = Bytes::from( + r#" + + url + http://oss-example.oss-cn-hangzhou.aliyuncs.com /multipart.data + oss-example + multipart.data + "B864DB6A936D376F9F8D3ED3BBE540****" +"#, + ); + + let result: CompleteMultipartUploadResult = + quick_xml::de::from_reader(bytes.reader()).unwrap(); + assert_eq!("\"B864DB6A936D376F9F8D3ED3BBE540****\"", result.etag); + assert_eq!( + "http://oss-example.oss-cn-hangzhou.aliyuncs.com /multipart.data", + result.location + ); + assert_eq!("oss-example", result.bucket); + assert_eq!("multipart.data", result.key); + } +} diff --git a/core/src/services/oss/mod.rs b/core/src/services/oss/mod.rs index 4fdaebc6de4..9bfd1692c7d 100644 --- a/core/src/services/oss/mod.rs +++ b/core/src/services/oss/mod.rs @@ -18,6 +18,7 @@ mod backend; pub use backend::OssBuilder as Oss; +mod core; mod error; mod pager; mod writer; diff --git a/core/src/services/oss/pager.rs b/core/src/services/oss/pager.rs index 0ecae280832..5a129b14ff9 100644 --- a/core/src/services/oss/pager.rs +++ b/core/src/services/oss/pager.rs @@ -25,7 +25,8 @@ use serde::Deserialize; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; -use super::backend::OssBackend; +use super::core::*; + use super::error::parse_error; use crate::raw::*; use crate::EntryMode; @@ -35,8 +36,8 @@ use crate::Metadata; use crate::Result; pub struct OssPager { - backend: Arc, - root: String, + core: Arc, + path: String, delimiter: String, limit: Option, @@ -46,16 +47,9 @@ pub struct OssPager { } impl OssPager { - pub fn new( - backend: Arc, - root: &str, - path: &str, - delimiter: &str, - limit: Option, - ) -> Self { + pub fn new(core: Arc, path: &str, delimiter: &str, limit: Option) -> Self { Self { - backend, - root: root.to_string(), + core, path: path.to_string(), delimiter: delimiter.to_string(), limit, @@ -75,7 +69,7 @@ impl oio::Page for OssPager { } let resp = self - .backend + .core .oss_list_object( &self.path, self.token.as_deref(), @@ -100,7 +94,7 @@ impl oio::Page for OssPager { for prefix in output.common_prefixes { let de = oio::Entry::new( - &build_rel_path(&self.root, &prefix.prefix), + &build_rel_path(&self.core.root, &prefix.prefix), Metadata::new(EntryMode::DIR), ); entries.push(de); @@ -125,7 +119,7 @@ impl oio::Page for OssPager { })?; meta.set_last_modified(dt); - let rel = build_rel_path(&self.root, &object.key); + let rel = build_rel_path(&self.core.root, &object.key); let path = unescape(&rel) .map_err(|e| Error::new(ErrorKind::Unexpected, "excapse xml").set_source(e))?; let de = oio::Entry::new(&path, meta); diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index cb4ea23b276..8a41a47e44e 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -18,16 +18,18 @@ use async_trait::async_trait; use bytes::Bytes; use http::StatusCode; +use std::sync::Arc; + +use super::core::*; -use super::backend::MultipartUploadPart; -use super::backend::OssBackend; use super::error::parse_error; use crate::ops::OpWrite; use crate::raw::*; use crate::*; pub struct OssWriter { - backend: OssBackend, + core: Arc, + op: OpWrite, path: String, upload_id: Option, @@ -35,9 +37,9 @@ pub struct OssWriter { } impl OssWriter { - pub fn new(backend: OssBackend, op: OpWrite, path: String, upload_id: Option) -> Self { + pub fn new(core: Arc, op: OpWrite, path: String, upload_id: Option) -> Self { OssWriter { - backend, + core, op, path, upload_id, @@ -49,7 +51,7 @@ impl OssWriter { #[async_trait] impl oio::Write for OssWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { - let mut req = self.backend.oss_put_object_request( + let mut req = self.core.oss_put_object_request( &self.path, Some(bs.len()), self.op.content_type(), @@ -58,12 +60,9 @@ impl oio::Write for OssWriter { false, )?; - self.backend - .signer - .sign(&mut req) - .map_err(new_request_sign_error)?; + self.core.sign(&mut req).await?; - let resp = self.backend.client.send(req).await?; + let resp = self.core.send(req).await?; let status = resp.status(); @@ -82,21 +81,21 @@ impl oio::Write for OssWriter { ); // Aliyun OSS requires part number must between [1..=10000] let part_number = self.parts.len() + 1; - let mut req = self.backend.oss_upload_part_request( - &self.path, - upload_id, - part_number, - false, - Some(bs.len() as u64), - AsyncBody::Bytes(bs), - )?; + let mut req = self + .core + .oss_upload_part_request( + &self.path, + upload_id, + part_number, + false, + Some(bs.len() as u64), + AsyncBody::Bytes(bs), + ) + .await?; - self.backend - .signer - .sign(&mut req) - .map_err(new_request_sign_error)?; + self.core.sign(&mut req).await?; - let resp = self.backend.client.send(req).await?; + let resp = self.core.send(req).await?; match resp.status() { StatusCode::OK => { let etag = parse_etag(resp.headers())? @@ -123,7 +122,7 @@ impl oio::Write for OssWriter { }; let resp = self - .backend + .core .oss_complete_multipart_upload_request(&self.path, upload_id, false, &self.parts) .await?; match resp.status() {