From efa6b369bc00c6387368d84b8dedfbc3c096e0bc Mon Sep 17 00:00:00 2001
From: heyuchen <heyuchen@xiaomi.com>
Date: Fri, 22 May 2020 12:27:26 +0800
Subject: [PATCH 1/6] add after_download functions

---
 .../replication/common/replication_common.cpp |   1 +
 .../replication/common/replication_common.h   |   2 +-
 .../lib/bulk_load/replica_bulk_loader.cpp     | 113 +++++++++++++--
 .../lib/bulk_load/replica_bulk_loader.h       |  10 +-
 .../test/replica_bulk_loader_test.cpp         | 131 ++++++++++++++++++
 5 files changed, 246 insertions(+), 11 deletions(-)

diff --git a/src/dist/replication/common/replication_common.cpp b/src/dist/replication/common/replication_common.cpp
index bc1da85f51..34f6e96a94 100644
--- a/src/dist/replication/common/replication_common.cpp
+++ b/src/dist/replication/common/replication_common.cpp
@@ -650,6 +650,7 @@ const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10;
 const int32_t bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL = 5;
 const std::string bulk_load_constant::BULK_LOAD_METADATA("bulk_load_metadata");
 const std::string bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR(".bulk_load");
+const int32_t bulk_load_constant::PROGRESS_FINISHED = 100;
 
 namespace cold_backup {
 std::string get_policy_path(const std::string &root, const std::string &policy_name)
diff --git a/src/dist/replication/common/replication_common.h b/src/dist/replication/common/replication_common.h
index 98d29606e7..bd8ef6b1e0 100644
--- a/src/dist/replication/common/replication_common.h
+++ b/src/dist/replication/common/replication_common.h
@@ -165,7 +165,7 @@ class bulk_load_constant
     static const int32_t BULK_LOAD_REQUEST_SHORT_INTERVAL;
     static const std::string BULK_LOAD_METADATA;
     static const std::string BULK_LOAD_LOCAL_ROOT_DIR;
-    // TODO(heyuchen): add more constant in further pr
+    static const int32_t PROGRESS_FINISHED;
 };
 
 namespace cold_backup {
diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
index d3065bed75..fcf46e1d11 100644
--- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
+++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
@@ -275,7 +275,7 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name,
     // parse metadata
     const std::string &local_metadata_file_name =
         utils::filesystem::path_combine(local_dir, bulk_load_constant::BULK_LOAD_METADATA);
-    err = parse_bulk_load_metadata(local_metadata_file_name, _metadata);
+    err = parse_bulk_load_metadata(local_metadata_file_name);
     if (err != ERR_OK) {
         derror_replica("parse bulk load metadata failed, error = {}", err.to_string());
         return err;
@@ -309,19 +309,74 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name,
 }
 
 // ThreadPool: THREAD_POOL_REPLICATION
-error_code replica_bulk_loader::parse_bulk_load_metadata(const std::string &fname,
-                                                         /*out*/ bulk_load_metadata &meta)
+error_code replica_bulk_loader::parse_bulk_load_metadata(const std::string &fname)
 {
-    // TODO(heyuchen): TBD
-    // read file and parse file content as bulk_load_metadata
+    if (!utils::filesystem::file_exists(fname)) {
+        derror_replica("file({}) doesn't exist", fname);
+        return ERR_FILE_OPERATION_FAILED;
+    }
+
+    int64_t file_sz = 0;
+    if (!utils::filesystem::file_size(fname, file_sz)) {
+        derror_replica("get file({}) size failed", fname);
+        return ERR_FILE_OPERATION_FAILED;
+    }
+
+    std::shared_ptr<char> buf = utils::make_shared_array<char>(file_sz + 1);
+    std::ifstream fin(fname, std::ifstream::in);
+    if (!fin.is_open()) {
+        derror_replica("open file({}) failed", fname);
+        return ERR_FILE_OPERATION_FAILED;
+    }
+    fin.read(buf.get(), file_sz);
+    dassert_replica(file_sz == fin.gcount(),
+                    "read file({}) failed, file_size = {} but read size = {}",
+                    fname,
+                    file_sz,
+                    fin.gcount());
+    fin.close();
+    buf.get()[fin.gcount()] = '\0';
+
+    blob bb;
+    bb.assign(std::move(buf), 0, file_sz);
+    if (!json::json_forwarder<bulk_load_metadata>::decode(bb, _metadata)) {
+        derror_replica("file({}) is damaged", fname);
+        return ERR_CORRUPTION;
+    }
+
+    if (_metadata.file_total_size <= 0) {
+        derror_replica("bulk_load_metadata has invalid file_total_size({})",
+                       _metadata.file_total_size);
+        return ERR_CORRUPTION;
+    }
+
     return ERR_OK;
 }
 
 // ThreadPool: THREAD_POOL_REPLICATION_LONG
 bool replica_bulk_loader::verify_sst_files(const file_meta &f_meta, const std::string &local_dir)
 {
-    // TODO(heyuchen): TBD
-    // compare sst file metadata calculated by file and parsed by metadata
+    std::string local_file = utils::filesystem::path_combine(local_dir, f_meta.name);
+    int64_t f_size = 0;
+    std::string md5;
+    if (!utils::filesystem::file_size(local_file, f_size)) {
+        derror_replica("verify file({}) failed, becaused failed to get file size", local_file);
+        return false;
+    }
+    if (utils::filesystem::md5sum(local_file, md5) != ERR_OK) {
+        derror_replica("verify file({}) failed, becaused failed to get file md5", local_file);
+        return false;
+    }
+    if (f_size != f_meta.size || md5 != f_meta.md5) {
+        derror_replica(
+            "verify file({}) failed, because file damaged, size: {} VS {}, md5: {} VS {}",
+            local_file,
+            f_size,
+            f_meta.size,
+            md5,
+            f_meta.md5);
+        return false;
+    }
     return true;
 }
 
@@ -329,8 +384,27 @@ bool replica_bulk_loader::verify_sst_files(const file_meta &f_meta, const std::s
 void replica_bulk_loader::update_bulk_load_download_progress(uint64_t file_size,
                                                              const std::string &file_name)
 {
-    // TODO(heyuchen): TBD
-    // update download progress after downloading sst files succeed
+    if (_metadata.file_total_size <= 0) {
+        derror_replica("bulk_load_metadata has invalid file_total_size({})",
+                       _metadata.file_total_size);
+        return;
+    }
+
+    ddebug_replica("update progress after downloading file({})", file_name);
+    _cur_downloaded_size.fetch_add(file_size);
+    auto total_size = static_cast<double>(_metadata.file_total_size);
+    auto cur_downloaded_size = static_cast<double>(_cur_downloaded_size.load());
+    auto cur_progress = static_cast<int32_t>((cur_downloaded_size / total_size) * 100);
+    _download_progress.store(cur_progress);
+    ddebug_replica("total_size = {}, cur_downloaded_size = {}, progress = {}",
+                   total_size,
+                   cur_downloaded_size,
+                   cur_progress);
+
+    tasking::enqueue(LPC_REPLICATION_COMMON,
+                     tracker(),
+                     std::bind(&replica_bulk_loader::bulk_load_check_download_finish, this),
+                     get_gpid().thread_hash());
 }
 
 // ThreadPool: THREAD_POOL_REPLICATION, THREAD_POOL_REPLICATION_LONG
@@ -342,6 +416,27 @@ void replica_bulk_loader::try_decrease_bulk_load_download_count()
                    _stub->_bulk_load_downloading_count.load());
 }
 
