Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async upload sstream #117

Merged
merged 2 commits into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/cos_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,8 @@ class CosAPI {
/// \return 返回context
SharedAsyncContext AsyncPutObject(const AsyncPutObjectReq& req);

SharedAsyncContext AsyncPutObject(const AsyncPutObjectByStreamReq& req);

/// \brief
/// 异步上传对象,封装了初始化分块上传、分块上传、完成分块上传三步,支持断点续传
/// \param req MultiPutObjectAsync请求
Expand Down
2 changes: 1 addition & 1 deletion include/op/object_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class ObjectOp : public BaseOp {
///
/// \return 返回HTTP请求的状态码及错误信息
CosResult PutObject(const PutObjectByStreamReq& req,
PutObjectByStreamResp* resp);
PutObjectByStreamResp* resp, const SharedTransferHandler& handler=nullptr);

/// \brief 删除Object
///
Expand Down
9 changes: 9 additions & 0 deletions include/request/object_req.h
Original file line number Diff line number Diff line change
Expand Up @@ -1712,6 +1712,15 @@ class AsyncPutObjectReq : public PutObjectByFileReq {
virtual ~AsyncPutObjectReq() {}
};

class AsyncPutObjectByStreamReq : public PutObjectByStreamReq {
public:
AsyncPutObjectByStreamReq(const std::string& bucket_name, const std::string& object_name,
std::istream& in_stream)
: PutObjectByStreamReq(bucket_name, object_name, in_stream) {}

virtual ~AsyncPutObjectByStreamReq() {}
};

class AsyncMultiPutObjectReq : public PutObjectByFileReq {
public:
AsyncMultiPutObjectReq(const std::string& bucket_name, const std::string& object_name,
Expand Down
20 changes: 19 additions & 1 deletion src/cos_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ int CosAPI::s_cos_obj_num = 0;
std::mutex g_init_lock;

Poco::TaskManager& GetGlobalTaskManager() {
static Poco::TaskManager task_manager;
static Poco::ThreadPool async_thread_pool("aysnc_pool", 2, CosSysConfig::GetAsynThreadPoolSize());
static Poco::TaskManager task_manager(async_thread_pool);
return task_manager;
}

Expand Down Expand Up @@ -501,6 +502,23 @@ SharedAsyncContext CosAPI::AsyncPutObject(const AsyncPutObjectReq& req) {
return context;
}

SharedAsyncContext CosAPI::AsyncPutObject(const AsyncPutObjectByStreamReq& req) {
SharedTransferHandler handler(new TransferHandler());
handler->SetRequest(reinterpret_cast<const void*>(&req));
auto& is = req.GetStream();
is.seekg(0, std::ios::end);
handler->SetTotalSize(is.tellg());
is.seekg(0, std::ios::beg);
TaskFunc fn = [=]() {
PutObjectByStreamResp resp;
m_object_op.PutObject(req, &resp, handler);
};
GetGlobalTaskManager().start(new AsyncTask(std::move(fn)));
SharedAsyncContext context(new AsyncContext(handler));
return context;
}


SharedAsyncContext CosAPI::AsyncMultiPutObject(const AsyncMultiPutObjectReq& req) {
SharedTransferHandler handler(new TransferHandler());
handler->SetRequest(reinterpret_cast<const void*>(&req));
Expand Down
11 changes: 8 additions & 3 deletions src/op/object_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ CosResult ObjectOp::MultiGetObject(const GetObjectByFileReq& req,
}

CosResult ObjectOp::PutObject(const PutObjectByStreamReq& req,
PutObjectByStreamResp* resp) {
PutObjectByStreamResp* resp, const SharedTransferHandler& handler) {
CosResult result;
std::string host = CosSysConfig::GetHost(GetAppId(), m_config->GetRegion(),
req.GetBucketName());
Expand Down Expand Up @@ -410,7 +410,7 @@ CosResult ObjectOp::PutObject(const PutObjectByStreamReq& req,
}

result = UploadAction(host, path, req, additional_headers,
additional_params, is, resp);
additional_params, is, resp, handler);

// V4 Etag长度为40字节
if (result.IsSucc() && need_check_etag &&
Expand All @@ -423,7 +423,12 @@ CosResult ObjectOp::PutObject(const PutObjectByStreamReq& req,
md5_str.c_str(), resp->GetEtag().c_str(),
resp->GetXCosRequestId().c_str());
}

if(result.IsSucc() && handler) {
handler->UpdateStatus(TransferStatus::COMPLETED, result, resp->GetHeaders(),
resp->GetBody());
} else if(handler) {
handler->UpdateStatus(TransferStatus::FAILED, result);
}
return result;
}

Expand Down