Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IngestExternalFile] Allow ingesting overlapping files #5539

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3762,9 +3762,9 @@ Status DBImpl::IngestExternalFiles(
std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
for (const auto& arg : args) {
auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
ingestion_jobs.emplace_back(env_, versions_.get(), cfd,
immutable_db_options_, env_options_,
&snapshots_, arg.options, &directories_);
ingestion_jobs.emplace_back(
env_, versions_.get(), cfd, immutable_db_options_, env_options_,
&snapshots_, arg.options, &directories_, &event_logger_);
}
std::vector<std::pair<bool, Status>> exec_results;
for (size_t i = 0; i != num_cfs; ++i) {
Expand Down Expand Up @@ -3894,19 +3894,21 @@ Status DBImpl::IngestExternalFiles(
}
}
if (status.ok()) {
bool should_increment_last_seqno =
ingestion_jobs[0].ShouldIncrementLastSequence();
int consumed_seqno_count =
ingestion_jobs[0].ConsumedSequenceNumbersCount();
#ifndef NDEBUG
for (size_t i = 1; i != num_cfs; ++i) {
assert(should_increment_last_seqno ==
ingestion_jobs[i].ShouldIncrementLastSequence());
assert(!!consumed_seqno_count ==
!!ingestion_jobs[i].ConsumedSequenceNumbersCount());
consumed_seqno_count +=
ingestion_jobs[i].ConsumedSequenceNumbersCount();
Copy link

@fredzqm fredzqm Mar 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line makes debug build and production build different. Is this a bug?

        consumed_seqno_count +=
            ingestion_jobs[i].ConsumedSequenceNumbersCount();

We managed to hit the assertion here in tests due to some but not all sstable file overlap with db.

        assert(!!consumed_seqno_count ==
               !!ingestion_jobs[i].ConsumedSequenceNumbersCount());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the !! supposed to do? That is a new operator to me...

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think !! force converts a integer to a boolean. 0 => 0, others => 1

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also posted the question here: https://groups.google.com/g/rocksdb/c/g5LACzCltWI
Thanks in advance!

}
#endif
if (should_increment_last_seqno) {
if (consumed_seqno_count > 0) {
const SequenceNumber last_seqno = versions_->LastSequence();
versions_->SetLastAllocatedSequence(last_seqno + 1);
versions_->SetLastPublishedSequence(last_seqno + 1);
versions_->SetLastSequence(last_seqno + 1);
versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);
versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count);
versions_->SetLastSequence(last_seqno + consumed_seqno_count);
}
autovector<ColumnFamilyData*> cfds_to_commit;
autovector<const MutableCFOptions*> mutable_cf_options_list;
Expand Down
41 changes: 41 additions & 0 deletions db/external_sst_file_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,47 @@ TEST_P(ExternalSSTFileBasicTest, IngestExternalFileWithCorruptedPropsBlock) {
} while (ChangeOptionsForFileIngestionTest());
}

TEST_F(ExternalSSTFileBasicTest, OverlappingFiles) {
Options options = CurrentOptions();

std::vector<std::string> files;
{
SstFileWriter sst_file_writer(EnvOptions(), options);
std::string file1 = sst_files_dir_ + "file1.sst";
ASSERT_OK(sst_file_writer.Open(file1));
ASSERT_OK(sst_file_writer.Put("a", "z"));
ASSERT_OK(sst_file_writer.Put("i", "m"));
ExternalSstFileInfo file1_info;
ASSERT_OK(sst_file_writer.Finish(&file1_info));
files.push_back(std::move(file1));
}
{
SstFileWriter sst_file_writer(EnvOptions(), options);
std::string file2 = sst_files_dir_ + "file2.sst";
ASSERT_OK(sst_file_writer.Open(file2));
ASSERT_OK(sst_file_writer.Put("i", "k"));
ExternalSstFileInfo file2_info;
ASSERT_OK(sst_file_writer.Finish(&file2_info));
files.push_back(std::move(file2));
}

IngestExternalFileOptions ifo;
ASSERT_OK(db_->IngestExternalFile(files, ifo));
ASSERT_EQ(Get("a"), "z");
ASSERT_EQ(Get("i"), "k");

int total_keys = 0;
Iterator* iter = db_->NewIterator(ReadOptions());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
total_keys++;
}
delete iter;
ASSERT_EQ(total_keys, 2);

ASSERT_EQ(2, NumTableFilesAtLevel(0));
}

INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest,
testing::Values(std::make_tuple(true, true),
std::make_tuple(true, false),
Expand Down
52 changes: 42 additions & 10 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,16 @@ Status ExternalSstFileIngestionJob::Prepare(
for (size_t i = 0; i < num_files - 1; i++) {
if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,
sorted_files[i + 1]->smallest_internal_key) >= 0) {
return Status::NotSupported("Files have overlapping ranges");
files_overlap_ = true;
break;
}
}
}

if (ingestion_options_.ingest_behind && files_overlap_) {
return Status::NotSupported("Files have overlapping ranges");
}

