Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-37670: [C++] IO FileInterface extend from enable_shared_from_this #37713

Merged
merged 8 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 49 additions & 36 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1454,14 +1454,7 @@ class ObjectOutputStream final : public io::OutputStream {

// OutputStream interface

Status Close() override {
auto fut = CloseAsync();
return fut.status();
}

Future<> CloseAsync() override {
if (closed_) return Status::OK();

Status EnsureReadyToFlushFromClose() {
if (current_part_) {
// Upload last part
RETURN_NOT_OK(CommitCurrentPart());
Expand All @@ -1472,36 +1465,56 @@ class ObjectOutputStream final : public io::OutputStream {
RETURN_NOT_OK(UploadPart("", 0));
}

// Wait for in-progress uploads to finish (if async writes are enabled)
return FlushAsync().Then([this]() {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
return Status::OK();
}

// At this point, all part uploads have finished successfully
DCHECK_GT(part_number_, 1);
DCHECK_EQ(upload_state_->completed_parts.size(),
static_cast<size_t>(part_number_ - 1));

S3Model::CompletedMultipartUpload completed_upload;
completed_upload.SetParts(upload_state_->completed_parts);
S3Model::CompleteMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
req.SetUploadId(upload_id_);
req.SetMultipartUpload(std::move(completed_upload));

auto outcome =
client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req));
if (!outcome.IsSuccess()) {
return ErrorToStatus(
std::forward_as_tuple("When completing multiple part upload for key '",
path_.key, "' in bucket '", path_.bucket, "': "),
"CompleteMultipartUpload", outcome.GetError());
}
Status FinishPartUploadAfterFlush() {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());

holder_ = nullptr;
closed_ = true;
return Status::OK();
});
// At this point, all part uploads have finished successfully
DCHECK_GT(part_number_, 1);
DCHECK_EQ(upload_state_->completed_parts.size(),
static_cast<size_t>(part_number_ - 1));

S3Model::CompletedMultipartUpload completed_upload;
completed_upload.SetParts(upload_state_->completed_parts);
S3Model::CompleteMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
req.SetUploadId(upload_id_);
req.SetMultipartUpload(std::move(completed_upload));

auto outcome =
client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req));
if (!outcome.IsSuccess()) {
return ErrorToStatus(
std::forward_as_tuple("When completing multiple part upload for key '",
path_.key, "' in bucket '", path_.bucket, "': "),
"CompleteMultipartUpload", outcome.GetError());
}

holder_ = nullptr;
closed_ = true;
return Status::OK();
}

Status Close() override {
if (closed_) return Status::OK();

RETURN_NOT_OK(EnsureReadyToFlushFromClose());

RETURN_NOT_OK(Flush());
return FinishPartUploadAfterFlush();
}

Future<> CloseAsync() override {
if (closed_) return Status::OK();

RETURN_NOT_OK(EnsureReadyToFlushFromClose());

auto self = std::dynamic_pointer_cast<ObjectOutputStream>(shared_from_this());
// Wait for in-progress uploads to finish (if async writes are enabled)
return FlushAsync().Then([self]() { return self->FinishPartUploadAfterFlush(); });
}

bool closed() const override { return closed_; }
Expand Down
25 changes: 25 additions & 0 deletions cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,21 @@ class TestS3FS : public S3TestMixin {
AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
}

void TestOpenOutputStreamCloseAsyncDestructor() {
std::shared_ptr<io::OutputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/somefile"));
ASSERT_OK(stream->Write("new data"));
// Destructor implicitly closes stream and completes the multipart upload.
// GH-37670: Testing it doesn't matter whether flush is triggered asynchronously
// after CloseAsync or synchronously after stream.reset() since we're just
// checking that `closeAsyncFut` keeps the stream alive until completion
// rather than segfaulting on a dangling stream
auto closeAsyncFut = stream->CloseAsync();
bkietz marked this conversation as resolved.
Show resolved Hide resolved
stream.reset();
ASSERT_OK(closeAsyncFut.MoveResult());
AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
}

protected:
S3Options options_;
std::shared_ptr<S3FileSystem> fs_;
Expand Down Expand Up @@ -1177,6 +1192,16 @@ TEST_F(TestS3FS, OpenOutputStreamDestructorSyncWrite) {
TestOpenOutputStreamDestructor();
}

TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorBackgroundWrites) {
TestOpenOutputStreamCloseAsyncDestructor();
}

TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorSyncWrite) {
options_.background_writes = false;
MakeFileSystem();
TestOpenOutputStreamCloseAsyncDestructor();
}

TEST_F(TestS3FS, OpenOutputStreamMetadata) {
std::shared_ptr<io::OutputStream> stream;

Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ Result<std::shared_ptr<const KeyValueMetadata>> InputStream::ReadMetadata() {
// executor
Future<std::shared_ptr<const KeyValueMetadata>> InputStream::ReadMetadataAsync(
const IOContext& ctx) {
auto self = shared_from_this();
std::shared_ptr<InputStream> self =
std::dynamic_pointer_cast<InputStream>(shared_from_this());
return DeferNotOk(internal::SubmitIO(ctx, [self] { return self->ReadMetadata(); }));
}

Expand Down Expand Up @@ -165,7 +166,7 @@ Result<std::shared_ptr<Buffer>> RandomAccessFile::ReadAt(int64_t position,
Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(const IOContext& ctx,
int64_t position,
int64_t nbytes) {
auto self = checked_pointer_cast<RandomAccessFile>(shared_from_this());
auto self = std::dynamic_pointer_cast<RandomAccessFile>(shared_from_this());
return DeferNotOk(internal::SubmitIO(
ctx, [self, position, nbytes] { return self->ReadAt(position, nbytes); }));
}
Expand Down
6 changes: 2 additions & 4 deletions cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ struct ARROW_EXPORT IOContext {
StopToken stop_token_;
};

class ARROW_EXPORT FileInterface {
class ARROW_EXPORT FileInterface : public std::enable_shared_from_this<FileInterface> {
public:
virtual ~FileInterface() = 0;

Expand Down Expand Up @@ -205,9 +205,7 @@ class ARROW_EXPORT OutputStream : virtual public FileInterface, public Writable
OutputStream() = default;
};

class ARROW_EXPORT InputStream : virtual public FileInterface,
virtual public Readable,
public std::enable_shared_from_this<InputStream> {
class ARROW_EXPORT InputStream : virtual public FileInterface, virtual public Readable {
public:
/// \brief Advance or skip stream indicated number of bytes
/// \param[in] nbytes the number to move forward
Expand Down