+// ThreadPool: THREAD_POOL_REPLICATION
+void replica_bulk_loader::bulk_load_check_download_finish()
+{
+    if (_download_progress.load() == bulk_load_constant::PROGRESS_FINISHED &&
+        _status == bulk_load_status::BLS_DOWNLOADING) {
+        ddebug_replica("download all files succeed");
+        _status = bulk_load_status::BLS_DOWNLOADED;
+        try_decrease_bulk_load_download_count();
+        cleanup_download_task();
+    }
+}
+
+// ThreadPool: THREAD_POOL_REPLICATION
+void replica_bulk_loader::cleanup_download_task()
+{
+    for (auto &kv : _download_task) {
+        CLEANUP_TASK_ALWAYS(kv.second)
+    }
+    _download_task.clear();
+}
+
 void replica_bulk_loader::clear_bulk_load_states()
 {
     // TODO(heyuchen): TBD
diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.h b/src/dist/replication/lib/bulk_load/replica_bulk_loader.h
index 0bf9fded90..bf80d7aa86 100644
--- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.h
+++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.h
@@ -51,12 +51,18 @@ class replica_bulk_loader : replica_base
 
     // \return ERR_FILE_OPERATION_FAILED: file not exist, get size failed, open file failed
     // \return ERR_CORRUPTION: parse failed
-    error_code parse_bulk_load_metadata(const std::string &fname, /*out*/ bulk_load_metadata &meta);
+    error_code parse_bulk_load_metadata(const std::string &fname);
 
+    // compare sst file metadata calculated by file and parsed by metadata
     bool verify_sst_files(const file_meta &f_meta, const std::string &local_dir);
+
+    // update download progress after downloading sst files succeed
     void update_bulk_load_download_progress(uint64_t file_size, const std::string &file_name);
+
     void try_decrease_bulk_load_download_count();
+    void bulk_load_check_download_finish();
 
+    void cleanup_download_task();
     void clear_bulk_load_states();
 
     void report_bulk_load_states_to_meta(bulk_load_status::type remote_status,
@@ -102,6 +108,8 @@ class replica_bulk_loader : replica_base
 
     bulk_load_status::type _status{bulk_load_status::BLS_INVALID};
     bulk_load_metadata _metadata;
+    std::atomic<uint64_t> _cur_downloaded_size{0};
+    std::atomic<int32_t> _download_progress{0};
     std::atomic<error_code> _download_status{ERR_OK};
     // file_name -> downloading task
     std::map<std::string, task_ptr> _download_task;
diff --git a/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp b/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp
index e6970b6b1a..534c9eb848 100644
--- a/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp
+++ b/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp
@@ -47,6 +47,20 @@ class replica_bulk_loader_test : public replica_test_base
         return _bulk_loader->bulk_load_start_download(APP_NAME, CLUSTER, PROVIDER);
     }
 
+    error_code test_parse_bulk_load_metadata(const std::string &file_path)
+    {
+        return _bulk_loader->parse_bulk_load_metadata(file_path);
+    }
+
+    bool test_verify_sst_files(int64_t size, const std::string &md5)
+    {
+        file_meta f_meta;
+        f_meta.name = FILE_NAME;
+        f_meta.size = size;
+        f_meta.md5 = md5;
+        return _bulk_loader->verify_sst_files(f_meta, LOCAL_DIR);
+    }
+
     /// mock structure functions
 
     void
@@ -101,15 +115,82 @@ class replica_bulk_loader_test : public replica_test_base
         config.secondaries.emplace_back(SECONDARY2);
     }
 
