diff --git a/src/dist/replication/lib/log_block.cpp b/src/dist/replication/lib/log_block.cpp new file mode 100644 index 0000000000..6e81640805 --- /dev/null +++ b/src/dist/replication/lib/log_block.cpp @@ -0,0 +1,22 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "log_block.h" + +namespace dsn { +namespace replication { + +log_block::log_block() { init(); } + +void log_block::init() +{ + log_block_header hdr; + + binary_writer temp_writer; + temp_writer.write_pod(hdr); + add(temp_writer.get_buffer()); +} + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/log_block.h b/src/dist/replication/lib/log_block.h new file mode 100644 index 0000000000..7a2f686f53 --- /dev/null +++ b/src/dist/replication/lib/log_block.h @@ -0,0 +1,64 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#pragma once + +#include "mutation.h" + +namespace dsn { +namespace replication { + +// each block in log file has a log_block_header +struct log_block_header +{ + int32_t magic{static_cast(0xdeadbeef)}; // 0xdeadbeef + int32_t length{0}; // block data length (not including log_block_header) + int32_t body_crc{0}; // block data crc (not including log_block_header) + + // start offset of the block (including log_block_header) in this log file + // TODO(wutao1): this field is unusable. the value is always set, but not read. + uint32_t local_offset{0}; +}; + +// a memory structure holding data which belongs to one block. +class log_block +{ + std::vector _data; // the first blob is log_block_header + size_t _size{0}; // total data size of all blobs +public: + log_block(); + + // get all blobs in the block + const std::vector &data() const { return _data; } + + // get the first blob (which contains the log_block_header) from the block + // + // TODO(wutao1): refactor `front()` to `get_log_block_header()` + // ``` + // log_block_header *get_log_block_header() + // { + // return reinterpret_cast(const_cast(_data.front().data())); + // } + // ``` + blob &front() + { + dassert(!_data.empty(), "trying to get first blob out of an empty log block"); + return _data.front(); + } + + // add a blob into the block + void add(const blob &bb) + { + _size += bb.length(); + _data.push_back(bb); + } + + // return total data size in the block + size_t size() const { return _size; } + +private: + void init(); +}; +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/mutation.cpp b/src/dist/replication/lib/mutation.cpp index 3ab6a3b689..beab0db4f2 100644 --- a/src/dist/replication/lib/mutation.cpp +++ b/src/dist/replication/lib/mutation.cpp @@ -164,7 +164,7 @@ void mutation::add_client_request(task_code code, dsn::message_ex *request) dassert(client_requests.size() == data.updates.size(), "size must be equal"); } -void mutation::write_to(std::function inserter) const +void mutation::write_to(const std::function &inserter) const { binary_writer writer(1024); write_mutation_header(writer, data.header); diff --git a/src/dist/replication/lib/mutation.h b/src/dist/replication/lib/mutation.h index 2890b54d3b..af8134b945 100644 --- a/src/dist/replication/lib/mutation.h +++ b/src/dist/replication/lib/mutation.h @@ -126,7 +126,7 @@ class mutation : public ref_counter // because: // - the private log may be transfered to other node with different program // - the private/shared log may be replayed by different program when server restart - void write_to(std::function inserter) const; + void write_to(const std::function &inserter) const; void write_to(binary_writer &writer, dsn::message_ex *to) const; static mutation_ptr read_from(binary_reader &reader, dsn::message_ex *from); diff --git a/src/dist/replication/lib/mutation_log.cpp b/src/dist/replication/lib/mutation_log.cpp index 9a69e811c2..5d63a65870 100644 --- a/src/dist/replication/lib/mutation_log.cpp +++ b/src/dist/replication/lib/mutation_log.cpp @@ -54,7 +54,7 @@ ::dsn::task_ptr mutation_log_shared::append(mutation_ptr &mu, // init pending buffer if (nullptr == _pending_write) { - _pending_write.reset(log_file::prepare_log_block()); + _pending_write.reset(new log_block()); _pending_write_callbacks.reset(new callbacks()); _pending_write_mutations.reset(new mutations()); _pending_write_start_offset = mark_new_offset(0, true).second; @@ -241,7 +241,7 @@ ::dsn::task_ptr mutation_log_private::append(mutation_ptr &mu, // init pending buffer if (nullptr == _pending_write) { - _pending_write.reset(log_file::prepare_log_block()); + _pending_write.reset(new log_block()); _pending_write_mutations.reset(new mutations()); _pending_write_start_offset = mark_new_offset(0, true).second; _pending_write_start_time_ms = dsn_now_ms(); @@ -797,7 +797,7 @@ error_code mutation_log::create_new_log_file() header_len = logf->write_file_header(temp_writer, _shared_log_info_map); } - log_block *blk = logf->prepare_log_block(); + log_block *blk = new log_block(); blk->add(temp_writer.get_buffer()); _global_end_offset += blk->size(); @@ -2074,19 +2074,6 @@ error_code log_file::read_next_log_block(/*out*/ ::dsn::blob &bb) return ERR_OK; } -log_block *log_file::prepare_log_block() -{ - log_block_header hdr; - hdr.magic = 0xdeadbeef; - hdr.length = 0; - hdr.body_crc = 0; - hdr.local_offset = 0; - - binary_writer temp_writer; - temp_writer.write_pod(hdr); - return new log_block(temp_writer.get_buffer()); -} - aio_task_ptr log_file::commit_log_block(log_block &block, int64_t offset, dsn::task_code evt, diff --git a/src/dist/replication/lib/mutation_log.h b/src/dist/replication/lib/mutation_log.h index faadca4065..ee6246cf65 100644 --- a/src/dist/replication/lib/mutation_log.h +++ b/src/dist/replication/lib/mutation_log.h @@ -37,6 +37,7 @@ #include "dist/replication/common/replication_common.h" #include "dist/replication/lib/mutation.h" +#include "dist/replication/lib/log_block.h" #include #include @@ -73,16 +74,6 @@ struct replica_log_info typedef std::unordered_map replica_log_info_map; -// each block in log file has a log_block_header -struct log_block_header -{ - int32_t magic; // 0xdeadbeef - int32_t length; // block data length (not including log_block_header) - int32_t body_crc; // block data crc (not including log_block_header) - uint32_t - local_offset; // start offset of the block (including log_block_header) in this log file -}; - // each log file has a log_file_header stored at the beginning of the first block's data content struct log_file_header { @@ -92,32 +83,6 @@ struct log_file_header start_global_offset; // start offset in the global space, equals to the file name's postfix }; -// a memory structure holding data which belongs to one block. -class log_block /* : public ::dsn::transient_object*/ -{ - std::vector _data; // the first blob is log_block_header - size_t _size; // total data size of all blobs -public: - log_block() : _size(0) {} - log_block(blob &&init_blob) : _data({init_blob}), _size(init_blob.length()) {} - // get all blobs in the block - const std::vector &data() const { return _data; } - // get the first blob (which contains the log_block_header) from the block - blob &front() - { - dassert(!_data.empty(), "trying to get first blob out of an empty log block"); - return _data.front(); - } - // add a blob into the block - void add(const blob &bb) - { - _size += bb.length(); - _data.push_back(bb); - } - // return total data size in the block - size_t size() const { return _size; } -}; - // // manage a sequence of continuous mutation log files // each log file name is: log.{index}.{global_start_offset} @@ -651,10 +616,6 @@ class log_file : public ref_counter // write routines // - // prepare a log entry buffer, with block header reserved and inited - // always returns non-nullptr - static log_block *prepare_log_block(); - // async write log entry into the file // 'block' is the date to be written // 'offset' is start offset of the entry in the global space diff --git a/src/dist/replication/test/replica_test/unit_test/mutation_log_test.cpp b/src/dist/replication/test/replica_test/unit_test/mutation_log_test.cpp index 8891c27d61..644eb0ca92 100644 --- a/src/dist/replication/test/replica_test/unit_test/mutation_log_test.cpp +++ b/src/dist/replication/test/replica_test/unit_test/mutation_log_test.cpp @@ -91,7 +91,7 @@ TEST(replication, log_file) ASSERT_EQ(offset, lf->start_offset()); ASSERT_EQ(offset, lf->end_offset()); for (int i = 0; i < 100; i++) { - auto writer = lf->prepare_log_block(); + auto writer = new log_block(); if (i == 0) { binary_writer temp_writer;