Skip to content

Commit

Permalink
refactor: move log_block class from mutation_log.h to separated file (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored Mar 25, 2020
1 parent 7df14f0 commit 3b1133b
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 59 deletions.
22 changes: 22 additions & 0 deletions src/dist/replication/lib/log_block.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "log_block.h"

namespace dsn {
namespace replication {

log_block::log_block() { init(); }

void log_block::init()
{
log_block_header hdr;

binary_writer temp_writer;
temp_writer.write_pod(hdr);
add(temp_writer.get_buffer());
}

} // namespace replication
} // namespace dsn
64 changes: 64 additions & 0 deletions src/dist/replication/lib/log_block.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#pragma once

#include "mutation.h"

namespace dsn {
namespace replication {

// each block in log file has a log_block_header
struct log_block_header
{
int32_t magic{static_cast<int32_t>(0xdeadbeef)}; // 0xdeadbeef
int32_t length{0}; // block data length (not including log_block_header)
int32_t body_crc{0}; // block data crc (not including log_block_header)

// start offset of the block (including log_block_header) in this log file
// TODO(wutao1): this field is unusable. the value is always set, but not read.
uint32_t local_offset{0};
};

// a memory structure holding data which belongs to one block.
class log_block
{
std::vector<blob> _data; // the first blob is log_block_header
size_t _size{0}; // total data size of all blobs
public:
log_block();

// get all blobs in the block
const std::vector<blob> &data() const { return _data; }

// get the first blob (which contains the log_block_header) from the block
//
// TODO(wutao1): refactor `front()` to `get_log_block_header()`
// ```
// log_block_header *get_log_block_header()
// {
// return reinterpret_cast<log_block_header *>(const_cast<char *>(_data.front().data()));
// }
// ```
blob &front()
{
dassert(!_data.empty(), "trying to get first blob out of an empty log block");
return _data.front();
}

// add a blob into the block
void add(const blob &bb)
{
_size += bb.length();
_data.push_back(bb);
}

// return total data size in the block
size_t size() const { return _size; }

private:
void init();
};
} // namespace replication
} // namespace dsn
2 changes: 1 addition & 1 deletion src/dist/replication/lib/mutation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ void mutation::add_client_request(task_code code, dsn::message_ex *request)
dassert(client_requests.size() == data.updates.size(), "size must be equal");
}

void mutation::write_to(std::function<void(const blob &)> inserter) const
void mutation::write_to(const std::function<void(const blob &)> &inserter) const
{
binary_writer writer(1024);
write_mutation_header(writer, data.header);
Expand Down
2 changes: 1 addition & 1 deletion src/dist/replication/lib/mutation.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class mutation : public ref_counter
// because:
// - the private log may be transfered to other node with different program
// - the private/shared log may be replayed by different program when server restart
void write_to(std::function<void(const blob &)> inserter) const;
void write_to(const std::function<void(const blob &)> &inserter) const;
void write_to(binary_writer &writer, dsn::message_ex *to) const;
static mutation_ptr read_from(binary_reader &reader, dsn::message_ex *from);

Expand Down
19 changes: 3 additions & 16 deletions src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ ::dsn::task_ptr mutation_log_shared::append(mutation_ptr &mu,

// init pending buffer
if (nullptr == _pending_write) {
_pending_write.reset(log_file::prepare_log_block());
_pending_write.reset(new log_block());
_pending_write_callbacks.reset(new callbacks());
_pending_write_mutations.reset(new mutations());
_pending_write_start_offset = mark_new_offset(0, true).second;
Expand Down Expand Up @@ -241,7 +241,7 @@ ::dsn::task_ptr mutation_log_private::append(mutation_ptr &mu,

// init pending buffer
if (nullptr == _pending_write) {
_pending_write.reset(log_file::prepare_log_block());
_pending_write.reset(new log_block());
_pending_write_mutations.reset(new mutations());
_pending_write_start_offset = mark_new_offset(0, true).second;
_pending_write_start_time_ms = dsn_now_ms();
Expand Down Expand Up @@ -797,7 +797,7 @@ error_code mutation_log::create_new_log_file()
header_len = logf->write_file_header(temp_writer, _shared_log_info_map);
}

log_block *blk = logf->prepare_log_block();
log_block *blk = new log_block();
blk->add(temp_writer.get_buffer());
_global_end_offset += blk->size();

Expand Down Expand Up @@ -2074,19 +2074,6 @@ error_code log_file::read_next_log_block(/*out*/ ::dsn::blob &bb)
return ERR_OK;
}

log_block *log_file::prepare_log_block()
{
log_block_header hdr;
hdr.magic = 0xdeadbeef;
hdr.length = 0;
hdr.body_crc = 0;
hdr.local_offset = 0;

binary_writer temp_writer;
temp_writer.write_pod(hdr);
return new log_block(temp_writer.get_buffer());
}

aio_task_ptr log_file::commit_log_block(log_block &block,
int64_t offset,
dsn::task_code evt,
Expand Down
41 changes: 1 addition & 40 deletions src/dist/replication/lib/mutation_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

#include "dist/replication/common/replication_common.h"
#include "dist/replication/lib/mutation.h"
#include "dist/replication/lib/log_block.h"

#include <atomic>
#include <dsn/tool-api/zlocks.h>
Expand Down Expand Up @@ -73,16 +74,6 @@ struct replica_log_info

typedef std::unordered_map<gpid, replica_log_info> replica_log_info_map;

// each block in log file has a log_block_header
struct log_block_header
{
int32_t magic; // 0xdeadbeef
int32_t length; // block data length (not including log_block_header)
int32_t body_crc; // block data crc (not including log_block_header)
uint32_t
local_offset; // start offset of the block (including log_block_header) in this log file
};

// each log file has a log_file_header stored at the beginning of the first block's data content
struct log_file_header
{
Expand All @@ -92,32 +83,6 @@ struct log_file_header
start_global_offset; // start offset in the global space, equals to the file name's postfix
};

// a memory structure holding data which belongs to one block.
class log_block /* : public ::dsn::transient_object*/
{
std::vector<blob> _data; // the first blob is log_block_header
size_t _size; // total data size of all blobs
public:
log_block() : _size(0) {}
log_block(blob &&init_blob) : _data({init_blob}), _size(init_blob.length()) {}
// get all blobs in the block
const std::vector<blob> &data() const { return _data; }
// get the first blob (which contains the log_block_header) from the block
blob &front()
{
dassert(!_data.empty(), "trying to get first blob out of an empty log block");
return _data.front();
}
// add a blob into the block
void add(const blob &bb)
{
_size += bb.length();
_data.push_back(bb);
}
// return total data size in the block
size_t size() const { return _size; }
};

//
// manage a sequence of continuous mutation log files
// each log file name is: log.{index}.{global_start_offset}
Expand Down Expand Up @@ -651,10 +616,6 @@ class log_file : public ref_counter
// write routines
//

// prepare a log entry buffer, with block header reserved and inited
// always returns non-nullptr
static log_block *prepare_log_block();

// async write log entry into the file
// 'block' is the date to be written
// 'offset' is start offset of the entry in the global space
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ TEST(replication, log_file)
ASSERT_EQ(offset, lf->start_offset());
ASSERT_EQ(offset, lf->end_offset());
for (int i = 0; i < 100; i++) {
auto writer = lf->prepare_log_block();
auto writer = new log_block();

if (i == 0) {
binary_writer temp_writer;
Expand Down

0 comments on commit 3b1133b

Please sign in to comment.