+    void create_local_file(const std::string &file_name)
+    {
+        std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, file_name);
+        utils::filesystem::create_file(whole_name);
+        std::ofstream test_file;
+        test_file.open(whole_name);
+        test_file << "write some data.\n";
+        test_file.close();
+
+        _file_meta.name = whole_name;
+        utils::filesystem::md5sum(whole_name, _file_meta.md5);
+        utils::filesystem::file_size(whole_name, _file_meta.size);
+    }
+
+    error_code create_local_metadata_file()
+    {
+        create_local_file(FILE_NAME);
+        _metadata.files.emplace_back(_file_meta);
+        _metadata.file_total_size = _file_meta.size;
+
+        std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA);
+        utils::filesystem::create_file(whole_name);
+        std::ofstream os(whole_name.c_str(),
+                         (std::ofstream::out | std::ios::binary | std::ofstream::trunc));
+        if (!os.is_open()) {
+            derror("open file %s failed", whole_name.c_str());
+            return ERR_FILE_OPERATION_FAILED;
+        }
+
+        blob bb = json::json_forwarder<bulk_load_metadata>::encode(_metadata);
+        os.write((const char *)bb.data(), (std::streamsize)bb.length());
+        if (os.bad()) {
+            derror("write file %s failed", whole_name.c_str());
+            return ERR_FILE_OPERATION_FAILED;
+        }
+        os.close();
+
+        return ERR_OK;
+    }
+
+    bool validate_metadata()
+    {
+        auto target = _bulk_loader->_metadata;
+        if (target.file_total_size != _metadata.file_total_size) {
+            return false;
+        }
+        if (target.files.size() != _metadata.files.size()) {
+            return false;
+        }
+        for (int i = 0; i < target.files.size(); ++i) {
+            if (target.files[i].name != _metadata.files[i].name) {
+                return false;
+            }
+            if (target.files[i].size != _metadata.files[i].size) {
+                return false;
+            }
+            if (target.files[i].md5 != _metadata.files[i].md5) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     // helper functions
     bulk_load_status::type get_bulk_load_status() const { return _bulk_loader->_status; }
 
 public:
     std::unique_ptr<mock_replica> _replica;
     std::unique_ptr<replica_bulk_loader> _bulk_loader;
+
     bulk_load_request _req;
     group_bulk_load_request _group_req;
 
+    file_meta _file_meta;
+    bulk_load_metadata _metadata;
+
     std::string APP_NAME = "replica";
     std::string CLUSTER = "cluster";
     std::string PROVIDER = "local_service";
@@ -119,6 +200,9 @@ class replica_bulk_loader_test : public replica_test_base
     rpc_address SECONDARY = rpc_address("127.0.0.3", 34801);
     rpc_address SECONDARY2 = rpc_address("127.0.0.4", 34801);
     int32_t MAX_DOWNLOADING_COUNT = 5;
+    std::string LOCAL_DIR = bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR;
+    std::string METADATA = bulk_load_constant::BULK_LOAD_METADATA;
+    std::string FILE_NAME = "test_sst_file";
 };
 
 // on_bulk_load unit tests
@@ -196,5 +280,52 @@ TEST_F(replica_bulk_loader_test, start_downloading_test)
     }
 }
 
