Skip to content

Commit

Permalink
Move create document to pipeline architecture
Browse files Browse the repository at this point in the history
  • Loading branch information
rylev committed Jul 21, 2021
1 parent fed8f1c commit 635f022
Show file tree
Hide file tree
Showing 21 changed files with 262 additions and 273 deletions.
4 changes: 4 additions & 0 deletions sdk/core/src/headers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ pub fn add_mandatory_header<T: AddAsHeader>(item: &T, builder: Builder) -> Build
item.add_as_header(builder)
}

pub fn add_mandatory_header2<T: AddAsHeader>(item: &T, request: &mut crate::Request) {
item.add_as_header2(request);
}

pub const SERVER: &str = "server";
pub const SOURCE_IF_MODIFIED_SINCE: &str = "x-ms-source-if-modified-since";
pub const SOURCE_IF_UNMODIFIED_SINCE: &str = "x-ms-source-if-unmodified-since";
Expand Down
6 changes: 5 additions & 1 deletion sdk/cosmos/examples/attachments_00.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use azure_core::Context;
use azure_cosmos::prelude::*;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
Expand Down Expand Up @@ -56,7 +57,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
};

// let's add an entity.
match client.create_document().execute(&doc).await {
match client
.create_document(Context::new(), &doc, CreateDocumentOptions::new())
.await
{
Ok(_) => {
println!("document created");
}
Expand Down
9 changes: 6 additions & 3 deletions sdk/cosmos/examples/database_00.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use azure_core::Context;
use azure_cosmos::prelude::*;
use serde_json::Value;
use std::error::Error;
Expand Down Expand Up @@ -40,10 +41,12 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
}"#;
let document: Value = serde_json::from_str(data)?;

let resp = collection_client
.create_document()
let options = CreateDocumentOptions::new()
.is_upsert(true)
.execute_with_partition_key(&document, &43u32)
.partition_key(&43u32)
.unwrap();
let resp = collection_client
.create_document(Context::new(), &document, options)
.await?;

println!("resp == {:?}", resp);
Expand Down
4 changes: 3 additions & 1 deletion sdk/cosmos/examples/document_00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
// The method create_document will return, upon success,
// the document attributes.

let create_document_response = collection_client.create_document().execute(&doc).await?;
let create_document_response = collection_client
.create_document(Context::new(), &doc, CreateDocumentOptions::new())
.await?;
println!(
"create_document_response == {:#?}",
create_document_response
Expand Down
6 changes: 5 additions & 1 deletion sdk/cosmos/examples/document_entries_00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
};

// let's add an entity.
response = Some(client.create_document().execute(&doc).await?);
response = Some(
client
.create_document(Context::new(), &doc, CreateDocumentOptions::new())
.await?,
);
}

