Skip to content

Commit

Permalink
[fix](be) Check MD5 when downloading snapshot in http
Browse files Browse the repository at this point in the history
Cherry-pick apache#36726
  • Loading branch information
w41ter committed Jul 8, 2024
1 parent bb912f9 commit 0757402
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 39 deletions.
21 changes: 9 additions & 12 deletions be/src/http/action/download_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

#include "http/action/download_action.h"

#include <algorithm>
#include <memory>
#include <sstream>
#include <string>
#include <utility>

Expand All @@ -34,10 +32,11 @@

namespace doris {
namespace {
static const std::string FILE_PARAMETER = "file";
static const std::string TOKEN_PARAMETER = "token";
static const std::string CHANNEL_PARAMETER = "channel";
static const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
const std::string FILE_PARAMETER = "file";
const std::string TOKEN_PARAMETER = "token";
const std::string CHANNEL_PARAMETER = "channel";
const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
const std::string ACQUIRE_MD5_PARAMETER = "acquire_md5";
} // namespace

DownloadAction::DownloadAction(ExecEnv* exec_env,
Expand All @@ -47,7 +46,7 @@ DownloadAction::DownloadAction(ExecEnv* exec_env,
_download_type(NORMAL),
_num_workers(num_workers),
_rate_limit_group(std::move(rate_limit_group)) {
for (auto& dir : allow_dirs) {
for (const auto& dir : allow_dirs) {
std::string p;
Status st = io::global_local_filesystem()->canonicalize(dir, &p);
if (!st.ok()) {
Expand Down Expand Up @@ -116,11 +115,9 @@ void DownloadAction::handle_normal(HttpRequest* req, const std::string& file_par
} else {
const auto& channel = req->param(CHANNEL_PARAMETER);
bool ingest_binlog = (channel == CHANNEL_INGEST_BINLOG_TYPE);
if (ingest_binlog) {
do_file_response(file_param, req, _rate_limit_group.get());
} else {
do_file_response(file_param, req);
}
bool is_acquire_md5 = !req->param(ACQUIRE_MD5_PARAMETER).empty();
auto* rate_limit_group = ingest_binlog ? _rate_limit_group.get() : nullptr;
do_file_response(file_param, req, rate_limit_group, is_acquire_md5);
}
}

Expand Down
89 changes: 62 additions & 27 deletions be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,8 @@ Status SnapshotLoader::remote_http_download(
// Step before, validate all remote

// Step 1: Validate local tablet snapshot paths
for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
auto& path = remote_tablet_snapshot.local_snapshot_path;
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
const auto& path = remote_tablet_snapshot.local_snapshot_path;
bool res = true;
RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res));
if (!res) {
Expand All @@ -433,10 +433,10 @@ Status SnapshotLoader::remote_http_download(
// Step 2: get all local files
struct LocalFileStat {
uint64_t size;
// TODO(Drogon): add md5sum
std::string md5;
};
std::unordered_map<std::string, std::unordered_map<std::string, LocalFileStat>> local_files_map;
for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
std::vector<std::string> local_files;
RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files));
Expand All @@ -452,7 +452,14 @@ Status SnapshotLoader::remote_http_download(
return Status::IOError("can't retrive file_size of {}, due to {}", local_file_path,
ec.message());
}
local_filestat[local_file] = {local_file_size};
std::string md5;
auto status = io::global_local_filesystem()->md5sum(local_file_path, &md5);
if (!status.ok()) {
LOG(WARNING) << "download file error, local file " << local_file_path
<< " md5sum: " << status.to_string();
return status;
}
local_filestat[local_file] = {local_file_size, md5};
}
}

Expand All @@ -465,22 +472,22 @@ Status SnapshotLoader::remote_http_download(
int total_num = remote_tablet_snapshots.size();
int finished_num = 0;
struct RemoteFileStat {
// TODO(Drogon): Add md5sum
std::string url;
std::string md5;
uint64_t size;
};
std::unordered_map<std::string, std::unordered_map<std::string, RemoteFileStat>>
remote_files_map;
for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
auto& remote_files = remote_files_map[remote_path];
const auto& token = remote_tablet_snapshot.remote_token;
const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;

// HEAD http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/
std::string remote_url_prefix =
fmt::format("http://{}:{}/api/_tablet/_download?token={}&file={}",
remote_be_addr.hostname, remote_be_addr.port, token, remote_path);
std::string base_url = fmt::format("http://{}:{}/api/_tablet/_download?token={}",
remote_be_addr.hostname, remote_be_addr.port, token);
std::string remote_url_prefix = fmt::format("{}&file={}", base_url, remote_path);

string file_list_str;
auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* client) {
Expand All @@ -493,30 +500,31 @@ Status SnapshotLoader::remote_http_download(
strings::Split(file_list_str, "\n", strings::SkipWhitespace());

for (const auto& filename : filename_list) {
std::string remote_file_url = fmt::format(
"http://{}:{}/api/_tablet/_download?token={}&file={}/{}&channel=ingest_binlog",
remote_tablet_snapshot.remote_be_addr.hostname,
remote_tablet_snapshot.remote_be_addr.port, remote_tablet_snapshot.remote_token,
remote_tablet_snapshot.remote_snapshot_path, filename);
std::string remote_file_url =
fmt::format("{}&file={}/{}&channel=ingest_binlog", base_url,
remote_tablet_snapshot.remote_snapshot_path, filename);

// get file length
uint64_t file_size = 0;
auto get_file_size_cb = [&remote_file_url, &file_size](HttpClient* client) {
RETURN_IF_ERROR(client->init(remote_file_url));
std::string file_md5;
auto get_file_stat_cb = [&remote_file_url, &file_size, &file_md5](HttpClient* client) {
std::string url = fmt::format("{}&acquire_md5=true", remote_file_url);
RETURN_IF_ERROR(client->init(url));
client->set_timeout_ms(kGetLengthTimeout * 1000);
RETURN_IF_ERROR(client->head());
RETURN_IF_ERROR(client->get_content_length(&file_size));
RETURN_IF_ERROR(client->get_content_md5(&file_md5));
return Status::OK();
};
RETURN_IF_ERROR(
HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_size_cb));
HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, get_file_stat_cb));

remote_files[filename] = RemoteFileStat {remote_file_url, file_size};
remote_files[filename] = RemoteFileStat {remote_file_url, file_md5, file_size};
}
}

// Step 4: Compare local and remote files && get all need download files
for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num,
TTaskType::type::DOWNLOAD));

