Skip to content

Commit

Permalink
feat(slog): flush and remove all shared logs for garbage collection (a…
Browse files Browse the repository at this point in the history
…pache#1594)

apache#1593

In XiaoMi/rdsn#1019 we've written private logs as
WAL instead of shared logs, which also means shared log files would never
be appended with new mutations.

However, obsolete shared logs that had been applied to rocksdb were not
removed since then. There is at least 1 shared log file which is never removed.
We should change this policy of garbage collection, to delete all obsolete
shared log files.

To facilitate the garbage collection of shared log files, we also trigger checkpoints
which would flush rocksdb data to disk for each replica that has prevented shared
log files from being removed for garbage collection. It is necessary to limit the
number of submitted replicas that would be flushed at a time to avoid too much
consumption of I/O resources which might affect the processing of read/write
requests from clients.
  • Loading branch information
empiredan authored and wangdan committed Oct 16, 2023
1 parent be9119e commit 026f402
Show file tree
Hide file tree
Showing 20 changed files with 871 additions and 495 deletions.
14 changes: 8 additions & 6 deletions src/replica/duplication/load_from_private_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <iterator>
#include <map>
#include <utility>

#include "common/duplication_common.h"
Expand Down Expand Up @@ -57,11 +58,11 @@ bool load_from_private_log::will_fail_fast() const
// we try to list all files and select a new one to start (find_log_file_to_start).
bool load_from_private_log::switch_to_next_log_file()
{
auto file_map = _private_log->get_log_file_map();
auto next_file_it = file_map.find(_current->index() + 1);
const auto &file_map = _private_log->get_log_file_map();
const auto &next_file_it = file_map.find(_current->index() + 1);
if (next_file_it != file_map.end()) {
log_file_ptr file;
error_s es = log_utils::open_read(next_file_it->second->path(), file);
const auto &es = log_utils::open_read(next_file_it->second->path(), file);
if (!es.is_ok()) {
LOG_ERROR_PREFIX("{}", es);
_current = nullptr;
Expand Down Expand Up @@ -123,11 +124,11 @@ void load_from_private_log::run()
void load_from_private_log::find_log_file_to_start()
{
// `file_map` has already excluded the useless log files during replica init.
auto file_map = _private_log->get_log_file_map();
const auto &file_map = _private_log->get_log_file_map();

// Reopen the files. Because the internal file handle of `file_map`
// is cleared once WAL replay finished. They are unable to read.
std::map<int, log_file_ptr> new_file_map;
mutation_log::log_file_map_by_index new_file_map;
for (const auto &pr : file_map) {
log_file_ptr file;
error_s es = log_utils::open_read(pr.second->path(), file);
Expand All @@ -141,7 +142,8 @@ void load_from_private_log::find_log_file_to_start()
find_log_file_to_start(std::move(new_file_map));
}

void load_from_private_log::find_log_file_to_start(std::map<int, log_file_ptr> log_file_map)
void load_from_private_log::find_log_file_to_start(
const mutation_log::log_file_map_by_index &log_file_map)
{
_current = nullptr;
if (dsn_unlikely(log_file_map.empty())) {
Expand Down
3 changes: 1 addition & 2 deletions src/replica/duplication/load_from_private_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <stddef.h>
#include <stdint.h>
#include <chrono>
#include <map>

#include "common/replication_other_types.h"
#include "mutation_batch.h"
Expand Down Expand Up @@ -61,7 +60,7 @@ class load_from_private_log final : public replica_base,

/// Find the log file that contains `_start_decree`.
void find_log_file_to_start();
void find_log_file_to_start(std::map<int, log_file_ptr> log_files);
void find_log_file_to_start(const mutation_log::log_file_map_by_index &log_files);

void replay_log_block();

Expand Down
4 changes: 2 additions & 2 deletions src/replica/duplication/test/duplication_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ class duplication_test_base : public replica_test_base
return duplicator;
}

std::map<int, log_file_ptr> open_log_file_map(const std::string &log_dir)
mutation_log::log_file_map_by_index open_log_file_map(const std::string &log_dir)
{
std::map<int, log_file_ptr> log_file_map;
mutation_log::log_file_map_by_index log_file_map;
error_s err = log_utils::open_log_file_map(log_dir, log_file_map);
EXPECT_EQ(err, error_s::ok());
return log_file_map;
Expand Down
18 changes: 9 additions & 9 deletions src/replica/log_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,15 @@ log_file::~log_file() { close(); }

log_file::log_file(
const char *path, disk_file *handle, int index, int64_t start_offset, bool is_read)
: _is_read(is_read)
: _crc32(0),
_start_offset(start_offset),
_end_offset(start_offset),
_handle(handle),
_is_read(is_read),
_path(path),
_index(index),
_last_write_time(0)
{
_start_offset = start_offset;
_end_offset = start_offset;
_handle = handle;
_path = path;
_index = index;
_crc32 = 0;
_last_write_time = 0;
memset(&_header, 0, sizeof(_header));

if (is_read) {
Expand Down Expand Up @@ -357,7 +357,7 @@ void log_file::reset_stream(size_t offset /*default = 0*/)
}
}

decree log_file::previous_log_max_decree(const dsn::gpid &pid)
decree log_file::previous_log_max_decree(const dsn::gpid &pid) const
{
auto it = _previous_log_max_decrees.find(pid);
return it == _previous_log_max_decrees.end() ? 0 : it->second.max_decree;
Expand Down
19 changes: 11 additions & 8 deletions src/replica/log_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ struct log_file_header
// a structure to record replica's log info
struct replica_log_info
{
int64_t max_decree;
decree max_decree;
int64_t valid_start_offset; // valid start offset in global space
replica_log_info(int64_t d, int64_t o)
replica_log_info(decree d, int64_t o)
{
max_decree = d;
valid_start_offset = o;
Expand Down Expand Up @@ -184,11 +184,14 @@ class log_file : public ref_counter
// file path
const std::string &path() const { return _path; }
// previous decrees
const replica_log_info_map &previous_log_max_decrees() { return _previous_log_max_decrees; }
const replica_log_info_map &previous_log_max_decrees() const
{
return _previous_log_max_decrees;
}
// previous decree for speicified gpid
decree previous_log_max_decree(const gpid &pid);
decree previous_log_max_decree(const gpid &pid) const;
// file header
log_file_header &header() { return _header; }
const log_file_header &header() const { return _header; }

// read file header from reader, return byte count consumed
int read_file_header(binary_reader &reader);
Expand All @@ -213,16 +216,16 @@ class log_file : public ref_counter
friend class mock_log_file;

uint32_t _crc32;
int64_t _start_offset; // start offset in the global space
const int64_t _start_offset; // start offset in the global space
std::atomic<int64_t>
_end_offset; // end offset in the global space: end_offset = start_offset + file_size
class file_streamer;

std::unique_ptr<file_streamer> _stream;
disk_file *_handle; // file handle
const bool _is_read; // if opened for read or write
std::string _path; // file path
int _index; // file index
const std::string _path; // file path
const int _index; // file index
log_file_header _header; // file header
uint64_t _last_write_time; // seconds from epoch time

Expand Down
Loading

0 comments on commit 026f402

Please sign in to comment.