Skip to content

Commit

Permalink
zeroize memory copy
Browse files Browse the repository at this point in the history
Change-Id: I2d389b24446623e8f639642a2b354120f20dbe56
  • Loading branch information
wu-hanqing committed Sep 22, 2020
1 parent 8a1c225 commit 455cbec
Show file tree
Hide file tree
Showing 46 changed files with 1,074 additions and 968 deletions.
12 changes: 10 additions & 2 deletions include/client/libcurve.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,11 @@ namespace client {

class FileClient;

enum class UserDataType {
RawBuffer, // char*
IOBuffer // butil::IOBuf*
};

// 存储用户信息
typedef struct UserInfo {
// 当前执行的owner信息
Expand Down Expand Up @@ -461,17 +466,20 @@ class CurveClient {
* 异步读
* @param fd 文件fd
* @param aioctx 异步读写的io上下文
* @param dataType type of user buffer
* @return 返回错误码
*/
virtual int AioRead(int fd, CurveAioContext* aioctx);
virtual int AioRead(int fd, CurveAioContext* aioctx, UserDataType dataType);

/**
* 异步写
* @param fd 文件fd
* @param aioctx 异步读写的io上下文
* @param dataType type of user buffer
* @return 返回错误码
*/
virtual int AioWrite(int fd, CurveAioContext* aioctx);
virtual int AioWrite(int fd, CurveAioContext* aioctx,
UserDataType dataType);

/**
* 测试使用,设置fileclient
Expand Down
34 changes: 18 additions & 16 deletions nebd/src/part2/file_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,35 @@
#include <brpc/closure_guard.h>
#include <brpc/controller.h>

#include <butil/iobuf.h>

#include "nebd/src/part2/file_service.h"

namespace nebd {
namespace server {

using nebd::client::RetCode;

static void AioReadDeleter(void* m) {
delete[] reinterpret_cast<char*>(m);
}

void NebdFileServiceCallback(NebdServerAioContext* context) {
CHECK(context != nullptr);
std::unique_ptr<NebdServerAioContext> contextGuard(context);
std::unique_ptr<butil::IOBuf> iobufGuard(
reinterpret_cast<butil::IOBuf*>(context->buf));
brpc::ClosureGuard doneGuard(context->done);
switch (context->op) {
case LIBAIO_OP::LIBAIO_OP_READ:
{
nebd::client::ReadResponse* response =
dynamic_cast<nebd::client::ReadResponse*>(context->response);
butil::IOBuf readBuf;
readBuf.append_user_data(
context->buf, context->size, AioReadDeleter);
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Read file failed. "
<< "return code: " << context->ret;
} else {
brpc::Controller* cntl =
dynamic_cast<brpc::Controller *>(context->cntl);
cntl->response_attachment().append(readBuf);
cntl->response_attachment() =
*reinterpret_cast<butil::IOBuf*>(context->buf);
response->set_retcode(RetCode::kOK);
}
break;
Expand All @@ -69,7 +67,6 @@ void NebdFileServiceCallback(NebdServerAioContext* context) {
} else {
response->set_retcode(RetCode::kOK);
}
delete[] reinterpret_cast<char*>(context->buf);
break;
}
case LIBAIO_OP::LIBAIO_OP_FLUSH:
Expand Down Expand Up @@ -141,19 +138,21 @@ void NebdFileServiceImpl::Write(
aioContext->cb = NebdFileServiceCallback;

brpc::Controller* cntl = dynamic_cast<brpc::Controller *>(cntl_base);
aioContext->buf = new char[aioContext->size];
size_t copySize =
cntl->request_attachment().copy_to(aioContext->buf, aioContext->size);

std::unique_ptr<butil::IOBuf> buf(new butil::IOBuf());
*buf = cntl->request_attachment();

size_t copySize = buf->size();
if (copySize != aioContext->size) {
LOG(ERROR) << "Copy attachment failed. "
<< "fd: " << request->fd()
<< ", offset: " << request->offset()
<< ", size: " << request->size()
<< ", copy size: " << copySize;
delete[] reinterpret_cast<char*>(aioContext->buf);
return;
}

aioContext->buf = buf.get();
aioContext->response = response;
aioContext->done = done;
aioContext->cntl = cntl_base;
Expand All @@ -164,8 +163,8 @@ void NebdFileServiceImpl::Write(
<< ", offset: " << request->offset()
<< ", size: " << request->size()
<< ", return code: " << rc;
delete[] reinterpret_cast<char*>(aioContext->buf);
} else {
buf.release();
doneGuard.release();
}
}
Expand All @@ -184,7 +183,10 @@ void NebdFileServiceImpl::Read(
aioContext->size = request->size();
aioContext->op = LIBAIO_OP::LIBAIO_OP_READ;
aioContext->cb = NebdFileServiceCallback;
aioContext->buf = new char[request->size()];

std::unique_ptr<butil::IOBuf> buf(new butil::IOBuf());
aioContext->buf = buf.get();

aioContext->response = response;
aioContext->done = done;
aioContext->cntl = cntl_base;
Expand All @@ -195,8 +197,8 @@ void NebdFileServiceImpl::Read(
<< ", offset: " << request->offset()
<< ", size: " << request->size()
<< ", return code: " << rc;
delete[] reinterpret_cast<char*>(aioContext->buf);
} else {
buf.release();
doneGuard.release();
}
}
Expand Down
6 changes: 4 additions & 2 deletions nebd/src/part2/request_executor_curve.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ int CurveRequestExecutor::AioRead(
return -1;
}

ret = client_->AioRead(curveFd, &curveCombineCtx->curveCtx);
ret = client_->AioRead(curveFd, &curveCombineCtx->curveCtx,
curve::client::UserDataType::IOBuffer);
if (ret != LIBCURVE_ERROR::OK) {
delete curveCombineCtx;
return -1;
Expand All @@ -200,7 +201,8 @@ int CurveRequestExecutor::AioWrite(
return -1;
}

ret = client_->AioWrite(curveFd, &curveCombineCtx->curveCtx);
ret = client_->AioWrite(curveFd, &curveCombineCtx->curveCtx,
curve::client::UserDataType::IOBuffer);
if (ret != LIBCURVE_ERROR::OK) {
delete curveCombineCtx;
return -1;
Expand Down
2 changes: 1 addition & 1 deletion nebd/test/part1/nebd_client_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ TEST_F(NebdFileClientTest, CommonTest) {

ASSERT_EQ(0, Extend4Nebd(fd, kFileSize));
ASSERT_EQ(kFileSize, GetFileSize4Nebd(fd));
ASSERT_EQ(-1, GetInfo4Nebd(1));
ASSERT_EQ(kFileSize, GetInfo4Nebd(fd));
ASSERT_EQ(0, InvalidCache4Nebd(fd));

char buffer[kBufSize];
Expand Down
16 changes: 11 additions & 5 deletions nebd/test/part2/file_service_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,13 @@ TEST_F(FileServiceTest, WriteTest) {
EXPECT_CALL(*fileManager_, AioWrite(fd, NotNull()))
.WillOnce(DoAll(SaveArg<1>(&aioCtx), Return(0)));
fileService_->Write(&cntl, &request, &response, &done);
ASSERT_EQ(0, strncmp((char*)aioCtx->buf, buf, kSize)); // NOLINT

butil::IOBuf data;
data.append(buf, kSize);
ASSERT_EQ(
*reinterpret_cast<butil::IOBuf*>(aioCtx->buf),
data);

ASSERT_FALSE(done.IsRunned());

// write failed
Expand Down Expand Up @@ -342,7 +348,7 @@ TEST_F(FileServiceTest, CallbackTest) {
context->offset = 0;
context->size = 4096;
context->done = &done;
context->buf = new char[4096];
context->buf = new butil::IOBuf();
context->ret = 0;
NebdFileServiceCallback(context);
ASSERT_TRUE(done.IsRunned());
Expand All @@ -360,7 +366,7 @@ TEST_F(FileServiceTest, CallbackTest) {
context->offset = 0;
context->size = 4096;
context->done = &done;
context->buf = new char[4096];
context->buf = new butil::IOBuf();
context->ret = -1;
NebdFileServiceCallback(context);
ASSERT_TRUE(done.IsRunned());
Expand All @@ -378,7 +384,7 @@ TEST_F(FileServiceTest, CallbackTest) {
context->offset = 0;
context->size = 4096;
context->done = &done;
context->buf = new char[4096];
context->buf = new butil::IOBuf();
context->ret = 0;
NebdFileServiceCallback(context);
ASSERT_TRUE(done.IsRunned());
Expand All @@ -396,7 +402,7 @@ TEST_F(FileServiceTest, CallbackTest) {
context->offset = 0;
context->size = 4096;
context->done = &done;
context->buf = new char[4096];
context->buf = new butil::IOBuf();
context->ret = -1;
NebdFileServiceCallback(context);
ASSERT_TRUE(done.IsRunned());
Expand Down
6 changes: 4 additions & 2 deletions nebd/test/part2/mock_curve_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ class MockCurveClient : public ::curve::client::CurveClient {
MOCK_METHOD1(Close, int(int));
MOCK_METHOD2(Extend, int(const std::string&, int64_t));
MOCK_METHOD1(StatFile, int64_t(const std::string&));
MOCK_METHOD2(AioRead, int(int, CurveAioContext*));
MOCK_METHOD2(AioWrite, int(int, CurveAioContext*));
MOCK_METHOD3(AioRead,
int(int, CurveAioContext*, curve::client::UserDataType));
MOCK_METHOD3(AioWrite,
int(int, CurveAioContext*, curve::client::UserDataType));
};

} // namespace server
Expand Down
16 changes: 8 additions & 8 deletions nebd/test/part2/test_request_executor_curve.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,15 +267,15 @@ TEST_F(TestReuqestExecutorCurve, test_AioRead) {
// 1. nebdFileIns不是CurveFileInstance类型, 异步读失败
{
auto nebdFileIns = new NebdFileInstance();
EXPECT_CALL(*curveClient_, AioRead(_, _)).Times(0);
EXPECT_CALL(*curveClient_, AioRead(_, _, _)).Times(0);
ASSERT_EQ(-1, executor.AioRead(nebdFileIns, &aiotcx));
}

// 2. nebdFileIns中的fd<0, 异步读失败
{
auto curveFileIns = new CurveFileInstance();
curveFileIns->fd = -1;
EXPECT_CALL(*curveClient_, AioRead(_, _)).Times(0);
EXPECT_CALL(*curveClient_, AioRead(_, _, _)).Times(0);
ASSERT_EQ(-1, executor.AioRead(curveFileIns, &aiotcx));
}

Expand All @@ -288,7 +288,7 @@ TEST_F(TestReuqestExecutorCurve, test_AioRead) {
aiotcx.op = LIBAIO_OP::LIBAIO_OP_READ;
curveFileIns->fd = 1;
curveFileIns->fileName = curveFilename;
EXPECT_CALL(*curveClient_, AioRead(1, _))
EXPECT_CALL(*curveClient_, AioRead(1, _, _))
.WillOnce(Return(LIBCURVE_ERROR::FAILED));
ASSERT_EQ(-1, executor.AioRead(curveFileIns, &aiotcx));
}
Expand All @@ -299,7 +299,7 @@ TEST_F(TestReuqestExecutorCurve, test_AioRead) {
curveFileIns->fd = 1;
curveFileIns->fileName = curveFilename;
CurveAioContext* curveCtx;
EXPECT_CALL(*curveClient_, AioRead(1, _))
EXPECT_CALL(*curveClient_, AioRead(1, _, _))
.WillOnce(DoAll(SaveArg<1>(&curveCtx),
Return(LIBCURVE_ERROR::OK)));
ASSERT_EQ(0, executor.AioRead(curveFileIns, &aiotcx));
Expand All @@ -316,15 +316,15 @@ TEST_F(TestReuqestExecutorCurve, test_AioWrite) {
// 1. nebdFileIns不是CurveFileInstance类型, 异步写失败
{
auto nebdFileIns = new NebdFileInstance();
EXPECT_CALL(*curveClient_, AioWrite(_, _)).Times(0);
EXPECT_CALL(*curveClient_, AioWrite(_, _, _)).Times(0);
ASSERT_EQ(-1, executor.AioWrite(nebdFileIns, &aiotcx));
}

// 2. nebdFileIns中的fd<0, 异步写失败
{
auto curveFileIns = new CurveFileInstance();
curveFileIns->fd = -1;
EXPECT_CALL(*curveClient_, AioWrite(_, _)).Times(0);
EXPECT_CALL(*curveClient_, AioWrite(_, _, _)).Times(0);
ASSERT_EQ(-1, executor.AioWrite(curveFileIns, &aiotcx));
}

Expand All @@ -337,7 +337,7 @@ TEST_F(TestReuqestExecutorCurve, test_AioWrite) {
aiotcx.op = LIBAIO_OP::LIBAIO_OP_READ;
curveFileIns->fd = 1;
curveFileIns->fileName = curveFilename;
EXPECT_CALL(*curveClient_, AioWrite(1, _))
EXPECT_CALL(*curveClient_, AioWrite(1, _, _))
.WillOnce(Return(LIBCURVE_ERROR::FAILED));
ASSERT_EQ(-1, executor.AioWrite(curveFileIns, &aiotcx));
}
Expand All @@ -348,7 +348,7 @@ TEST_F(TestReuqestExecutorCurve, test_AioWrite) {
curveFileIns->fd = 1;
curveFileIns->fileName = curveFilename;
CurveAioContext* curveCtx;
EXPECT_CALL(*curveClient_, AioWrite(1, _))
EXPECT_CALL(*curveClient_, AioWrite(1, _, _))
.WillOnce(DoAll(SaveArg<1>(&curveCtx),
Return(LIBCURVE_ERROR::OK)));
ASSERT_EQ(0, executor.AioWrite(curveFileIns, &aiotcx));
Expand Down
12 changes: 4 additions & 8 deletions src/client/chunk_closure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ void ClientClosure::OnInvalidRequest() {

void WriteChunkClosure::SendRetryRequest() {
client_->WriteChunk(reqCtx_->idinfo_, reqCtx_->seq_,
reqCtx_->writeBuffer_,
reqCtx_->writeData_,
reqCtx_->offset_,
reqCtx_->rawlength_,
reqCtx_->sourceInfo_,
Expand Down Expand Up @@ -531,9 +531,7 @@ void ReadChunkClosure::SendRetryRequest() {
void ReadChunkClosure::OnSuccess() {
ClientClosure::OnSuccess();

cntl_->response_attachment().copy_to(
reqCtx_->readBuffer_,
cntl_->response_attachment().size());
reqCtx_->readData_ = cntl_->response_attachment();

metaCache_->UpdateAppliedIndex(
reqCtx_->idinfo_.lpid_,
Expand All @@ -545,7 +543,7 @@ void ReadChunkClosure::OnChunkNotExist() {
ClientClosure::OnChunkNotExist();

reqDone_->SetFailed(0);
memset(reqCtx_->readBuffer_, 0, reqCtx_->rawlength_);
reqCtx_->readData_.resize(reqCtx_->rawlength_, 0);
metaCache_->UpdateAppliedIndex(chunkIdInfo_.lpid_, chunkIdInfo_.cpid_,
response_->appliedindex());
}
Expand All @@ -560,9 +558,7 @@ void ReadChunkSnapClosure::SendRetryRequest() {
void ReadChunkSnapClosure::OnSuccess() {
ClientClosure::OnSuccess();

cntl_->response_attachment().copy_to(
reqCtx_->readBuffer_,
cntl_->response_attachment().size());
reqCtx_->readData_ = cntl_->response_attachment();
}

void DeleteChunkSnapClosure::SendRetryRequest() {
Expand Down
Loading

0 comments on commit 455cbec

Please sign in to comment.