Skip to content

Commit

Permalink
[IngestExternalFile] Allow ingesting overlapping files
Browse files Browse the repository at this point in the history
Currently IngestExternalFile() fails when its input files' ranges overlap. This condition doesn't need to hold for files that are to be ingested in L0, though.

This commit allows overlapping files and forces their target level to L0.

Additionally, ingest job's completion is logged to EventLogger, analogous to flush and compaction jobs.
  • Loading branch information
igorcanadi committed Sep 13, 2019
1 parent 1a928c2 commit 6cf74b0
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 26 deletions.
24 changes: 13 additions & 11 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3762,9 +3762,9 @@ Status DBImpl::IngestExternalFiles(
std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
for (const auto& arg : args) {
auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
ingestion_jobs.emplace_back(env_, versions_.get(), cfd,
immutable_db_options_, env_options_,
&snapshots_, arg.options, &directories_);
ingestion_jobs.emplace_back(
env_, versions_.get(), cfd, immutable_db_options_, env_options_,
&snapshots_, arg.options, &directories_, &event_logger_);
}
std::vector<std::pair<bool, Status>> exec_results;
for (size_t i = 0; i != num_cfs; ++i) {
Expand Down Expand Up @@ -3894,19 +3894,21 @@ Status DBImpl::IngestExternalFiles(
}
}
if (status.ok()) {
bool should_increment_last_seqno =
ingestion_jobs[0].ShouldIncrementLastSequence();
int consumed_seqno_count =
ingestion_jobs[0].ConsumedSequenceNumbersCount();
#ifndef NDEBUG
for (size_t i = 1; i != num_cfs; ++i) {
assert(should_increment_last_seqno ==
ingestion_jobs[i].ShouldIncrementLastSequence());
assert(!!consumed_seqno_count ==
!!ingestion_jobs[i].ConsumedSequenceNumbersCount());
consumed_seqno_count +=
ingestion_jobs[i].ConsumedSequenceNumbersCount();
}
#endif
if (should_increment_last_seqno) {
if (consumed_seqno_count > 0) {
const SequenceNumber last_seqno = versions_->LastSequence();
versions_->SetLastAllocatedSequence(last_seqno + 1);
versions_->SetLastPublishedSequence(last_seqno + 1);
versions_->SetLastSequence(last_seqno + 1);
versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);
versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count);
versions_->SetLastSequence(last_seqno + consumed_seqno_count);
}
autovector<ColumnFamilyData*> cfds_to_commit;
autovector<const MutableCFOptions*> mutable_cf_options_list;
Expand Down
41 changes: 41 additions & 0 deletions db/external_sst_file_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,47 @@ TEST_P(ExternalSSTFileBasicTest, IngestExternalFileWithCorruptedPropsBlock) {
} while (ChangeOptionsForFileIngestionTest());
}

TEST_F(ExternalSSTFileBasicTest, OverlappingFiles) {
Options options = CurrentOptions();

std::vector<std::string> files;
{
SstFileWriter sst_file_writer(EnvOptions(), options);
std::string file1 = sst_files_dir_ + "file1.sst";
ASSERT_OK(sst_file_writer.Open(file1));
ASSERT_OK(sst_file_writer.Put("a", "z"));
ASSERT_OK(sst_file_writer.Put("i", "m"));
ExternalSstFileInfo file1_info;
ASSERT_OK(sst_file_writer.Finish(&file1_info));
files.push_back(std::move(file1));
}
{
SstFileWriter sst_file_writer(EnvOptions(), options);
std::string file2 = sst_files_dir_ + "file2.sst";
ASSERT_OK(sst_file_writer.Open(file2));
ASSERT_OK(sst_file_writer.Put("i", "k"));
ExternalSstFileInfo file2_info;
ASSERT_OK(sst_file_writer.Finish(&file2_info));
files.push_back(std::move(file2));
}

IngestExternalFileOptions ifo;
ASSERT_OK(db_->IngestExternalFile(files, ifo));
ASSERT_EQ(Get("a"), "z");
ASSERT_EQ(Get("i"), "k");

int total_keys = 0;
Iterator* iter = db_->NewIterator(ReadOptions());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
total_keys++;
}
delete iter;
ASSERT_EQ(total_keys, 2);

ASSERT_EQ(2, NumTableFilesAtLevel(0));
}

INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest,
testing::Values(std::make_tuple(true, true),
std::make_tuple(true, false),
Expand Down
51 changes: 41 additions & 10 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,16 @@ Status ExternalSstFileIngestionJob::Prepare(
for (size_t i = 0; i < num_files - 1; i++) {
if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,
sorted_files[i + 1]->smallest_internal_key) >= 0) {
return Status::NotSupported("Files have overlapping ranges");
files_overlap_ = true;
break;
}
}
}

if (ingestion_options_.ingest_behind && files_overlap_) {
return Status::NotSupported("Files have overlapping ranges");
}

