From 6cf74b0b72278216ea1d4737533f93f3a07db87b Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 4 Jul 2019 13:24:05 +0000 Subject: [PATCH] [IngestExternalFile] Allow ingesting overlapping files Currently IngestExternalFile() fails when its input files' ranges overlap. This condition doesn't need to hold for files that are to be ingested in L0, though. This commit allows overlapping files and forces their target level to L0. Additionally, ingest job's completion is logged to EventLogger, analogous to flush and compaction jobs. --- db/db_impl/db_impl.cc | 24 +++++++------ db/external_sst_file_basic_test.cc | 41 +++++++++++++++++++++ db/external_sst_file_ingestion_job.cc | 51 +++++++++++++++++++++------ db/external_sst_file_ingestion_job.h | 17 ++++++--- 4 files changed, 107 insertions(+), 26 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index fd1f0b45c43..aa817f3a095 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -3762,9 +3762,9 @@ Status DBImpl::IngestExternalFiles( std::vector ingestion_jobs; for (const auto& arg : args) { auto* cfd = static_cast(arg.column_family)->cfd(); - ingestion_jobs.emplace_back(env_, versions_.get(), cfd, - immutable_db_options_, env_options_, - &snapshots_, arg.options, &directories_); + ingestion_jobs.emplace_back( + env_, versions_.get(), cfd, immutable_db_options_, env_options_, + &snapshots_, arg.options, &directories_, &event_logger_); } std::vector> exec_results; for (size_t i = 0; i != num_cfs; ++i) { @@ -3894,19 +3894,21 @@ Status DBImpl::IngestExternalFiles( } } if (status.ok()) { - bool should_increment_last_seqno = - ingestion_jobs[0].ShouldIncrementLastSequence(); + int consumed_seqno_count = + ingestion_jobs[0].ConsumedSequenceNumbersCount(); #ifndef NDEBUG for (size_t i = 1; i != num_cfs; ++i) { - assert(should_increment_last_seqno == - ingestion_jobs[i].ShouldIncrementLastSequence()); + assert(!!consumed_seqno_count == + !!ingestion_jobs[i].ConsumedSequenceNumbersCount()); + consumed_seqno_count += + ingestion_jobs[i].ConsumedSequenceNumbersCount(); } #endif - if (should_increment_last_seqno) { + if (consumed_seqno_count > 0) { const SequenceNumber last_seqno = versions_->LastSequence(); - versions_->SetLastAllocatedSequence(last_seqno + 1); - versions_->SetLastPublishedSequence(last_seqno + 1); - versions_->SetLastSequence(last_seqno + 1); + versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count); + versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count); + versions_->SetLastSequence(last_seqno + consumed_seqno_count); } autovector cfds_to_commit; autovector mutable_cf_options_list; diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index c85d7394ea7..43a003a85cc 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -1070,6 +1070,47 @@ TEST_P(ExternalSSTFileBasicTest, IngestExternalFileWithCorruptedPropsBlock) { } while (ChangeOptionsForFileIngestionTest()); } +TEST_F(ExternalSSTFileBasicTest, OverlappingFiles) { + Options options = CurrentOptions(); + + std::vector files; + { + SstFileWriter sst_file_writer(EnvOptions(), options); + std::string file1 = sst_files_dir_ + "file1.sst"; + ASSERT_OK(sst_file_writer.Open(file1)); + ASSERT_OK(sst_file_writer.Put("a", "z")); + ASSERT_OK(sst_file_writer.Put("i", "m")); + ExternalSstFileInfo file1_info; + ASSERT_OK(sst_file_writer.Finish(&file1_info)); + files.push_back(std::move(file1)); + } + { + SstFileWriter sst_file_writer(EnvOptions(), options); + std::string file2 = sst_files_dir_ + "file2.sst"; + ASSERT_OK(sst_file_writer.Open(file2)); + ASSERT_OK(sst_file_writer.Put("i", "k")); + ExternalSstFileInfo file2_info; + ASSERT_OK(sst_file_writer.Finish(&file2_info)); + files.push_back(std::move(file2)); + } + + IngestExternalFileOptions ifo; + ASSERT_OK(db_->IngestExternalFile(files, ifo)); + ASSERT_EQ(Get("a"), "z"); + ASSERT_EQ(Get("i"), "k"); + + int total_keys = 0; + Iterator* iter = db_->NewIterator(ReadOptions()); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + total_keys++; + } + delete iter; + ASSERT_EQ(total_keys, 2); + + ASSERT_EQ(2, NumTableFilesAtLevel(0)); +} + INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest, testing::Values(std::make_tuple(true, true), std::make_tuple(true, false), diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 03bcd424022..f49b4d59ab1 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -71,11 +71,16 @@ Status ExternalSstFileIngestionJob::Prepare( for (size_t i = 0; i < num_files - 1; i++) { if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key, sorted_files[i + 1]->smallest_internal_key) >= 0) { - return Status::NotSupported("Files have overlapping ranges"); + files_overlap_ = true; + break; } } } + if (ingestion_options_.ingest_behind && files_overlap_) { + return Status::NotSupported("Files have overlapping ranges"); + } + for (IngestedFileInfo& f : files_to_ingest_) { if (f.num_entries == 0 && f.num_range_deletions == 0) { return Status::InvalidArgument("File contain no entries"); @@ -212,7 +217,7 @@ Status ExternalSstFileIngestionJob::Run() { } // It is safe to use this instead of LastAllocatedSequence since we are // the only active writer, and hence they are equal - const SequenceNumber last_seqno = versions_->LastSequence(); + SequenceNumber last_seqno = versions_->LastSequence(); edit_.SetColumnFamily(cfd_->GetID()); // The levels that the files will be ingested into @@ -222,8 +227,8 @@ Status ExternalSstFileIngestionJob::Run() { status = CheckLevelForIngestedBehindFile(&f); } else { status = AssignLevelAndSeqnoForIngestedFile( - super_version, force_global_seqno, cfd_->ioptions()->compaction_style, - &f, &assigned_seqno); + super_version, force_global_seqno, cfd_->ioptions()->compaction_style, + &f, last_seqno, &assigned_seqno); } if (!status.ok()) { return status; @@ -231,8 +236,10 @@ Status ExternalSstFileIngestionJob::Run() { status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno); TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run", &assigned_seqno); - if (assigned_seqno == last_seqno + 1) { - consumed_seqno_ = true; + if (assigned_seqno > last_seqno) { + assert(assigned_seqno == last_seqno + 1); + last_seqno = assigned_seqno; + consumed_seqno_count_ += 1; } if (!status.ok()) { return status; @@ -250,6 +257,13 @@ void ExternalSstFileIngestionJob::UpdateStats() { uint64_t total_keys = 0; uint64_t total_l0_files = 0; uint64_t total_time = env_->NowMicros() - job_start_time_; + + EventLoggerStream stream = event_logger_->Log(); + stream << "event" + << "ingest_finished"; + stream << "files_ingested"; + stream.StartArray(); + for (IngestedFileInfo& f : files_to_ingest_) { InternalStats::CompactionStats stats(CompactionReason::kExternalSstIngestion, 1); stats.micros = total_time; @@ -277,7 +291,18 @@ void ExternalSstFileIngestionJob::UpdateStats() { "(global_seqno=%" PRIu64 ")\n", f.external_file_path.c_str(), f.picked_level, f.internal_file_path.c_str(), f.assigned_seqno); + stream << "file" << f.internal_file_path << "level" << f.picked_level; } + stream.EndArray(); + + stream << "lsm_state"; + stream.StartArray(); + auto vstorage = cfd_->current()->storage_info(); + for (int level = 0; level < vstorage->num_levels(); ++level) { + stream << vstorage->NumLevelFiles(level); + } + stream.EndArray(); + cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL, total_keys); cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL, @@ -301,7 +326,7 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) { f.internal_file_path.c_str(), s.ToString().c_str()); } } - consumed_seqno_ = false; + consumed_seqno_count_ = 0; } else if (status.ok() && ingestion_options_.move_files) { // The files were moved and added successfully, remove original file links for (IngestedFileInfo& f : files_to_ingest_) { @@ -479,13 +504,13 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo( Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style, - IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno) { + IngestedFileInfo* file_to_ingest, SequenceNumber last_seqno, + SequenceNumber* assigned_seqno) { Status status; *assigned_seqno = 0; - const SequenceNumber last_seqno = versions_->LastSequence(); if (force_global_seqno) { *assigned_seqno = last_seqno + 1; - if (compaction_style == kCompactionStyleUniversal) { + if (compaction_style == kCompactionStyleUniversal || files_overlap_) { file_to_ingest->picked_level = 0; return status; } @@ -547,6 +572,12 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( target_level = lvl; } } + // If files overlap, we have to ingest them at level 0 and assign the newest + // sequence number + if (files_overlap_) { + target_level = 0; + *assigned_seqno = last_seqno + 1; + } TEST_SYNC_POINT_CALLBACK( "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile", &overlap_with_db); diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index 4f9fac2416d..4f84d33a8d4 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -12,6 +12,7 @@ #include "db/dbformat.h" #include "db/internal_stats.h" #include "db/snapshot_impl.h" +#include "logging/event_logger.h" #include "options/db_options.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -71,7 +72,7 @@ class ExternalSstFileIngestionJob { const ImmutableDBOptions& db_options, const EnvOptions& env_options, SnapshotList* db_snapshots, const IngestExternalFileOptions& ingestion_options, - Directories* directories) + Directories* directories, EventLogger* event_logger) : env_(env), versions_(versions), cfd_(cfd), @@ -80,8 +81,9 @@ class ExternalSstFileIngestionJob { db_snapshots_(db_snapshots), ingestion_options_(ingestion_options), directories_(directories), + event_logger_(event_logger), job_start_time_(env_->NowMicros()), - consumed_seqno_(false) { + consumed_seqno_count_(0) { assert(directories != nullptr); } @@ -116,8 +118,8 @@ class ExternalSstFileIngestionJob { return files_to_ingest_; } - // Whether to increment VersionSet's seqno after this job runs - bool ShouldIncrementLastSequence() const { return consumed_seqno_; } + // How many sequence numbers did we consume as part of the ingest job? + int ConsumedSequenceNumbersCount() const { return consumed_seqno_count_; } private: // Open the external file and populate `file_to_ingest` with all the @@ -133,6 +135,7 @@ class ExternalSstFileIngestionJob { bool force_global_seqno, CompactionStyle compaction_style, IngestedFileInfo* file_to_ingest, + SequenceNumber last_seqno, SequenceNumber* assigned_seqno); // File that we want to ingest behind always goes to the lowest level; @@ -163,9 +166,13 @@ class ExternalSstFileIngestionJob { autovector files_to_ingest_; const IngestExternalFileOptions& ingestion_options_; Directories* directories_; + EventLogger* event_logger_; VersionEdit edit_; uint64_t job_start_time_; - bool consumed_seqno_; + int consumed_seqno_count_; + // Set in ExternalSstFileIngestionJob::Prepare(), if true all files are + // ingested in L0 + bool files_overlap_{false}; }; } // namespace rocksdb