diff --git a/src/block_service/directio_writable_file.cpp b/src/block_service/directio_writable_file.cpp deleted file mode 100644 index 2c74ae05d2..0000000000 --- a/src/block_service/directio_writable_file.cpp +++ /dev/null @@ -1,169 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "block_service/directio_writable_file.h" - -#include -#include -#include // posix_memalign -#include -#include // getpagesize -#include -#include -#include - -#include "utils/flags.h" -#include "utils/fmt_logging.h" -#include "utils/safe_strerror_posix.h" - -namespace dsn { -namespace dist { -namespace block_service { - -DSN_DEFINE_uint32(replication, - direct_io_buffer_pages, - 64, - "Number of pages we need to set to direct io buffer"); -DSN_TAG_VARIABLE(direct_io_buffer_pages, FT_MUTABLE); - -DSN_DEFINE_bool(replication, - enable_direct_io, - false, - "Whether to enable direct I/O when download files"); -DSN_TAG_VARIABLE(enable_direct_io, FT_MUTABLE); - -const uint32_t g_page_size = getpagesize(); - -direct_io_writable_file::direct_io_writable_file(const std::string &file_path) - : _file_path(file_path), - _fd(-1), - _file_size(0), - _buffer(nullptr), - _buffer_size(FLAGS_direct_io_buffer_pages * g_page_size), - _offset(0) -{ -} - -direct_io_writable_file::~direct_io_writable_file() -{ - if (!_buffer || _fd < 0) { - return; - } - // Here is an ensurance, users shuold call finalize manually - CHECK_EQ_MSG(_offset, 0, "finalize() should be called before destructor"); - - ::free(_buffer); - CHECK_EQ_MSG( - 0, ::close(_fd), "Failed to close {}, err = {}", _file_path, utils::safe_strerror(errno)); -} - -bool direct_io_writable_file::initialize() -{ - if (posix_memalign(&_buffer, g_page_size, _buffer_size) != 0) { - LOG_ERROR("Allocate memaligned buffer failed, err = {}", utils::safe_strerror(errno)); - return false; - } - - int flag = O_WRONLY | O_TRUNC | O_CREAT; -#if !defined(__APPLE__) - flag |= O_DIRECT; -#endif - // TODO(yingchun): there maybe serious error of the disk driver when these system call failed, - // maybe just terminate the process or mark the disk as failed would be better - _fd = ::open(_file_path.c_str(), flag, S_IRUSR | S_IWUSR | S_IRGRP); - if (_fd < 0) { - LOG_ERROR("Failed to open {} with flag {}, err = {}", - _file_path, - flag, - utils::safe_strerror(errno)); - ::free(_buffer); - _buffer = nullptr; - return false; - } - return true; -} - -bool direct_io_writable_file::finalize() -{ - CHECK(_buffer && _fd >= 0, "Initialize the instance first"); - - if (_offset > 0) { - ssize_t written_bytes = ::write(_fd, _buffer, _buffer_size); - if (dsn_unlikely(written_bytes < 0)) { - LOG_ERROR("Failed to write the last chunk, file_path = {}, err = {}", - _file_path, - utils::safe_strerror(errno)); - return false; - } - // TODO(yingchun): would better to retry - if (dsn_unlikely(written_bytes != _buffer_size)) { - LOG_ERROR("Failed to write the last chunk, file_path = {}, data bytes = {}, written " - "bytes = {}", - _file_path, - _buffer_size, - written_bytes); - return false; - } - _offset = 0; - if (::ftruncate(_fd, _file_size) < 0) { - LOG_ERROR("Failed to truncate {}, err = {}", _file_path, utils::safe_strerror(errno)); - return false; - } - } - return true; -} - -bool direct_io_writable_file::write(const char *s, size_t n) -{ - CHECK(_buffer && _fd >= 0, "Initialize the instance first"); - - uint32_t remaining = n; - while (remaining > 0) { - uint32_t bytes = std::min((_buffer_size - _offset), remaining); - memcpy((char *)_buffer + _offset, s, bytes); - _offset += bytes; - remaining -= bytes; - s += bytes; - // buffer is full, flush to file - if (_offset == _buffer_size) { - ssize_t written_bytes = ::write(_fd, _buffer, _buffer_size); - if (dsn_unlikely(written_bytes < 0)) { - LOG_ERROR("Failed to write chunk, file_path = {}, err = {}", - _file_path, - utils::safe_strerror(errno)); - return false; - } - // TODO(yingchun): would better to retry - if (dsn_unlikely(written_bytes != _buffer_size)) { - LOG_ERROR( - "Failed to write chunk, file_path = {}, data bytes = {}, written bytes = {}", - _file_path, - _buffer_size, - written_bytes); - return false; - } - // reset offset - _offset = 0; - } - } - _file_size += n; - return true; -} - -} // namespace block_service -} // namespace dist -} // namespace dsn diff --git a/src/block_service/directio_writable_file.h b/src/block_service/directio_writable_file.h deleted file mode 100644 index d4f99949ff..0000000000 --- a/src/block_service/directio_writable_file.h +++ /dev/null @@ -1,57 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include - -#include "utils/ports.h" - -namespace dsn { -namespace dist { -namespace block_service { - -class direct_io_writable_file -{ -public: - explicit direct_io_writable_file(const std::string &file_path); - ~direct_io_writable_file(); - - bool initialize(); - bool write(const char *s, size_t n); - bool finalize(); - -private: - DISALLOW_COPY_AND_ASSIGN(direct_io_writable_file); - - std::string _file_path; - int _fd; - uint32_t _file_size; - - // page size aligned buffer - void *_buffer; - // buffer size - uint32_t _buffer_size; - // buffer offset - uint32_t _offset; -}; - -} // namespace block_service -} // namespace dist -} // namespace dsn diff --git a/src/block_service/hdfs/CMakeLists.txt b/src/block_service/hdfs/CMakeLists.txt index 803e85bec3..2bba8b96bb 100644 --- a/src/block_service/hdfs/CMakeLists.txt +++ b/src/block_service/hdfs/CMakeLists.txt @@ -17,20 +17,16 @@ set(MY_PROJ_NAME dsn.block_service.hdfs) -set(DIRECTIO_SRC - ../directio_writable_file.cpp - ) - #Source files under CURRENT project directory will be automatically included. #You can manually set MY_PROJ_SRC to include source files under other directories. -set(MY_PROJ_SRC "${DIRECTIO_SRC}") +set(MY_PROJ_SRC "") #Search mode for source files under CURRENT project directory ? #"GLOB_RECURSE" for recursive search #"GLOB" for non - recursive search set(MY_SRC_SEARCH_MODE "GLOB") -set(MY_PROJ_LIBS hdfs) +set(MY_PROJ_LIBS hdfs rocksdb) #Extra files that will be installed set(MY_BINPLACES "") diff --git a/src/block_service/hdfs/hdfs_service.cpp b/src/block_service/hdfs/hdfs_service.cpp index 459108e111..5909f40de0 100644 --- a/src/block_service/hdfs/hdfs_service.cpp +++ b/src/block_service/hdfs/hdfs_service.cpp @@ -17,19 +17,21 @@ #include #include +#include #include -#include #include #include -#include "block_service/directio_writable_file.h" #include "hdfs/hdfs.h" #include "hdfs_service.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" #include "runtime/task/async_calls.h" #include "runtime/task/task.h" #include "utils/TokenBucket.h" #include "utils/autoref_ptr.h" #include "utils/blob.h" +#include "utils/env.h" #include "utils/error_code.h" #include "utils/filesystem.h" #include "utils/flags.h" @@ -37,6 +39,8 @@ #include "utils/safe_strerror_posix.h" #include "utils/strings.h" +DSN_DECLARE_bool(enable_direct_io); + struct hdfsBuilder; namespace dsn { @@ -65,8 +69,6 @@ DSN_DEFINE_uint64(replication, "hdfs write batch size, the default value is 64MB"); DSN_TAG_VARIABLE(hdfs_write_batch_size_bytes, FT_MUTABLE); -DSN_DECLARE_bool(enable_direct_io); - hdfs_service::hdfs_service() { _read_token_bucket.reset(new folly::DynamicTokenBucket()); @@ -108,12 +110,12 @@ error_code hdfs_service::create_fs() hdfsBuilderSetNameNode(builder, _hdfs_name_node.c_str()); _fs = hdfsBuilderConnect(builder); if (!_fs) { - LOG_ERROR("Fail to connect hdfs name node {}, error: {}.", + LOG_ERROR("Fail to connect HDFS name node {}, error: {}.", _hdfs_name_node, utils::safe_strerror(errno)); return ERR_FS_INTERNAL; } - LOG_INFO("Succeed to connect hdfs name node {}.", _hdfs_name_node); + LOG_INFO("Succeed to connect HDFS name node {}.", _hdfs_name_node); return ERR_OK; } @@ -122,10 +124,10 @@ void hdfs_service::close() // This method should be carefully called. // Calls to hdfsDisconnect() by individual threads would terminate // all other connections handed out via hdfsConnect() to the same URI. - LOG_INFO("Try to disconnect hdfs."); + LOG_INFO("Try to disconnect HDFS."); int result = hdfsDisconnect(_fs); if (result == -1) { - LOG_ERROR("Fail to disconnect from the hdfs file system, error: {}.", + LOG_ERROR("Fail to disconnect from the HDFS file system, error: {}.", utils::safe_strerror(errno)); } // Even if there is an error, the resources associated with the hdfsFS will be freed. @@ -134,7 +136,7 @@ void hdfs_service::close() std::string hdfs_service::get_hdfs_entry_name(const std::string &hdfs_path) { - // get exact file name from an hdfs path. + // get exact file name from an HDFS path. int pos = hdfs_path.find_last_of("/"); return hdfs_path.substr(pos + 1); } @@ -305,7 +307,7 @@ error_code hdfs_file_object::write_data_in_batches(const char *data, hdfsFile write_file = hdfsOpenFile(_service->get_fs(), file_name().c_str(), O_WRONLY | O_CREAT, 0, 0, 0); if (!write_file) { - LOG_ERROR("Failed to open hdfs file {} for writting, error: {}.", + LOG_ERROR("Failed to open HDFS file {} for writting, error: {}.", file_name(), utils::safe_strerror(errno)); return ERR_FS_INTERNAL; @@ -323,7 +325,7 @@ error_code hdfs_file_object::write_data_in_batches(const char *data, (void *)(data + cur_pos), static_cast(write_len)); if (num_written_bytes == -1) { - LOG_ERROR("Failed to write hdfs file {}, error: {}.", + LOG_ERROR("Failed to write HDFS file {}, error: {}.", file_name(), utils::safe_strerror(errno)); hdfsCloseFile(_service->get_fs(), write_file); @@ -333,18 +335,18 @@ error_code hdfs_file_object::write_data_in_batches(const char *data, } if (hdfsHFlush(_service->get_fs(), write_file) != 0) { LOG_ERROR( - "Failed to flush hdfs file {}, error: {}.", file_name(), utils::safe_strerror(errno)); + "Failed to flush HDFS file {}, error: {}.", file_name(), utils::safe_strerror(errno)); hdfsCloseFile(_service->get_fs(), write_file); return ERR_FS_INTERNAL; } written_size = cur_pos; if (hdfsCloseFile(_service->get_fs(), write_file) != 0) { LOG_ERROR( - "Failed to close hdfs file {}, error: {}", file_name(), utils::safe_strerror(errno)); + "Failed to close HDFS file {}, error: {}", file_name(), utils::safe_strerror(errno)); return ERR_FS_INTERNAL; } - LOG_INFO("start to synchronize meta data after successfully wrote data to hdfs"); + LOG_INFO("start to synchronize meta data after successfully wrote data to HDFS"); return get_file_meta(); } @@ -376,23 +378,51 @@ dsn::task_ptr hdfs_file_object::upload(const upload_request &req, add_ref(); auto upload_background = [this, req, t]() { + LOG_INFO("start to upload from '{}' to '{}'", req.input_local_name, file_name()); + upload_response resp; - resp.uploaded_size = 0; - std::ifstream is(req.input_local_name, std::ios::binary | std::ios::in); - if (is.is_open()) { - int64_t file_sz = 0; - dsn::utils::filesystem::file_size(req.input_local_name, file_sz); - std::unique_ptr buffer(new char[file_sz]); - is.read(buffer.get(), file_sz); - is.close(); - resp.err = write_data_in_batches(buffer.get(), file_sz, resp.uploaded_size); - } else { - LOG_ERROR("HDFS upload failed: open local file {} failed when upload to {}, error: {}", - req.input_local_name, - file_name(), - utils::safe_strerror(errno)); - resp.err = dsn::ERR_FILE_OPERATION_FAILED; - } + do { + rocksdb::EnvOptions env_options; + env_options.use_direct_reads = FLAGS_enable_direct_io; + std::unique_ptr rfile; + auto s = rocksdb::Env::Default()->NewSequentialFile( + req.input_local_name, &rfile, env_options); + if (!s.ok()) { + LOG_ERROR( + "open local file '{}' failed, err = {}", req.input_local_name, s.ToString()); + resp.err = ERR_FILE_OPERATION_FAILED; + break; + } + + int64_t file_size; + if (!dsn::utils::filesystem::file_size( + req.input_local_name, dsn::utils::FileDataType::kSensitive, file_size)) { + LOG_ERROR("get size of local file '{}' failed", req.input_local_name); + resp.err = ERR_FILE_OPERATION_FAILED; + break; + } + + rocksdb::Slice result; + char scratch[file_size]; + s = rfile->Read(file_size, &result, scratch); + if (!s.ok()) { + LOG_ERROR( + "read local file '{}' failed, err = {}", req.input_local_name, s.ToString()); + resp.err = ERR_FILE_OPERATION_FAILED; + break; + } + + resp.err = write_data_in_batches(result.data(), result.size(), resp.uploaded_size); + if (resp.err != ERR_OK) { + LOG_ERROR("write data to remote '{}' failed, err = {}", file_name(), resp.err); + break; + } + + LOG_INFO("finish to upload from '{}' to '{}', size = {}", + req.input_local_name, + file_name(), + resp.uploaded_size); + } while (false); t->enqueue_with(resp); release_ref(); }; @@ -417,7 +447,7 @@ error_code hdfs_file_object::read_data_in_batches(uint64_t start_pos, hdfsFile read_file = hdfsOpenFile(_service->get_fs(), file_name().c_str(), O_RDONLY, 0, 0, 0); if (!read_file) { - LOG_ERROR("Failed to open hdfs file {} for reading, error: {}.", + LOG_ERROR("Failed to open HDFS file {} for reading, error: {}.", file_name(), utils::safe_strerror(errno)); return ERR_FS_INTERNAL; @@ -446,7 +476,7 @@ error_code hdfs_file_object::read_data_in_batches(uint64_t start_pos, cur_pos += num_read_bytes; dst_buf += num_read_bytes; } else if (num_read_bytes == -1) { - LOG_ERROR("Failed to read hdfs file {}, error: {}.", + LOG_ERROR("Failed to read HDFS file {}, error: {}.", file_name(), utils::safe_strerror(errno)); read_success = false; @@ -455,7 +485,7 @@ error_code hdfs_file_object::read_data_in_batches(uint64_t start_pos, } if (hdfsCloseFile(_service->get_fs(), read_file) != 0) { LOG_ERROR( - "Failed to close hdfs file {}, error: {}.", file_name(), utils::safe_strerror(errno)); + "Failed to close HDFS file {}, error: {}.", file_name(), utils::safe_strerror(errno)); return ERR_FS_INTERNAL; } if (read_success) { @@ -504,48 +534,53 @@ dsn::task_ptr hdfs_file_object::download(const download_request &req, auto download_background = [this, req, t]() { download_response resp; resp.downloaded_size = 0; - std::string read_buffer; - size_t read_length = 0; - resp.err = - read_data_in_batches(req.remote_pos, req.remote_length, read_buffer, read_length); - if (resp.err == ERR_OK) { - bool write_succ = false; - if (FLAGS_enable_direct_io) { - auto dio_file = std::make_unique(req.output_local_name); - do { - if (!dio_file->initialize()) { - break; - } - bool wr_ret = dio_file->write(read_buffer.c_str(), read_length); - if (!wr_ret) { - break; - } - if (dio_file->finalize()) { - resp.downloaded_size = read_length; - resp.file_md5 = utils::string_md5(read_buffer.c_str(), read_length); - write_succ = true; - } - } while (0); - } else { - std::ofstream out(req.output_local_name, - std::ios::binary | std::ios::out | std::ios::trunc); - if (out.is_open()) { - out.write(read_buffer.c_str(), read_length); - out.close(); - resp.downloaded_size = read_length; - resp.file_md5 = utils::string_md5(read_buffer.c_str(), read_length); - write_succ = true; - } + resp.err = ERR_OK; + bool write_succ = false; + std::string target_file = req.output_local_name; + do { + LOG_INFO("start to download from '{}' to '{}'", file_name(), target_file); + + std::string read_buffer; + size_t read_length = 0; + resp.err = + read_data_in_batches(req.remote_pos, req.remote_length, read_buffer, read_length); + if (resp.err != ERR_OK) { + LOG_ERROR("read data from remote '{}' failed, err = {}", file_name(), resp.err); + break; } - if (!write_succ) { - LOG_ERROR("HDFS download failed: fail to open localfile {} when download {}, " - "error: {}", - req.output_local_name, - file_name(), - utils::safe_strerror(errno)); - resp.err = ERR_FILE_OPERATION_FAILED; - resp.downloaded_size = 0; + + rocksdb::EnvOptions env_options; + env_options.use_direct_writes = FLAGS_enable_direct_io; + std::unique_ptr wfile; + auto s = rocksdb::Env::Default()->NewWritableFile(target_file, &wfile, env_options); + if (!s.ok()) { + LOG_ERROR("create local file '{}' failed, err = {}", target_file, s.ToString()); + break; + } + + s = wfile->Append(rocksdb::Slice(read_buffer.data(), read_length)); + if (!s.ok()) { + LOG_ERROR("append local file '{}' failed, err = {}", target_file, s.ToString()); + break; + } + + s = wfile->Fsync(); + if (!s.ok()) { + LOG_ERROR("fsync local file '{}' failed, err = {}", target_file, s.ToString()); + break; } + + resp.downloaded_size = read_length; + resp.file_md5 = utils::string_md5(read_buffer.c_str(), read_length); + write_succ = true; + } while (false); + + if (!write_succ) { + LOG_ERROR("HDFS download failed: fail to write local file {} when download {}", + target_file, + file_name()); + resp.err = ERR_FILE_OPERATION_FAILED; + resp.downloaded_size = 0; } t->enqueue_with(resp); release_ref(); diff --git a/src/block_service/test/config-test.ini b/src/block_service/test/config-test.ini index c1996d5518..2acb86e2bb 100644 --- a/src/block_service/test/config-test.ini +++ b/src/block_service/test/config-test.ini @@ -53,7 +53,7 @@ max_size = 150 worker_count = 8 [hdfs_test] -test_name_node = -test_backup_path = +test_name_node = +test_backup_path = num_test_file_lines = 4096 num_total_files_for_hdfs_concurrent_test = 64 diff --git a/src/block_service/test/hdfs_service_test.cpp b/src/block_service/test/hdfs_service_test.cpp index 6cfcfb379c..e2f63fe9aa 100644 --- a/src/block_service/test/hdfs_service_test.cpp +++ b/src/block_service/test/hdfs_service_test.cpp @@ -15,15 +15,20 @@ // specific language governing permissions and limitations // under the License. +#include +#include // IWYU pragma: no_include // IWYU pragma: no_include #include +#include +#include +#include #include #include #include #include #include -#include +#include #include #include #include @@ -35,27 +40,23 @@ #include "runtime/task/task.h" #include "runtime/task/task_code.h" #include "runtime/task/task_tracker.h" +#include "test_util/test_util.h" #include "utils/autoref_ptr.h" #include "utils/blob.h" #include "utils/enum_helper.h" +#include "utils/env.h" #include "utils/error_code.h" #include "utils/filesystem.h" #include "utils/flags.h" -#include "utils/strings.h" +#include "utils/fmt_logging.h" +#include "utils/test_macros.h" #include "utils/threadpool_code.h" using namespace dsn; using namespace dsn::dist::block_service; -static std::string example_name_node = ""; -static std::string example_backup_path = ""; -// Please modify following paras in 'config-test.ini' to enable hdfs_service_test, -// or hdfs_service_test will be skipped and return true. -DSN_DEFINE_string(hdfs_test, test_name_node, "", "hdfs name node"); -DSN_DEFINE_string(hdfs_test, - test_backup_path, - "", - "path for uploading and downloading test files"); +DSN_DEFINE_string(hdfs_test, test_name_node, "", "hdfs name node"); +DSN_DEFINE_string(hdfs_test, test_backup_path, "", "path for uploading and downloading test files"); DSN_DEFINE_uint32(hdfs_test, num_test_file_lines, 4096, "number of lines in test file"); DSN_DEFINE_uint32(hdfs_test, @@ -65,206 +66,243 @@ DSN_DEFINE_uint32(hdfs_test, DEFINE_TASK_CODE(LPC_TEST_HDFS, TASK_PRIORITY_HIGH, dsn::THREAD_POOL_DEFAULT) -class HDFSClientTest : public testing::Test +class HDFSClientTest : public pegasus::encrypt_data_test_base { protected: - virtual void SetUp() override; - virtual void TearDown() override; - void generate_test_file(const char *filename); - void write_test_files_async(task_tracker *tracker); - std::string name_node; - std::string backup_path; - std::string local_test_dir; - std::string test_data_str; + void generate_test_file(const std::string &filename); + void write_test_files_async(const std::string &local_test_path, task_tracker *tracker); }; -void HDFSClientTest::SetUp() +void HDFSClientTest::generate_test_file(const std::string &filename) { - name_node = FLAGS_test_name_node; - backup_path = FLAGS_test_backup_path; - local_test_dir = "test_dir"; - test_data_str = ""; - for (int i = 0; i < FLAGS_num_test_file_lines; ++i) { - test_data_str += "test"; - } -} - -void HDFSClientTest::TearDown() {} - -void HDFSClientTest::generate_test_file(const char *filename) -{ - // generate a local test file. int lines = FLAGS_num_test_file_lines; - FILE *fp = fopen(filename, "wb"); + std::unique_ptr wfile; + auto s = rocksdb::Env::Default()->NewWritableFile(filename, &wfile, rocksdb::EnvOptions()); + ASSERT_TRUE(s.ok()) << s.ToString(); for (int i = 0; i < lines; ++i) { - fprintf(fp, "%04d_this_is_a_simple_test_file\n", i); + rocksdb::Slice data(fmt::format("{:04}d_this_is_a_simple_test_file\n", i)); + s = wfile->Append(data); + ASSERT_TRUE(s.ok()) << s.ToString(); } - fclose(fp); + s = wfile->Fsync(); + ASSERT_TRUE(s.ok()) << s.ToString(); } -void HDFSClientTest::write_test_files_async(task_tracker *tracker) +void HDFSClientTest::write_test_files_async(const std::string &local_test_path, + task_tracker *tracker) { - dsn::utils::filesystem::create_directory(local_test_dir); + dsn::utils::filesystem::create_directory(local_test_path); + std::string local_test_data; + for (int i = 0; i < FLAGS_num_test_file_lines; ++i) { + local_test_data += "test"; + } for (int i = 0; i < 100; ++i) { - tasking::enqueue(LPC_TEST_HDFS, tracker, [this, i]() { + tasking::enqueue(LPC_TEST_HDFS, tracker, [&]() { // mock the writing process in hdfs_file_object::download(). - std::string test_file_name = local_test_dir + "/test_file_" + std::to_string(i); - std::ofstream out(test_file_name, std::ios::binary | std::ios::out | std::ios::trunc); - if (out.is_open()) { - out.write(test_data_str.c_str(), test_data_str.length()); - } - out.close(); + std::string test_file_name = local_test_path + "/test_file_" + std::to_string(i); + auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(), + rocksdb::Slice(local_test_data), + test_file_name, + /* should_sync */ true); + CHECK(s.ok(), "{}", s.ToString()); }); } } -TEST_F(HDFSClientTest, test_basic_operation) +// TODO(yingchun): ENCRYPTION: add enable encryption test. +INSTANTIATE_TEST_CASE_P(, HDFSClientTest, ::testing::Values(false)); + +TEST_P(HDFSClientTest, test_hdfs_read_write) +{ + if (strlen(FLAGS_test_name_node) == 0 || strlen(FLAGS_test_backup_path) == 0) { + // TODO(yingchun): use GTEST_SKIP after upgrading gtest. + std::cout << "Set hdfs_test.* configs in config-test.ini to enable hdfs_service_test." + << std::endl; + return; + } + + auto s = std::make_shared(); + ASSERT_EQ(dsn::ERR_OK, s->initialize({FLAGS_test_name_node, FLAGS_test_backup_path})); + + const std::string kRemoteTestPath = "hdfs_client_test"; + const std::string kRemoteTestRWFile = kRemoteTestPath + "/test_write_file"; + const std::string kTestBuffer = "write_hello_world_for_test"; + const int kTestBufferLength = kTestBuffer.size(); + + // 1. clean up all old file in remote test directory. + printf("clean up all old files.\n"); + remove_path_response rem_resp; + s->remove_path(remove_path_request{kRemoteTestPath, true}, + LPC_TEST_HDFS, + [&rem_resp](const remove_path_response &resp) { rem_resp = resp; }, + nullptr) + ->wait(); + ASSERT_TRUE(dsn::ERR_OK == rem_resp.err || dsn::ERR_OBJECT_NOT_FOUND == rem_resp.err); + + // 2. create file. + printf("test write operation.\n"); + create_file_response cf_resp; + s->create_file(create_file_request{kRemoteTestRWFile, false}, + LPC_TEST_HDFS, + [&cf_resp](const create_file_response &r) { cf_resp = r; }, + nullptr) + ->wait(); + ASSERT_EQ(dsn::ERR_OK, cf_resp.err); + + // 3. write file. + dsn::blob bb(kTestBuffer.c_str(), 0, kTestBufferLength); + write_response w_resp; + cf_resp.file_handle + ->write(write_request{bb}, + LPC_TEST_HDFS, + [&w_resp](const write_response &w) { w_resp = w; }, + nullptr) + ->wait(); + ASSERT_EQ(dsn::ERR_OK, w_resp.err); + ASSERT_EQ(kTestBufferLength, w_resp.written_size); + ASSERT_EQ(kTestBufferLength, cf_resp.file_handle->get_size()); + + // 4. read file. + printf("test read just written contents.\n"); + read_response r_resp; + cf_resp.file_handle + ->read(read_request{0, -1}, + LPC_TEST_HDFS, + [&r_resp](const read_response &r) { r_resp = r; }, + nullptr) + ->wait(); + ASSERT_EQ(dsn::ERR_OK, r_resp.err); + ASSERT_EQ(kTestBufferLength, r_resp.buffer.length()); + ASSERT_EQ(kTestBuffer, r_resp.buffer.to_string()); + + // 5. partial read. + const uint64_t kOffset = 5; + const int64_t kSize = 10; + cf_resp.file_handle + ->read(read_request{kOffset, kSize}, + LPC_TEST_HDFS, + [&r_resp](const read_response &r) { r_resp = r; }, + nullptr) + ->wait(); + ASSERT_EQ(dsn::ERR_OK, r_resp.err); + ASSERT_EQ(kSize, r_resp.buffer.length()); + ASSERT_EQ(kTestBuffer.substr(kOffset, kSize), r_resp.buffer.to_string()); +} + +TEST_P(HDFSClientTest, test_upload_and_download) { - if (name_node == example_name_node || backup_path == example_backup_path) { + if (strlen(FLAGS_test_name_node) == 0 || strlen(FLAGS_test_backup_path) == 0) { + // TODO(yingchun): use GTEST_SKIP after upgrading gtest. + std::cout << "Set hdfs_test.* configs in config-test.ini to enable hdfs_service_test." + << std::endl; return; } - std::vector args = {name_node, backup_path}; - std::shared_ptr s = std::make_shared(); - ASSERT_EQ(dsn::ERR_OK, s->initialize(args)); + auto s = std::make_shared(); + ASSERT_EQ(dsn::ERR_OK, s->initialize({FLAGS_test_name_node, FLAGS_test_backup_path})); - std::string local_test_file = "test_file"; - std::string remote_test_file = "hdfs_client_test/test_file"; - int64_t test_file_size = 0; + const std::string kLocalFile = "test_file"; + const std::string kRemoteTestPath = "hdfs_client_test"; + const std::string kRemoteTestFile = kRemoteTestPath + "/test_file"; - generate_test_file(local_test_file.c_str()); - dsn::utils::filesystem::file_size(local_test_file, test_file_size); + // 0. generate test file. + NO_FATALS(generate_test_file(kLocalFile)); + int64_t local_file_size = 0; + ASSERT_TRUE(dsn::utils::filesystem::file_size( + kLocalFile, dsn::utils::FileDataType::kSensitive, local_file_size)); + std::string local_file_md5sum; + dsn::utils::filesystem::md5sum(kLocalFile, local_file_md5sum); - // fisrt clean up all old file in test directory. + // 1. clean up all old file in remote test directory. printf("clean up all old files.\n"); remove_path_response rem_resp; - s->remove_path(remove_path_request{"hdfs_client_test", true}, + s->remove_path(remove_path_request{kRemoteTestPath, true}, LPC_TEST_HDFS, [&rem_resp](const remove_path_response &resp) { rem_resp = resp; }, nullptr) ->wait(); ASSERT_TRUE(dsn::ERR_OK == rem_resp.err || dsn::ERR_OBJECT_NOT_FOUND == rem_resp.err); - // test upload file. - printf("create and upload: %s.\n", remote_test_file.c_str()); + // 2. create file. + printf("create and upload: %s.\n", kRemoteTestFile.c_str()); create_file_response cf_resp; - s->create_file(create_file_request{remote_test_file, true}, + s->create_file(create_file_request{kRemoteTestFile, true}, LPC_TEST_HDFS, [&cf_resp](const create_file_response &r) { cf_resp = r; }, nullptr) ->wait(); - ASSERT_EQ(cf_resp.err, dsn::ERR_OK); + ASSERT_EQ(dsn::ERR_OK, cf_resp.err); + + // 3. upload file. upload_response u_resp; cf_resp.file_handle - ->upload(upload_request{local_test_file}, + ->upload(upload_request{kLocalFile}, LPC_TEST_HDFS, [&u_resp](const upload_response &r) { u_resp = r; }, nullptr) ->wait(); ASSERT_EQ(dsn::ERR_OK, u_resp.err); - ASSERT_EQ(test_file_size, cf_resp.file_handle->get_size()); + ASSERT_EQ(local_file_size, cf_resp.file_handle->get_size()); - // test list directory. + // 4. list directory. ls_response l_resp; - s->list_dir(ls_request{"hdfs_client_test"}, + s->list_dir(ls_request{kRemoteTestPath}, LPC_TEST_HDFS, [&l_resp](const ls_response &resp) { l_resp = resp; }, nullptr) ->wait(); ASSERT_EQ(dsn::ERR_OK, l_resp.err); ASSERT_EQ(1, l_resp.entries->size()); - ASSERT_EQ("test_file", l_resp.entries->at(0).entry_name); + ASSERT_EQ(kLocalFile, l_resp.entries->at(0).entry_name); ASSERT_EQ(false, l_resp.entries->at(0).is_directory); - // test download file. + // 5. download file. download_response d_resp; - printf("test download %s.\n", remote_test_file.c_str()); - s->create_file(create_file_request{remote_test_file, false}, + printf("test download %s.\n", kRemoteTestFile.c_str()); + s->create_file(create_file_request{kRemoteTestFile, false}, LPC_TEST_HDFS, [&cf_resp](const create_file_response &resp) { cf_resp = resp; }, nullptr) ->wait(); ASSERT_EQ(dsn::ERR_OK, cf_resp.err); - ASSERT_EQ(test_file_size, cf_resp.file_handle->get_size()); - std::string local_file_for_download = "test_file_d"; + ASSERT_EQ(local_file_size, cf_resp.file_handle->get_size()); + std::string kLocalDownloadFile = "test_file_d"; cf_resp.file_handle - ->download(download_request{local_file_for_download, 0, -1}, + ->download(download_request{kLocalDownloadFile, 0, -1}, LPC_TEST_HDFS, [&d_resp](const download_response &resp) { d_resp = resp; }, nullptr) ->wait(); ASSERT_EQ(dsn::ERR_OK, d_resp.err); - ASSERT_EQ(test_file_size, d_resp.downloaded_size); - - // compare local_test_file and local_file_for_download. - int64_t file_size = 0; - dsn::utils::filesystem::file_size(local_file_for_download, file_size); - ASSERT_EQ(test_file_size, file_size); - std::string test_file_md5sum; - dsn::utils::filesystem::md5sum(local_test_file, test_file_md5sum); - std::string downloaded_file_md5sum; - dsn::utils::filesystem::md5sum(local_file_for_download, downloaded_file_md5sum); - ASSERT_EQ(test_file_md5sum, downloaded_file_md5sum); - - // test read and write. - printf("test read write operation.\n"); - std::string test_write_file = "hdfs_client_test/test_write_file"; - s->create_file(create_file_request{test_write_file, false}, - LPC_TEST_HDFS, - [&cf_resp](const create_file_response &r) { cf_resp = r; }, - nullptr) - ->wait(); - ASSERT_EQ(dsn::ERR_OK, cf_resp.err); - const char *test_buffer = "write_hello_world_for_test"; - int length = strlen(test_buffer); - dsn::blob bb(test_buffer, 0, length); - write_response w_resp; - cf_resp.file_handle - ->write(write_request{bb}, - LPC_TEST_HDFS, - [&w_resp](const write_response &w) { w_resp = w; }, - nullptr) - ->wait(); - ASSERT_EQ(dsn::ERR_OK, w_resp.err); - ASSERT_EQ(length, w_resp.written_size); - ASSERT_EQ(length, cf_resp.file_handle->get_size()); - printf("test read just written contents.\n"); - read_response r_resp; - cf_resp.file_handle - ->read(read_request{0, -1}, - LPC_TEST_HDFS, - [&r_resp](const read_response &r) { r_resp = r; }, - nullptr) - ->wait(); - ASSERT_EQ(dsn::ERR_OK, r_resp.err); - ASSERT_EQ(length, r_resp.buffer.length()); - ASSERT_TRUE(dsn::utils::mequals(r_resp.buffer.data(), test_buffer, length)); - - // test partitial read. - cf_resp.file_handle - ->read(read_request{5, 10}, - LPC_TEST_HDFS, - [&r_resp](const read_response &r) { r_resp = r; }, - nullptr) - ->wait(); - ASSERT_EQ(dsn::ERR_OK, r_resp.err); - ASSERT_EQ(10, r_resp.buffer.length()); - ASSERT_TRUE(dsn::utils::mequals(r_resp.buffer.data(), test_buffer + 5, 10)); - - // clean up local test files. - utils::filesystem::remove_path(local_test_file); - utils::filesystem::remove_path(local_file_for_download); + ASSERT_EQ(local_file_size, d_resp.downloaded_size); + + // 6. compare kLocalFile and kLocalDownloadFile. + // 6.1 check file size. + int64_t local_download_file_size = 0; + ASSERT_TRUE(dsn::utils::filesystem::file_size( + kLocalDownloadFile, dsn::utils::FileDataType::kSensitive, local_download_file_size)); + ASSERT_EQ(local_file_size, local_download_file_size); + // 6.2 check file md5sum. + std::string local_download_file_md5sum; + dsn::utils::filesystem::md5sum(kLocalDownloadFile, local_download_file_md5sum); + ASSERT_EQ(local_file_md5sum, local_download_file_md5sum); + + // 7. clean up local test files. + utils::filesystem::remove_path(kLocalFile); + utils::filesystem::remove_path(kLocalDownloadFile); } -TEST_F(HDFSClientTest, test_concurrent_upload_download) +TEST_P(HDFSClientTest, test_concurrent_upload_download) { - if (name_node == example_name_node || backup_path == example_backup_path) { + if (strlen(FLAGS_test_name_node) == 0 || strlen(FLAGS_test_backup_path) == 0) { + // TODO(yingchun): use GTEST_SKIP after upgrading gtest. + std::cout << "Set hdfs_test.* configs in config-test.ini to enable hdfs_service_test." + << std::endl; return; } - std::vector args = {name_node, backup_path}; - std::shared_ptr s = std::make_shared(); - ASSERT_EQ(dsn::ERR_OK, s->initialize(args)); + auto s = std::make_shared(); + ASSERT_EQ(dsn::ERR_OK, s->initialize({FLAGS_test_name_node, FLAGS_test_backup_path})); int total_files = FLAGS_num_total_files_for_hdfs_concurrent_test; std::vector local_file_names; @@ -282,11 +320,12 @@ TEST_F(HDFSClientTest, test_concurrent_upload_download) // generate test files. for (int i = 0; i < total_files; ++i) { std::string file_name = "randomfile" + std::to_string(i); - generate_test_file(file_name.c_str()); + NO_FATALS(generate_test_file(file_name)); int64_t file_size = 0; - dsn::utils::filesystem::file_size(file_name, file_size); + ASSERT_TRUE(dsn::utils::filesystem::file_size( + file_name, dsn::utils::FileDataType::kSensitive, file_size)); std::string md5sum; - dsn::utils::filesystem::md5sum(file_name, md5sum); + ASSERT_EQ(ERR_OK, dsn::utils::filesystem::md5sum(file_name, md5sum)); local_file_names.emplace_back(file_name); remote_file_names.emplace_back("hdfs_concurrent_test/" + file_name); @@ -385,13 +424,16 @@ TEST_F(HDFSClientTest, test_concurrent_upload_download) } } -TEST_F(HDFSClientTest, test_rename_path_while_writing) +TEST_P(HDFSClientTest, test_rename_path_while_writing) { + std::string kLocalTestPath = "test_dir"; + task_tracker tracker; - write_test_files_async(&tracker); + write_test_files_async(kLocalTestPath, &tracker); usleep(100); - std::string rename_dir = "rename_dir." + std::to_string(dsn_now_ms()); + + std::string kLocalRenamedTestPath = "rename_dir." + std::to_string(dsn_now_ms()); // rename succeed but writing failed. - ASSERT_TRUE(dsn::utils::filesystem::rename_path(local_test_dir, rename_dir)); + ASSERT_TRUE(dsn::utils::filesystem::rename_path(kLocalTestPath, kLocalRenamedTestPath)); tracker.cancel_outstanding_tasks(); } diff --git a/src/utils/env.cpp b/src/utils/env.cpp index 329406117c..7e49d999a5 100644 --- a/src/utils/env.cpp +++ b/src/utils/env.cpp @@ -49,6 +49,12 @@ DSN_DEFINE_string(pegasus.server, "The encryption method to use in the filesystem. Now " "supports AES128CTR, AES192CTR, AES256CTR and SM4CTR."); +DSN_DEFINE_bool(replication, + enable_direct_io, + false, + "Whether to enable direct I/O when download files"); +DSN_TAG_VARIABLE(enable_direct_io, FT_MUTABLE); + namespace dsn { namespace utils { @@ -89,6 +95,7 @@ rocksdb::Status do_copy_file(const std::string &src_fname, uint64_t *total_size) { rocksdb::EnvOptions src_env_options; + src_env_options.use_direct_reads = FLAGS_enable_direct_io; std::unique_ptr src_file; auto s = dsn::utils::PegasusEnv(src_type)->NewSequentialFile(src_fname, &src_file, src_env_options); @@ -105,6 +112,7 @@ rocksdb::Status do_copy_file(const std::string &src_fname, } rocksdb::EnvOptions dst_env_options; + dst_env_options.use_direct_writes = FLAGS_enable_direct_io; std::unique_ptr dst_file; s = dsn::utils::PegasusEnv(dst_type)->NewWritableFile(dst_fname, &dst_file, dst_env_options); LOG_AND_RETURN_NOT_RDB_OK(WARNING, s, "failed to open file {} for writing", dst_fname); diff --git a/src/utils/env.h b/src/utils/env.h index 839bd81ab3..cdaf16aaa5 100644 --- a/src/utils/env.h +++ b/src/utils/env.h @@ -19,8 +19,8 @@ #include #include -#include -#include +#include +#include #include namespace dsn {