Skip to content

Commit

Permalink
[Cosmos] Start migrating get_document to pipelines
Browse files Browse the repository at this point in the history
This change begins the migration process for moving get_document to the
new pipelines architecture. Most of the code is moved over now, but the
example programs still need updated.

Issue Azure#290
  • Loading branch information
eholk committed Sep 1, 2021
1 parent 1f2162a commit 4fdb3ef
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 105 deletions.
40 changes: 37 additions & 3 deletions sdk/cosmos/src/clients/document_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use super::{AttachmentClient, CollectionClient, CosmosClient, DatabaseClient};
use crate::prelude::{GetDocumentOptions, GetDocumentResponse};
use crate::resources::ResourceType;
use crate::{requests, ReadonlyString};
use azure_core::HttpClient;
use azure_core::{Context, HttpClient, PipelineContext, Request};
use serde::de::DeserializeOwned;
use serde::Serialize;

/// A client for Cosmos document resources.
Expand Down Expand Up @@ -61,8 +63,28 @@ impl DocumentClient {
}

/// Get a document
pub fn get_document(&self) -> requests::GetDocumentBuilder<'_, '_> {
requests::GetDocumentBuilder::new(self)
pub async fn get_document<T>(
&self,
ctx: Context,
options: GetDocumentOptions<'_, '_>,
) -> Result<GetDocumentResponse<T>, crate::Error>
where
T: DeserializeOwned,
{
let mut request = self.prepare_request_pipeline_with_document_name(http::Method::GET);
let mut pipeline_context = PipelineContext::new(ctx, ResourceType::Databases.into());

options.decorate_request(&mut request)?;

let response = self
.cosmos_client()
.pipeline()
.send(&mut pipeline_context, &mut request)
.await?
.validate(http::StatusCode::OK)
.await?;

GetDocumentResponse::try_from(response).await
}

/// Delete a document
Expand Down Expand Up @@ -99,6 +121,18 @@ impl DocumentClient {
)
}

fn prepare_request_pipeline_with_document_name(&self, method: http::Method) -> Request {
self.cosmos_client().prepare_request_pipeline(
&format!(
"dbs/{}/colls/{}/docs/{}",
self.database_client().database_name(),
self.collection_client().collection_name(),
self.document_name()
),
method,
)
}

