Skip to content

Commit

Permalink
feat(cmn): Resumable upload implemented
Browse files Browse the repository at this point in the history
With all bells and whisles. For now, we don't have a good return value
to indicate that the operation was cancelled, which needs fixing.
  • Loading branch information
Byron committed Mar 22, 2015
1 parent 065753c commit 29ee94b
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 29 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ To find a library of your interest, you might want to proceed looking at the [AP
# Project Features

* provide an idiomatic rust implementation for google APIs
* first-class documentation with cross-links and complete code-examples to increase ease-of-use
* support all features, including uploads and resumable uploads
* first-class documentation with cross-links and complete code-examples
* support all features, including downloads and resumable uploads
* safety and resilience are built-in, allowing you to create highly available tools on top of it. For example, you can trigger retries for all operations that may temporarily fail, e.g. due to network outage.
* *(soon)* Feature-complete command line tool to interact with each API.

Expand Down
80 changes: 72 additions & 8 deletions gen/groupsmigration1/src/cmn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ pub struct JsonServerError {
pub struct DummyNetworkStream;

impl Read for DummyNetworkStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
Ok(0)
}
}

impl Write for DummyNetworkStream {
fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
fn write(&mut self, _: &[u8]) -> io::Result<usize> {
Ok(0)
}

Expand Down Expand Up @@ -175,6 +175,21 @@ pub trait Delegate {
/// It's also useful as you can be sure that a request will definitely be made.
fn pre_request(&mut self) { }

/// Return the size of each chunk of a resumable upload.
/// Must be a power of two, with 1<<18 being the smallest allowed chunk size.
/// Will be called once before starting any resumable upload.
fn chunk_size(&mut self) -> u64 {
1 << 23
}

/// Called before the given chunk is uploaded to the server.
/// If true is returned, the upload will be interrupted.
/// However, it may be resumable if you stored the upload URL in a previous call
/// to `store_upload_url()`
fn cancel_chunk_upload(&mut self, chunk: &ContentRange) -> bool {
let _ = chunk;
false
}

/// Called before the API request method returns, in every case. It can be used to clean up
/// internal state between calls to the API.
Expand Down Expand Up @@ -417,7 +432,7 @@ impl Header for ContentRange {
}

/// We are not parsable, as parsing is done by the `Range` header
fn parse_header(raw: &[Vec<u8>]) -> Option<ContentRange> {
fn parse_header(_: &[Vec<u8>]) -> Option<ContentRange> {
None
}
}
Expand Down Expand Up @@ -465,7 +480,7 @@ impl Header for RangeResponseHeader {

impl HeaderFormat for RangeResponseHeader {
/// No implmentation necessary, we just need to parse
fn fmt_header(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fn fmt_header(&self, _: &mut fmt::Formatter) -> fmt::Result {
Err(fmt::Error)
}
}
Expand All @@ -488,7 +503,7 @@ impl<'a, NC, A> ResumableUploadHelper<'a, NC, A>
where NC: hyper::net::NetworkConnector,
A: oauth2::GetToken {

fn query_transfer_status(&'a mut self) -> (Option<u64>, hyper::HttpResult<hyper::client::Response>) {
fn query_transfer_status(&mut self) -> (Option<u64>, hyper::HttpResult<hyper::client::Response>) {
loop {
match self.client.post(self.url)
.header(UserAgent(self.user_agent.to_string()))
Expand Down Expand Up @@ -521,14 +536,63 @@ impl<'a, NC, A> ResumableUploadHelper<'a, NC, A>
}
}

pub fn upload(&'a mut self) -> hyper::HttpResult<hyper::client::Response> {
let start = match self.start_at {
pub fn upload(&mut self) -> hyper::HttpResult<hyper::client::Response> {
let mut start = match self.start_at {
Some(s) => s,
None => match self.query_transfer_status() {
(Some(s), _) => s,
(_, result) => return result
}
};
Err(hyper::error::HttpError::HttpStatusError)

const MIN_CHUNK_SIZE: u64 = 1 << 18;
let chunk_size = match self.delegate.chunk_size() {
cs if cs > MIN_CHUNK_SIZE => cs,
_ => MIN_CHUNK_SIZE
};

loop {
let request_size = match self.content_length - start {
rs if rs > chunk_size => chunk_size,
rs => rs
};

self.reader.seek(SeekFrom::Start(start)).unwrap();
let mut section_reader = self.reader.take(request_size);
let range_header = ContentRange {
range: Some(Chunk {first: start, last: start + request_size - 1}),
total_length: self.content_length
};
start += request_size;
if self.delegate.cancel_chunk_upload(&range_header) {
return Err(hyper::error::HttpError::HttpStatusError)
}
match self.client.post(self.url)
.header(range_header)
.header(ContentType(self.media_type.clone()))
.header(UserAgent(self.user_agent.to_string()))
.body(&mut section_reader)
.send() {
Ok(res) => {
if res.status == StatusCode::PermanentRedirect {
continue
}
if res.status != StatusCode::Ok {
if let Retry::After(d) = self.delegate.http_failure(&res, None) {
sleep(d);
continue;
}
}
return Ok(res)
},
Err(err) => {
if let Retry::After(d) = self.delegate.http_error(&err) {
sleep(d);
continue;
}
return Err(err)
}
}
}
}
}
5 changes: 2 additions & 3 deletions gen/groupsmigration1/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@
// We don't warn about this, as depending on the API, some data structures or facilities are never used.
// Instead of pre-determining this, we just disable the lint. It's manually tuned to not have any
// unused imports in fully featured APIs. Same with unused_mut ... .
#![allow(unused_imports, unused_mut)]
#![allow(unused_imports, unused_mut, dead_code)]
// Required for serde annotations
#![feature(custom_derive, custom_attribute, plugin)]
#![plugin(serde_macros)]
Expand Down Expand Up @@ -450,7 +450,6 @@ impl<'a, C, NC, A> ArchiveInsertCall<'a, C, NC, A> where NC: hyper::net::Network
/// Perform the operation you have build so far.
fn doit<RS>(mut self, mut reader: RS, reader_mime_type: mime::Mime, protocol: &'static str) -> Result<(hyper::client::Response, Groups)>
where RS: ReadSeek {
use hyper::client::IntoBody;
use std::io::{Read, Seek};
use hyper::header::{ContentType, ContentLength, Authorization, UserAgent, Location};
let mut dd = DefaultDelegate;
Expand Down Expand Up @@ -558,7 +557,7 @@ impl<'a, C, NC, A> ArchiveInsertCall<'a, C, NC, A> where NC: hyper::net::Network
}
req = req.header(ContentType(reader_mime_type.clone()))
.header(ContentLength(size))
.body(reader.into_body());
.body(&mut reader);
}
upload_url_from_server = true;
if protocol == "resumable" {
Expand Down
2 changes: 1 addition & 1 deletion src/mako/lib.rs.mako
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ ${lib.docs(c)}
// We don't warn about this, as depending on the API, some data structures or facilities are never used.
// Instead of pre-determining this, we just disable the lint. It's manually tuned to not have any
// unused imports in fully featured APIs. Same with unused_mut ... .
#![allow(unused_imports, unused_mut)]
#![allow(unused_imports, unused_mut, dead_code)]
// Required for serde annotations
#![feature(custom_derive, custom_attribute, plugin)]
#![plugin(serde_macros)]
Expand Down
10 changes: 3 additions & 7 deletions src/mako/lib/mbuild.mako
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,6 @@ match result {
% if URL_ENCODE in special_cases:
use url::{percent_encode, FORM_URLENCODED_ENCODE_SET};
% endif
## TODO: IntoBody is called explicilty, even though it should be working implicitly.
## However, the compiler complains about
## "the trait `core::marker::Sized` is not implemented for the type `std::io::Read`"
use hyper::client::IntoBody;
use std::io::{Read, Seek};
use hyper::header::{ContentType, ContentLength, Authorization, UserAgent, Location};
let mut dd = DefaultDelegate;
Expand Down Expand Up @@ -720,11 +716,11 @@ else {
.header(ContentType(json_mime_type.clone()))
.header(ContentLength(request_size as u64))
.body(request_value_reader.into_body())\
.body(&mut request_value_reader)\
% else:
.header(content_type)
.body(body_reader.into_body())\
.body(&mut body_reader)\
% endif ## not simple_media_param
% endif
;
Expand All @@ -733,7 +729,7 @@ else {
${READER_SEEK | indent_all_but_first_by(4)}
req = req.header(ContentType(reader_mime_type.clone()))
.header(ContentLength(size))
.body(reader.into_body());
.body(&mut reader);
}
% endif ## media upload handling
% if resumable_media_param:
Expand Down
80 changes: 72 additions & 8 deletions src/rust/cmn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ pub struct JsonServerError {
pub struct DummyNetworkStream;

impl Read for DummyNetworkStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
Ok(0)
}
}

impl Write for DummyNetworkStream {
fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
fn write(&mut self, _: &[u8]) -> io::Result<usize> {
Ok(0)
}

Expand Down Expand Up @@ -173,6 +173,21 @@ pub trait Delegate {
/// It's also useful as you can be sure that a request will definitely be made.
fn pre_request(&mut self) { }

/// Return the size of each chunk of a resumable upload.
/// Must be a power of two, with 1<<18 being the smallest allowed chunk size.
/// Will be called once before starting any resumable upload.
fn chunk_size(&mut self) -> u64 {
1 << 23
}

/// Called before the given chunk is uploaded to the server.
/// If true is returned, the upload will be interrupted.
/// However, it may be resumable if you stored the upload URL in a previous call
/// to `store_upload_url()`
fn cancel_chunk_upload(&mut self, chunk: &ContentRange) -> bool {
let _ = chunk;
false
}

/// Called before the API request method returns, in every case. It can be used to clean up
/// internal state between calls to the API.
Expand Down Expand Up @@ -415,7 +430,7 @@ impl Header for ContentRange {
}

/// We are not parsable, as parsing is done by the `Range` header
fn parse_header(raw: &[Vec<u8>]) -> Option<ContentRange> {
fn parse_header(_: &[Vec<u8>]) -> Option<ContentRange> {
None
}
}
Expand Down Expand Up @@ -463,7 +478,7 @@ impl Header for RangeResponseHeader {

impl HeaderFormat for RangeResponseHeader {
/// No implmentation necessary, we just need to parse
fn fmt_header(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fn fmt_header(&self, _: &mut fmt::Formatter) -> fmt::Result {
Err(fmt::Error)
}
}
Expand All @@ -486,7 +501,7 @@ impl<'a, NC, A> ResumableUploadHelper<'a, NC, A>
where NC: hyper::net::NetworkConnector,
A: oauth2::GetToken {

fn query_transfer_status(&'a mut self) -> (Option<u64>, hyper::HttpResult<hyper::client::Response>) {
fn query_transfer_status(&mut self) -> (Option<u64>, hyper::HttpResult<hyper::client::Response>) {
loop {
match self.client.post(self.url)
.header(UserAgent(self.user_agent.to_string()))
Expand Down Expand Up @@ -519,14 +534,63 @@ impl<'a, NC, A> ResumableUploadHelper<'a, NC, A>
}
}

pub fn upload(&'a mut self) -> hyper::HttpResult<hyper::client::Response> {
let start = match self.start_at {
pub fn upload(&mut self) -> hyper::HttpResult<hyper::client::Response> {
let mut start = match self.start_at {
Some(s) => s,
None => match self.query_transfer_status() {
(Some(s), _) => s,
(_, result) => return result
}
};
Err(hyper::error::HttpError::HttpStatusError)

const MIN_CHUNK_SIZE: u64 = 1 << 18;
let chunk_size = match self.delegate.chunk_size() {
cs if cs > MIN_CHUNK_SIZE => cs,
_ => MIN_CHUNK_SIZE
};

loop {
let request_size = match self.content_length - start {
rs if rs > chunk_size => chunk_size,
rs => rs
};

self.reader.seek(SeekFrom::Start(start)).unwrap();
let mut section_reader = self.reader.take(request_size);
let range_header = ContentRange {
range: Some(Chunk {first: start, last: start + request_size - 1}),
total_length: self.content_length
};
start += request_size;
if self.delegate.cancel_chunk_upload(&range_header) {
return Err(hyper::error::HttpError::HttpStatusError)
}
match self.client.post(self.url)
.header(range_header)
.header(ContentType(self.media_type.clone()))
.header(UserAgent(self.user_agent.to_string()))
.body(&mut section_reader)
.send() {
Ok(res) => {
if res.status == StatusCode::PermanentRedirect {
continue
}
if res.status != StatusCode::Ok {
if let Retry::After(d) = self.delegate.http_failure(&res, None) {
sleep(d);
continue;
}
}
return Ok(res)
},
Err(err) => {
if let Retry::After(d) = self.delegate.http_error(&err) {
sleep(d);
continue;
}
return Err(err)
}
}
}
}
}

0 comments on commit 29ee94b

Please sign in to comment.