for (IngestedFileInfo& f : files_to_ingest_) {
if (f.num_entries == 0 && f.num_range_deletions == 0) {
return Status::InvalidArgument("File contain no entries");
Expand Down Expand Up @@ -212,7 +217,7 @@ Status ExternalSstFileIngestionJob::Run() {
}
// It is safe to use this instead of LastAllocatedSequence since we are
// the only active writer, and hence they are equal
const SequenceNumber last_seqno = versions_->LastSequence();
SequenceNumber last_seqno = versions_->LastSequence();
edit_.SetColumnFamily(cfd_->GetID());
// The levels that the files will be ingested into

Expand All @@ -222,17 +227,19 @@ Status ExternalSstFileIngestionJob::Run() {
status = CheckLevelForIngestedBehindFile(&f);
} else {
status = AssignLevelAndSeqnoForIngestedFile(
super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
&f, &assigned_seqno);
super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
last_seqno, &f, &assigned_seqno);
}
if (!status.ok()) {
return status;
}
status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno);
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
&assigned_seqno);
if (assigned_seqno == last_seqno + 1) {
consumed_seqno_ = true;
if (assigned_seqno > last_seqno) {
assert(assigned_seqno == last_seqno + 1);
last_seqno = assigned_seqno;
++consumed_seqno_count_;
}
if (!status.ok()) {
return status;
Expand All @@ -250,6 +257,13 @@ void ExternalSstFileIngestionJob::UpdateStats() {
uint64_t total_keys = 0;
uint64_t total_l0_files = 0;
uint64_t total_time = env_->NowMicros() - job_start_time_;

EventLoggerStream stream = event_logger_->Log();
stream << "event"
<< "ingest_finished";
stream << "files_ingested";
stream.StartArray();

for (IngestedFileInfo& f : files_to_ingest_) {
InternalStats::CompactionStats stats(CompactionReason::kExternalSstIngestion, 1);
stats.micros = total_time;
Expand Down Expand Up @@ -277,7 +291,18 @@ void ExternalSstFileIngestionJob::UpdateStats() {
"(global_seqno=%" PRIu64 ")\n",
f.external_file_path.c_str(), f.picked_level,
f.internal_file_path.c_str(), f.assigned_seqno);
stream << "file" << f.internal_file_path << "level" << f.picked_level;
}
stream.EndArray();

stream << "lsm_state";
stream.StartArray();
auto vstorage = cfd_->current()->storage_info();
for (int level = 0; level < vstorage->num_levels(); ++level) {
stream << vstorage->NumLevelFiles(level);
}
stream.EndArray();

cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL,
total_keys);
cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL,
Expand All @@ -301,7 +326,8 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
f.internal_file_path.c_str(), s.ToString().c_str());
}
}
consumed_seqno_ = false;
consumed_seqno_count_ = 0;
files_overlap_ = false;
} else if (status.ok() && ingestion_options_.move_files) {
// The files were moved and added successfully, remove original file links
for (IngestedFileInfo& f : files_to_ingest_) {
Expand Down Expand Up @@ -479,13 +505,13 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(

Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style,
IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno) {
SequenceNumber last_seqno, IngestedFileInfo* file_to_ingest,
SequenceNumber* assigned_seqno) {
Status status;
*assigned_seqno = 0;
const SequenceNumber last_seqno = versions_->LastSequence();
if (force_global_seqno) {
*assigned_seqno = last_seqno + 1;
if (compaction_style == kCompactionStyleUniversal) {
if (compaction_style == kCompactionStyleUniversal || files_overlap_) {
file_to_ingest->picked_level = 0;
return status;
}
Expand Down Expand Up @@ -547,6 +573,12 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
target_level = lvl;
}
}
// If files overlap, we have to ingest them at level 0 and assign the newest
// sequence number
if (files_overlap_) {
target_level = 0;
*assigned_seqno = last_seqno + 1;
}
TEST_SYNC_POINT_CALLBACK(
"ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
&overlap_with_db);
Expand Down
17 changes: 12 additions & 5 deletions db/external_sst_file_ingestion_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "db/dbformat.h"
#include "db/internal_stats.h"
#include "db/snapshot_impl.h"
#include "logging/event_logger.h"
#include "options/db_options.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
Expand Down Expand Up @@ -71,7 +72,7 @@ class ExternalSstFileIngestionJob {
const ImmutableDBOptions& db_options, const EnvOptions& env_options,
SnapshotList* db_snapshots,
const IngestExternalFileOptions& ingestion_options,
Directories* directories)
Directories* directories, EventLogger* event_logger)
: env_(env),
versions_(versions),
cfd_(cfd),
Expand All @@ -80,8 +81,9 @@ class ExternalSstFileIngestionJob {
db_snapshots_(db_snapshots),
ingestion_options_(ingestion_options),
directories_(directories),
event_logger_(event_logger),
job_start_time_(env_->NowMicros()),
consumed_seqno_(false) {
consumed_seqno_count_(0) {
assert(directories != nullptr);
}

Expand Down Expand Up @@ -116,8 +118,8 @@ class ExternalSstFileIngestionJob {
return files_to_ingest_;
}

// Whether to increment VersionSet's seqno after this job runs
bool ShouldIncrementLastSequence() const { return consumed_seqno_; }
// How many sequence numbers did we consume as part of the ingest job?
int ConsumedSequenceNumbersCount() const { return consumed_seqno_count_; }

private:
// Open the external file and populate `file_to_ingest` with all the
Expand All @@ -132,6 +134,7 @@ class ExternalSstFileIngestionJob {
Status AssignLevelAndSeqnoForIngestedFile(SuperVersion* sv,
bool force_global_seqno,
CompactionStyle compaction_style,
SequenceNumber last_seqno,
IngestedFileInfo* file_to_ingest,
SequenceNumber* assigned_seqno);

Expand Down Expand Up @@ -163,9 +166,13 @@ class ExternalSstFileIngestionJob {
autovector<IngestedFileInfo> files_to_ingest_;
const IngestExternalFileOptions& ingestion_options_;
Directories* directories_;
EventLogger* event_logger_;
VersionEdit edit_;
uint64_t job_start_time_;
bool consumed_seqno_;
int consumed_seqno_count_;
// Set in ExternalSstFileIngestionJob::Prepare(), if true all files are
// ingested in L0
bool files_overlap_{false};
};

} // namespace rocksdb