Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add binary request example #1865

Merged
merged 4 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/core/azure_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ hmac_openssl = ["dep:openssl"]
test_e2e = []
azurite_workaround = []
xml = ["typespec_client_core/xml"]
tokio_fs = ["tokio/fs", "tokio/sync", "tokio/io-util"]
tokio_fs = ["typespec_client_core/tokio_fs"]
tokio_sleep = ["typespec_client_core/tokio_sleep"]

[package.metadata.docs.rs]
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure_core/src/tokio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
// Licensed under the MIT License.

#[cfg(feature = "tokio_fs")]
pub mod fs;
pub use typespec_client_core::fs::*;
16 changes: 8 additions & 8 deletions sdk/typespec/typespec_client_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,26 @@ tokio = { workspace = true, features = ["macros", "rt", "time"] }
[dev-dependencies]
once_cell.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
typespec_derive.workspace = true

[features]
default = [
"http",
"json",
"reqwest",
"reqwest_gzip",
"reqwest_rustls",
"tokio_sleep",
]
default = ["http", "json", "reqwest", "reqwest_gzip", "reqwest_rustls"]
derive = ["dep:typespec_derive"]
http = ["dep:http-types", "typespec/http"]
json = ["typespec/json"]
reqwest = ["dep:reqwest", "reqwest/default-tls"]
reqwest_gzip = ["reqwest/gzip"]
reqwest_rustls = ["reqwest/rustls-tls"]
tokio_fs = ["tokio/fs", "tokio/sync", "tokio/io-util"]
tokio_sleep = ["tokio/time"]
xml = ["dep:quick-xml"]

[[example]]
name = "binary_data_request"
required-features = ["tokio_fs", "tokio_sleep"]

[[example]]
name = "stream_response"
required-features = ["derive"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

use tokio::fs;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::fmt::format::FmtSpan;
use typespec_client_core::{fs::FileStreamBuilder, http::RequestContent};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Log traces to stdout.
let _log = tracing_subscriber::fmt()
.with_span_events(FmtSpan::ENTER | FmtSpan::EXIT)
.with_max_level(LevelFilter::DEBUG)
.init();

// Asynchronously read the whole file into memory.
let body = RequestContent::from(fs::read(file!()).await?);
client::put_binary_data(body).await?;

// Asynchronously stream the file with the service client request.
#[cfg(not(target_arch = "wasm32"))]
{
let file = fs::File::open(file!()).await?;
let file = FileStreamBuilder::new(file)
// Simulate a slow, chunky request.
.buffer_size(512usize)
.build()
.await?;
client::put_binary_data(file.into()).await?;
}

Ok(())
}

mod client {
use futures::StreamExt;
use tracing::debug;
use typespec_client_core::{
http::{headers::Headers, Body, RequestContent, Response, StatusCode},
stream::BytesStream,
};

#[tracing::instrument(skip(body))]
pub async fn put_binary_data(
body: RequestContent<()>,
heaths marked this conversation as resolved.
Show resolved Hide resolved
) -> typespec_client_core::Result<Response<()>> {
let body: RequestContent<()> = body.into();
let body: Body = body.into();

let content = match body {
Body::Bytes(ref bytes) => {
debug!("received {} bytes", bytes.len());
bytes.to_owned()
}
Body::SeekableStream(mut stream) => {
debug!("received stream");
let stream = stream.as_mut();

let mut bytes = Vec::new();
while let Some(Ok(buf)) = stream.next().await {
debug!("read {} bytes from stream", buf.len());
bytes.extend(buf);
}

bytes.into()
}
};

// Assume bytes are a string in this example.
let content = String::from_utf8(content.into())?;
println!("{content}");
heaths marked this conversation as resolved.
Show resolved Hide resolved

Ok(Response::new(
StatusCode::NoContent,
Headers::new(),
Box::pin(BytesStream::new_empty()),
))
}
}
25 changes: 19 additions & 6 deletions sdk/typespec/typespec_client_core/examples/stream_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,19 @@
// Licensed under the MIT License.

use futures::StreamExt;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::fmt::format::FmtSpan;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Log traces to stdout.
let _log = tracing_subscriber::fmt()
.with_span_events(FmtSpan::ENTER | FmtSpan::EXIT)
.with_max_level(LevelFilter::DEBUG)
.init();
heaths marked this conversation as resolved.
Show resolved Hide resolved

