Skip to content

Commit

Permalink
GH-37670: [C++] IO Output extend from enable_shared_from_this
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Sep 14, 2023
1 parent 15a8ac3 commit ec41eee
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
24 changes: 13 additions & 11 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1472,21 +1472,23 @@ class ObjectOutputStream final : public io::OutputStream {
RETURN_NOT_OK(UploadPart("", 0));
}

auto self =
::arrow::internal::checked_pointer_cast<ObjectOutputStream>(shared_from_this());
// Wait for in-progress uploads to finish (if async writes are enabled)
return FlushAsync().Then([this]() {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
return FlushAsync().Then([self]() {
ARROW_ASSIGN_OR_RAISE(auto client_lock, self->holder_->Lock());

// At this point, all part uploads have finished successfully
DCHECK_GT(part_number_, 1);
DCHECK_EQ(upload_state_->completed_parts.size(),
static_cast<size_t>(part_number_ - 1));
DCHECK_GT(self->part_number_, 1);
DCHECK_EQ(self->upload_state_->completed_parts.size(),
static_cast<size_t>(self->part_number_ - 1));

S3Model::CompletedMultipartUpload completed_upload;
completed_upload.SetParts(upload_state_->completed_parts);
completed_upload.SetParts(self->upload_state_->completed_parts);
S3Model::CompleteMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
req.SetUploadId(upload_id_);
req.SetBucket(ToAwsString(self->path_.bucket));
req.SetKey(ToAwsString(self->path_.key));
req.SetUploadId(self->upload_id_);
req.SetMultipartUpload(std::move(completed_upload));

auto outcome =
Expand All @@ -1498,8 +1500,8 @@ class ObjectOutputStream final : public io::OutputStream {
"CompleteMultipartUpload", outcome.GetError());
}

holder_ = nullptr;
closed_ = true;
self->holder_ = nullptr;
self->closed_ = true;
return Status::OK();
});
}
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ class ARROW_EXPORT Readable {
virtual const IOContext& io_context() const;
};

class ARROW_EXPORT OutputStream : virtual public FileInterface, public Writable {
class ARROW_EXPORT OutputStream : virtual public FileInterface,
public Writable,
public std::enable_shared_from_this<OutputStream> {
protected:
OutputStream() = default;
};
Expand Down

0 comments on commit ec41eee

Please sign in to comment.