diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 3a0ade3d2e322..13d6ead6ef686 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -1784,15 +1784,23 @@ class ObjectOutputStream final : public io::OutputStream { return Status::OK(); } + Status CleanupIfFailed(Status status) { + if (!status.ok()) { + RETURN_NOT_OK(CleanupAfterClose()); + return status; + } + return Status::OK(); + } + Status Close() override { if (closed_) return Status::OK(); - RETURN_NOT_OK(EnsureReadyToFlushFromClose()); + RETURN_NOT_OK(CleanupIfFailed(EnsureReadyToFlushFromClose())); - RETURN_NOT_OK(Flush()); + RETURN_NOT_OK(CleanupIfFailed(Flush())); if (IsMultipartCreated()) { - RETURN_NOT_OK(FinishPartUploadAfterFlush()); + RETURN_NOT_OK(CleanupIfFailed(FinishPartUploadAfterFlush())); } return CleanupAfterClose(); @@ -1801,12 +1809,12 @@ class ObjectOutputStream final : public io::OutputStream { Future<> CloseAsync() override { if (closed_) return Status::OK(); - RETURN_NOT_OK(EnsureReadyToFlushFromClose()); + RETURN_NOT_OK(CleanupIfFailed(EnsureReadyToFlushFromClose())); // Wait for in-progress uploads to finish (if async writes are enabled) return FlushAsync().Then([self = Self()]() { if (self->IsMultipartCreated()) { - RETURN_NOT_OK(self->FinishPartUploadAfterFlush()); + RETURN_NOT_OK(self->CleanupIfFailed(self->FinishPartUploadAfterFlush())); } return self->CleanupAfterClose(); }); @@ -2021,7 +2029,7 @@ class ObjectOutputStream final : public io::OutputStream { std::shared_ptr state, int32_t part_number, Aws::S3::Model::PutObjectOutcome outcome) { - HandleUploadUsingSingleRequestOutcome(state, request, outcome.GetResult()); + HandleUploadUsingSingleRequestOutcome(state, request, outcome); return Status::OK(); }; @@ -2072,7 +2080,7 @@ class ObjectOutputStream final : public io::OutputStream { std::shared_ptr state, int32_t part_number, Aws::S3::Model::UploadPartOutcome outcome) { - HandleUploadPartOutcome(state, part_number, request, outcome.GetResult()); + HandleUploadPartOutcome(state, part_number, request, outcome); return Status::OK(); }; @@ -2083,16 +2091,12 @@ class ObjectOutputStream final : public io::OutputStream { static void HandleUploadUsingSingleRequestOutcome( const std::shared_ptr& state, const S3Model::PutObjectRequest& req, - const Result& result) { + const S3Model::PutObjectOutcome& outcome) { std::unique_lock lock(state->mutex); - if (!result.ok()) { - state->status &= result.status(); - } else { - const auto& outcome = *result; - if (!outcome.IsSuccess()) { - state->status &= UploadUsingSingleRequestError(req, outcome); - } + if (!outcome.IsSuccess()) { + state->status &= UploadUsingSingleRequestError(req, outcome); } + // GH-41862: avoid potential deadlock if the Future's callback is called // with the mutex taken. auto fut = state->pending_uploads_completed; @@ -2103,18 +2107,14 @@ class ObjectOutputStream final : public io::OutputStream { static void HandleUploadPartOutcome(const std::shared_ptr& state, int part_number, const S3Model::UploadPartRequest& req, - const Result& result) { + const S3Model::UploadPartOutcome& outcome) { std::unique_lock lock(state->mutex); - if (!result.ok()) { - state->status &= result.status(); + if (!outcome.IsSuccess()) { + state->status &= UploadPartError(req, outcome); } else { - const auto& outcome = *result; - if (!outcome.IsSuccess()) { - state->status &= UploadPartError(req, outcome); - } else { - AddCompletedPart(state, part_number, outcome.GetResult()); - } + AddCompletedPart(state, part_number, outcome.GetResult()); } + // Notify completion if (--state->uploads_in_progress == 0) { // GH-41862: avoid potential deadlock if the Future's callback is called diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index b8f497d23c9a3..43091aaa986d9 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -572,7 +572,10 @@ class TestS3FS : public S3TestMixin { void TestOpenOutputStream(bool allow_delayed_open) { std::shared_ptr stream; - if (!allow_delayed_open) { + if (allow_delayed_open) { + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("nonexistent-bucket/somefile")); + ASSERT_RAISES(IOError, stream->Close()); + } else { // Nonexistent ASSERT_RAISES(IOError, fs_->OpenOutputStream("nonexistent-bucket/somefile")); }