Skip to content

Commit

Permalink
feat(cold-backup): add rate limit for fds (#443)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhao liwei authored May 8, 2020
1 parent 6127676 commit 54b4dda
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 58 deletions.
216 changes: 163 additions & 53 deletions src/dist/block_service/fds/fds_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
#include <memory>
#include <fstream>
#include <string.h>
#include <dsn/utility/defer.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/TokenBucket.h>
#include <dsn/dist/fmt_logging.h>

namespace dsn {
namespace dist {
Expand Down Expand Up @@ -95,10 +99,46 @@ DEFINE_THREAD_POOL_CODE(THREAD_POOL_FDS_SERVICE)
DEFINE_TASK_CODE(LPC_FDS_CALL, TASK_PRIORITY_COMMON, THREAD_POOL_FDS_SERVICE)

const std::string fds_service::FILE_LENGTH_CUSTOM_KEY = "x-xiaomi-meta-content-length";
const std::string fds_service::FILE_LENGTH_KEY = "content-length";
const std::string fds_service::FILE_MD5_KEY = "content-md5";

fds_service::fds_service() {}
fds_service::fds_service()
{
const int BYTE_TO_BIT = 8;

/// In normal scenario, the sst file size of level 0 is write_buffer_size * [0.75, 1.25]
/// And in BULK_LOAD scenario, it is 4 * write_buffer_size * [0.75, 1.25].
/// In rdsn, we can't get the scenario, so if we take BULK_LOAD scenario into consideration,
/// we must set max_sst_file_size to 4 * write_buffer_size * [0.75, 1.25], which is too big.
/// So in this implementation, we don't take BULK_LOAD scenario into consideration.
uint64_t target_file_size =
dsn_config_get_value_uint64("pegasus.server",
"rocksdb_target_file_size_base",
64 * 1024 * 1024,
"rocksdb options.target_file_size_base");
uint64_t write_buffer_size = dsn_config_get_value_uint64("pegasus.server",
"rocksdb_write_buffer_size",
64 * 1024 * 1024,
"rocksdb options.write_buffer_size");
uint64_t max_sst_file_size = std::max(target_file_size, (uint64_t)1.25 * write_buffer_size);

uint32_t write_rate_limit = (uint32_t)dsn_config_get_value_uint64(
"replication", "fds_write_limit_rate", 20, "rate limit of fds(Mb/s)");
/// For write operation, we can't send a file in batches. Because putContent interface of fds
/// will overwrite what was sent before for the same file. So we must send a file as a whole.
/// If file size > burst size, the file will be rejected by the token bucket.
/// Here we set burst_size = max_sst_file_size + 3MB, a litte greater than max_sst_file_size
uint32_t burst_size =
std::max(2 * write_rate_limit * 1e6 / BYTE_TO_BIT, max_sst_file_size + 3e6);
_write_token_bucket.reset(
new folly::TokenBucket(write_rate_limit * 1e6 / BYTE_TO_BIT, burst_size));

uint32_t read_rate_limit = (uint32_t)dsn_config_get_value_uint64(
"replication", "fds_read_limit_rate", 20, "rate limit of fds(Mb/s)");
burst_size = 2 * read_rate_limit * 1e6 / BYTE_TO_BIT;
_read_token_bucket.reset(
new folly::TokenBucket(read_rate_limit * 1e6 / BYTE_TO_BIT, burst_size));
}

fds_service::~fds_service() {}

/**
Expand Down Expand Up @@ -231,6 +271,7 @@ dsn::task_ptr fds_service::list_dir(const ls_request &req,
return t;
}

// TODO(zhaoliwei) refactor these code, because there have same code in get_file_meta()
dsn::task_ptr fds_service::create_file(const create_file_request &req,
dsn::task_code code,
const create_file_callback &cb,
Expand Down Expand Up @@ -484,82 +525,146 @@ fds_file_object::fds_file_object(fds_service *s,

fds_file_object::~fds_file_object() {}

dsn::error_code fds_file_object::get_content(uint64_t pos,
int64_t length,
/*out*/ std::ostream &os,
/*out*/ uint64_t &transfered_bytes)
error_code fds_file_object::get_file_meta()
{
dsn::error_code err;
galaxy::fds::GalaxyFDSClient *c = _service->get_client();
try {
auto meta = c->getObjectMetadata(_service->get_bucket_name(), _fds_path)->metadata();

// get file length
auto iter = meta.find(fds_service::FILE_LENGTH_CUSTOM_KEY);
dassert_f(iter != meta.end(),
"can't find {} in object({})'s metadata",
fds_service::FILE_LENGTH_CUSTOM_KEY.c_str(),
_fds_path.c_str());
bool valid = dsn::buf2uint64(iter->second, _size);
dassert_f(valid, "error to get file size");

// get md5 key
iter = meta.find(fds_service::FILE_MD5_KEY);
dassert_f(iter != meta.end(),
"can't find {} in object({})'s metadata",
fds_service::FILE_MD5_KEY.c_str(),
_fds_path.c_str());
_md5sum = iter->second;

_has_meta_synced = true;
return ERR_OK;
} catch (const galaxy::fds::GalaxyFDSClientException &ex) {
if (ex.code() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND) {
return ERR_OBJECT_NOT_FOUND;
} else {
derror_f("fds getObjectMetadata failed: parameter({}), code({}), msg({})",
_name.c_str(),
ex.code(),
ex.what());
return ERR_FS_INTERNAL;
}
}
}

error_code fds_file_object::get_content_in_batches(uint64_t start,
int64_t length,
/*out*/ std::ostream &os,
/*out*/ uint64_t &transfered_bytes)
{
// the max batch size is 1MB
const uint64_t BATCH_MAX = 1e6;
error_code err = ERR_OK;
transfered_bytes = 0;

// get file meta if it is not synced
if (!_has_meta_synced) {
err = get_file_meta();
if (ERR_OK != err) {
return err;
}
}

// if length = -1, it means we should transfer the whole file
uint64_t to_transfer_bytes = (length == -1 ? _size : length);

uint64_t pos = start;
uint64_t once_transfered_bytes = 0;
while (pos < start + to_transfer_bytes) {
uint64_t batch_len = std::min(BATCH_MAX, start + to_transfer_bytes - pos);
// get tokens from token bucket
_service->_read_token_bucket->consumeWithBorrowAndWait(batch_len);

err = get_content(pos, batch_len, os, once_transfered_bytes);
transfered_bytes += once_transfered_bytes;
if (err != ERR_OK || once_transfered_bytes < batch_len) {
return err;
}
pos += batch_len;
}

return ERR_OK;
}

error_code fds_file_object::get_content(uint64_t pos,
uint64_t length,
/*out*/ std::ostream &os,
/*out*/ uint64_t &transfered_bytes)
{
error_code err = ERR_OK;
transfered_bytes = 0;
while (true) {
if (_has_meta_synced) {
// if we have download enough or we have reach the end
if ((length != -1 && (int64_t)transfered_bytes >= length) ||
transfered_bytes + pos >= _size) {
return dsn::ERR_OK;
}
// if we have download enough or we have reach the end
if (transfered_bytes >= length || transfered_bytes + pos >= _size) {
return ERR_OK;
}

try {
galaxy::fds::GalaxyFDSClient *c = _service->get_client();
std::shared_ptr<galaxy::fds::FDSObject> obj;
if (length == -1)
obj = c->getObject(_service->get_bucket_name(), _fds_path, pos + transfered_bytes);
else
obj = c->getObject(_service->get_bucket_name(),
_fds_path,
pos + transfered_bytes,
length - transfered_bytes);
obj = c->getObject(_service->get_bucket_name(),
_fds_path,
pos + transfered_bytes,
length - transfered_bytes);
dinfo("get object from fds succeed, remote_file(%s)", _fds_path.c_str());
if (!_has_meta_synced) {
const std::map<std::string, std::string> &meta = obj->objectMetadata().metadata();
auto iter = meta.find(fds_service::FILE_MD5_KEY);
if (iter != meta.end()) {
_md5sum = iter->second;
iter = meta.find(fds_service::FILE_LENGTH_KEY);
dassert(iter != meta.end(),
"%s: can't get %s in getObject %s",
_name.c_str(),
fds_service::FILE_LENGTH_KEY.c_str(),
_fds_path.c_str());
_size = atoll(iter->second.c_str());
_has_meta_synced = true;
}
}
std::istream &is = obj->objectContent();
transfered_bytes += utils::copy_stream(is, os, PIECE_SIZE);
err = dsn::ERR_OK;
err = ERR_OK;
} catch (const galaxy::fds::GalaxyFDSClientException &ex) {
derror("fds getObject error: remote_file(%s), code(%d), msg(%s)",
file_name().c_str(),
ex.code(),
ex.what());
if (!_has_meta_synced && ex.code() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND) {
if (ex.code() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND) {
_has_meta_synced = true;
_md5sum = "";
_size = 0;
err = dsn::ERR_OBJECT_NOT_FOUND;
err = ERR_OBJECT_NOT_FOUND;
} else {
err = dsn::ERR_FS_INTERNAL;
err = ERR_FS_INTERNAL;
}
}
FDS_EXCEPTION_HANDLE(err, "getObject", file_name().c_str())

if (err != dsn::ERR_OK) {
if (err != ERR_OK) {
return err;
}
}

return err;
}

dsn::error_code fds_file_object::put_content(/*in-out*/ std::istream &is,
uint64_t &transfered_bytes)
error_code fds_file_object::put_content(/*in-out*/ std::istream &is,
int64_t to_transfer_bytes,
uint64_t &transfered_bytes)
{
dsn::error_code err = dsn::ERR_OK;
error_code err = ERR_OK;
transfered_bytes = 0;
galaxy::fds::GalaxyFDSClient *c = _service->get_client();

// get tokens from token bucket
if (!_service->_write_token_bucket->consumeWithBorrowAndWait(to_transfer_bytes)) {
ddebug_f("the transfer count({}) is greater than burst size({}), so it is rejected by "
"token bucket",
to_transfer_bytes,
_service->_write_token_bucket->burst());
return ERR_BUSY;
}

try {
c->putObject(_service->get_bucket_name(), _fds_path, is, galaxy::fds::FDSObjectMetadata());
} catch (const galaxy::fds::GalaxyFDSClientException &ex) {
Expand All @@ -571,7 +676,7 @@ dsn::error_code fds_file_object::put_content(/*in-out*/ std::istream &is,
}
FDS_EXCEPTION_HANDLE(err, "putObject", file_name().c_str())

if (err != dsn::ERR_OK) {
if (err != ERR_OK) {
return err;
}

Expand Down Expand Up @@ -621,7 +726,7 @@ dsn::task_ptr fds_file_object::write(const write_request &req,
write_response resp;
std::istringstream is;
is.str(std::string(req.buffer.data(), req.buffer.length()));
resp.err = put_content(is, resp.written_size);
resp.err = put_content(is, req.buffer.length(), resp.written_size);

t->enqueue_with(resp);
release_ref();
Expand All @@ -643,6 +748,10 @@ dsn::task_ptr fds_file_object::upload(const upload_request &req,
add_ref();
auto upload_background = [this, req, t]() {
const std::string &local_file = req.input_local_name;
// get file size
int64_t file_sz = 0;
dsn::utils::filesystem::file_size(local_file, file_sz);

upload_response resp;
// TODO: we can cache the whole file in buffer, then upload the buffer rather than the
// ifstream, because if ifstream read file beyond 60s, fds-server will reset the session,
Expand All @@ -658,7 +767,7 @@ dsn::task_ptr fds_file_object::upload(const upload_request &req,
ptr);
resp.err = dsn::ERR_FILE_OPERATION_FAILED;
} else {
resp.err = put_content(is, resp.uploaded_size);
resp.err = put_content(is, file_sz, resp.uploaded_size);
is.close();
}

Expand Down Expand Up @@ -691,7 +800,7 @@ dsn::task_ptr fds_file_object::read(const read_request &req,
read_response resp;
std::ostringstream os;
uint64_t transferd_size;
resp.err = get_content(req.remote_pos, req.remote_length, os, transferd_size);
resp.err = get_content_in_batches(req.remote_pos, req.remote_length, os, transferd_size);
if (os.tellp() > 0) {
std::string *output = new std::string();
*output = os.str();
Expand Down Expand Up @@ -743,7 +852,8 @@ dsn::task_ptr fds_file_object::download(const download_request &req,
auto download_background = [this, req, handle, t]() {
download_response resp;
uint64_t transfered_size;
resp.err = get_content(req.remote_pos, req.remote_length, *handle, transfered_size);
resp.err =
get_content_in_batches(req.remote_pos, req.remote_length, *handle, transfered_size);
resp.downloaded_size = 0;
if (handle->tellp() != -1)
resp.downloaded_size = handle->tellp();
Expand All @@ -755,6 +865,6 @@ dsn::task_ptr fds_file_object::download(const download_request &req,
dsn::tasking::enqueue(LPC_FDS_CALL, nullptr, download_background);
return t;
}
}
}
}
} // namespace block_service
} // namespace dist
} // namespace dsn
29 changes: 24 additions & 5 deletions src/dist/block_service/fds/fds_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@

#include <dsn/dist/block_service.h>

namespace folly {
template <typename Clock>
class BasicTokenBucket;

using TokenBucket = BasicTokenBucket<std::chrono::steady_clock>;
}

namespace galaxy {
namespace fds {
class GalaxyFDSClient;
Expand Down Expand Up @@ -64,6 +71,10 @@ class fds_service : public block_filesystem
private:
std::shared_ptr<galaxy::fds::GalaxyFDSClient> _client;
std::string _bucket_name;
std::unique_ptr<folly::TokenBucket> _read_token_bucket;
std::unique_ptr<folly::TokenBucket> _write_token_bucket;

friend class fds_file_object;
};

class fds_file_object : public block_file
Expand Down Expand Up @@ -101,11 +112,19 @@ class fds_file_object : public block_file
dsn::task_tracker *tracker) override;

private:
dsn::error_code get_content(uint64_t pos,
int64_t length,
/*out*/ std::ostream &os,
/*out*/ uint64_t &transfered_bytes);
dsn::error_code put_content(/*in-out*/ std::istream &is, /*out*/ uint64_t &transfered_bytes);
error_code get_content_in_batches(uint64_t start,
int64_t length,
/*out*/ std::ostream &os,
/*out*/ uint64_t &transfered_bytes);
error_code get_content(uint64_t pos,
uint64_t length,
/*out*/ std::ostream &os,
/*out*/ uint64_t &transfered_bytes);
error_code put_content(/*in-out*/ std::istream &is,
/*int*/ int64_t to_transfer_bytes,
/*out*/ uint64_t &transfered_bytes);
error_code get_file_meta();

fds_service *_service;
std::string _fds_path;
std::string _md5sum;
Expand Down

0 comments on commit 54b4dda

Please sign in to comment.