Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cosmos] Migrate (get, replace, delete)_collection to pipeline architecture #373

Merged
merged 3 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions sdk/cosmos/examples/collection.rs.disabled
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let collection_response = database_client
.clone()
.into_collection_client(collection.id)
.get_collection()
.execute()
.get_collection(Context::new(), GetCollectionOptions::new())
.await?;

println!("\tcollection_response {:?}", collection_response);
Expand Down
8 changes: 6 additions & 2 deletions sdk/cosmos/examples/create_delete_database.rs.disabled
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {

let db_collection = db_client.clone().into_collection_client("panzadoro");

let get_collection_response = db_collection.get_collection().execute().await?;
let get_collection_response = db_collection
.get_collection(Context::new(), GetCollectionOptions::new())
.await?;
println!("get_collection_response == {:#?}", get_collection_response);

let stream = db_client.list_collections();
Expand All @@ -77,7 +79,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
println!("res == {:#?}", res);
}

let delete_response = db_collection.delete_collection().execute().await?;
let delete_response = db_collection
.delete_collection(Context::new(), DeleteCollectionOptions::new())
.await?;
println!("collection deleted: {:#?}", delete_response);
}

Expand Down
7 changes: 4 additions & 3 deletions sdk/cosmos/examples/database_00.rs.disabled
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {

println!("\nReplacing collection");
let replace_collection_response = collection_client
.replace_collection()
.indexing_policy(&indexing_policy_new)
.execute("/age")
.replace_collection(
Context::new(),
ReplaceCollectionOptions::new("/age").indexing_policy(indexing_policy_new),
)
.await?;
println!(
"replace_collection_response == {:#?}",
Expand Down
5 changes: 4 additions & 1 deletion sdk/cosmos/examples/database_01.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use azure_core::Context;
use azure_cosmos::prelude::*;
use std::error::Error;

Expand All @@ -20,7 +21,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
println!("collections == {:#?}", collections);

let collection_client = database_client.into_collection_client("cnt");
let collection = collection_client.get_collection().execute().await?;
let collection = collection_client
.get_collection(Context::new(), GetCollectionOptions::new())
.await?;
println!("collection == {:#?}", collection);

Ok(())
Expand Down
3 changes: 1 addition & 2 deletions sdk/cosmos/examples/document_00.rs.disabled
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.clone()
.into_database_client(DATABASE.to_owned())
.into_collection_client(COLLECTION.to_owned())
.delete_collection()
.execute()
.delete_collection(Context::new(), DeleteCollectionOptions::new())
.await?;
println!("collection deleted");

Expand Down
8 changes: 6 additions & 2 deletions sdk/cosmos/examples/permission_00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,14 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.await?;
println!("get_database_response == {:#?}", get_database_response);

let get_collection_response = collection_client.get_collection().execute().await?;
let get_collection_response = collection_client
.get_collection(Context::new(), GetCollectionOptions::new())
.await?;
println!("get_collection_response == {:#?}", get_collection_response);

let get_collection2_response = collection2_client.get_collection().execute().await?;
let get_collection2_response = collection2_client
.get_collection(Context::new(), GetCollectionOptions::new())
.await?;
println!(
"get_collection2_response == {:#?}",
get_collection2_response
Expand Down
4 changes: 3 additions & 1 deletion sdk/cosmos/examples/user_permission_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.into_collection_client(collection_name.clone());
let user_client = database_client.into_user_client(user_name);

let get_collection_response = collection_client.get_collection().execute().await?;
let get_collection_response = collection_client
.get_collection(Context::new(), GetCollectionOptions::new())
.await?;
println!("get_collection_response == {:#?}", get_collection_response);

let create_user_response = user_client
Expand Down
84 changes: 65 additions & 19 deletions sdk/cosmos/src/clients/collection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,69 @@ impl CollectionClient {
}

/// Get a collection
pub fn get_collection(&self) -> requests::GetCollectionBuilder<'_> {
requests::GetCollectionBuilder::new(self)
pub async fn get_collection(
&self,
ctx: Context,
options: GetCollectionOptions,
) -> Result<GetCollectionResponse, crate::Error> {
let mut request = self.prepare_request_with_collection_name(http::Method::GET);

let mut pipeline_context = PipelineContext::new(ctx, ResourceType::Collections.into());

options.decorate_request(&mut request)?;

let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::OK)
.await?;

Ok(GetCollectionResponse::try_from(response).await?)
}

/// Delete a collection
pub fn delete_collection(&self) -> requests::DeleteCollectionBuilder<'_> {
requests::DeleteCollectionBuilder::new(self)
pub async fn delete_collection(
&self,
ctx: Context,
options: DeleteCollectionOptions,
) -> Result<DeleteCollectionResponse, crate::Error> {
let mut request = self.prepare_request_with_collection_name(http::Method::DELETE);

let mut pipeline_context = PipelineContext::new(ctx, ResourceType::Collections.into());

options.decorate_request(&mut request)?;

let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::NO_CONTENT)
.await?;

Ok(DeleteCollectionResponse::try_from(response).await?)
}

/// Replace a collection
pub fn replace_collection(&self) -> requests::ReplaceCollectionBuilder<'_, '_> {
requests::ReplaceCollectionBuilder::new(self)
pub async fn replace_collection(
&self,
ctx: Context,
options: ReplaceCollectionOptions,
) -> Result<ReplaceCollectionResponse, crate::Error> {
let mut request = self.prepare_request_with_collection_name(http::Method::PUT);

let mut pipeline_context = PipelineContext::new(ctx, ResourceType::Collections.into());

options.decorate_request(&mut request, self.collection_name())?;

let response = self
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::OK)
.await?;

Ok(ReplaceCollectionResponse::try_from(response).await?)
}

/// list documents in a collection
Expand Down Expand Up @@ -139,19 +190,14 @@ impl CollectionClient {
StoredProcedureClient::new(self, stored_procedure_name)
}

pub(crate) fn prepare_request_with_collection_name(
&self,
method: http::Method,
) -> http::request::Builder {
self.cosmos_client().prepare_request(
&format!(
"dbs/{}/colls/{}",
self.database_client().database_name(),
self.collection_name()
),
method,
ResourceType::Collections,
)
fn prepare_request_with_collection_name(&self, http_method: http::Method) -> Request {
let path = &format!(
"dbs/{}/colls/{}",
self.database_client().database_name(),
self.collection_name()
);
self.cosmos_client()
.prepare_request_pipeline(&path, http_method)
}

pub(crate) fn http_client(&self) -> &dyn HttpClient {
Expand Down
89 changes: 89 additions & 0 deletions sdk/cosmos/src/operations/delete_collection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use crate::prelude::*;
use crate::{headers::from_headers::*, ResourceQuota};
use azure_core::headers::{content_type_from_headers, session_token_from_headers};
use azure_core::{Request as HttpRequest, Response as HttpResponse};
use chrono::{DateTime, Utc};

#[derive(Debug, Clone)]
pub struct DeleteCollectionOptions {
consistency_level: Option<ConsistencyLevel>,
}

impl DeleteCollectionOptions {
pub fn new() -> Self {
Self {
consistency_level: None,
}
}

setters! {
consistency_level: ConsistencyLevel => Some(consistency_level),
}

pub(crate) fn decorate_request(&self, request: &mut HttpRequest) -> Result<(), crate::Error> {
azure_core::headers::add_optional_header2(&self.consistency_level, request)?;

Ok(())
}
}

#[derive(Debug, Clone)]
pub struct DeleteCollectionResponse {
pub last_state_change: DateTime<Utc>,
pub resource_quota: Vec<ResourceQuota>,
pub resource_usage: Vec<ResourceQuota>,
pub collection_partition_index: u64,
pub collection_service_index: u64,
pub schema_version: String,
pub alt_content_path: String,
pub quorum_acked_lsn: u64,
pub current_write_quorum: u64,
pub current_replica_set_size: u64,
pub charge: f64,
pub service_version: String,
pub activity_id: uuid::Uuid,
pub session_token: String,
pub gateway_version: String,
pub cosmos_llsn: u64,
pub lsn: u64,
pub date: DateTime<Utc>,
pub transport_request_id: u64,
pub xp_role: u32,
pub server: String,
pub cosmos_quorum_acked_llsn: u64,
pub content_location: String,
pub content_type: String,
}

impl DeleteCollectionResponse {
pub async fn try_from(response: HttpResponse) -> Result<Self, crate::Error> {
let (_status_code, headers, _pinned_stream) = response.deconstruct();

Ok(Self {
last_state_change: last_state_change_from_headers(&headers)?,
collection_partition_index: collection_partition_index_from_headers(&headers)?,
collection_service_index: collection_service_index_from_headers(&headers)?,
schema_version: schema_version_from_headers(&headers)?.to_owned(),
alt_content_path: alt_content_path_from_headers(&headers)?.to_owned(),
charge: request_charge_from_headers(&headers)?,
service_version: service_version_from_headers(&headers)?.to_owned(),
activity_id: activity_id_from_headers(&headers)?,
session_token: session_token_from_headers(&headers)?,
gateway_version: gateway_version_from_headers(&headers)?.to_owned(),
cosmos_llsn: cosmos_llsn_from_headers(&headers)?,
quorum_acked_lsn: quorum_acked_lsn_from_headers(&headers)?,
current_write_quorum: current_write_quorum_from_headers(&headers)?,
current_replica_set_size: current_replica_set_size_from_headers(&headers)?,
lsn: lsn_from_headers(&headers)?,
cosmos_quorum_acked_llsn: cosmos_quorum_acked_llsn_from_headers(&headers)?,
server: server_from_headers(&headers)?.to_owned(),
xp_role: role_from_headers(&headers)?,
content_type: content_type_from_headers(&headers)?.to_owned(),
content_location: content_location_from_headers(&headers)?.to_owned(),
transport_request_id: transport_request_id_from_headers(&headers)?,
date: date_from_headers(&headers)?,
resource_quota: resource_quota_from_headers(&headers)?,
resource_usage: resource_usage_from_headers(&headers)?,
})
}
}
95 changes: 95 additions & 0 deletions sdk/cosmos/src/operations/get_collection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use crate::prelude::*;

use crate::headers::from_headers::*;
use azure_core::headers::{
content_type_from_headers, etag_from_headers, session_token_from_headers,
};
use azure_core::{collect_pinned_stream, Request as HttpRequest, Response as HttpResponse};
use chrono::{DateTime, Utc};

#[derive(Debug, Clone)]
pub struct GetCollectionOptions {
consistency_level: Option<ConsistencyLevel>,
}

impl GetCollectionOptions {
pub fn new() -> Self {
Self {
consistency_level: None,
}
}

setters! {
consistency_level: ConsistencyLevel => Some(consistency_level),
}

pub(crate) fn decorate_request(&self, request: &mut HttpRequest) -> Result<(), crate::Error> {
azure_core::headers::add_optional_header2(&self.consistency_level, request)?;

Ok(())
}
}

#[derive(Debug, Clone)]
pub struct GetCollectionResponse {
pub collection: Collection,
pub last_state_change: DateTime<Utc>,
pub etag: String,
pub collection_partition_index: u64,
pub collection_service_index: u64,
pub lsn: u64,
pub schema_version: String,
pub alt_content_path: String,
pub content_path: String,
pub global_committed_lsn: u64,
pub number_of_read_regions: u32,
pub item_lsn: u64,
pub transport_request_id: u64,
pub cosmos_llsn: u64,
pub cosmos_item_llsn: u64,
pub charge: f64,
pub service_version: String,
pub activity_id: uuid::Uuid,
pub session_token: String,
pub gateway_version: String,
pub server: String,
pub xp_role: u32,
pub content_type: String,
pub content_location: String,
pub date: DateTime<Utc>,
}

impl GetCollectionResponse {
pub async fn try_from(response: HttpResponse) -> Result<Self, crate::Error> {
let (_status_code, headers, pinned_stream) = response.deconstruct();
let body = collect_pinned_stream(pinned_stream).await?;

Ok(Self {
collection: serde_json::from_slice(&body)?,
last_state_change: last_state_change_from_headers(&headers)?,
etag: etag_from_headers(&headers)?,
collection_partition_index: collection_partition_index_from_headers(&headers)?,
collection_service_index: collection_service_index_from_headers(&headers)?,
lsn: lsn_from_headers(&headers)?,
schema_version: schema_version_from_headers(&headers)?.to_owned(),
alt_content_path: alt_content_path_from_headers(&headers)?.to_owned(),
content_path: content_path_from_headers(&headers)?.to_owned(),
global_committed_lsn: global_committed_lsn_from_headers(&headers)?,
number_of_read_regions: number_of_read_regions_from_headers(&headers)?,
item_lsn: item_lsn_from_headers(&headers)?,
transport_request_id: transport_request_id_from_headers(&headers)?,
cosmos_llsn: cosmos_llsn_from_headers(&headers)?,
cosmos_item_llsn: cosmos_item_llsn_from_headers(&headers)?,
charge: request_charge_from_headers(&headers)?,
service_version: service_version_from_headers(&headers)?.to_owned(),
activity_id: activity_id_from_headers(&headers)?,
session_token: session_token_from_headers(&headers)?,
gateway_version: gateway_version_from_headers(&headers)?.to_owned(),
server: server_from_headers(&headers)?.to_owned(),
xp_role: role_from_headers(&headers)?,
content_type: content_type_from_headers(&headers)?.to_owned(),
content_location: content_location_from_headers(&headers)?.to_owned(),
date: date_from_headers(&headers)?,
})
}
}
Loading