Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move storage blobs to pipeline architecture #843

Merged
merged 26 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions sdk/storage/src/account/operations/find_blobs_by_tags.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::core::prelude::*;
use crate::xml::read_xml;
use crate::{core::clients::ServiceType, core::prelude::*, xml::read_xml};
use azure_core::headers::{date_from_headers, request_id_from_headers};
use azure_core::prelude::*;
use azure_core::{collect_pinned_stream, RequestId, Response as HttpResponse};
Expand Down Expand Up @@ -57,8 +56,7 @@ impl FindBlobsByTagsBuilder {
let response = self
.client
.storage_account_client()
.pipeline()
.send(&mut self.context, &mut request)
.send(&mut self.context, &mut request, ServiceType::Blob)
.await?;

ListBlobsByTagsResponse::try_from(response).await
Expand Down
26 changes: 19 additions & 7 deletions sdk/storage/src/core/clients/storage_account_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ use crate::{
AccountSharedAccessSignatureBuilder, ClientAccountSharedAccessSignature,
},
};
use azure_core::auth::TokenCredential;
use azure_core::error::{Error, ErrorKind, ResultExt};
use azure_core::{headers, Request};
use azure_core::{headers::*, Pipeline};
use azure_core::{ClientOptions, HttpClient};
use azure_core::{
auth::TokenCredential,
error::{Error, ErrorKind, ResultExt},
headers::*,
ClientOptions, Context, HttpClient, Pipeline, Request, Response,
};
Comment on lines +10 to +15
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we don't seem to have a consistent way of structuring imports. I'd like to figure out a consistent way of structuring these things and stick to that across all of the SDKs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, preferably using imports_granularity once it stabilizes.

use bytes::Bytes;
use http::method::Method;
use std::sync::Arc;
Expand Down Expand Up @@ -446,8 +447,8 @@ impl StorageAccountClient {
None => request.insert_header(CONTENT_LENGTH, "0"),
};

request.insert_header(headers::MS_DATE, time);
request.insert_header(headers::VERSION, AZURE_VERSION);
request.insert_header(MS_DATE, time);
request.insert_header(VERSION, AZURE_VERSION);

// We sign the request only if it is not already signed (with the signature of an
// SAS token for example)
Expand Down Expand Up @@ -496,6 +497,17 @@ impl StorageAccountClient {
&self.pipeline
}

pub async fn send(
&self,
context: &mut Context,
request: &mut Request,
service_type: ServiceType,
) -> azure_core::Result<Response> {
self.pipeline
.send(context.insert(service_type), request)
.await
}

/// Prepares' an `azure_core::Request`.
pub(crate) fn blob_storage_request(&self, http_method: http::Method) -> Request {
Request::new(self.blob_storage_url().clone(), http_method)
Expand Down
17 changes: 15 additions & 2 deletions sdk/storage/src/core/clients/storage_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::core::clients::{ServiceType, StorageAccountClient};
use crate::operations::*;
use azure_core::error::{Error, ErrorKind};
use azure_core::Request;
use azure_core::{
error::{Error, ErrorKind},
Context, Request, Response,
};
use bytes::Bytes;
use http::method::Method;
use std::sync::Arc;
Expand Down Expand Up @@ -94,4 +96,15 @@ impl StorageClient {
self.storage_account_client
.prepare_request(url, method, ServiceType::Blob, request_body)
}

pub async fn send(
&self,
context: &mut Context,
request: &mut Request,
service_type: ServiceType,
) -> azure_core::Result<Response> {
self.storage_account_client
.send(context, request, service_type)
.await
}
}
18 changes: 8 additions & 10 deletions sdk/storage_blobs/examples/connection_string.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use azure_storage::core::prelude::*;
use azure_storage_blobs::prelude::*;
use futures::stream::StreamExt;
use futures::StreamExt;
use std::{num::NonZeroU32, time::Duration};