println!("Created 5 documents.");
Expand Down
9 changes: 6 additions & 3 deletions sdk/cosmos/examples/document_entries_01.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use azure_core::Context;
use azure_cosmos::prelude::*;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
Expand Down Expand Up @@ -49,9 +50,11 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {

// let's add an entity.
let create_document_response = client
.create_document()
.is_upsert(true)
.execute(&doc)
.create_document(
Context::new(),
&doc,
CreateDocumentOptions::new().is_upsert(true),
)
.await?;

println!(
Expand Down
9 changes: 6 additions & 3 deletions sdk/cosmos/examples/readme.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use azure_core::Context;
use serde::{Deserialize, Serialize};
// Using the prelude module of the Cosmos crate makes easier to use the Rust Azure SDK for Cosmos.
use azure_cosmos::prelude::*;
Expand Down Expand Up @@ -79,9 +80,11 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
// insert it and store the returned session token for later use!
session_token = Some(
collection_client
.create_document()
.is_upsert(true) // this option will overwrite a preexisting document (if any)
.execute(&document_to_insert)
.create_document(
Context::new(),
&document_to_insert,
CreateDocumentOptions::new().is_upsert(true),
)
.await?
.session_token, // get only the session token, if everything else was ok!
);
Expand Down
23 changes: 17 additions & 6 deletions sdk/cosmos/examples/user_permission_token.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use azure_core::Context;
use azure_cosmos::prelude::*;
use std::error::Error;

Expand Down Expand Up @@ -112,9 +113,14 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.clone()
.into_database_client(database_name.clone())
.into_collection_client(collection_name.clone())
.create_document()
.is_upsert(true)
.execute_with_partition_key(&document, &"Gianluigi Bombatomica")
.create_document(
Context::new(),
&document,
CreateDocumentOptions::new()
.is_upsert(true)
.partition_key(&"Gianluigi Bombatomica")
.unwrap(),
)
.await
{
Ok(_) => panic!("this should not happen!"),
Expand Down Expand Up @@ -151,9 +157,14 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let create_document_response = client
.into_database_client(database_name)
.into_collection_client(collection_name)
.create_document()
.is_upsert(true)
.execute_with_partition_key(&document, &"Gianluigi Bombatomica")
.create_document(
Context::new(),
&document,
CreateDocumentOptions::new()
.is_upsert(true)
.partition_key(&"Gianluigi Bombatomica")
.unwrap(),
)
.await?;
println!(
"create_document_response == {:#?}",
Expand Down
27 changes: 24 additions & 3 deletions sdk/cosmos/src/clients/collection_client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use super::{DatabaseClient, UserDefinedFunctionClient};
use crate::clients::*;
use crate::operations::*;
use crate::requests;
use crate::resources::ResourceType;
use crate::ReadonlyString;
use azure_core::HttpClient;
use azure_core::{pipeline::Pipeline, Context, HttpClient};
use serde::Serialize;

/// A client for Cosmos collection resources.
Expand Down Expand Up @@ -60,8 +61,24 @@ impl CollectionClient {
}

/// create a document in a collection
pub fn create_document(&self) -> requests::CreateDocumentBuilder<'_, '_> {
requests::CreateDocumentBuilder::new(self)
pub async fn create_document<D: Serialize>(
&self,
ctx: Context,
document: &D,
options: CreateDocumentOptions<'_>,
) -> Result<CreateDocumentResponse, crate::Error> {
let request = self.prepare_request_with_collection_name(http::Method::POST);
let mut request = request.body(bytes::Bytes::new()).unwrap().into();
let mut ctx = ctx.clone();
options.decorate_request(&mut request, document)?;
let response = self
.pipeline()
.send(&mut ctx, &mut request)
.await?
.validate(http::StatusCode::CREATED)
.await?;

Ok(CreateDocumentResponse::try_from(response).await?)
}

/// query documents in a collection
Expand Down Expand Up @@ -137,4 +154,8 @@ impl CollectionClient {
pub(crate) fn http_client(&self) -> &dyn HttpClient {
self.cosmos_client().http_client()
}

pub(crate) fn pipeline(&self) -> &Pipeline {
self.cosmos_client().pipeline()
}
}
1 change: 1 addition & 0 deletions sdk/cosmos/src/consistency_level.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::headers;
use crate::operations::*;
use crate::responses::*;
use azure_core::AddAsHeader;
use http::request;
Expand Down
25 changes: 11 additions & 14 deletions sdk/cosmos/src/cosmos_entity.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::headers;
use azure_core::Request as HttpRequest;
use http::request::Builder;
use serde::Serialize;

Expand All @@ -18,20 +19,6 @@ pub(crate) fn serialize_partition_key<PK: Serialize>(pk: &PK) -> Result<String,
serde_json::to_string(&[pk])
}

// Here we do not implement add_as_header because the trait does not support errors and serializing
// with serde_json returns a Result. I am not sure why a serialization could fail (memory
// allocation)? In case we are confident that no errors should arise we can implement the trait and just
// unwrap the result of serde_json::to_string.
pub(crate) fn add_as_partition_key_header<'a, P: CosmosEntity<'a>>(
pk: &'a P,
builder: Builder,
) -> Result<Builder, serde_json::Error> {
Ok(builder.header(
headers::HEADER_DOCUMENTDB_PARTITIONKEY,
&serialize_partition_key(&pk.partition_key())?,
))
}

pub(crate) fn add_as_partition_key_header_serialized(
partition_key_serialized: &str,
builder: Builder,
Expand All @@ -41,3 +28,13 @@ pub(crate) fn add_as_partition_key_header_serialized(
partition_key_serialized,
)
}

pub(crate) fn add_as_partition_key_header_serialized2(
partition_key_serialized: &str,
request: &mut HttpRequest,
) {
request.headers_mut().insert(
headers::HEADER_DOCUMENTDB_PARTITIONKEY,
http::header::HeaderValue::from_str(partition_key_serialized).unwrap(),
);
}
147 changes: 147 additions & 0 deletions sdk/cosmos/src/operations/create_document.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use crate::cosmos_entity::{add_as_partition_key_header_serialized2, serialize_partition_key};
use crate::headers::from_headers::*;
use crate::prelude::*;
use crate::resources::document::DocumentAttributes;
use crate::ResourceQuota;
use azure_core::headers::{etag_from_headers, session_token_from_headers};
use azure_core::prelude::*;
use chrono::{DateTime, Utc};
use http::StatusCode;
use serde::Serialize;
use std::convert::TryFrom;

use azure_core::{collect_pinned_stream, Request as HttpRequest, Response as HttpResponse};

#[derive(Debug, Clone)]
pub struct CreateDocumentOptions<'a> {
is_upsert: IsUpsert,
indexing_directive: IndexingDirective,
if_match_condition: Option<IfMatchCondition<'a>>,
if_modified_since: Option<IfModifiedSince<'a>>,
consistency_level: Option<ConsistencyLevel>,
allow_tentative_writes: TenativeWritesAllowance,
partition_key: Option<String>,
}

impl<'a> CreateDocumentOptions<'a> {
pub fn new() -> Self {
Self {
is_upsert: IsUpsert::No,
indexing_directive: IndexingDirective::Default,
if_match_condition: None,
if_modified_since: None,
consistency_level: None,
allow_tentative_writes: TenativeWritesAllowance::Deny,
partition_key: None,
}
}

setters! {
consistency_level: ConsistencyLevel => Some(consistency_level),
if_match_condition: IfMatchCondition<'a> => Some(if_match_condition),
if_modified_since: &'a DateTime<Utc> => Some(IfModifiedSince::new(if_modified_since)),
allow_tentative_writes: TenativeWritesAllowance,
is_upsert: bool => if is_upsert { IsUpsert::Yes } else { IsUpsert::No },
indexing_directive: IndexingDirective,
}

pub fn partition_key<PK: Serialize>(
mut self,
partition_key: &PK,
) -> Result<Self, serde_json::Error> {
self.partition_key = Some(serialize_partition_key(partition_key)?);
Ok(self)
}

pub(crate) fn decorate_request<'b, DOC>(
&self,
req: &mut HttpRequest,
document: &'b DOC,
) -> Result<(), crate::Error>
where
DOC: Serialize,
{
let serialized = serde_json::to_string(document)?;
let partition_key = self
.partition_key
.clone()
.unwrap_or_else(|| serialize_partition_key(document).unwrap());

add_as_partition_key_header_serialized2(&partition_key, req);
azure_core::headers::add_optional_header2(&self.if_match_condition, req);
azure_core::headers::add_optional_header2(&self.if_modified_since, req);
azure_core::headers::add_optional_header2(&self.consistency_level, req);
azure_core::headers::add_mandatory_header2(&self.is_upsert, req);
azure_core::headers::add_mandatory_header2(&self.indexing_directive, req);
azure_core::headers::add_mandatory_header2(&self.allow_tentative_writes, req);

req.set_body(bytes::Bytes::from(serialized).into());
Ok(())
}
}

#[derive(Debug, Clone)]
pub struct CreateDocumentResponse {
pub document_attributes: DocumentAttributes,
pub is_update: bool,
pub last_state_change: DateTime<Utc>,
pub etag: String,
pub resource_quota: Vec<ResourceQuota>,
pub resource_usage: Vec<ResourceQuota>,
pub lsn: u64,
pub schema_version: String,
pub alt_content_path: String,
pub content_path: String,
pub quorum_acked_lsn: u64,
pub current_write_quorum: u64,
pub current_replica_set_size: u64,
pub role: u32,
pub global_committed_lsn: u64,
pub number_of_read_regions: u32,
pub transport_request_id: u64,
pub cosmos_llsn: u64,
pub cosmos_quorum_acked_llsn: u64,
pub session_token: String,
pub charge: f64,
pub service_version: String,
pub activity_id: uuid::Uuid,
pub gateway_version: String,
pub date: DateTime<Utc>,
}

impl CreateDocumentResponse {
pub async fn try_from(response: HttpResponse) -> Result<Self, crate::Error> {
let (status_code, headers, pinned_stream) = response.deconstruct();
let body = collect_pinned_stream(pinned_stream).await?;

Ok(CreateDocumentResponse {
is_update: status_code == StatusCode::OK,

last_state_change: last_state_change_from_headers(&headers)?,
etag: etag_from_headers(&headers)?,
resource_quota: resource_quota_from_headers(&headers)?,
resource_usage: resource_usage_from_headers(&headers)?,
lsn: lsn_from_headers(&headers)?,
schema_version: schema_version_from_headers(&headers)?.to_owned(),
alt_content_path: alt_content_path_from_headers(&headers)?.to_owned(),
content_path: content_path_from_headers(&headers)?.to_owned(),
quorum_acked_lsn: quorum_acked_lsn_from_headers(&headers)?,
current_write_quorum: current_write_quorum_from_headers(&headers)?,
current_replica_set_size: current_replica_set_size_from_headers(&headers)?,
role: role_from_headers(&headers)?,
global_committed_lsn: global_committed_lsn_from_headers(&headers)?,
number_of_read_regions: number_of_read_regions_from_headers(&headers)?,
transport_request_id: transport_request_id_from_headers(&headers)?,
cosmos_llsn: cosmos_llsn_from_headers(&headers)?,
cosmos_quorum_acked_llsn: cosmos_quorum_acked_llsn_from_headers(&headers)?,
session_token: session_token_from_headers(&headers)?,
charge: request_charge_from_headers(&headers)?,
service_version: service_version_from_headers(&headers)?.to_owned(),
activity_id: activity_id_from_headers(&headers)?,
gateway_version: gateway_version_from_headers(&headers)?.to_owned(),
date: date_from_headers(&headers)?,

document_attributes: DocumentAttributes::try_from(body)?,
})
}
}
Loading

0 comments on commit 635f022

Please sign in to comment.