Skip to content

Commit

Permalink
fix bugs and format error
Browse files Browse the repository at this point in the history
1. fix url format error in search client
2. reexport tracing crate
3. change Option<String> to Option<&str>
  • Loading branch information
4t145 committed Oct 8, 2023
1 parent fda9ee2 commit 46e8f1a
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 54 deletions.
3 changes: 3 additions & 0 deletions tardis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ pub use serde_json;
pub use testcontainers;
pub use tokio;
pub use tracing as log;
// we still need to pub use tracing for some macros
// in tracing relies on `$crate` witch infers `tracing`.
pub use tracing;
pub use url;

use basic::error::TardisErrorWithExt;
Expand Down
40 changes: 20 additions & 20 deletions tardis/src/os/os_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,32 +86,32 @@ impl TardisOSClient {
self.get_client().bucket_delete(bucket_name).await
}

pub async fn object_create(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<String>) -> TardisResult<()> {
pub async fn object_create(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<()> {
trace!("[Tardis.OSClient] Creating object {}", path);
self.get_client().object_create(path, content, content_type, bucket_name).await
}

pub async fn object_get(&self, path: &str, bucket_name: Option<String>) -> TardisResult<Vec<u8>> {
pub async fn object_get(&self, path: &str, bucket_name: Option<&str>) -> TardisResult<Vec<u8>> {
trace!("[Tardis.OSClient] Getting object {}", path);
self.get_client().object_get(path, bucket_name).await
}

pub async fn object_delete(&self, path: &str, bucket_name: Option<String>) -> TardisResult<()> {
pub async fn object_delete(&self, path: &str, bucket_name: Option<&str>) -> TardisResult<()> {
trace!("[Tardis.OSClient] Deleting object {}", path);
self.get_client().object_delete(path, bucket_name).await
}

pub fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<String>) -> TardisResult<String> {
pub fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
trace!("[Tardis.OSClient] Creating object url {}", path);
self.get_client().object_create_url(path, expire_sec, bucket_name)
}

pub fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<String>) -> TardisResult<String> {
pub fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
trace!("[Tardis.OSClient] Getting object url {}", path);
self.get_client().object_get_url(path, expire_sec, bucket_name)
}

pub fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<String>) -> TardisResult<String> {
pub fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
trace!("[Tardis.OSClient] Deleting object url {}", path);
self.get_client().object_delete_url(path, expire_sec, bucket_name)
}
Expand All @@ -123,17 +123,17 @@ trait TardisOSOperations {

async fn bucket_delete(&self, bucket_name: &str) -> TardisResult<()>;

async fn object_create(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<String>) -> TardisResult<()>;
async fn object_create(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<()>;

async fn object_get(&self, path: &str, bucket_name: Option<String>) -> TardisResult<Vec<u8>>;
async fn object_get(&self, path: &str, bucket_name: Option<&str>) -> TardisResult<Vec<u8>>;

async fn object_delete(&self, path: &str, bucket_name: Option<String>) -> TardisResult<()>;
async fn object_delete(&self, path: &str, bucket_name: Option<&str>) -> TardisResult<()>;

fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<String>) -> TardisResult<String>;
fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String>;

fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<String>) -> TardisResult<String>;
fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String>;

fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<String>) -> TardisResult<String>;
fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String>;
}

#[async_trait]
Expand Down Expand Up @@ -174,7 +174,7 @@ impl TardisOSOperations for TardisOSS3Client {
}
}

