diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 29f8882225ae3..08fbcde6fd9de 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -1454,14 +1454,7 @@ class ObjectOutputStream final : public io::OutputStream { // OutputStream interface - Status Close() override { - auto fut = CloseAsync(); - return fut.status(); - } - - Future<> CloseAsync() override { - if (closed_) return Status::OK(); - + Status EnsureReadyToFlushFromClose() { if (current_part_) { // Upload last part RETURN_NOT_OK(CommitCurrentPart()); @@ -1472,36 +1465,56 @@ class ObjectOutputStream final : public io::OutputStream { RETURN_NOT_OK(UploadPart("", 0)); } - // Wait for in-progress uploads to finish (if async writes are enabled) - return FlushAsync().Then([this]() { - ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + return Status::OK(); + } - // At this point, all part uploads have finished successfully - DCHECK_GT(part_number_, 1); - DCHECK_EQ(upload_state_->completed_parts.size(), - static_cast(part_number_ - 1)); - - S3Model::CompletedMultipartUpload completed_upload; - completed_upload.SetParts(upload_state_->completed_parts); - S3Model::CompleteMultipartUploadRequest req; - req.SetBucket(ToAwsString(path_.bucket)); - req.SetKey(ToAwsString(path_.key)); - req.SetUploadId(upload_id_); - req.SetMultipartUpload(std::move(completed_upload)); - - auto outcome = - client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req)); - if (!outcome.IsSuccess()) { - return ErrorToStatus( - std::forward_as_tuple("When completing multiple part upload for key '", - path_.key, "' in bucket '", path_.bucket, "': "), - "CompleteMultipartUpload", outcome.GetError()); - } + Status FinishPartUploadAfterFlush() { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); - holder_ = nullptr; - closed_ = true; - return Status::OK(); - }); + // At this point, all part uploads have finished successfully + DCHECK_GT(part_number_, 1); + DCHECK_EQ(upload_state_->completed_parts.size(), + static_cast(part_number_ - 1)); + + S3Model::CompletedMultipartUpload completed_upload; + completed_upload.SetParts(upload_state_->completed_parts); + S3Model::CompleteMultipartUploadRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + req.SetUploadId(upload_id_); + req.SetMultipartUpload(std::move(completed_upload)); + + auto outcome = + client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req)); + if (!outcome.IsSuccess()) { + return ErrorToStatus( + std::forward_as_tuple("When completing multiple part upload for key '", + path_.key, "' in bucket '", path_.bucket, "': "), + "CompleteMultipartUpload", outcome.GetError()); + } + + holder_ = nullptr; + closed_ = true; + return Status::OK(); + } + + Status Close() override { + if (closed_) return Status::OK(); + + RETURN_NOT_OK(EnsureReadyToFlushFromClose()); + + RETURN_NOT_OK(Flush()); + return FinishPartUploadAfterFlush(); + } + + Future<> CloseAsync() override { + if (closed_) return Status::OK(); + + RETURN_NOT_OK(EnsureReadyToFlushFromClose()); + + auto self = std::dynamic_pointer_cast(shared_from_this()); + // Wait for in-progress uploads to finish (if async writes are enabled) + return FlushAsync().Then([self]() { return self->FinishPartUploadAfterFlush(); }); } bool closed() const override { return closed_; } diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index e9f14fde72316..f88ee7eef9332 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -590,6 +590,21 @@ class TestS3FS : public S3TestMixin { AssertObjectContents(client_.get(), "bucket", "somefile", "new data"); } + void TestOpenOutputStreamCloseAsyncDestructor() { + std::shared_ptr stream; + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/somefile")); + ASSERT_OK(stream->Write("new data")); + // Destructor implicitly closes stream and completes the multipart upload. + // GH-37670: Testing it doesn't matter whether flush is triggered asynchronously + // after CloseAsync or synchronously after stream.reset() since we're just + // checking that `closeAsyncFut` keeps the stream alive until completion + // rather than segfaulting on a dangling stream + auto closeAsyncFut = stream->CloseAsync(); + stream.reset(); + ASSERT_OK(closeAsyncFut.MoveResult()); + AssertObjectContents(client_.get(), "bucket", "somefile", "new data"); + } + protected: S3Options options_; std::shared_ptr fs_; @@ -1177,6 +1192,16 @@ TEST_F(TestS3FS, OpenOutputStreamDestructorSyncWrite) { TestOpenOutputStreamDestructor(); } +TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorBackgroundWrites) { + TestOpenOutputStreamCloseAsyncDestructor(); +} + +TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorSyncWrite) { + options_.background_writes = false; + MakeFileSystem(); + TestOpenOutputStreamCloseAsyncDestructor(); +} + TEST_F(TestS3FS, OpenOutputStreamMetadata) { std::shared_ptr stream; diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index e7819e139f67a..d3229fd067cbe 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -123,7 +123,8 @@ Result> InputStream::ReadMetadata() { // executor Future> InputStream::ReadMetadataAsync( const IOContext& ctx) { - auto self = shared_from_this(); + std::shared_ptr self = + std::dynamic_pointer_cast(shared_from_this()); return DeferNotOk(internal::SubmitIO(ctx, [self] { return self->ReadMetadata(); })); } @@ -165,7 +166,7 @@ Result> RandomAccessFile::ReadAt(int64_t position, Future> RandomAccessFile::ReadAsync(const IOContext& ctx, int64_t position, int64_t nbytes) { - auto self = checked_pointer_cast(shared_from_this()); + auto self = std::dynamic_pointer_cast(shared_from_this()); return DeferNotOk(internal::SubmitIO( ctx, [self, position, nbytes] { return self->ReadAt(position, nbytes); })); } diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index dcbe4feb261fb..d2a11b7b6d7ce 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -96,7 +96,7 @@ struct ARROW_EXPORT IOContext { StopToken stop_token_; }; -class ARROW_EXPORT FileInterface { +class ARROW_EXPORT FileInterface : public std::enable_shared_from_this { public: virtual ~FileInterface() = 0; @@ -205,9 +205,7 @@ class ARROW_EXPORT OutputStream : virtual public FileInterface, public Writable OutputStream() = default; }; -class ARROW_EXPORT InputStream : virtual public FileInterface, - virtual public Readable, - public std::enable_shared_from_this { +class ARROW_EXPORT InputStream : virtual public FileInterface, virtual public Readable { public: /// \brief Advance or skip stream indicated number of bytes /// \param[in] nbytes the number to move forward