// Get a response from a service client.
let response = client::get_binary_data_response()?;
let response = client::get_binary_data()?;

// Normally you'd deserialize into a type or `collect()` the body,
// but this better simulates fetching multiple chunks from a slow response.
Expand All @@ -18,7 +26,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

// You can also deserialize into a model from a slow response.
let team = client::get_model_response()?.deserialize_body().await?;
let team = client::get_model()?.deserialize_body().await?;
println!("{team:#?}");

Ok(())
Expand All @@ -29,6 +37,7 @@ mod client {
use futures::Stream;
use serde::Deserialize;
use std::{cmp::min, task::Poll, time::Duration};
use tracing::debug;
use typespec_client_core::{
http::{headers::Headers, Model, Response, StatusCode},
Bytes,
Expand All @@ -47,7 +56,8 @@ mod client {
pub name: Option<String>,
}

pub fn get_binary_data_response() -> typespec_client_core::Result<Response<()>> {
#[tracing::instrument]
pub fn get_binary_data() -> typespec_client_core::Result<Response<()>> {
let bytes = Bytes::from_static(b"Hello, world!");
let response = SlowResponse {
bytes: bytes.repeat(5).into(),
Expand All @@ -62,7 +72,8 @@ mod client {
))
}

pub fn get_model_response() -> typespec_client_core::Result<Response<Team>> {
#[tracing::instrument]
pub fn get_model() -> typespec_client_core::Result<Response<Team>> {
let bytes = br#"{
"name": "Contoso Dev Team",
"members": [
Expand Down Expand Up @@ -104,7 +115,9 @@ mod client {
) -> Poll<Option<Self::Item>> {
let self_mut = self.get_mut();
if self_mut.bytes_read < self_mut.bytes.len() {
eprintln!("getting partial response...");
debug!("writing partial response...");

// Simulate a slow response.
std::thread::sleep(Duration::from_millis(200));

let end = self_mut.bytes_read
Expand All @@ -116,7 +129,7 @@ mod client {
self_mut.bytes_read += bytes.len();
Poll::Ready(Some(Ok(bytes)))
} else {
eprintln!("done");
debug!("done");
Poll::Ready(None)
}
}
Expand Down
8 changes: 8 additions & 0 deletions sdk/typespec/typespec_client_core/src/fs/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

#[cfg(feature = "tokio_fs")]
mod tokio;

#[cfg(feature = "tokio_fs")]
pub use tokio::*;
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

use crate::{
http::{Body, RequestContent},
setters,
stream::{SeekableStream, DEFAULT_BUFFER_SIZE},
};
use futures::{task::Poll, Future};
use std::{cmp::min, io::SeekFrom, pin::Pin, sync::Arc, task::Context};
use tokio::{
Expand All @@ -9,11 +14,6 @@ use tokio::{
sync::Mutex,
};
use tracing::debug;
use typespec_client_core::{
http::Body,
setters,
stream::{SeekableStream, DEFAULT_BUFFER_SIZE},
};

#[derive(Debug)]
pub struct FileStreamBuilder {
Expand Down Expand Up @@ -168,3 +168,17 @@ impl From<FileStream> for Body {
Body::SeekableStream(Box::new(stream))
}
}

#[cfg(not(target_arch = "wasm32"))]
impl<T> From<&FileStream> for RequestContent<T> {
fn from(stream: &FileStream) -> Self {
Body::from(stream).into()
}
}

#[cfg(not(target_arch = "wasm32"))]
impl<T> From<FileStream> for RequestContent<T> {
fn from(stream: FileStream) -> Self {
Body::from(stream).into()
}
}
9 changes: 9 additions & 0 deletions sdk/typespec/typespec_client_core/src/http/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ impl<T> From<RequestContent<T>> for Body {
}
}

impl<T> From<Body> for RequestContent<T> {
heaths marked this conversation as resolved.
Show resolved Hide resolved
fn from(body: Body) -> Self {
Self {
body,
phantom: PhantomData,
}
}
}

impl<T> TryFrom<Bytes> for RequestContent<T> {
type Error = crate::Error;
fn try_from(body: Bytes) -> Result<Self, Self::Error> {
Expand Down
1 change: 1 addition & 0 deletions sdk/typespec/typespec_client_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod macros;
pub mod base64;
pub mod date;
pub mod error;
pub mod fs;
#[cfg(feature = "http")]
pub mod http;
#[cfg(feature = "json")]
Expand Down