Expand All @@ -529,8 +537,8 @@ Status SnapshotLoader::remote_http_download(
// get all need download files
std::vector<std::string> need_download_files;
for (const auto& [remote_file, remote_filestat] : remote_files) {
LOG(INFO) << fmt::format("remote file: {}, size: {}", remote_file,
remote_filestat.size);
LOG(INFO) << "remote file: " << remote_file << ", size: " << remote_filestat.size
<< ", md5: " << remote_filestat.md5;
auto it = local_files.find(remote_file);
if (it == local_files.end()) {
need_download_files.emplace_back(remote_file);
Expand All @@ -545,7 +553,11 @@ Status SnapshotLoader::remote_http_download(
need_download_files.emplace_back(remote_file);
continue;
}
// TODO(Drogon): check by md5sum, if not match then download

if (auto& local_filestat = it->second; local_filestat.md5 != remote_filestat.md5) {
need_download_files.emplace_back(remote_file);
continue;
}

LOG(INFO) << fmt::format("file {} already exists, skip download", remote_file);
}
Expand All @@ -569,6 +581,7 @@ Status SnapshotLoader::remote_http_download(
auto& remote_filestat = remote_files[filename];
auto file_size = remote_filestat.size;
auto& remote_file_url = remote_filestat.url;
auto& remote_file_md5 = remote_filestat.md5;

// check disk capacity
if (data_dir->reach_capacity_limit(file_size)) {
Expand All @@ -591,8 +604,8 @@ Status SnapshotLoader::remote_http_download(
<< " to: " << local_file_path << ". size(B): " << file_size
<< ", timeout(s): " << estimate_timeout;

auto download_cb = [&remote_file_url, estimate_timeout, &local_file_path,
file_size](HttpClient* client) {
auto download_cb = [&remote_file_url, &remote_file_md5, estimate_timeout,
&local_file_path, file_size](HttpClient* client) {
RETURN_IF_ERROR(client->init(remote_file_url));
client->set_timeout_ms(estimate_timeout * 1000);
RETURN_IF_ERROR(client->download(local_file_path));
Expand All @@ -612,13 +625,35 @@ Status SnapshotLoader::remote_http_download(
<< ", local_file_size=" << local_file_size;
return Status::InternalError("downloaded file size is not equal");
}

if (!remote_file_md5.empty()) { // keep compatibility
std::string local_file_md5;
RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_file_path,
&local_file_md5));
if (local_file_md5 != remote_file_md5) {
LOG(WARNING) << "download file md5 error"
<< ", remote_file_url=" << remote_file_url
<< ", local_file_path=" << local_file_path
<< ", remote_file_md5=" << remote_file_md5
<< ", local_file_md5=" << local_file_md5;
return Status::RuntimeError(
"download file {} md5 is not equal, local={}, remote={}",
remote_file_url, local_file_md5, remote_file_md5);
}
}

return io::global_local_filesystem()->permission(
local_file_path, io::LocalFileSystem::PERMS_OWNER_RW);
};
RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb));
auto status = HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb);
if (!status.ok()) {
LOG(WARNING) << "failed to download file from " << remote_file_url
<< ", status: " << status.to_string();
return status;
}

// local_files always keep the updated local files
local_files[filename] = LocalFileStat {file_size};
local_files[filename] = LocalFileStat {file_size, remote_file_md5};
}

uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
Expand Down

0 comments on commit 0757402

Please sign in to comment.