+// parse_bulk_load_metadata unit tests
+TEST_F(replica_bulk_loader_test, bulk_load_metadata_not_exist)
+{
+    ASSERT_EQ(test_parse_bulk_load_metadata("path_not_exist"), ERR_FILE_OPERATION_FAILED);
+}
+
+TEST_F(replica_bulk_loader_test, bulk_load_metadata_corrupt)
+{
+    // create file can not parse as bulk_load_metadata structure
+    utils::filesystem::create_directory(LOCAL_DIR);
+    create_local_file(METADATA);
+    std::string metadata_file_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA);
+    error_code ec = test_parse_bulk_load_metadata(metadata_file_name);
+    ASSERT_EQ(ec, ERR_CORRUPTION);
+    utils::filesystem::remove_path(LOCAL_DIR);
+}
+
+TEST_F(replica_bulk_loader_test, bulk_load_metadata_parse_succeed)
+{
+    utils::filesystem::create_directory(LOCAL_DIR);
+    error_code ec = create_local_metadata_file();
+    ASSERT_EQ(ec, ERR_OK);
+
+    std::string metadata_file_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA);
+    ec = test_parse_bulk_load_metadata(metadata_file_name);
+    ASSERT_EQ(ec, ERR_OK);
+    ASSERT_TRUE(validate_metadata());
+    utils::filesystem::remove_path(LOCAL_DIR);
+}
+
+// verify_sst_files unit tests
+TEST_F(replica_bulk_loader_test, verify_file_failed)
+{
+    utils::filesystem::create_directory(LOCAL_DIR);
+    create_local_file(FILE_NAME);
+    ASSERT_FALSE(test_verify_sst_files(_file_meta.size, "wrong_md5"));
+    utils::filesystem::remove_path(LOCAL_DIR);
+}
+
+TEST_F(replica_bulk_loader_test, verify_file_succeed)
+{
+    utils::filesystem::create_directory(LOCAL_DIR);
+    create_local_file(FILE_NAME);
+    ASSERT_TRUE(test_verify_sst_files(_file_meta.size, _file_meta.md5));
+    utils::filesystem::remove_path(LOCAL_DIR);
+}
+
 } // namespace replication
 } // namespace dsn

