From 1a5f0cecc7a62cb588ae43886151fe6fbb871618 Mon Sep 17 00:00:00 2001 From: Carlos O'Ryan Date: Thu, 17 Mar 2022 20:36:55 +0000 Subject: [PATCH] fix(storage): return an error on short writes Very rarely the retry policy is exhausted during an upload. If this happens when the last request was a query to find the status of the upload the `UploadChunk()` succeeds with a short write. We need to determine if that is a good idea (it might be). In the interim, we need to signal to the application that the stream is no longer usable. They can use `.Suspend()` to reset the stream (maybe in another process) if that can be useful. --- .../internal/object_write_streambuf.cc | 7 +++-- .../internal/object_write_streambuf_test.cc | 29 +++++++++++++++++++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/google/cloud/storage/internal/object_write_streambuf.cc b/google/cloud/storage/internal/object_write_streambuf.cc index 32092c49b2018..4667af7c4e7d5 100644 --- a/google/cloud/storage/internal/object_write_streambuf.cc +++ b/google/cloud/storage/internal/object_write_streambuf.cc @@ -180,7 +180,6 @@ void ObjectWriteStreambuf::FlushRoundChunk(ConstBufferSequence buffers) { // GCS upload returns an updated range header that sets the next expected // byte. Check to make sure it remains consistent with the bytes stored in the // buffer. - auto first_buffered_byte = upload_session_->next_expected_byte(); auto expected_next_byte = upload_session_->next_expected_byte() + actual_size; last_response_ = upload_session_->UploadChunk(payload); @@ -200,8 +199,10 @@ void ObjectWriteStreambuf::FlushRoundChunk(ConstBufferSequence buffers) { // even if no "final chunk" is sent. The resumable upload classes know how // to deal with this mess, so let's not duplicate that code here. auto actual_next_byte = upload_session_->next_expected_byte(); - if (actual_next_byte < expected_next_byte && - actual_next_byte < first_buffered_byte) { + if (actual_next_byte < expected_next_byte) { + // TODO(#8559) - if actual_next_byte < first_buffered_byte and there is + // enough space in the buffer we could save the bytes for a future + // write. std::ostringstream error_message; error_message << "Could not continue upload stream. GCS requested byte " << actual_next_byte << " which has already been uploaded."; diff --git a/google/cloud/storage/internal/object_write_streambuf_test.cc b/google/cloud/storage/internal/object_write_streambuf_test.cc index d949ec1ba82c4..2f5648eb9792e 100644 --- a/google/cloud/storage/internal/object_write_streambuf_test.cc +++ b/google/cloud/storage/internal/object_write_streambuf_test.cc @@ -406,6 +406,35 @@ TEST(ObjectWriteStreambufTest, NextExpectedByteDecreases) { EXPECT_THAT(streambuf.last_status(), StatusIs(StatusCode::kAborted)); } +/// @test verify that the upload steam transitions to a bad state on a partial +/// write. +TEST(ObjectWriteStreambufTest, PartialUploadChunk) { + auto mock = absl::make_unique(); + + auto const quantum = UploadChunkRequest::kChunkSizeQuantum; + std::string const payload = std::string(quantum * 4, '*'); + + auto next_byte = 2 * quantum - 1; + EXPECT_CALL(*mock, UploadChunk).WillOnce(InvokeWithoutArgs([&]() { + return make_status_or(ResumableUploadResponse{ + "", ResumableUploadResponse::kInProgress, next_byte, {}, {}}); + })); + EXPECT_CALL(*mock, next_expected_byte()).WillRepeatedly([&]() { + return next_byte; + }); + EXPECT_CALL(*mock, done).WillRepeatedly(Return(false)); + std::string id = "id"; + EXPECT_CALL(*mock, session_id).WillRepeatedly(ReturnRef(id)); + + ObjectWriteStreambuf streambuf( + std::move(mock), quantum, CreateNullHashFunction(), HashValues{}, + CreateNullHashValidator(), AutoFinalizeConfig::kEnabled); + std::ostream output(&streambuf); + output.write(payload.data(), payload.size()); + EXPECT_FALSE(output.good()); + EXPECT_THAT(streambuf.last_status(), StatusIs(StatusCode::kAborted)); +} + /// @test Verify that a stream flushes when mixing operations that add one /// character at a time and operations that add buffers. TEST(ObjectWriteStreambufTest, MixPutcPutn) {