From bb912f913d174f7d1801fa2b457cd8863a98c2c5 Mon Sep 17 00:00:00 2001 From: walter Date: Fri, 21 Jun 2024 13:51:44 +0800 Subject: [PATCH] [fix](be) Check MD5 of the downloaded files before ingesting binlog Cherry-pick #36621 --- be/src/http/action/download_binlog_action.cpp | 9 ++- be/src/service/backend_service.cpp | 56 +++++++++++++++++-- 2 files changed, 59 insertions(+), 6 deletions(-) diff --git a/be/src/http/action/download_binlog_action.cpp b/be/src/http/action/download_binlog_action.cpp index dbe2880d3b4c02..61d65ca9756bbb 100644 --- a/be/src/http/action/download_binlog_action.cpp +++ b/be/src/http/action/download_binlog_action.cpp @@ -48,6 +48,7 @@ const std::string kBinlogVersionParameter = "binlog_version"; const std::string kRowsetIdParameter = "rowset_id"; const std::string kSegmentIndexParameter = "segment_index"; const std::string kSegmentIndexIdParameter = "segment_index_id"; +const std::string kAcquireMD5Parameter = "acquire_md5"; // get http param, if no value throw exception const auto& get_http_param(HttpRequest* req, const std::string& param_name) { @@ -102,12 +103,14 @@ void handle_get_binlog_info(HttpRequest* req) { void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group* rate_limit_group) { // Step 1: get download file path std::string segment_file_path; + bool is_acquire_md5 = false; try { const auto& tablet_id = get_http_param(req, kTabletIdParameter); auto tablet = get_tablet(tablet_id); const auto& rowset_id = get_http_param(req, kRowsetIdParameter); const auto& segment_index = get_http_param(req, kSegmentIndexParameter); segment_file_path = tablet->get_segment_filepath(rowset_id, segment_index); + is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty(); } catch (const std::exception& e) { HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what()); LOG(WARNING) << "get download file path failed, error: " << e.what(); @@ -128,7 +131,7 @@ void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group* rat LOG(WARNING) << "file not exist, file path: " << segment_file_path; return; } - do_file_response(segment_file_path, req, rate_limit_group); + do_file_response(segment_file_path, req, rate_limit_group, is_acquire_md5); } /// handle get segment index file, need tablet_id, rowset_id, segment_index && segment_index_id @@ -136,6 +139,7 @@ void handle_get_segment_index_file(HttpRequest* req, bufferevent_rate_limit_group* rate_limit_group) { // Step 1: get download file path std::string segment_index_file_path; + bool is_acquire_md5 = false; try { const auto& tablet_id = get_http_param(req, kTabletIdParameter); auto tablet = get_tablet(tablet_id); @@ -144,6 +148,7 @@ void handle_get_segment_index_file(HttpRequest* req, const auto& segment_index_id = req->param(kSegmentIndexIdParameter); segment_index_file_path = tablet->get_segment_index_filepath(rowset_id, segment_index, segment_index_id); + is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty(); } catch (const std::exception& e) { HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what()); LOG(WARNING) << "get download file path failed, error: " << e.what(); @@ -164,7 +169,7 @@ void handle_get_segment_index_file(HttpRequest* req, LOG(WARNING) << "file not exist, file path: " << segment_index_file_path; return; } - do_file_response(segment_index_file_path, req, rate_limit_group); + do_file_response(segment_index_file_path, req, rate_limit_group, is_acquire_md5); } void handle_get_rowset_meta(HttpRequest* req) { diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index c4ccaa7281b882..324c21d91aeb00 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -267,7 +267,8 @@ void _ingest_binlog(IngestBinlogArg* arg) { // Step 5.3: get all segment files for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { auto segment_file_size = segment_file_sizes[segment_index]; - auto get_segment_file_url = segment_file_urls[segment_index]; + auto get_segment_file_url = + fmt::format("{}&acquire_md5=true", segment_file_urls[segment_index]); uint64_t estimate_timeout = segment_file_size / config::download_low_speed_limit_kbps / 1024; @@ -286,6 +287,12 @@ void _ingest_binlog(IngestBinlogArg* arg) { RETURN_IF_ERROR(client->download(local_segment_path)); download_success_files.push_back(local_segment_path); + std::string remote_file_md5; + RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5)); + LOG(INFO) << "download segment file to " << local_segment_path + << ", remote md5: " << remote_file_md5 + << ", remote size: " << segment_file_size; + std::error_code ec; // Check file length uint64_t local_file_size = std::filesystem::file_size(local_segment_path, ec); @@ -294,13 +301,32 @@ void _ingest_binlog(IngestBinlogArg* arg) { return Status::IOError("can't retrive file_size of {}, due to {}", local_segment_path, ec.message()); } + if (local_file_size != segment_file_size) { LOG(WARNING) << "download file length error" << ", get_segment_file_url=" << get_segment_file_url << ", file_size=" << segment_file_size << ", local_file_size=" << local_file_size; - return Status::InternalError("downloaded file size is not equal"); + return Status::RuntimeError( + "downloaded file size is not equal, local={}, remote={}", local_file_size, + segment_file_size); + } + + if (!remote_file_md5.empty()) { // keep compatibility + std::string local_file_md5; + RETURN_IF_ERROR( + io::global_local_filesystem()->md5sum(local_segment_path, &local_file_md5)); + if (local_file_md5 != remote_file_md5) { + LOG(WARNING) << "download file md5 error" + << ", get_segment_file_url=" << get_segment_file_url + << ", remote_file_md5=" << remote_file_md5 + << ", local_file_md5=" << local_file_md5; + return Status::RuntimeError( + "download file md5 is not equal, local={}, remote={}", local_file_md5, + remote_file_md5); + } } + return io::global_local_filesystem()->permission(local_segment_path, io::LocalFileSystem::PERMS_OWNER_RW); }; @@ -415,7 +441,8 @@ void _ingest_binlog(IngestBinlogArg* arg) { DCHECK(segment_index_file_names.size() == segment_index_file_urls.size()); for (int64_t i = 0; i < segment_index_file_urls.size(); ++i) { auto segment_index_file_size = segment_index_file_sizes[i]; - auto get_segment_index_file_url = segment_index_file_urls[i]; + auto get_segment_index_file_url = + fmt::format("{}&acquire_md5=true", segment_index_file_urls[i]); uint64_t estimate_timeout = segment_index_file_size / config::download_low_speed_limit_kbps / 1024; @@ -434,6 +461,9 @@ void _ingest_binlog(IngestBinlogArg* arg) { RETURN_IF_ERROR(client->download(local_segment_index_path)); download_success_files.push_back(local_segment_index_path); + std::string remote_file_md5; + RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5)); + std::error_code ec; // Check file length uint64_t local_index_file_size = @@ -448,8 +478,26 @@ void _ingest_binlog(IngestBinlogArg* arg) { << ", get_segment_index_file_url=" << get_segment_index_file_url << ", index_file_size=" << segment_index_file_size << ", local_index_file_size=" << local_index_file_size; - return Status::InternalError("downloaded index file size is not equal"); + return Status::RuntimeError( + "downloaded index file size is not equal, local={}, remote={}", + local_index_file_size, segment_index_file_size); } + + if (!remote_file_md5.empty()) { // keep compatibility + std::string local_file_md5; + RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_segment_index_path, + &local_file_md5)); + if (local_file_md5 != remote_file_md5) { + LOG(WARNING) << "download file md5 error" + << ", get_segment_index_file_url=" << get_segment_index_file_url + << ", remote_file_md5=" << remote_file_md5 + << ", local_file_md5=" << local_file_md5; + return Status::RuntimeError( + "download file md5 is not equal, local={}, remote={}", local_file_md5, + remote_file_md5); + } + } + return io::global_local_filesystem()->permission(local_segment_index_path, io::LocalFileSystem::PERMS_OWNER_RW); };