From ea83443dcd593dba325b3fad7f3412df1bd6c1fc Mon Sep 17 00:00:00 2001
From: heyuchen <heyuchen@xiaomi.com>
Date: Wed, 27 May 2020 12:05:46 +0800
Subject: [PATCH 2/6] fix by review

---
 include/dsn/utility/filesystem.h              |  2 ++
 src/core/core/filesystem.cpp                  | 32 +++++++++++++++++
 .../lib/bulk_load/replica_bulk_loader.cpp     | 36 +++++--------------
 .../lib/bulk_load/replica_bulk_loader.h       |  2 +-
 4 files changed, 43 insertions(+), 29 deletions(-)

diff --git a/include/dsn/utility/filesystem.h b/include/dsn/utility/filesystem.h
index 9da8a4954d..85524e727a 100644
--- a/include/dsn/utility/filesystem.h
+++ b/include/dsn/utility/filesystem.h
@@ -129,6 +129,8 @@ error_code md5sum(const std::string &file_path, /*out*/ std::string &result);
 //          B is represent wheter the directory is empty, true means empty, otherwise false
 std::pair<error_code, bool> is_directory_empty(const std::string &dirname);
 
+error_code read_file(const std::string &fname, /*out*/ std::string &buf);
+
 } // namespace filesystem
 } // namespace utils
 } // namespace dsn
diff --git a/src/core/core/filesystem.cpp b/src/core/core/filesystem.cpp
index 4acc443229..cfb22596e8 100644
--- a/src/core/core/filesystem.cpp
+++ b/src/core/core/filesystem.cpp
@@ -33,6 +33,8 @@
  *     xxxx-xx-xx, author, fix bug about xxx
  */
 
+#include <fstream>
+
 #include <dsn/c/api_utilities.h>
 #include <dsn/utility/filesystem.h>
 #include <dsn/utility/utils.h>
@@ -763,6 +765,36 @@ std::pair<error_code, bool> is_directory_empty(const std::string &dirname)
     }
     return res;
 }
+
+error_code read_file(const std::string &fname, std::string &buf)
+{
+    if (!file_exists(fname)) {
+        derror("file(%s) doesn't exist", fname.c_str());
+        return ERR_FILE_OPERATION_FAILED;
+    }
+
+    int64_t file_sz = 0;
+    if (!utils::filesystem::file_size(fname, file_sz)) {
+        derror("get file(%s) size failed", fname.c_str());
+        return ERR_FILE_OPERATION_FAILED;
+    }
+
+    buf.resize(file_sz);
+    std::ifstream fin(fname, std::ifstream::in);
+    if (!fin.is_open()) {
+        derror("open file(%s) failed", fname.c_str());
+        return ERR_FILE_OPERATION_FAILED;
+    }
+    fin.read(&buf[0], file_sz);
+    dassert(file_sz == fin.gcount(),
+            "read file(%s) failed, file_size =  %" PRIx64 " but read size = %" PRIx64,
+            fname.c_str(),
+            file_sz,
+            fin.gcount());
+    fin.close();
+    return ERR_OK;
+}
+
 } // namespace filesystem
 } // namespace utils
 } // namespace dsn
diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
index fcf46e1d11..3ac967d9f3 100644
--- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
+++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
@@ -4,7 +4,7 @@
 
 #include "replica_bulk_loader.h"
 
-#include <fstream>
+// #include <fstream>
 
 #include <dsn/dist/block_service.h>
 #include <dsn/dist/fmt_logging.h>