#[tokio::main]
Expand All @@ -20,10 +20,10 @@ async fn main() -> azure_core::Result<()> {
let container_client = storage_client.as_container_client(&container_name);
let blob_service = storage_client.as_blob_service_client();

let mut stream = Box::pin(blob_service.list_containers().stream());
let mut stream = blob_service.list_containers().into_stream();
while let Some(result) = stream.next().await {
let container = result?;
for container in container.incomplete_vector.as_ref() {
let result = result?;
for container in result.containers {
if container.name == container_name {
panic!("The specified container must not exists!");
}
Expand Down Expand Up @@ -52,12 +52,10 @@ async fn main() -> azure_core::Result<()> {

let max_results = NonZeroU32::new(3).unwrap();

let mut stream = Box::pin(
container_client
.list_blobs()
.max_results(max_results)
.stream(),
);
let mut stream = container_client
.list_blobs()
.max_results(max_results)
.into_stream();

let mut cnt: i32 = 0;
while let Some(value) = stream.next().await {
Expand Down
26 changes: 11 additions & 15 deletions sdk/storage_blobs/examples/container_00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,26 @@ async fn main() -> azure_core::Result<()> {
let container_client = storage_client.as_container_client(container_name);

let max_results = NonZeroU32::new(3).unwrap();
let mut iv = Box::pin(
blob_service_client
.list_containers()
.max_results(max_results)
.stream(),
);
let mut iv = blob_service_client
.list_containers()
.max_results(max_results)
.into_stream();

let mut count = 0;
while let Some(result) = iv.next().await {
let container = result?;
count += container.incomplete_vector.len();
for container in container.incomplete_vector.iter() {
let page = result?;
count += page.containers.len();
for container in page.containers.iter() {
println!("\t{}", container.name);
}
}

println!("List containers returned {} containers.", count);

let mut stream = Box::pin(
container_client
.list_blobs()
.max_results(max_results)
.stream(),
);
let mut stream = container_client
.list_blobs()
.max_results(max_results)
.into_stream();

let mut count = 0;
while let Some(result) = stream.next().await {
Expand Down
16 changes: 7 additions & 9 deletions sdk/storage_blobs/examples/container_and_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,13 @@ async fn main() -> azure_core::Result<()> {
println!("{:?}", res);

// only get the first set of blobs in the list
let res = Box::pin(
container_client
.list_blobs()
.include_metadata(true)
.stream(),
)
.next()
.await
.expect("stream failed")?;
let res = container_client
.list_blobs()
.include_metadata(true)
.into_stream()
.next()
.await
.expect("stream failed")?;
println!("{:?}", res);

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage_blobs/examples/count_blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn main() -> azure_core::Result<()> {
.as_container_client(&container);

let mut count: usize = 0;
let mut list_blobs = Box::pin(container_client.list_blobs().stream());
let mut list_blobs = container_client.list_blobs().into_stream();
while let Some(list_blobs_response) = list_blobs.next().await {
let list_blobs_response = list_blobs_response?;
count += list_blobs_response.blobs.blobs.len();
Expand Down
4 changes: 3 additions & 1 deletion sdk/storage_blobs/examples/device_code_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ async fn main() -> azure_core::Result<()> {

// now we enumerate the containers in the
// specified storage account.
let containers = Box::pin(blob_service_client.list_containers().stream())
let containers = blob_service_client
.list_containers()
.into_stream()
.next()
.await
.expect("stream failed")?;
Expand Down
16 changes: 7 additions & 9 deletions sdk/storage_blobs/examples/emulator_00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ async fn main() -> azure_core::Result<()> {
.await?;
println!("{:?}", res);

let res = Box::pin(
container_client
.list_blobs()
.include_metadata(true)
.stream(),
)
.next()
.await
.expect("stream failed")?;
let res = container_client
.list_blobs()
.include_metadata(true)
.into_stream()
.next()
.await
.expect("stream failed")?;
println!("{:?}", res);

Ok(())
Expand Down
38 changes: 18 additions & 20 deletions sdk/storage_blobs/examples/list_blobs_00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ async fn main() -> azure_core::Result<()> {
let blob_service = storage_client.as_blob_service_client();
let container_client = storage_client.as_container_client(&container_name);

let iv = Box::pin(blob_service.list_containers().stream())
let page = blob_service
.list_containers()
.into_stream()
.next()
.await
.expect("stream failed")?;

if iv
.incomplete_vector
if page
.containers
.iter()
.any(|item| item.name == container_name)
{
Expand All @@ -55,27 +57,23 @@ async fn main() -> azure_core::Result<()> {
println!("\tAdded blob {}", i);
}

let iv = Box::pin(
container_client
.list_blobs()
.max_results(NonZeroU32::new(3u32).unwrap())
.stream(),
)
.next()
.await
.expect("stream failed")?;
let page = container_client
.list_blobs()
.max_results(NonZeroU32::new(3u32).unwrap())
.into_stream()
.next()
.await
.expect("stream failed")?;

println!("List blob returned {} blobs.", iv.blobs.blobs.len());
for cont in iv.blobs.blobs.iter() {
println!("List blob returned {} blobs.", page.blobs.blobs.len());
for cont in page.blobs.blobs.iter() {
println!("\t{}\t{} bytes", cont.name, cont.properties.content_length);
}

let mut stream = Box::pin(
container_client
.list_blobs()
.max_results(NonZeroU32::new(3u32).unwrap())
.stream(),
);
let mut stream = container_client
.list_blobs()
.max_results(NonZeroU32::new(3u32).unwrap())
.into_stream();

let mut cnt: i32 = 0;
while let Some(value) = stream.next().await {
Expand Down
Loading