for (IngestedFileInfo& f : files_to_ingest_) {
if (f.num_entries == 0 && f.num_range_deletions == 0) {
return Status::InvalidArgument("File contain no entries");
Expand Down Expand Up @@ -212,7 +217,7 @@ Status ExternalSstFileIngestionJob::Run() {
}
// It is safe to use this instead of LastAllocatedSequence since we are
// the only active writer, and hence they are equal
const SequenceNumber last_seqno = versions_->LastSequence();
SequenceNumber last_seqno = versions_->LastSequence();
edit_.SetColumnFamily(cfd_->GetID());
// The levels that the files will be ingested into

Expand All @@ -222,17 +227,19 @@ Status ExternalSstFileIngestionJob::Run() {
status = CheckLevelForIngestedBehindFile(&f);
} else {
status = AssignLevelAndSeqnoForIngestedFile(
super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
&f, &assigned_seqno);
super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
&f, last_seqno, &assigned_seqno);
}
if (!status.ok()) {
return status;
}
status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno);
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
&assigned_seqno);
if (assigned_seqno == last_seqno + 1) {
consumed_seqno_ = true;
if (assigned_seqno > last_seqno) {
assert(assigned_seqno == last_seqno + 1);
last_seqno = assigned_seqno;
consumed_seqno_count_ += 1;
}
if (!status.ok()) {
return status;
Expand All @@ -250,6 +257,13 @@ void ExternalSstFileIngestionJob::UpdateStats() {
uint64_t total_keys = 0;
uint64_t total_l0_files = 0;
uint64_t total_time = env_->NowMicros() - job_start_time_;

EventLoggerStream stream = event_logger_->Log();
stream << "event"
<< "ingest_finished";
stream << "files_ingested";
stream.StartArray();

for (IngestedFileInfo& f : files_to_ingest_) {
InternalStats::CompactionStats stats(CompactionReason::kExternalSstIngestion, 1);
stats.micros = total_time;
Expand Down Expand Up @@ -277,7 +291,18 @@ void ExternalSstFileIngestionJob::UpdateStats() {
"(global_seqno=%" PRIu64 ")\n",
f.external_file_path.c_str(), f.picked_level,
f.internal_file_path.c_str(), f.assigned_seqno);
stream << "file" << f.internal_file_path << "level" << f.picked_level;
}
stream.EndArray();

stream << "lsm_state";
stream.StartArray();
auto vstorage = cfd_->current()->storage_info();
for (int level = 0; level < vstorage->num_levels(); ++level) {
stream << vstorage->NumLevelFiles(level);
}
stream.EndArray();

cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL,
total_keys);
cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL,
Expand All @@ -301,7 +326,7 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
f.internal_file_path.c_str(), s.ToString().c_str());
}
}
consumed_seqno_ = false;
consumed_seqno_count_ = 0;
} else if (status.ok() && ingestion_options_.move_files) {
// The files were moved and added successfully, remove original file links
for (IngestedFileInfo& f : files_to_ingest_) {
Expand Down Expand Up @@ -479,13 +504,13 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(

Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style,
IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno) {
IngestedFileInfo* file_to_ingest, SequenceNumber last_seqno,
SequenceNumber* assigned_seqno) {
Status status;
*assigned_seqno = 0;
const SequenceNumber last_seqno = versions_->LastSequence();
if (force_global_seqno) {
*assigned_seqno = last_seqno + 1;
if (compaction_style == kCompactionStyleUniversal) {
if (compaction_style == kCompactionStyleUniversal || files_overlap_) {
file_to_ingest->picked_level = 0;
return status;
}
Expand Down Expand Up @@ -547,6 +572,12 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
target_level = lvl;
}
}
// If files overlap, we have to ingest them at level 0 and assign the newest
// sequence number
if (files_overlap_) {
target_level = 0;
*assigned_seqno = last_seqno + 1;
}
TEST_SYNC_POINT_CALLBACK(
"ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
&overlap_with_db);
Expand Down
17 changes: 12 additions & 5 deletions db/external_sst_file_ingestion_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "db/dbformat.h"
#include "db/internal_stats.h"
#include "db/snapshot_impl.h"
#include "logging/event_logger.h"
#include "options/db_options.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
Expand Down Expand Up @@ -71,7 +72,7 @@ class ExternalSstFileIngestionJob {
const ImmutableDBOptions& db_options, const EnvOptions& env_options,
SnapshotList* db_snapshots,
const IngestExternalFileOptions& ingestion_options,
Directories* directories)
Directories* directories, EventLogger* event_logger)
: env_(env),
versions_(versions),
cfd_(cfd),
Expand All @@ -80,8 +81,9 @@ class ExternalSstFileIngestionJob {
db_snapshots_(db_snapshots),
ingestion_options_(ingestion_options),
directories_(directories),
event_logger_(event_logger),
job_start_time_(env_->NowMicros()),
consumed_seqno_(false) {
consumed_seqno_count_(0) {
assert(directories != nullptr);
}

Expand Down Expand Up @@ -116,8 +118,8 @@ class ExternalSstFileIngestionJob {
return files_to_ingest_;
}

// Whether to increment VersionSet's seqno after this job runs
bool ShouldIncrementLastSequence() const { return consumed_seqno_; }
// How many sequence numbers did we consume as part of the ingest job?
int ConsumedSequenceNumbersCount() const { return consumed_seqno_count_; }

private:
// Open the external file and populate `file_to_ingest` with all the
Expand All @@ -133,6 +135,7 @@ class ExternalSstFileIngestionJob {
bool force_global_seqno,
CompactionStyle compaction_style,
IngestedFileInfo* file_to_ingest,
SequenceNumber last_seqno,
SequenceNumber* assigned_seqno);

// File that we want to ingest behind always goes to the lowest level;
Expand Down Expand Up @@ -163,9 +166,13 @@ class ExternalSstFileIngestionJob {
autovector<IngestedFileInfo> files_to_ingest_;
const IngestExternalFileOptions& ingestion_options_;
Directories* directories_;
EventLogger* event_logger_;
VersionEdit edit_;
uint64_t job_start_time_;
bool consumed_seqno_;
int consumed_seqno_count_;
// Set in ExternalSstFileIngestionJob::Prepare(), if true all files are
// ingested in L0
bool files_overlap_{false};
};

} // namespace rocksdb

0 comments on commit 6cf74b0

Please sign in to comment.