@@ -311,34 +311,14 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name,
 // ThreadPool: THREAD_POOL_REPLICATION
 error_code replica_bulk_loader::parse_bulk_load_metadata(const std::string &fname)
 {
-    if (!utils::filesystem::file_exists(fname)) {
-        derror_replica("file({}) doesn't exist", fname);
-        return ERR_FILE_OPERATION_FAILED;
-    }
-
-    int64_t file_sz = 0;
-    if (!utils::filesystem::file_size(fname, file_sz)) {
-        derror_replica("get file({}) size failed", fname);
-        return ERR_FILE_OPERATION_FAILED;
+    std::string buf;
+    error_code ec = utils::filesystem::read_file(fname, buf);
+    if (ec != ERR_OK) {
+        derror_replica("read file {} failed, error = {}", fname, ec.to_string());
+        return ec;
     }
 
-    std::shared_ptr<char> buf = utils::make_shared_array<char>(file_sz + 1);
-    std::ifstream fin(fname, std::ifstream::in);
-    if (!fin.is_open()) {
-        derror_replica("open file({}) failed", fname);
-        return ERR_FILE_OPERATION_FAILED;
-    }
-    fin.read(buf.get(), file_sz);
-    dassert_replica(file_sz == fin.gcount(),
-                    "read file({}) failed, file_size = {} but read size = {}",
-                    fname,
-                    file_sz,
-                    fin.gcount());
-    fin.close();
-    buf.get()[fin.gcount()] = '\0';
-
-    blob bb;
-    bb.assign(std::move(buf), 0, file_sz);
+    blob bb = blob::create_from_bytes(std::move(buf));
     if (!json::json_forwarder<bulk_load_metadata>::decode(bb, _metadata)) {
         derror_replica("file({}) is damaged", fname);
         return ERR_CORRUPTION;
@@ -403,7 +383,7 @@ void replica_bulk_loader::update_bulk_load_download_progress(uint64_t file_size,
 
     tasking::enqueue(LPC_REPLICATION_COMMON,
                      tracker(),
-                     std::bind(&replica_bulk_loader::bulk_load_check_download_finish, this),
+                     std::bind(&replica_bulk_loader::check_download_finish, this),
                      get_gpid().thread_hash());
 }
 
diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.h b/src/dist/replication/lib/bulk_load/replica_bulk_loader.h
index bf80d7aa86..256a8422c5 100644
--- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.h
+++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.h
@@ -60,7 +60,7 @@ class replica_bulk_loader : replica_base
     void update_bulk_load_download_progress(uint64_t file_size, const std::string &file_name);
 
     void try_decrease_bulk_load_download_count();
-    void bulk_load_check_download_finish();
+    void check_download_finish();
 
     void cleanup_download_task();
     void clear_bulk_load_states();

From 906968ac91b1e4f97888936180660ca55f9cef01 Mon Sep 17 00:00:00 2001
From: heyuchen <heyuchen@xiaomi.com>
Date: Wed, 27 May 2020 13:06:38 +0800
Subject: [PATCH 3/6] small fix

---
 src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
index 3ac967d9f3..2fbfdbe8c5 100644
--- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
+++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
@@ -397,7 +397,7 @@ void replica_bulk_loader::try_decrease_bulk_load_download_count()
 }
 
 // ThreadPool: THREAD_POOL_REPLICATION
-void replica_bulk_loader::bulk_load_check_download_finish()
+void replica_bulk_loader::check_download_finish()
 {
     if (_download_progress.load() == bulk_load_constant::PROGRESS_FINISHED &&
         _status == bulk_load_status::BLS_DOWNLOADING) {

From 6a860c1894eb0492cb22962902664a1d8edb503c Mon Sep 17 00:00:00 2001
From: heyuchen <heyuchen@xiaomi.com>
Date: Wed, 27 May 2020 14:39:43 +0800
Subject: [PATCH 4/6] fix by review

---
 src/core/core/filesystem.cpp                    | 17 +++++++++--------
 .../lib/bulk_load/replica_bulk_loader.cpp       |  8 +++-----
 .../lib/bulk_load/replica_bulk_loader.h         |  5 +++--
 .../bulk_load/test/replica_bulk_loader_test.cpp | 11 ++++++-----
 4 files changed, 21 insertions(+), 20 deletions(-)

diff --git a/src/core/core/filesystem.cpp b/src/core/core/filesystem.cpp
index cfb22596e8..26fcbc04f5 100644
--- a/src/core/core/filesystem.cpp
+++ b/src/core/core/filesystem.cpp
@@ -36,6 +36,7 @@
 #include <fstream>
 
 #include <dsn/c/api_utilities.h>
+#include <dsn/dist/fmt_logging.h>
 #include <dsn/utility/filesystem.h>
 #include <dsn/utility/utils.h>
 #include <dsn/utility/safe_strerror_posix.h>
@@ -769,28 +770,28 @@ std::pair<error_code, bool> is_directory_empty(const std::string &dirname)
 error_code read_file(const std::string &fname, std::string &buf)
 {
     if (!file_exists(fname)) {
-        derror("file(%s) doesn't exist", fname.c_str());
+        derror_f("file({}) doesn't exist", fname);
         return ERR_FILE_OPERATION_FAILED;
     }
 
     int64_t file_sz = 0;
     if (!utils::filesystem::file_size(fname, file_sz)) {
-        derror("get file(%s) size failed", fname.c_str());
+        derror_f("get file({}) size failed", fname);
         return ERR_FILE_OPERATION_FAILED;
     }
 
     buf.resize(file_sz);
     std::ifstream fin(fname, std::ifstream::in);
     if (!fin.is_open()) {
-        derror("open file(%s) failed", fname.c_str());
+        derror_f("open file({}) failed", fname);
         return ERR_FILE_OPERATION_FAILED;
     }
     fin.read(&buf[0], file_sz);
-    dassert(file_sz == fin.gcount(),
-            "read file(%s) failed, file_size =  %" PRIx64 " but read size = %" PRIx64,
-            fname.c_str(),
-            file_sz,
-            fin.gcount());
+    dassert_f(file_sz == fin.gcount(),
+              "read file({}) failed, file_size = {} but read size = {}",
+              fname,
+              file_sz,
+              fin.gcount());
     fin.close();
     return ERR_OK;
 }
diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
index 2fbfdbe8c5..0283ec3122 100644
--- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
+++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
@@ -4,8 +4,6 @@
 
 #include "replica_bulk_loader.h"
 
-// #include <fstream>
-
 #include <dsn/dist/block_service.h>
 #include <dsn/dist/fmt_logging.h>
 #include <dsn/dist/replication/replication_app_base.h>
@@ -288,7 +286,7 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name,
                 uint64_t f_size = 0;
                 error_code ec =
                     _replica->do_download(remote_dir, local_dir, f_meta.name, fs, f_size);
-                if (ec == ERR_OK && !verify_sst_files(f_meta, local_dir)) {
+                if (ec == ERR_OK && !verify_file(f_meta, local_dir)) {
                     ec = ERR_CORRUPTION;
                 }
                 if (ec != ERR_OK) {
@@ -334,9 +332,9 @@ error_code replica_bulk_loader::parse_bulk_load_metadata(const std::string &fnam
 }
 
 // ThreadPool: THREAD_POOL_REPLICATION_LONG
-bool replica_bulk_loader::verify_sst_files(const file_meta &f_meta, const std::string &local_dir)
+bool replica_bulk_loader::verify_file(const file_meta &f_meta, const std::string &local_dir)
 {
-    std::string local_file = utils::filesystem::path_combine(local_dir, f_meta.name);
+    const std::string local_file = utils::filesystem::path_combine(local_dir, f_meta.name);
     int64_t f_size = 0;
     std::string md5;
     if (!utils::filesystem::file_size(local_file, f_size)) {
diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.h b/src/dist/replication/lib/bulk_load/replica_bulk_loader.h
index 256a8422c5..1bab20a55a 100644
--- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.h
+++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.h
@@ -53,8 +53,9 @@ class replica_bulk_loader : replica_base
     // \return ERR_CORRUPTION: parse failed
     error_code parse_bulk_load_metadata(const std::string &fname);
 
-    // compare sst file metadata calculated by file and parsed by metadata
-    bool verify_sst_files(const file_meta &f_meta, const std::string &local_dir);
+    // TODO(heyuchen): move this function into block service manager, also used by restore
+    // compare file metadata calculated by file and parsed by metadata
+    bool verify_file(const file_meta &f_meta, const std::string &local_dir);
 
     // update download progress after downloading sst files succeed
     void update_bulk_load_download_progress(uint64_t file_size, const std::string &file_name);
diff --git a/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp b/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp
index 534c9eb848..0b77fb8793 100644
--- a/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp
+++ b/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp
@@ -52,13 +52,13 @@ class replica_bulk_loader_test : public replica_test_base
         return _bulk_loader->parse_bulk_load_metadata(file_path);
     }
 
-    bool test_verify_sst_files(int64_t size, const std::string &md5)
+    bool test_verify_file(int64_t size, const std::string &md5)
     {
         file_meta f_meta;
         f_meta.name = FILE_NAME;
         f_meta.size = size;
         f_meta.md5 = md5;
-        return _bulk_loader->verify_sst_files(f_meta, LOCAL_DIR);
+        return _bulk_loader->verify_file(f_meta, LOCAL_DIR);
     }
 
     /// mock structure functions
@@ -310,12 +310,13 @@ TEST_F(replica_bulk_loader_test, bulk_load_metadata_parse_succeed)
     utils::filesystem::remove_path(LOCAL_DIR);
 }
 
-// verify_sst_files unit tests
+// TODO(heyuchen): move tests into block service
+// verify_file unit tests
 TEST_F(replica_bulk_loader_test, verify_file_failed)
 {
     utils::filesystem::create_directory(LOCAL_DIR);
     create_local_file(FILE_NAME);
-    ASSERT_FALSE(test_verify_sst_files(_file_meta.size, "wrong_md5"));
+    ASSERT_FALSE(test_verify_file(_file_meta.size, "wrong_md5"));
     utils::filesystem::remove_path(LOCAL_DIR);
 }
 
@@ -323,7 +324,7 @@ TEST_F(replica_bulk_loader_test, verify_file_succeed)
 {
     utils::filesystem::create_directory(LOCAL_DIR);
     create_local_file(FILE_NAME);
-    ASSERT_TRUE(test_verify_sst_files(_file_meta.size, _file_meta.md5));
+    ASSERT_TRUE(test_verify_file(_file_meta.size, _file_meta.md5));
     utils::filesystem::remove_path(LOCAL_DIR);
 }
 

From bfd3e935f3ed461adbd37d11e08b72bc8eef219e Mon Sep 17 00:00:00 2001
From: heyuchen <heyuchen@xiaomi.com>
Date: Wed, 27 May 2020 17:45:05 +0800
Subject: [PATCH 5/6] small fix

---
 src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
index 0283ec3122..b0b93d6b68 100644
--- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
+++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
@@ -312,7 +312,7 @@ error_code replica_bulk_loader::parse_bulk_load_metadata(const std::string &fnam
     std::string buf;
     error_code ec = utils::filesystem::read_file(fname, buf);
     if (ec != ERR_OK) {
-        derror_replica("read file {} failed, error = {}", fname, ec.to_string());
+        derror_replica("read file {} failed, error = {}", fname, ec);
         return ec;
     }
 
@@ -336,11 +336,11 @@ bool replica_bulk_loader::verify_file(const file_meta &f_meta, const std::string
 {
     const std::string local_file = utils::filesystem::path_combine(local_dir, f_meta.name);
     int64_t f_size = 0;
-    std::string md5;
     if (!utils::filesystem::file_size(local_file, f_size)) {
         derror_replica("verify file({}) failed, becaused failed to get file size", local_file);
         return false;
     }
+    std::string md5;
     if (utils::filesystem::md5sum(local_file, md5) != ERR_OK) {
         derror_replica("verify file({}) failed, becaused failed to get file md5", local_file);
         return false;

From 1573c314dd2c4e0aca9bc9bb7e1559975eb5ea75 Mon Sep 17 00:00:00 2001
From: heyuchen <heyuchen@xiaomi.com>
Date: Thu, 28 May 2020 09:23:43 +0800
Subject: [PATCH 6/6] small fix

---
 src/core/core/filesystem.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/core/core/filesystem.cpp b/src/core/core/filesystem.cpp
index 26fcbc04f5..5e658a9958 100644
--- a/src/core/core/filesystem.cpp
+++ b/src/core/core/filesystem.cpp
@@ -775,7 +775,7 @@ error_code read_file(const std::string &fname, std::string &buf)
     }
 
     int64_t file_sz = 0;
-    if (!utils::filesystem::file_size(fname, file_sz)) {
+    if (!file_size(fname, file_sz)) {
         derror_f("get file({}) size failed", fname);
         return ERR_FILE_OPERATION_FAILED;
     }