diff --git a/src/dist/replication/lib/log_file.cpp b/src/dist/replication/lib/log_file.cpp new file mode 100644 index 0000000000..63e83a4684 --- /dev/null +++ b/src/dist/replication/lib/log_file.cpp @@ -0,0 +1,405 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "log_file.h" +#include "log_file_stream.h" + +#include +#include +#include + +namespace dsn { +namespace replication { + +log_file::~log_file() { close(); } +/*static */ log_file_ptr log_file::open_read(const char *path, /*out*/ error_code &err) +{ + char splitters[] = {'\\', '/', 0}; + std::string name = utils::get_last_component(std::string(path), splitters); + + // log.index.start_offset + if (name.length() < strlen("log.") || name.substr(0, strlen("log.")) != std::string("log.")) { + err = ERR_INVALID_PARAMETERS; + dwarn("invalid log path %s", path); + return nullptr; + } + + auto pos = name.find_first_of('.'); + dassert(pos != std::string::npos, "invalid log_file, name = %s", name.c_str()); + auto pos2 = name.find_first_of('.', pos + 1); + if (pos2 == std::string::npos) { + err = ERR_INVALID_PARAMETERS; + dwarn("invalid log path %s", path); + return nullptr; + } + + /* so the log file format is log.index_str.start_offset_str */ + std::string index_str = name.substr(pos + 1, pos2 - pos - 1); + std::string start_offset_str = name.substr(pos2 + 1); + if (index_str.empty() || start_offset_str.empty()) { + err = ERR_INVALID_PARAMETERS; + dwarn("invalid log path %s", path); + return nullptr; + } + + char *p = nullptr; + int index = static_cast(strtol(index_str.c_str(), &p, 10)); + if (*p != 0) { + err = ERR_INVALID_PARAMETERS; + dwarn("invalid log path %s", path); + return nullptr; + } + int64_t start_offset = static_cast(strtoll(start_offset_str.c_str(), &p, 10)); + if (*p != 0) { + err = ERR_INVALID_PARAMETERS; + dwarn("invalid log path %s", path); + return nullptr; + } + + disk_file *hfile = file::open(path, O_RDONLY | O_BINARY, 0); + if (!hfile) { + err = ERR_FILE_OPERATION_FAILED; + dwarn("open log file %s failed", path); + return nullptr; + } + + auto lf = new log_file(path, hfile, index, start_offset, true); + lf->reset_stream(); + blob hdr_blob; + err = lf->read_next_log_block(hdr_blob); + if (err == ERR_INVALID_DATA || err == ERR_INCOMPLETE_DATA || err == ERR_HANDLE_EOF || + err == ERR_FILE_OPERATION_FAILED) { + std::string removed = std::string(path) + ".removed"; + derror("read first log entry of file %s failed, err = %s. Rename the file to %s", + path, + err.to_string(), + removed.c_str()); + delete lf; + lf = nullptr; + + // rename file on failure + dsn::utils::filesystem::rename_path(path, removed); + + return nullptr; + } + + binary_reader reader(std::move(hdr_blob)); + lf->read_file_header(reader); + if (!lf->is_right_header()) { + std::string removed = std::string(path) + ".removed"; + derror("invalid log file header of file %s. Rename the file to %s", path, removed.c_str()); + delete lf; + lf = nullptr; + + // rename file on failure + dsn::utils::filesystem::rename_path(path, removed); + + err = ERR_INVALID_DATA; + return nullptr; + } + + err = ERR_OK; + return lf; +} + +/*static*/ log_file_ptr log_file::create_write(const char *dir, int index, int64_t start_offset) +{ + char path[512]; + sprintf(path, "%s/log.%d.%" PRId64, dir, index, start_offset); + + if (dsn::utils::filesystem::path_exists(std::string(path))) { + dwarn("log file %s already exist", path); + return nullptr; + } + + disk_file *hfile = file::open(path, O_RDWR | O_CREAT | O_BINARY, 0666); + if (!hfile) { + dwarn("create log %s failed", path); + return nullptr; + } + + return new log_file(path, hfile, index, start_offset, false); +} + +log_file::log_file( + const char *path, disk_file *handle, int index, int64_t start_offset, bool is_read) + : _is_read(is_read) +{ + _start_offset = start_offset; + _end_offset = start_offset; + _handle = handle; + _path = path; + _index = index; + _crc32 = 0; + _last_write_time = 0; + memset(&_header, 0, sizeof(_header)); + + if (is_read) { + int64_t sz; + if (!dsn::utils::filesystem::file_size(_path, sz)) { + dassert(false, "fail to get file size of %s.", _path.c_str()); + } + _end_offset += sz; + } +} + +void log_file::close() +{ + zauto_lock lock(_write_lock); + + //_stream implicitly refer to _handle so it needs to be cleaned up first. + // TODO: We need better abstraction to avoid those manual stuffs.. + _stream.reset(nullptr); + if (_handle) { + error_code err = file::close(_handle); + dassert(err == ERR_OK, "file::close failed, err = %s", err.to_string()); + + _handle = nullptr; + } +} + +void log_file::flush() const +{ + dassert(!_is_read, "log file must be of write mode"); + zauto_lock lock(_write_lock); + + if (_handle) { + error_code err = file::flush(_handle); + dassert(err == ERR_OK, "file::flush failed, err = %s", err.to_string()); + } +} + +error_code log_file::read_next_log_block(/*out*/ ::dsn::blob &bb) +{ + dassert(_is_read, "log file must be of read mode"); + auto err = _stream->read_next(sizeof(log_block_header), bb); + if (err != ERR_OK || bb.length() != sizeof(log_block_header)) { + if (err == ERR_OK || err == ERR_HANDLE_EOF) { + // if read_count is 0, then we meet the end of file + err = (bb.length() == 0 ? ERR_HANDLE_EOF : ERR_INCOMPLETE_DATA); + } else { + derror("read data block header failed, size = %d vs %d, err = %s", + bb.length(), + (int)sizeof(log_block_header), + err.to_string()); + } + + return err; + } + log_block_header hdr = *reinterpret_cast(bb.data()); + + if (hdr.magic != 0xdeadbeef) { + derror("invalid data header magic: 0x%x", hdr.magic); + return ERR_INVALID_DATA; + } + + err = _stream->read_next(hdr.length, bb); + if (err != ERR_OK || hdr.length != bb.length()) { + derror("read data block body failed, size = %d vs %d, err = %s", + bb.length(), + (int)hdr.length, + err.to_string()); + + if (err == ERR_OK || err == ERR_HANDLE_EOF) { + // because already read log_block_header above, so here must be imcomplete data + err = ERR_INCOMPLETE_DATA; + } + + return err; + } + + auto crc = dsn::utils::crc32_calc( + static_cast(bb.data()), static_cast(hdr.length), _crc32); + if (crc != hdr.body_crc) { + derror("crc checking failed"); + return ERR_INVALID_DATA; + } + _crc32 = crc; + + return ERR_OK; +} + +aio_task_ptr log_file::commit_log_block(log_block &block, + int64_t offset, + dsn::task_code evt, + dsn::task_tracker *tracker, + aio_handler &&callback, + int hash) +{ + log_appender pending(offset, block); + return commit_log_blocks(pending, evt, tracker, std::move(callback), hash); +} +aio_task_ptr log_file::commit_log_blocks(log_appender &pending, + dsn::task_code evt, + dsn::task_tracker *tracker, + aio_handler &&callback, + int hash) +{ + dassert(!_is_read, "log file must be of write mode"); + dcheck_gt(pending.size(), 0); + + zauto_lock lock(_write_lock); + if (!_handle) { + return nullptr; + } + + auto size = (long long)pending.size(); + size_t vec_size = pending.blob_count(); + std::vector buffer_vector(vec_size); + int buffer_idx = 0; + for (log_block &block : pending.all_blocks()) { + int64_t local_offset = block.start_offset() - start_offset(); + auto hdr = reinterpret_cast(const_cast(block.front().data())); + + dassert(hdr->magic == 0xdeadbeef, ""); + hdr->local_offset = local_offset; + hdr->length = static_cast(block.size() - sizeof(log_block_header)); + hdr->body_crc = _crc32; + + for (int i = 0; i < block.data().size(); i++) { + auto &blk = block.data()[i]; + buffer_vector[buffer_idx].buffer = + reinterpret_cast(const_cast(blk.data())); + buffer_vector[buffer_idx].size = blk.length(); + + // skip block header + if (i > 0) { + hdr->body_crc = dsn::utils::crc32_calc(static_cast(blk.data()), + static_cast(blk.length()), + hdr->body_crc); + } + buffer_idx++; + } + _crc32 = hdr->body_crc; + } + + aio_task_ptr tsk; + int64_t local_offset = pending.start_offset() - start_offset(); + if (callback) { + tsk = file::write_vector(_handle, + buffer_vector.data(), + vec_size, + static_cast(local_offset), + evt, + tracker, + std::forward(callback), + hash); + } else { + tsk = file::write_vector(_handle, + buffer_vector.data(), + vec_size, + static_cast(local_offset), + evt, + tracker, + nullptr, + hash); + } + + _end_offset.fetch_add(size); + return tsk; +} + +void log_file::reset_stream(size_t offset /*default = 0*/) +{ + if (_stream == nullptr) { + _stream.reset(new file_streamer(_handle, offset)); + } else { + _stream->reset(offset); + } + if (offset == 0) { + _crc32 = 0; + } +} + +decree log_file::previous_log_max_decree(const dsn::gpid &pid) +{ + auto it = _previous_log_max_decrees.find(pid); + return it == _previous_log_max_decrees.end() ? 0 : it->second.max_decree; +} + +int log_file::read_file_header(binary_reader &reader) +{ + /* + * the log file header structure: + * log_file_header + + * count + count * (gpid + replica_log_info) + */ + reader.read_pod(_header); + + int count; + reader.read(count); + for (int i = 0; i < count; i++) { + gpid gpid; + replica_log_info info; + + reader.read_pod(gpid); + reader.read_pod(info); + + _previous_log_max_decrees[gpid] = info; + } + + return get_file_header_size(); +} + +int log_file::get_file_header_size() const +{ + int count = static_cast(_previous_log_max_decrees.size()); + return static_cast(sizeof(log_file_header) + sizeof(count) + + (sizeof(gpid) + sizeof(replica_log_info)) * count); +} + +bool log_file::is_right_header() const +{ + return _header.magic == 0xdeadbeef && _header.start_global_offset == _start_offset; +} + +int log_file::write_file_header(binary_writer &writer, const replica_log_info_map &init_max_decrees) +{ + /* + * the log file header structure: + * log_file_header + + * count + count * (gpid + replica_log_info) + */ + _previous_log_max_decrees = init_max_decrees; + + _header.magic = 0xdeadbeef; + _header.version = 0x1; + _header.start_global_offset = start_offset(); + + writer.write_pod(_header); + + int count = static_cast(_previous_log_max_decrees.size()); + writer.write(count); + for (auto &kv : _previous_log_max_decrees) { + writer.write_pod(kv.first); + writer.write_pod(kv.second); + } + + return get_file_header_size(); +} + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/log_file.h b/src/dist/replication/lib/log_file.h new file mode 100644 index 0000000000..6219b5dcc6 --- /dev/null +++ b/src/dist/replication/lib/log_file.h @@ -0,0 +1,216 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#pragma once + +#include "log_block.h" + +#include + +namespace dsn { +namespace replication { + +// each log file has a log_file_header stored at the beginning of the first block's data content +struct log_file_header +{ + int32_t magic; // 0xdeadbeef + int32_t version; // current 0x1 + int64_t + start_global_offset; // start offset in the global space, equals to the file name's postfix +}; + +// a structure to record replica's log info +struct replica_log_info +{ + int64_t max_decree; + int64_t valid_start_offset; // valid start offset in global space + replica_log_info(int64_t d, int64_t o) + { + max_decree = d; + valid_start_offset = o; + } + replica_log_info() + { + max_decree = 0; + valid_start_offset = 0; + } + bool operator==(const replica_log_info &o) const + { + return max_decree == o.max_decree && valid_start_offset == o.valid_start_offset; + } +}; + +typedef std::unordered_map replica_log_info_map; + +class log_file; +typedef dsn::ref_ptr log_file_ptr; + +// +// the log file is structured with sequences of log_blocks, +// each block consists of the log_block_header + log_content, +// and the first block contains the log_file_header at the beginning +// +// the class is not thread safe +// +class log_file : public ref_counter +{ +public: + ~log_file(); + + // + // file operations + // + + // open the log file for read + // 'path' should be in format of log.{index}.{start_offset}, where: + // - index: the index of the log file, start from 1 + // - start_offset: start offset in the global space + // returns: + // - non-null if open succeed + // - null if open failed + static log_file_ptr open_read(const char *path, /*out*/ error_code &err); + + // open the log file for write + // the file path is '{dir}/log.{index}.{start_offset}' + // returns: + // - non-null if open succeed + // - null if open failed + static log_file_ptr create_write(const char *dir, int index, int64_t start_offset); + + // close the log file + void close(); + + // flush the log file + void flush() const; + + // + // read routines + // + + // sync read the next log entry from the file + // the entry data is start from the 'local_offset' of the file + // the result is passed out by 'bb', not including the log_block_header + // return error codes: + // - ERR_OK + // - ERR_HANDLE_EOF + // - ERR_INCOMPLETE_DATA + // - ERR_INVALID_DATA + // - other io errors caused by file read operator + error_code read_next_log_block(/*out*/ ::dsn::blob &bb); + + // + // write routines + // + + // async write log entry into the file + // 'block' is the date to be written + // 'offset' is start offset of the entry in the global space + // 'evt' is to indicate which thread pool to execute the callback + // 'callback_host' is used to get tracer + // 'callback' is to indicate the callback handler + // 'hash' helps to choose which thread in the thread pool to execute the callback + // returns: + // - non-null if io task is in pending + // - null if error + dsn::aio_task_ptr commit_log_block(log_block &block, + int64_t offset, + dsn::task_code evt, + dsn::task_tracker *tracker, + aio_handler &&callback, + int hash); + dsn::aio_task_ptr commit_log_blocks(log_appender &pending, + dsn::task_code evt, + dsn::task_tracker *tracker, + aio_handler &&callback, + int hash); + + // + // others + // + + // Reset file_streamer to point to `offset`. + // offset=0 means the start of this log file. + void reset_stream(size_t offset = 0); + // end offset in the global space: end_offset = start_offset + file_size + int64_t end_offset() const { return _end_offset.load(); } + // start offset in the global space + int64_t start_offset() const { return _start_offset; } + // file index + int index() const { return _index; } + // file path + const std::string &path() const { return _path; } + // previous decrees + const replica_log_info_map &previous_log_max_decrees() { return _previous_log_max_decrees; } + // previous decree for speicified gpid + decree previous_log_max_decree(const gpid &pid); + // file header + log_file_header &header() { return _header; } + + // read file header from reader, return byte count consumed + int read_file_header(binary_reader &reader); + // write file header to writer, return byte count written + int write_file_header(binary_writer &writer, const replica_log_info_map &init_max_decrees); + // get serialized size of current file header + int get_file_header_size() const; + // if the file header is valid + bool is_right_header() const; + + // set & get last write time, used for gc + void set_last_write_time(uint64_t last_write_time) { _last_write_time = last_write_time; } + uint64_t last_write_time() const { return _last_write_time; } + + const disk_file *file_handle() const { return _handle; } + +private: + // make private, user should create log_file through open_read() or open_write() + log_file(const char *path, disk_file *handle, int index, int64_t start_offset, bool is_read); + +private: + friend class mock_log_file; + + uint32_t _crc32; + int64_t _start_offset; // start offset in the global space + std::atomic + _end_offset; // end offset in the global space: end_offset = start_offset + file_size + class file_streamer; + std::unique_ptr _stream; + disk_file *_handle; // file handle + const bool _is_read; // if opened for read or write + std::string _path; // file path + int _index; // file index + log_file_header _header; // file header + uint64_t _last_write_time; // seconds from epoch time + + mutable zlock _write_lock; + + // this data is used for garbage collection, and is part of file header. + // for read, the value is read from file header. + // for write, the value is set by write_file_header(). + replica_log_info_map _previous_log_max_decrees; +}; + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/log_file_stream.h b/src/dist/replication/lib/log_file_stream.h new file mode 100644 index 0000000000..cb9c6d19e9 --- /dev/null +++ b/src/dist/replication/lib/log_file_stream.h @@ -0,0 +1,196 @@ + + +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#pragma once + +#include "log_file.h" + +namespace dsn { +namespace replication { + +// log_file::file_streamer +class log_file::file_streamer +{ +public: + explicit file_streamer(disk_file *fd, size_t file_offset) + : _file_dispatched_bytes(file_offset), _file_handle(fd) + { + _current_buffer = _buffers + 0; + _next_buffer = _buffers + 1; + fill_buffers(); + } + ~file_streamer() + { + _current_buffer->wait_ongoing_task(); + _next_buffer->wait_ongoing_task(); + } + // try to reset file_offset + void reset(size_t file_offset) + { + _current_buffer->wait_ongoing_task(); + _next_buffer->wait_ongoing_task(); + // fast path if we can just move the cursor + if (_current_buffer->_file_offset_of_buffer <= file_offset && + _current_buffer->_file_offset_of_buffer + _current_buffer->_end > file_offset) { + _current_buffer->_begin = file_offset - _current_buffer->_file_offset_of_buffer; + } else { + _current_buffer->_begin = _current_buffer->_end = _next_buffer->_begin = + _next_buffer->_end = 0; + _file_dispatched_bytes = file_offset; + } + fill_buffers(); + } + + // TODO(wutao1): use string_view instead of using blob. + // WARNING: the resulted blob is not guaranteed to be reference counted. + // possible error_code: + // ERR_OK result would always size as expected + // ERR_HANDLE_EOF if there are not enough data in file. result would still be + // filled with possible data + // ERR_FILE_OPERATION_FAILED filesystem failure + error_code read_next(size_t size, /*out*/ blob &result) + { + binary_writer writer(size); + +#define TRY(x) \ + do { \ + auto _x = (x); \ + if (_x != ERR_OK) { \ + result = writer.get_current_buffer(); \ + return _x; \ + } \ + } while (0) + + TRY(_current_buffer->wait_ongoing_task()); + if (size < _current_buffer->length()) { + result.assign(_current_buffer->_buffer.get(), _current_buffer->_begin, size); + _current_buffer->_begin += size; + } else { + _current_buffer->drain(writer); + // we can now assign result since writer must have allocated a buffer. + dassert(writer.total_size() != 0, "writer.total_size = %d", writer.total_size()); + if (size > writer.total_size()) { + TRY(_next_buffer->wait_ongoing_task()); + _next_buffer->consume(writer, + std::min(size - writer.total_size(), _next_buffer->length())); + // We hope that this never happens, it would deteriorate performance + if (size > writer.total_size()) { + auto task = + file::read(_file_handle, + writer.get_current_buffer().buffer().get() + writer.total_size(), + size - writer.total_size(), + _file_dispatched_bytes, + LPC_AIO_IMMEDIATE_CALLBACK, + nullptr, + nullptr); + task->wait(); + writer.write_empty(task->get_transferred_size()); + _file_dispatched_bytes += task->get_transferred_size(); + TRY(task->error()); + } + } + result = writer.get_current_buffer(); + } + fill_buffers(); + return ERR_OK; +#undef TRY + } + +private: + void fill_buffers() + { + while (!_current_buffer->_have_ongoing_task && _current_buffer->empty()) { + _current_buffer->_begin = _current_buffer->_end = 0; + _current_buffer->_file_offset_of_buffer = _file_dispatched_bytes; + _current_buffer->_have_ongoing_task = true; + _current_buffer->_task = file::read(_file_handle, + _current_buffer->_buffer.get(), + block_size_bytes, + _file_dispatched_bytes, + LPC_AIO_IMMEDIATE_CALLBACK, + nullptr, + nullptr); + _file_dispatched_bytes += block_size_bytes; + std::swap(_current_buffer, _next_buffer); + } + } + + // buffer size, in bytes + // TODO(wutao1): call it BLOCK_BYTES_SIZE + static constexpr size_t block_size_bytes = 1024 * 1024; // 1MB + struct buffer_t + { + std::unique_ptr _buffer; // with block_size + size_t _begin, _end; // [buffer[begin]..buffer[end]) contains unconsumed_data + size_t _file_offset_of_buffer; // file offset projected to buffer[0] + bool _have_ongoing_task; + aio_task_ptr _task; + + buffer_t() + : _buffer(new char[block_size_bytes]), + _begin(0), + _end(0), + _file_offset_of_buffer(0), + _have_ongoing_task(false) + { + } + size_t length() const { return _end - _begin; } + bool empty() const { return length() == 0; } + void consume(binary_writer &dest, size_t len) + { + dest.write(_buffer.get() + _begin, len); + _begin += len; + } + size_t drain(binary_writer &dest) + { + auto len = length(); + consume(dest, len); + return len; + } + error_code wait_ongoing_task() + { + if (_have_ongoing_task) { + _task->wait(); + _have_ongoing_task = false; + _end += _task->get_transferred_size(); + dassert(_end <= block_size_bytes, "invalid io_size."); + return _task->error(); + } else { + return ERR_OK; + } + } + } _buffers[2]; + buffer_t *_current_buffer, *_next_buffer; + + // number of bytes we have issued read operations + size_t _file_dispatched_bytes; + disk_file *_file_handle; +}; + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/mutation_log.cpp b/src/dist/replication/lib/mutation_log.cpp index c1f7f9ab7c..75ad106928 100644 --- a/src/dist/replication/lib/mutation_log.cpp +++ b/src/dist/replication/lib/mutation_log.cpp @@ -1649,531 +1649,5 @@ std::map mutation_log::get_log_file_map() const return _log_files; } -// log_file::file_streamer -class log_file::file_streamer -{ -public: - explicit file_streamer(disk_file *fd, size_t file_offset) - : _file_dispatched_bytes(file_offset), _file_handle(fd) - { - _current_buffer = _buffers + 0; - _next_buffer = _buffers + 1; - fill_buffers(); - } - ~file_streamer() - { - _current_buffer->wait_ongoing_task(); - _next_buffer->wait_ongoing_task(); - } - // try to reset file_offset - void reset(size_t file_offset) - { - _current_buffer->wait_ongoing_task(); - _next_buffer->wait_ongoing_task(); - // fast path if we can just move the cursor - if (_current_buffer->_file_offset_of_buffer <= file_offset && - _current_buffer->_file_offset_of_buffer + _current_buffer->_end > file_offset) { - _current_buffer->_begin = file_offset - _current_buffer->_file_offset_of_buffer; - } else { - _current_buffer->_begin = _current_buffer->_end = _next_buffer->_begin = - _next_buffer->_end = 0; - _file_dispatched_bytes = file_offset; - } - fill_buffers(); - } - - // TODO(wutao1): use string_view instead of using blob. - // WARNING: the resulted blob is not guaranteed to be reference counted. - // possible error_code: - // ERR_OK result would always size as expected - // ERR_HANDLE_EOF if there are not enough data in file. result would still be - // filled with possible data - // ERR_FILE_OPERATION_FAILED filesystem failure - error_code read_next(size_t size, /*out*/ blob &result) - { - binary_writer writer(size); - -#define TRY(x) \ - do { \ - auto _x = (x); \ - if (_x != ERR_OK) { \ - result = writer.get_current_buffer(); \ - return _x; \ - } \ - } while (0) - - TRY(_current_buffer->wait_ongoing_task()); - if (size < _current_buffer->length()) { - result.assign(_current_buffer->_buffer.get(), _current_buffer->_begin, size); - _current_buffer->_begin += size; - } else { - _current_buffer->drain(writer); - // we can now assign result since writer must have allocated a buffer. - dassert(writer.total_size() != 0, "writer.total_size = %d", writer.total_size()); - if (size > writer.total_size()) { - TRY(_next_buffer->wait_ongoing_task()); - _next_buffer->consume(writer, - std::min(size - writer.total_size(), _next_buffer->length())); - // We hope that this never happens, it would deteriorate performance - if (size > writer.total_size()) { - auto task = - file::read(_file_handle, - writer.get_current_buffer().buffer().get() + writer.total_size(), - size - writer.total_size(), - _file_dispatched_bytes, - LPC_AIO_IMMEDIATE_CALLBACK, - nullptr, - nullptr); - task->wait(); - writer.write_empty(task->get_transferred_size()); - _file_dispatched_bytes += task->get_transferred_size(); - TRY(task->error()); - } - } - result = writer.get_current_buffer(); - } - fill_buffers(); - return ERR_OK; -#undef TRY - } - -private: - void fill_buffers() - { - while (!_current_buffer->_have_ongoing_task && _current_buffer->empty()) { - _current_buffer->_begin = _current_buffer->_end = 0; - _current_buffer->_file_offset_of_buffer = _file_dispatched_bytes; - _current_buffer->_have_ongoing_task = true; - _current_buffer->_task = file::read(_file_handle, - _current_buffer->_buffer.get(), - block_size_bytes, - _file_dispatched_bytes, - LPC_AIO_IMMEDIATE_CALLBACK, - nullptr, - nullptr); - _file_dispatched_bytes += block_size_bytes; - std::swap(_current_buffer, _next_buffer); - } - } - - // buffer size, in bytes - // TODO(wutao1): call it BLOCK_BYTES_SIZE - static constexpr size_t block_size_bytes = 1024 * 1024; // 1MB - struct buffer_t - { - std::unique_ptr _buffer; // with block_size - size_t _begin, _end; // [buffer[begin]..buffer[end]) contains unconsumed_data - size_t _file_offset_of_buffer; // file offset projected to buffer[0] - bool _have_ongoing_task; - aio_task_ptr _task; - - buffer_t() - : _buffer(new char[block_size_bytes]), - _begin(0), - _end(0), - _file_offset_of_buffer(0), - _have_ongoing_task(false) - { - } - size_t length() const { return _end - _begin; } - bool empty() const { return length() == 0; } - void consume(binary_writer &dest, size_t len) - { - dest.write(_buffer.get() + _begin, len); - _begin += len; - } - size_t drain(binary_writer &dest) - { - auto len = length(); - consume(dest, len); - return len; - } - error_code wait_ongoing_task() - { - if (_have_ongoing_task) { - _task->wait(); - _have_ongoing_task = false; - _end += _task->get_transferred_size(); - dassert(_end <= block_size_bytes, "invalid io_size."); - return _task->error(); - } else { - return ERR_OK; - } - } - } _buffers[2]; - buffer_t *_current_buffer, *_next_buffer; - - // number of bytes we have issued read operations - size_t _file_dispatched_bytes; - disk_file *_file_handle; -}; - -//------------------- log_file -------------------------- -log_file::~log_file() { close(); } -/*static */ log_file_ptr log_file::open_read(const char *path, /*out*/ error_code &err) -{ - char splitters[] = {'\\', '/', 0}; - std::string name = utils::get_last_component(std::string(path), splitters); - - // log.index.start_offset - if (name.length() < strlen("log.") || name.substr(0, strlen("log.")) != std::string("log.")) { - err = ERR_INVALID_PARAMETERS; - dwarn("invalid log path %s", path); - return nullptr; - } - - auto pos = name.find_first_of('.'); - dassert(pos != std::string::npos, "invalid log_file, name = %s", name.c_str()); - auto pos2 = name.find_first_of('.', pos + 1); - if (pos2 == std::string::npos) { - err = ERR_INVALID_PARAMETERS; - dwarn("invalid log path %s", path); - return nullptr; - } - - /* so the log file format is log.index_str.start_offset_str */ - std::string index_str = name.substr(pos + 1, pos2 - pos - 1); - std::string start_offset_str = name.substr(pos2 + 1); - if (index_str.empty() || start_offset_str.empty()) { - err = ERR_INVALID_PARAMETERS; - dwarn("invalid log path %s", path); - return nullptr; - } - - char *p = nullptr; - int index = static_cast(strtol(index_str.c_str(), &p, 10)); - if (*p != 0) { - err = ERR_INVALID_PARAMETERS; - dwarn("invalid log path %s", path); - return nullptr; - } - int64_t start_offset = static_cast(strtoll(start_offset_str.c_str(), &p, 10)); - if (*p != 0) { - err = ERR_INVALID_PARAMETERS; - dwarn("invalid log path %s", path); - return nullptr; - } - - disk_file *hfile = file::open(path, O_RDONLY | O_BINARY, 0); - if (!hfile) { - err = ERR_FILE_OPERATION_FAILED; - dwarn("open log file %s failed", path); - return nullptr; - } - - auto lf = new log_file(path, hfile, index, start_offset, true); - lf->reset_stream(); - blob hdr_blob; - err = lf->read_next_log_block(hdr_blob); - if (err == ERR_INVALID_DATA || err == ERR_INCOMPLETE_DATA || err == ERR_HANDLE_EOF || - err == ERR_FILE_OPERATION_FAILED) { - std::string removed = std::string(path) + ".removed"; - derror("read first log entry of file %s failed, err = %s. Rename the file to %s", - path, - err.to_string(), - removed.c_str()); - delete lf; - lf = nullptr; - - // rename file on failure - dsn::utils::filesystem::rename_path(path, removed); - - return nullptr; - } - - binary_reader reader(std::move(hdr_blob)); - lf->read_file_header(reader); - if (!lf->is_right_header()) { - std::string removed = std::string(path) + ".removed"; - derror("invalid log file header of file %s. Rename the file to %s", path, removed.c_str()); - delete lf; - lf = nullptr; - - // rename file on failure - dsn::utils::filesystem::rename_path(path, removed); - - err = ERR_INVALID_DATA; - return nullptr; - } - - err = ERR_OK; - return lf; -} - -/*static*/ log_file_ptr log_file::create_write(const char *dir, int index, int64_t start_offset) -{ - char path[512]; - sprintf(path, "%s/log.%d.%" PRId64, dir, index, start_offset); - - if (dsn::utils::filesystem::path_exists(std::string(path))) { - dwarn("log file %s already exist", path); - return nullptr; - } - - disk_file *hfile = file::open(path, O_RDWR | O_CREAT | O_BINARY, 0666); - if (!hfile) { - dwarn("create log %s failed", path); - return nullptr; - } - - return new log_file(path, hfile, index, start_offset, false); -} - -log_file::log_file( - const char *path, disk_file *handle, int index, int64_t start_offset, bool is_read) - : _is_read(is_read) -{ - _start_offset = start_offset; - _end_offset = start_offset; - _handle = handle; - _path = path; - _index = index; - _crc32 = 0; - _last_write_time = 0; - memset(&_header, 0, sizeof(_header)); - - if (is_read) { - int64_t sz; - if (!dsn::utils::filesystem::file_size(_path, sz)) { - dassert(false, "fail to get file size of %s.", _path.c_str()); - } - _end_offset += sz; - } -} - -void log_file::close() -{ - zauto_lock lock(_write_lock); - - //_stream implicitly refer to _handle so it needs to be cleaned up first. - // TODO: We need better abstraction to avoid those manual stuffs.. - _stream.reset(nullptr); - if (_handle) { - error_code err = file::close(_handle); - dassert(err == ERR_OK, "file::close failed, err = %s", err.to_string()); - - _handle = nullptr; - } -} - -void log_file::flush() const -{ - dassert(!_is_read, "log file must be of write mode"); - zauto_lock lock(_write_lock); - - if (_handle) { - error_code err = file::flush(_handle); - dassert(err == ERR_OK, "file::flush failed, err = %s", err.to_string()); - } -} - -error_code log_file::read_next_log_block(/*out*/ ::dsn::blob &bb) -{ - dassert(_is_read, "log file must be of read mode"); - auto err = _stream->read_next(sizeof(log_block_header), bb); - if (err != ERR_OK || bb.length() != sizeof(log_block_header)) { - if (err == ERR_OK || err == ERR_HANDLE_EOF) { - // if read_count is 0, then we meet the end of file - err = (bb.length() == 0 ? ERR_HANDLE_EOF : ERR_INCOMPLETE_DATA); - } else { - derror("read data block header failed, size = %d vs %d, err = %s", - bb.length(), - (int)sizeof(log_block_header), - err.to_string()); - } - - return err; - } - log_block_header hdr = *reinterpret_cast(bb.data()); - - if (hdr.magic != 0xdeadbeef) { - derror("invalid data header magic: 0x%x", hdr.magic); - return ERR_INVALID_DATA; - } - - err = _stream->read_next(hdr.length, bb); - if (err != ERR_OK || hdr.length != bb.length()) { - derror("read data block body failed, size = %d vs %d, err = %s", - bb.length(), - (int)hdr.length, - err.to_string()); - - if (err == ERR_OK || err == ERR_HANDLE_EOF) { - // because already read log_block_header above, so here must be imcomplete data - err = ERR_INCOMPLETE_DATA; - } - - return err; - } - - auto crc = dsn::utils::crc32_calc( - static_cast(bb.data()), static_cast(hdr.length), _crc32); - if (crc != hdr.body_crc) { - derror("crc checking failed"); - return ERR_INVALID_DATA; - } - _crc32 = crc; - - return ERR_OK; -} - -aio_task_ptr log_file::commit_log_block(log_block &block, - int64_t offset, - dsn::task_code evt, - dsn::task_tracker *tracker, - aio_handler &&callback, - int hash) -{ - log_appender pending(offset, block); - return commit_log_blocks(pending, evt, tracker, std::move(callback), hash); -} -aio_task_ptr log_file::commit_log_blocks(log_appender &pending, - dsn::task_code evt, - dsn::task_tracker *tracker, - aio_handler &&callback, - int hash) -{ - dassert(!_is_read, "log file must be of write mode"); - dcheck_gt(pending.size(), 0); - - zauto_lock lock(_write_lock); - if (!_handle) { - return nullptr; - } - - auto size = (long long)pending.size(); - size_t vec_size = pending.blob_count(); - std::vector buffer_vector(vec_size); - int buffer_idx = 0; - for (log_block &block : pending.all_blocks()) { - int64_t local_offset = block.start_offset() - start_offset(); - auto hdr = reinterpret_cast(const_cast(block.front().data())); - - dassert(hdr->magic == 0xdeadbeef, ""); - hdr->local_offset = local_offset; - hdr->length = static_cast(block.size() - sizeof(log_block_header)); - hdr->body_crc = _crc32; - - for (int i = 0; i < block.data().size(); i++) { - auto &blk = block.data()[i]; - buffer_vector[buffer_idx].buffer = - reinterpret_cast(const_cast(blk.data())); - buffer_vector[buffer_idx].size = blk.length(); - - // skip block header - if (i > 0) { - hdr->body_crc = dsn::utils::crc32_calc(static_cast(blk.data()), - static_cast(blk.length()), - hdr->body_crc); - } - buffer_idx++; - } - _crc32 = hdr->body_crc; - } - - aio_task_ptr tsk; - int64_t local_offset = pending.start_offset() - start_offset(); - if (callback) { - tsk = file::write_vector(_handle, - buffer_vector.data(), - vec_size, - static_cast(local_offset), - evt, - tracker, - std::forward(callback), - hash); - } else { - tsk = file::write_vector(_handle, - buffer_vector.data(), - vec_size, - static_cast(local_offset), - evt, - tracker, - nullptr, - hash); - } - - _end_offset.fetch_add(size); - return tsk; -} - -void log_file::reset_stream(size_t offset /*default = 0*/) -{ - if (_stream == nullptr) { - _stream.reset(new file_streamer(_handle, offset)); - } else { - _stream->reset(offset); - } - if (offset == 0) { - _crc32 = 0; - } -} - -decree log_file::previous_log_max_decree(const dsn::gpid &pid) -{ - auto it = _previous_log_max_decrees.find(pid); - return it == _previous_log_max_decrees.end() ? 0 : it->second.max_decree; -} - -int log_file::read_file_header(binary_reader &reader) -{ - /* - * the log file header structure: - * log_file_header + - * count + count * (gpid + replica_log_info) - */ - reader.read_pod(_header); - - int count; - reader.read(count); - for (int i = 0; i < count; i++) { - gpid gpid; - replica_log_info info; - - reader.read_pod(gpid); - reader.read_pod(info); - - _previous_log_max_decrees[gpid] = info; - } - - return get_file_header_size(); -} - -int log_file::get_file_header_size() const -{ - int count = static_cast(_previous_log_max_decrees.size()); - return static_cast(sizeof(log_file_header) + sizeof(count) + - (sizeof(gpid) + sizeof(replica_log_info)) * count); -} - -bool log_file::is_right_header() const -{ - return _header.magic == 0xdeadbeef && _header.start_global_offset == _start_offset; -} - -int log_file::write_file_header(binary_writer &writer, const replica_log_info_map &init_max_decrees) -{ - /* - * the log file header structure: - * log_file_header + - * count + count * (gpid + replica_log_info) - */ - _previous_log_max_decrees = init_max_decrees; - - _header.magic = 0xdeadbeef; - _header.version = 0x1; - _header.start_global_offset = start_offset(); - - writer.write_pod(_header); - - int count = static_cast(_previous_log_max_decrees.size()); - writer.write(count); - for (auto &kv : _previous_log_max_decrees) { - writer.write_pod(kv.first); - writer.write_pod(kv.second); - } - - return get_file_header_size(); -} } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/mutation_log.h b/src/dist/replication/lib/mutation_log.h index 116b00faf5..db95edffd5 100644 --- a/src/dist/replication/lib/mutation_log.h +++ b/src/dist/replication/lib/mutation_log.h @@ -24,20 +24,12 @@ * THE SOFTWARE. */ -/* - * Description: - * Mutation log read and write. - * - * Revision history: - * Mar., 2015, @imzhenyu (Zhenyu Guo), first version - * Dec., 2015, @qinzuoyan (Zuoyan Qin), refactor and add comments - */ - #pragma once #include "dist/replication/common/replication_common.h" -#include "dist/replication/lib/mutation.h" -#include "dist/replication/lib/log_block.h" +#include "mutation.h" +#include "log_block.h" +#include "log_file.h" #include #include @@ -48,41 +40,6 @@ namespace dsn { namespace replication { -class log_file; -typedef dsn::ref_ptr log_file_ptr; - -// a structure to record replica's log info -struct replica_log_info -{ - int64_t max_decree; - int64_t valid_start_offset; // valid start offset in global space - replica_log_info(int64_t d, int64_t o) - { - max_decree = d; - valid_start_offset = o; - } - replica_log_info() - { - max_decree = 0; - valid_start_offset = 0; - } - bool operator==(const replica_log_info &o) const - { - return max_decree == o.max_decree && valid_start_offset == o.valid_start_offset; - } -}; - -typedef std::unordered_map replica_log_info_map; - -// each log file has a log_file_header stored at the beginning of the first block's data content -struct log_file_header -{ - int32_t magic; // 0xdeadbeef - int32_t version; // current 0x1 - int64_t - start_global_offset; // start offset in the global space, equals to the file name's postfix -}; - // // manage a sequence of continuous mutation log files // each log file name is: log.{index}.{global_start_offset} @@ -561,148 +518,5 @@ class mutation_log_private : public mutation_log, private replica_base uint64_t _batch_buffer_flush_interval_ms; }; -// -// the log file is structured with sequences of log_blocks, -// each block consists of the log_block_header + log_content, -// and the first block contains the log_file_header at the beginning -// -// the class is not thread safe -// -class log_file : public ref_counter -{ -public: - ~log_file(); - - // - // file operations - // - - // open the log file for read - // 'path' should be in format of log.{index}.{start_offset}, where: - // - index: the index of the log file, start from 1 - // - start_offset: start offset in the global space - // returns: - // - non-null if open succeed - // - null if open failed - static log_file_ptr open_read(const char *path, /*out*/ error_code &err); - - // open the log file for write - // the file path is '{dir}/log.{index}.{start_offset}' - // returns: - // - non-null if open succeed - // - null if open failed - static log_file_ptr create_write(const char *dir, int index, int64_t start_offset); - - // close the log file - void close(); - - // flush the log file - void flush() const; - - // - // read routines - // - - // sync read the next log entry from the file - // the entry data is start from the 'local_offset' of the file - // the result is passed out by 'bb', not including the log_block_header - // return error codes: - // - ERR_OK - // - ERR_HANDLE_EOF - // - ERR_INCOMPLETE_DATA - // - ERR_INVALID_DATA - // - other io errors caused by file read operator - error_code read_next_log_block(/*out*/ ::dsn::blob &bb); - - // - // write routines - // - - // async write log entry into the file - // 'block' is the date to be written - // 'offset' is start offset of the entry in the global space - // 'evt' is to indicate which thread pool to execute the callback - // 'callback_host' is used to get tracer - // 'callback' is to indicate the callback handler - // 'hash' helps to choose which thread in the thread pool to execute the callback - // returns: - // - non-null if io task is in pending - // - null if error - dsn::aio_task_ptr commit_log_block(log_block &block, - int64_t offset, - dsn::task_code evt, - dsn::task_tracker *tracker, - aio_handler &&callback, - int hash); - dsn::aio_task_ptr commit_log_blocks(log_appender &pending, - dsn::task_code evt, - dsn::task_tracker *tracker, - aio_handler &&callback, - int hash); - - // - // others - // - - // Reset file_streamer to point to `offset`. - // offset=0 means the start of this log file. - void reset_stream(size_t offset = 0); - // end offset in the global space: end_offset = start_offset + file_size - int64_t end_offset() const { return _end_offset.load(); } - // start offset in the global space - int64_t start_offset() const { return _start_offset; } - // file index - int index() const { return _index; } - // file path - const std::string &path() const { return _path; } - // previous decrees - const replica_log_info_map &previous_log_max_decrees() { return _previous_log_max_decrees; } - // previous decree for speicified gpid - decree previous_log_max_decree(const gpid &pid); - // file header - log_file_header &header() { return _header; } - - // read file header from reader, return byte count consumed - int read_file_header(binary_reader &reader); - // write file header to writer, return byte count written - int write_file_header(binary_writer &writer, const replica_log_info_map &init_max_decrees); - // get serialized size of current file header - int get_file_header_size() const; - // if the file header is valid - bool is_right_header() const; - - // set & get last write time, used for gc - void set_last_write_time(uint64_t last_write_time) { _last_write_time = last_write_time; } - uint64_t last_write_time() const { return _last_write_time; } - - const disk_file *file_handle() const { return _handle; } - -private: - // make private, user should create log_file through open_read() or open_write() - log_file(const char *path, disk_file *handle, int index, int64_t start_offset, bool is_read); - -private: - friend class mock_log_file; - - uint32_t _crc32; - int64_t _start_offset; // start offset in the global space - std::atomic - _end_offset; // end offset in the global space: end_offset = start_offset + file_size - class file_streamer; - std::unique_ptr _stream; - disk_file *_handle; // file handle - const bool _is_read; // if opened for read or write - std::string _path; // file path - int _index; // file index - log_file_header _header; // file header - uint64_t _last_write_time; // seconds from epoch time - - mutable zlock _write_lock; - - // this data is used for garbage collection, and is part of file header. - // for read, the value is read from file header. - // for write, the value is set by write_file_header(). - replica_log_info_map _previous_log_max_decrees; -}; } // namespace replication } // namespace dsn