Skip to content

Commit

Permalink
Merge pull request #445 from paul-hansen/streaming-upload
Browse files Browse the repository at this point in the history
Add upload_to_container_streaming to Container
  • Loading branch information
fussybeaver authored Aug 17, 2024
2 parents 31868e5 + 0d8d171 commit 834acfe
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ webpki-roots = { version = "0.26", optional = true }
flate2 = "1.0"
tar = "0.4"
tokio = { version = "1.38", features = ["fs", "rt-multi-thread", "macros"] }
tokio-util = { version = "0.7", features = ["io"] }
yup-hyper-mock = { version = "8.0.0" }
once_cell = "1.19"

Expand Down
80 changes: 75 additions & 5 deletions src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::hash::Hash;
use std::pin::Pin;

use super::Docker;
use crate::docker::BodyType;
use crate::docker::{body_stream, BodyType};
use crate::errors::Error;
use crate::models::*;
use crate::read::NewlineLogOutputDecoder;
Expand Down Expand Up @@ -2124,6 +2124,72 @@ impl Docker {
self.process_into_value(req).await
}

/// ---
///
/// # Stream Upload To Container
///
/// Stream an upload of a tar archive to be extracted to a path in the filesystem of container
/// id.
///
/// # Arguments
///
/// - Optional [Upload To Container Options](UploadToContainerOptions) struct.
///
/// # Returns
///
/// - unit type `()`, wrapped in a Future.
///
/// # Examples
///
/// ```rust,no_run
/// # use bollard::Docker;
/// use bollard::container::UploadToContainerOptions;
/// use futures_util::{StreamExt, TryFutureExt};
/// use tokio::fs::File;
/// use tokio_util::io::ReaderStream;
///
/// # #[tokio::main]
/// # async fn main() {
/// # let docker = Docker::connect_with_http_defaults().unwrap();
/// let options = Some(UploadToContainerOptions{
/// path: "/opt",
/// ..Default::default()
/// });
///
/// let file = File::open("tarball.tar.gz")
/// .map_ok(ReaderStream::new)
/// .try_flatten_stream()
/// .map(|x|x.expect("failed to stream file"));
///
/// docker
/// .upload_to_container_streaming("my-container", options, file)
/// .await
/// .expect("upload failed");
/// # }
/// ```
pub async fn upload_to_container_streaming<T>(
&self,
container_name: &str,
options: Option<UploadToContainerOptions<T>>,
tar: impl Stream<Item = Bytes> + Send + 'static,
) -> Result<(), Error>
where
T: Into<String> + Serialize,
{
let url = format!("/containers/{container_name}/archive");

let req = self.build_request(
&url,
Builder::new()
.method(Method::PUT)
.header(CONTENT_TYPE, "application/x-tar"),
options,
Ok(body_stream(tar)),
);

self.process_into_unit(req).await
}

/// ---
///
/// # Upload To Container
Expand All @@ -2142,13 +2208,13 @@ impl Docker {
///
/// ```rust,no_run
/// # use bollard::Docker;
/// # let docker = Docker::connect_with_http_defaults().unwrap();
/// use bollard::container::UploadToContainerOptions;
///
/// use std::default::Default;
/// use std::fs::File;
/// use std::io::Read;
///
/// # #[tokio::main]
/// # async fn main() {
/// # let docker = Docker::connect_with_http_defaults().unwrap();
/// let options = Some(UploadToContainerOptions{
/// path: "/opt",
/// ..Default::default()
Expand All @@ -2158,7 +2224,11 @@ impl Docker {
/// let mut contents = Vec::new();
/// file.read_to_end(&mut contents).unwrap();
///
/// docker.upload_to_container("my-container", options, contents.into());
/// docker
/// .upload_to_container("my-container", options, contents.into())
/// .await
/// .expect("upload failed");
/// # }
/// ```
pub async fn upload_to_container<T>(
&self,
Expand Down
56 changes: 40 additions & 16 deletions tests/container_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ async fn prune_containers_test(docker: Docker) -> Result<(), Error> {
Ok(())
}

async fn archive_container_test(docker: Docker) -> Result<(), Error> {
async fn archive_container_test(docker: Docker, streaming_upload: bool) -> Result<(), Error> {
let image = if cfg!(windows) {
format!("{}microsoft/nanoserver", registry_http_addr())
} else {
Expand Down Expand Up @@ -588,20 +588,43 @@ async fn archive_container_test(docker: Docker) -> Result<(), Error> {
)
.await?;

let _ = &docker
.upload_to_container(
"integration_test_archive_container",
Some(UploadToContainerOptions {
path: if cfg!(windows) {
"C:\\Windows\\Logs"
} else {
"/tmp"
},
..Default::default()
}),
payload.into(),
)
.await?;
if streaming_upload {
// Make payload live for the lifetime of the test and convert it to an async Bytes stream.
// Normally you would use an existing async stream.
let payload = Box::new(payload).leak();
let payload = payload.chunks(32);
let payload = futures_util::stream::iter(payload.map(bytes::Bytes::from));

let _ = &docker
.upload_to_container_streaming(
"integration_test_archive_container",
Some(UploadToContainerOptions {
path: if cfg!(windows) {
"C:\\Windows\\Logs"
} else {
"/tmp"
},
..Default::default()
}),
payload,
)
.await?;
} else {
let _ = &docker
.upload_to_container(
"integration_test_archive_container",
Some(UploadToContainerOptions {
path: if cfg!(windows) {
"C:\\Windows\\Logs"
} else {
"/tmp"
},
..Default::default()
}),
payload.into(),
)
.await?;
}

let res = docker.download_from_container(
"integration_test_archive_container",
Expand Down Expand Up @@ -904,7 +927,8 @@ fn integration_test_prune_containers() {

#[test]
fn integration_test_archive_containers() {
connect_to_docker_and_run!(archive_container_test);
connect_to_docker_and_run!(|docker| archive_container_test(docker, true));
connect_to_docker_and_run!(|docker| archive_container_test(docker, false));
}

#[test]
Expand Down

0 comments on commit 834acfe

Please sign in to comment.