pub(crate) fn http_client(&self) -> &dyn HttpClient {
self.cosmos_client().http_client()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,85 @@
use crate::headers::from_headers::*;
use crate::prelude::*;
use crate::resources::Document;
use crate::ResourceQuota;
use azure_core::headers::{etag_from_headers, session_token_from_headers};
use azure_core::prelude::*;
use azure_core::SessionToken;
use azure_core::{collect_pinned_stream, Request as HttpRequest, Response as HttpResponse};
use chrono::{DateTime, Utc};
use http::response::Response;
use http::StatusCode;
use http::{HeaderMap, StatusCode};
use serde::de::DeserializeOwned;

#[derive(Debug, Clone)]
pub struct GetDocumentOptions<'a, 'b> {
document_client: &'a DocumentClient,
if_match_condition: Option<IfMatchCondition<'b>>,
if_modified_since: Option<IfModifiedSince<'b>>,
activity_id: Option<ActivityId<'b>>,
consistency_level: Option<ConsistencyLevel>,
}

impl<'a, 'b> GetDocumentOptions<'a, 'b> {
pub fn new(document_client: &'a DocumentClient) -> Self {
Self {
document_client,
if_match_condition: None,
if_modified_since: None,
activity_id: None,
consistency_level: None,
}
}

setters! {
activity_id: &'b str => Some(ActivityId::new(activity_id)),
consistency_level: ConsistencyLevel => Some(consistency_level),
if_match_condition: IfMatchCondition<'b> => Some(if_match_condition),
if_modified_since: &'b DateTime<Utc> => Some(IfModifiedSince::new(if_modified_since)),
}

pub(crate) fn decorate_request(&self, request: &mut HttpRequest) -> Result<(), crate::Error> {
// add trait headers
azure_core::headers::add_optional_header2(&self.if_match_condition, request)?;
azure_core::headers::add_optional_header2(&self.if_modified_since, request)?;
azure_core::headers::add_optional_header2(&self.activity_id, request)?;
azure_core::headers::add_optional_header2(&self.consistency_level, request)?;

crate::cosmos_entity::add_as_partition_key_header_serialized2(
self.document_client.partition_key_serialized(),
request,
);

request.set_body(bytes::Bytes::from_static(EMPTY_BODY).into());

Ok(())
}
}

#[derive(Debug, Clone)]
pub enum GetDocumentResponse<T> {
Found(Box<FoundDocumentResponse<T>>),
NotFound(Box<NotFoundDocumentResponse>),
}

impl<T> std::convert::TryFrom<Response<bytes::Bytes>> for GetDocumentResponse<T>
impl<T> GetDocumentResponse<T>
where
T: DeserializeOwned,
{
type Error = crate::Error;

fn try_from(response: Response<bytes::Bytes>) -> Result<Self, Self::Error> {
let status_code = response.status();
pub async fn try_from(response: HttpResponse) -> Result<Self, crate::Error> {
let (status_code, headers, pinned_stream) = response.deconstruct();

let has_been_found =
status_code == StatusCode::OK || status_code == StatusCode::NOT_MODIFIED;

let body = collect_pinned_stream(pinned_stream).await?;

if has_been_found {
Ok(GetDocumentResponse::Found(Box::new(
FoundDocumentResponse::try_from(response)?,
FoundDocumentResponse::try_from(&headers, body).await?,
)))
} else {
Ok(GetDocumentResponse::NotFound(Box::new(
NotFoundDocumentResponse::try_from(response)?,
NotFoundDocumentResponse::try_from(&headers).await?,
)))
}
}
Expand Down Expand Up @@ -65,18 +112,13 @@ pub struct FoundDocumentResponse<T> {
pub date: DateTime<Utc>,
}

impl<T> std::convert::TryFrom<Response<bytes::Bytes>> for FoundDocumentResponse<T>
impl<T> FoundDocumentResponse<T>
where
T: DeserializeOwned,
{
type Error = crate::Error;

fn try_from(response: Response<bytes::Bytes>) -> Result<Self, Self::Error> {
let headers = response.headers();
let body: &[u8] = response.body();

async fn try_from(headers: &HeaderMap, body: bytes::Bytes) -> Result<Self, crate::Error> {
Ok(Self {
document: Document::try_from((headers, body))?,
document: serde_json::from_slice(&body)?,

content_location: content_location_from_headers(headers)?.to_owned(),
last_state_change: last_state_change_from_headers(headers)?,
Expand Down Expand Up @@ -126,12 +168,8 @@ pub struct NotFoundDocumentResponse {
pub date: DateTime<Utc>,
}

impl std::convert::TryFrom<Response<bytes::Bytes>> for NotFoundDocumentResponse {
type Error = crate::Error;

fn try_from(response: Response<bytes::Bytes>) -> Result<Self, Self::Error> {
let headers = response.headers();

impl NotFoundDocumentResponse {
async fn try_from(headers: &HeaderMap) -> Result<Self, crate::Error> {
Ok(Self {
content_location: content_location_from_headers(headers)?.to_owned(),
last_state_change: last_state_change_from_headers(headers)?,
Expand Down
2 changes: 2 additions & 0 deletions sdk/cosmos/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod create_permission;
mod create_user;
mod delete_permission;
mod get_database;
mod get_document;
mod get_permission;
mod get_user;
mod list_databases;
Expand All @@ -22,6 +23,7 @@ pub use create_permission::*;
pub use create_user::*;
pub use delete_permission::*;
pub use get_database::*;
pub use get_document::*;
pub use get_permission::*;
pub use get_user::*;
pub use list_databases::*;
Expand Down
75 changes: 0 additions & 75 deletions sdk/cosmos/src/requests/get_document_builder.rs

This file was deleted.

2 changes: 0 additions & 2 deletions sdk/cosmos/src/requests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ mod delete_user_defined_function_builder;
mod execute_stored_procedure_builder;
mod get_attachment_builder;
mod get_collection_builder;
mod get_document_builder;
mod get_partition_key_ranges_builder;
mod list_attachments_builder;
mod list_collections_builder;
Expand Down Expand Up @@ -55,7 +54,6 @@ pub use delete_user_defined_function_builder::DeleteUserDefinedFunctionBuilder;
pub use execute_stored_procedure_builder::ExecuteStoredProcedureBuilder;
pub use get_attachment_builder::GetAttachmentBuilder;
pub use get_collection_builder::GetCollectionBuilder;
pub use get_document_builder::GetDocumentBuilder;
pub use get_partition_key_ranges_builder::GetPartitionKeyRangesBuilder;
pub use list_attachments_builder::ListAttachmentsBuilder;
pub use list_collections_builder::ListCollectionsBuilder;
Expand Down
2 changes: 0 additions & 2 deletions sdk/cosmos/src/responses/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ mod delete_user_response;
mod execute_stored_procedure_response;
mod get_attachment_response;
mod get_collection_response;
mod get_document_response;
mod get_partition_key_ranges_response;
mod list_attachments_response;
mod list_collections_response;
Expand Down Expand Up @@ -51,7 +50,6 @@ pub use delete_user_response::DeleteUserResponse;
pub use execute_stored_procedure_response::ExecuteStoredProcedureResponse;
pub use get_attachment_response::GetAttachmentResponse;
pub use get_collection_response::GetCollectionResponse;
pub use get_document_response::GetDocumentResponse;
pub use get_partition_key_ranges_response::GetPartitionKeyRangesResponse;
pub use list_attachments_response::ListAttachmentsResponse;
pub use list_collections_response::ListCollectionsResponse;
Expand Down

0 comments on commit 4fdb3ef

Please sign in to comment.