diff --git a/be/src/http/action/download_action.cpp b/be/src/http/action/download_action.cpp index f271b4f1916708..284314f421d207 100644 --- a/be/src/http/action/download_action.cpp +++ b/be/src/http/action/download_action.cpp @@ -17,9 +17,7 @@ #include "http/action/download_action.h" -#include #include -#include #include #include @@ -34,10 +32,11 @@ namespace doris { namespace { -static const std::string FILE_PARAMETER = "file"; -static const std::string TOKEN_PARAMETER = "token"; -static const std::string CHANNEL_PARAMETER = "channel"; -static const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog"; +const std::string FILE_PARAMETER = "file"; +const std::string TOKEN_PARAMETER = "token"; +const std::string CHANNEL_PARAMETER = "channel"; +const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog"; +const std::string ACQUIRE_MD5_PARAMETER = "acquire_md5"; } // namespace DownloadAction::DownloadAction(ExecEnv* exec_env, @@ -47,7 +46,7 @@ DownloadAction::DownloadAction(ExecEnv* exec_env, _download_type(NORMAL), _num_workers(num_workers), _rate_limit_group(std::move(rate_limit_group)) { - for (auto& dir : allow_dirs) { + for (const auto& dir : allow_dirs) { std::string p; Status st = io::global_local_filesystem()->canonicalize(dir, &p); if (!st.ok()) { @@ -116,11 +115,9 @@ void DownloadAction::handle_normal(HttpRequest* req, const std::string& file_par } else { const auto& channel = req->param(CHANNEL_PARAMETER); bool ingest_binlog = (channel == CHANNEL_INGEST_BINLOG_TYPE); - if (ingest_binlog) { - do_file_response(file_param, req, _rate_limit_group.get()); - } else { - do_file_response(file_param, req); - } + bool is_acquire_md5 = !req->param(ACQUIRE_MD5_PARAMETER).empty(); + auto* rate_limit_group = ingest_binlog ? _rate_limit_group.get() : nullptr; + do_file_response(file_param, req, rate_limit_group, is_acquire_md5); } } diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index cab8edb1927205..a5061c4decfb03 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -417,8 +417,8 @@ Status SnapshotLoader::remote_http_download( // Step before, validate all remote // Step 1: Validate local tablet snapshot paths - for (auto& remote_tablet_snapshot : remote_tablet_snapshots) { - auto& path = remote_tablet_snapshot.local_snapshot_path; + for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { + const auto& path = remote_tablet_snapshot.local_snapshot_path; bool res = true; RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res)); if (!res) { @@ -433,10 +433,10 @@ Status SnapshotLoader::remote_http_download( // Step 2: get all local files struct LocalFileStat { uint64_t size; - // TODO(Drogon): add md5sum + std::string md5; }; std::unordered_map> local_files_map; - for (auto& remote_tablet_snapshot : remote_tablet_snapshots) { + for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { const auto& local_path = remote_tablet_snapshot.local_snapshot_path; std::vector local_files; RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files)); @@ -452,7 +452,14 @@ Status SnapshotLoader::remote_http_download( return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path, ec.message()); } - local_filestat[local_file] = {local_file_size}; + std::string md5; + auto status = io::global_local_filesystem()->md5sum(local_file_path, &md5); + if (!status.ok()) { + LOG(WARNING) << "download file error, local file " << local_file_path + << " md5sum: " << status.to_string(); + return status; + } + local_filestat[local_file] = {local_file_size, md5}; } } @@ -465,22 +472,22 @@ Status SnapshotLoader::remote_http_download( int total_num = remote_tablet_snapshots.size(); int finished_num = 0; struct RemoteFileStat { - // TODO(Drogon): Add md5sum std::string url; + std::string md5; uint64_t size; }; std::unordered_map> remote_files_map; - for (auto& remote_tablet_snapshot : remote_tablet_snapshots) { + for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path; auto& remote_files = remote_files_map[remote_path]; const auto& token = remote_tablet_snapshot.remote_token; const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr; // HEAD http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/ - std::string remote_url_prefix = - fmt::format("http://{}:{}/api/_tablet/_download?token={}&file={}", - remote_be_addr.hostname, remote_be_addr.port, token, remote_path); + std::string base_url = fmt::format("http://{}:{}/api/_tablet/_download?token={}", + remote_be_addr.hostname, remote_be_addr.port, token); + std::string remote_url_prefix = fmt::format("{}&file={}", base_url, remote_path); string file_list_str; auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* client) { @@ -493,30 +500,31 @@ Status SnapshotLoader::remote_http_download( strings::Split(file_list_str, "\n", strings::SkipWhitespace()); for (const auto& filename : filename_list) { - std::string remote_file_url = fmt::format( - "http://{}:{}/api/_tablet/_download?token={}&file={}/{}&channel=ingest_binlog", - remote_tablet_snapshot.remote_be_addr.hostname, - remote_tablet_snapshot.remote_be_addr.port, remote_tablet_snapshot.remote_token, - remote_tablet_snapshot.remote_snapshot_path, filename); + std::string remote_file_url = + fmt::format("{}&file={}/{}&channel=ingest_binlog", base_url, + remote_tablet_snapshot.remote_snapshot_path, filename); // get file length uint64_t file_size = 0; - auto get_file_size_cb = [&remote_file_url, &file_size](HttpClient* client) { - RETURN_IF_ERROR(client->init(remote_file_url)); + std::string file_md5; + auto get_file_stat_cb = [&remote_file_url, &file_size, &file_md5](HttpClient* client) { + std::string url = fmt::format("{}&acquire_md5=true", remote_file_url); + RETURN_IF_ERROR(client->init(url)); client->set_timeout_ms(kGetLengthTimeout * 1000); RETURN_IF_ERROR(client->head()); RETURN_IF_ERROR(client->get_content_length(&file_size)); + RETURN_IF_ERROR(client->get_content_md5(&file_md5)); return Status::OK(); }; RETURN_IF_ERROR( - HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_size_cb)); + HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_stat_cb)); - remote_files[filename] = RemoteFileStat {remote_file_url, file_size}; + remote_files[filename] = RemoteFileStat {remote_file_url, file_md5, file_size}; } } // Step 4: Compare local and remote files && get all need download files - for (auto& remote_tablet_snapshot : remote_tablet_snapshots) { + for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, TTaskType::type::DOWNLOAD)); @@ -529,8 +537,8 @@ Status SnapshotLoader::remote_http_download( // get all need download files std::vector need_download_files; for (const auto& [remote_file, remote_filestat] : remote_files) { - LOG(INFO) << fmt::format("remote file: {}, size: {}", remote_file, - remote_filestat.size); + LOG(INFO) << "remote file: " << remote_file << ", size: " << remote_filestat.size + << ", md5: " << remote_filestat.md5; auto it = local_files.find(remote_file); if (it == local_files.end()) { need_download_files.emplace_back(remote_file); @@ -545,7 +553,11 @@ Status SnapshotLoader::remote_http_download( need_download_files.emplace_back(remote_file); continue; } - // TODO(Drogon): check by md5sum, if not match then download + + if (auto& local_filestat = it->second; local_filestat.md5 != remote_filestat.md5) { + need_download_files.emplace_back(remote_file); + continue; + } LOG(INFO) << fmt::format("file {} already exists, skip download", remote_file); } @@ -569,6 +581,7 @@ Status SnapshotLoader::remote_http_download( auto& remote_filestat = remote_files[filename]; auto file_size = remote_filestat.size; auto& remote_file_url = remote_filestat.url; + auto& remote_file_md5 = remote_filestat.md5; // check disk capacity if (data_dir->reach_capacity_limit(file_size)) { @@ -591,8 +604,8 @@ Status SnapshotLoader::remote_http_download( << " to: " << local_file_path << ". size(B): " << file_size << ", timeout(s): " << estimate_timeout; - auto download_cb = [&remote_file_url, estimate_timeout, &local_file_path, - file_size](HttpClient* client) { + auto download_cb = [&remote_file_url, &remote_file_md5, estimate_timeout, + &local_file_path, file_size](HttpClient* client) { RETURN_IF_ERROR(client->init(remote_file_url)); client->set_timeout_ms(estimate_timeout * 1000); RETURN_IF_ERROR(client->download(local_file_path)); @@ -612,13 +625,35 @@ Status SnapshotLoader::remote_http_download( << ", local_file_size=" << local_file_size; return Status::InternalError("downloaded file size is not equal"); } + + if (!remote_file_md5.empty()) { // keep compatibility + std::string local_file_md5; + RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_file_path, + &local_file_md5)); + if (local_file_md5 != remote_file_md5) { + LOG(WARNING) << "download file md5 error" + << ", remote_file_url=" << remote_file_url + << ", local_file_path=" << local_file_path + << ", remote_file_md5=" << remote_file_md5 + << ", local_file_md5=" << local_file_md5; + return Status::RuntimeError( + "download file {} md5 is not equal, local={}, remote={}", + remote_file_url, local_file_md5, remote_file_md5); + } + } + return io::global_local_filesystem()->permission( local_file_path, io::LocalFileSystem::PERMS_OWNER_RW); }; - RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb)); + auto status = HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb); + if (!status.ok()) { + LOG(WARNING) << "failed to download file from " << remote_file_url + << ", status: " << status.to_string(); + return status; + } // local_files always keep the updated local files - local_files[filename] = LocalFileStat {file_size}; + local_files[filename] = LocalFileStat {file_size, remote_file_md5}; } uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;