async fn object_create(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<String>) -> TardisResult<()> {
async fn object_create(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<()> {
let bucket = self.get_bucket(bucket_name)?;
let response_data = if let Some(content_type) = content_type {
bucket.put_object_with_content_type(path, content, content_type).await?
Expand All @@ -197,7 +197,7 @@ impl TardisOSOperations for TardisOSS3Client {
}
}

async fn object_get(&self, path: &str, bucket_name: Option<String>) -> TardisResult<Vec<u8>> {
async fn object_get(&self, path: &str, bucket_name: Option<&str>) -> TardisResult<Vec<u8>> {
let bucket = self.get_bucket(bucket_name)?;
let response_data = bucket.get_object(path).await?;
if response_data.status_code() == 200 {
Expand All @@ -216,7 +216,7 @@ impl TardisOSOperations for TardisOSS3Client {
}
}

async fn object_delete(&self, path: &str, bucket_name: Option<String>) -> TardisResult<()> {
async fn object_delete(&self, path: &str, bucket_name: Option<&str>) -> TardisResult<()> {
let bucket = self.get_bucket(bucket_name)?;
let response_data = bucket.delete_object(path).await?;
if response_data.status_code() == 200 || response_data.status_code() == 204 {
Expand All @@ -235,23 +235,23 @@ impl TardisOSOperations for TardisOSS3Client {
}
}

fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<String>) -> TardisResult<String> {
fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
Ok(self.get_bucket(bucket_name)?.presign_put(path, expire_sec, None)?)
}

fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<String>) -> TardisResult<String> {
fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
Ok(self.get_bucket(bucket_name)?.presign_get(path, expire_sec, None)?)
}

fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<String>) -> TardisResult<String> {
fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
Ok(self.get_bucket(bucket_name)?.presign_delete(path, expire_sec)?)
}
}

impl TardisOSS3Client {
fn get_bucket(&self, bucket_name: Option<String>) -> TardisResult<Bucket> {
fn get_bucket(&self, bucket_name: Option<&str>) -> TardisResult<Bucket> {
if let Some(bucket_name) = bucket_name {
Ok(Bucket::new(&bucket_name, self.region.clone(), self.credentials.clone())?.with_path_style())
Ok(Bucket::new(bucket_name, self.region.clone(), self.credentials.clone())?.with_path_style())
} else {
let bucket =
self.default_bucket.as_ref().ok_or_else(|| TardisError::not_found("[Tardis.OSClient] No default bucket configured", "404-tardis-os-default-bucket-not-exist"))?;
Expand Down
51 changes: 27 additions & 24 deletions tardis/src/search/search_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ impl TardisSearchClient {
TardisResult::Ok(TardisSearchClient { client, server_url: url.clone() })
}

fn get_url_with_path<'a>(&self, path: impl IntoIterator<Item = &'a str>) -> Url {
let mut url = self.server_url.clone();
url.path_segments_mut().expect("search server_url can't be a base").extend(path);
url
}
/// Create index / 创建索引
///
/// # Arguments
Expand All @@ -102,8 +107,8 @@ impl TardisSearchClient {
/// ```
pub async fn create_index(&self, index_name: &str, mappings: Option<&str>) -> TardisResult<()> {
trace!("[Tardis.SearchClient] Creating index: {}", index_name);
let url = format!("{}/{}", self.server_url, index_name);
let resp = self.client.put_str_to_str(&url, mappings.unwrap_or_default(), None).await?;
let url = self.get_url_with_path(Some(index_name));
let resp = self.client.put_str_to_str(url, mappings.unwrap_or_default(), None).await?;
if resp.code >= 200 && resp.code <= 300 {
Ok(())
} else {
Expand All @@ -129,8 +134,8 @@ impl TardisSearchClient {
/// ```
pub async fn create_record(&self, index_name: &str, data: &str) -> TardisResult<String> {
trace!("[Tardis.SearchClient] Creating record: {}, data:{}", index_name, data);
let url = format!("{}/{}/_doc/", self.server_url, index_name);
let resp = self.client.post_str_to_str(&url, data, None).await?;
let url = self.get_url_with_path([index_name, "_doc"]);
let resp = self.client.post_str_to_str(url, data, None).await?;
if resp.code >= 200 && resp.code <= 300 {
let result = TardisFuns::json.str_to_json(&resp.body.unwrap_or_default())?;
Ok(result["_id"].as_str().ok_or_else(|| TardisError::bad_request("[Tardis.SearchClient] [_id] structure not found", "400-tardis-search-id-not-exist"))?.to_string())
Expand All @@ -157,8 +162,8 @@ impl TardisSearchClient {
/// ```
pub async fn get_record(&self, index_name: &str, id: &str) -> TardisResult<String> {
trace!("[Tardis.SearchClient] Getting record: {}, id:{}", index_name, id);
let url = format!("{}/{}/_doc/{}", self.server_url, index_name, id);
let resp = self.client.get_to_str(&url, None).await?;
let url = self.get_url_with_path([index_name, "_doc", id]);
let resp = self.client.get_to_str(url, None).await?;
if resp.code >= 200 && resp.code <= 300 {
let result = TardisFuns::json.str_to_json(&resp.body.unwrap_or_default())?;
Ok(result["_source"].to_string())
Expand All @@ -185,8 +190,9 @@ impl TardisSearchClient {
/// ```
pub async fn simple_search(&self, index_name: &str, q: &str) -> TardisResult<Vec<String>> {
trace!("[Tardis.SearchClient] Simple search: {}, q:{}", index_name, q);
let url = format!("{}/{}/_search?q={}", self.server_url, index_name, q);
let resp = self.client.get_to_str(&url, None).await?;
let mut url = self.get_url_with_path([index_name, "_search"]);
url.query_pairs_mut().append_pair("q", q);
let resp = self.client.get_to_str(url, None).await?;
if resp.code >= 200 && resp.code <= 300 {
Self::parse_search_result(&resp.body.unwrap_or_default())
} else {
Expand Down Expand Up @@ -242,22 +248,18 @@ impl TardisSearchClient {
from,
track_scores
);
let mut url = format!("{}/{}/_search", self.server_url, index_name);
let mut queries = vec![];

let mut url = self.server_url.clone();
url.path_segments_mut().expect("search server_url can't be a base").extend([index_name, "_search"]);
if let Some(size) = size {
queries.push(format!("size={}", size));
url.query_pairs_mut().append_pair("size", &size.to_string());
}
if let Some(from) = from {
queries.push(format!("from={}", from));
url.query_pairs_mut().append_pair("from", &from.to_string());
}
if let Some(track_scores) = track_scores {
queries.push(format!("track_scores={}", track_scores));
}
if !queries.is_empty() {
url = format!("{}?{}", url, queries.join("&").as_str());
url.query_pairs_mut().append_pair("track_scores", &track_scores.to_string());
}
let resp = self.client.post_str_to_str(&url, q, None).await?;
let resp = self.client.post_str_to_str(url, q, None).await?;
if resp.code >= 200 && resp.code <= 300 {
trace!("[Tardis.SearchClient] resp.body: {:?}", &resp.body);
Ok(TardisFuns::json.str_to_obj(&resp.body.unwrap_or_default())?)
Expand All @@ -283,8 +285,8 @@ impl TardisSearchClient {
/// ```
pub async fn check_index_exist(&self, index_name: &str) -> TardisResult<bool> {
trace!("[Tardis.SearchClient] Check index exist: {}", index_name);
let url = format!("{}/{}", self.server_url, index_name);
let resp = self.client.head_to_void(&url, None).await?;
let url = self.get_url_with_path([index_name]);
let resp = self.client.head_to_void(url, None).await?;
match resp.code {
200 => Ok(true),
404 => Ok(false),
Expand Down Expand Up @@ -321,8 +323,9 @@ impl TardisSearchClient {
let params = params_vec.join(",");
let q = format!(r#"{{ "script": {{"source": "{source}", "params":{{{params}}}}}}}"#);
debug!("[Tardis.SearchClient] Update: {}, q:{}", index_name, q);
let url = format!("{}/{}/_update/{}?refresh=true", self.server_url, index_name, id);
let resp = self.client.post_str_to_str(&url, &q, None).await?;
let mut url = self.get_url_with_path([index_name, "_update", id]);
url.query_pairs_mut().append_pair("refresh", "true");
let resp = self.client.post_str_to_str(url, &q, None).await?;
if resp.code >= 200 && resp.code <= 300 {
trace!("[Tardis.SearchClient] resp.body: {:?}", &resp.body);
Ok(())
Expand All @@ -348,8 +351,8 @@ impl TardisSearchClient {
/// TardisFuns::search().delete_by_query("test_index" r#"{}"#).await.unwrap();
/// ```
pub async fn delete_by_query(&self, index_name: &str, q: &str) -> TardisResult<()> {
let url = format!("{}/{}/_delete_by_query", self.server_url, index_name);
let resp = self.client.post_str_to_str(&url, q, None).await?;
let url = self.get_url_with_path([index_name, "_delete_by_query"]);
let resp = self.client.post_str_to_str(url, q, None).await?;
if resp.code >= 200 && resp.code <= 300 {
debug!("[Tardis.SearchClient] resp.body: {:?}", &resp.body);
Ok(())
Expand Down
20 changes: 10 additions & 10 deletions tardis/tests/test_os_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,30 @@ async fn test_os_client() -> TardisResult<()> {
TardisTestContainer::minio(|url| async move {
let os_module_config = OSModuleConfig::builder().kind("s3").endpoint(url).ak("minioadmin").sk("minioadmin").region("us-east-1").build();
TardisFuns::init_conf(TardisConfig::builder().fw(FrameworkConfig::builder().os(os_module_config).build()).build()).await?;
let bucket_name = "test".to_string();
let bucket_name = "test";

TardisFuns::os().bucket_create_simple(&bucket_name, true).await?;
let resp = TardisFuns::os().bucket_create_simple(&bucket_name, true).await;
TardisFuns::os().bucket_create_simple(bucket_name, true).await?;
let resp = TardisFuns::os().bucket_create_simple(bucket_name, true).await;
assert_eq!(resp.err().unwrap().code, "409");

TardisFuns::os().object_create("test/test.txt", "I want to go to S3 测试".as_bytes(), None, Some(bucket_name.clone())).await?;
TardisFuns::os().object_create("test/test.txt", "I want to go to S3 测试".as_bytes(), None, Some(bucket_name)).await?;

let data = TardisFuns::os().object_get("test/test.txt", Some(bucket_name.clone())).await?;
let data = TardisFuns::os().object_get("test/test.txt", Some(bucket_name)).await?;
assert_eq!(String::from_utf8(data).unwrap(), "I want to go to S3 测试");

info!("object_get_url = {}", TardisFuns::os().object_get_url("test/test.txt", 60, Some(bucket_name.clone()))?);
info!("object_get_url = {}", TardisFuns::os().object_get_url("test/test.txt", 60, Some(bucket_name))?);

//info!("object_create_url = {}", TardisFuns::os().object_create_url("test/test2.txt", 1, Some(bucket_name.clone()))?);
//
//info!("object_delete_url = {}", TardisFuns::os().object_delete_url("test/test.txt", 60, Some(bucket_name.clone()))?);

let data = TardisFuns::os().object_get("test/test.txt", Some(bucket_name.clone())).await?;
let data = TardisFuns::os().object_get("test/test.txt", Some(bucket_name)).await?;
assert_eq!(String::from_utf8(data).unwrap(), "I want to go to S3 测试");

TardisFuns::os().object_delete("test/test.txt", Some(bucket_name.clone())).await?;
assert!(TardisFuns::os().object_get("test/test.txt", Some(bucket_name.clone())).await.is_err());
TardisFuns::os().object_delete("test/test.txt", Some(bucket_name)).await?;
assert!(TardisFuns::os().object_get("test/test.txt", Some(bucket_name)).await.is_err());

TardisFuns::os().bucket_delete(&bucket_name).await?;
TardisFuns::os().bucket_delete(bucket_name).await?;

Ok(())
})
Expand Down

0 comments on commit 46e8f1a

Please sign in to comment.