Skip to content

Commit

Permalink
[fix](be) Check MD5 of the downloaded files before ingesting binlog
Browse files Browse the repository at this point in the history
Cherry-pick apache#36621
  • Loading branch information
w41ter committed Jul 8, 2024
1 parent caa1869 commit bb912f9
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 6 deletions.
9 changes: 7 additions & 2 deletions be/src/http/action/download_binlog_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const std::string kBinlogVersionParameter = "binlog_version";
const std::string kRowsetIdParameter = "rowset_id";
const std::string kSegmentIndexParameter = "segment_index";
const std::string kSegmentIndexIdParameter = "segment_index_id";
const std::string kAcquireMD5Parameter = "acquire_md5";

// get http param, if no value throw exception
const auto& get_http_param(HttpRequest* req, const std::string& param_name) {
Expand Down Expand Up @@ -102,12 +103,14 @@ void handle_get_binlog_info(HttpRequest* req) {
void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group* rate_limit_group) {
// Step 1: get download file path
std::string segment_file_path;
bool is_acquire_md5 = false;
try {
const auto& tablet_id = get_http_param(req, kTabletIdParameter);
auto tablet = get_tablet(tablet_id);
const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
const auto& segment_index = get_http_param(req, kSegmentIndexParameter);
segment_file_path = tablet->get_segment_filepath(rowset_id, segment_index);
is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
} catch (const std::exception& e) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
LOG(WARNING) << "get download file path failed, error: " << e.what();
Expand All @@ -128,14 +131,15 @@ void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group* rat
LOG(WARNING) << "file not exist, file path: " << segment_file_path;
return;
}
do_file_response(segment_file_path, req, rate_limit_group);
do_file_response(segment_file_path, req, rate_limit_group, is_acquire_md5);
}

/// handle get segment index file, need tablet_id, rowset_id, segment_index && segment_index_id
void handle_get_segment_index_file(HttpRequest* req,
bufferevent_rate_limit_group* rate_limit_group) {
// Step 1: get download file path
std::string segment_index_file_path;
bool is_acquire_md5 = false;
try {
const auto& tablet_id = get_http_param(req, kTabletIdParameter);
auto tablet = get_tablet(tablet_id);
Expand All @@ -144,6 +148,7 @@ void handle_get_segment_index_file(HttpRequest* req,
const auto& segment_index_id = req->param(kSegmentIndexIdParameter);
segment_index_file_path =
tablet->get_segment_index_filepath(rowset_id, segment_index, segment_index_id);
is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
} catch (const std::exception& e) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
LOG(WARNING) << "get download file path failed, error: " << e.what();
Expand All @@ -164,7 +169,7 @@ void handle_get_segment_index_file(HttpRequest* req,
LOG(WARNING) << "file not exist, file path: " << segment_index_file_path;
return;
}
do_file_response(segment_index_file_path, req, rate_limit_group);
do_file_response(segment_index_file_path, req, rate_limit_group, is_acquire_md5);
}

void handle_get_rowset_meta(HttpRequest* req) {
Expand Down
56 changes: 52 additions & 4 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ void _ingest_binlog(IngestBinlogArg* arg) {
// Step 5.3: get all segment files
for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
auto segment_file_size = segment_file_sizes[segment_index];
auto get_segment_file_url = segment_file_urls[segment_index];
auto get_segment_file_url =
fmt::format("{}&acquire_md5=true", segment_file_urls[segment_index]);

uint64_t estimate_timeout =
segment_file_size / config::download_low_speed_limit_kbps / 1024;
Expand All @@ -286,6 +287,12 @@ void _ingest_binlog(IngestBinlogArg* arg) {
RETURN_IF_ERROR(client->download(local_segment_path));
download_success_files.push_back(local_segment_path);

std::string remote_file_md5;
RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
LOG(INFO) << "download segment file to " << local_segment_path
<< ", remote md5: " << remote_file_md5
<< ", remote size: " << segment_file_size;

std::error_code ec;
// Check file length
uint64_t local_file_size = std::filesystem::file_size(local_segment_path, ec);
Expand All @@ -294,13 +301,32 @@ void _ingest_binlog(IngestBinlogArg* arg) {
return Status::IOError("can't retrive file_size of {}, due to {}",
local_segment_path, ec.message());
}

if (local_file_size != segment_file_size) {
LOG(WARNING) << "download file length error"
<< ", get_segment_file_url=" << get_segment_file_url
<< ", file_size=" << segment_file_size
<< ", local_file_size=" << local_file_size;
return Status::InternalError("downloaded file size is not equal");
return Status::RuntimeError(
"downloaded file size is not equal, local={}, remote={}", local_file_size,
segment_file_size);
}

if (!remote_file_md5.empty()) { // keep compatibility
std::string local_file_md5;
RETURN_IF_ERROR(
io::global_local_filesystem()->md5sum(local_segment_path, &local_file_md5));
if (local_file_md5 != remote_file_md5) {
LOG(WARNING) << "download file md5 error"
<< ", get_segment_file_url=" << get_segment_file_url
<< ", remote_file_md5=" << remote_file_md5
<< ", local_file_md5=" << local_file_md5;
return Status::RuntimeError(
"download file md5 is not equal, local={}, remote={}", local_file_md5,
remote_file_md5);
}
}

return io::global_local_filesystem()->permission(local_segment_path,
io::LocalFileSystem::PERMS_OWNER_RW);
};
Expand Down Expand Up @@ -415,7 +441,8 @@ void _ingest_binlog(IngestBinlogArg* arg) {
DCHECK(segment_index_file_names.size() == segment_index_file_urls.size());
for (int64_t i = 0; i < segment_index_file_urls.size(); ++i) {
auto segment_index_file_size = segment_index_file_sizes[i];
auto get_segment_index_file_url = segment_index_file_urls[i];
auto get_segment_index_file_url =
fmt::format("{}&acquire_md5=true", segment_index_file_urls[i]);

uint64_t estimate_timeout =
segment_index_file_size / config::download_low_speed_limit_kbps / 1024;
Expand All @@ -434,6 +461,9 @@ void _ingest_binlog(IngestBinlogArg* arg) {
RETURN_IF_ERROR(client->download(local_segment_index_path));
download_success_files.push_back(local_segment_index_path);

std::string remote_file_md5;
RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));

std::error_code ec;
// Check file length
uint64_t local_index_file_size =
Expand All @@ -448,8 +478,26 @@ void _ingest_binlog(IngestBinlogArg* arg) {
<< ", get_segment_index_file_url=" << get_segment_index_file_url
<< ", index_file_size=" << segment_index_file_size
<< ", local_index_file_size=" << local_index_file_size;
return Status::InternalError("downloaded index file size is not equal");
return Status::RuntimeError(
"downloaded index file size is not equal, local={}, remote={}",
local_index_file_size, segment_index_file_size);
}

if (!remote_file_md5.empty()) { // keep compatibility
std::string local_file_md5;
RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_segment_index_path,
&local_file_md5));
if (local_file_md5 != remote_file_md5) {
LOG(WARNING) << "download file md5 error"
<< ", get_segment_index_file_url=" << get_segment_index_file_url
<< ", remote_file_md5=" << remote_file_md5
<< ", local_file_md5=" << local_file_md5;
return Status::RuntimeError(
"download file md5 is not equal, local={}, remote={}", local_file_md5,
remote_file_md5);
}
}

return io::global_local_filesystem()->permission(local_segment_index_path,
io::LocalFileSystem::PERMS_OWNER_RW);
};
Expand Down

0 comments on commit bb912f9

Please sign in to comment.