diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 8f104f707a9..4bcab93a867 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -3763,9 +3763,9 @@ Status DBImpl::IngestExternalFiles( std::vector ingestion_jobs; for (const auto& arg : args) { auto* cfd = static_cast(arg.column_family)->cfd(); - ingestion_jobs.emplace_back(env_, versions_.get(), cfd, - immutable_db_options_, env_options_, - &snapshots_, arg.options, &directories_); + ingestion_jobs.emplace_back( + env_, versions_.get(), cfd, immutable_db_options_, env_options_, + &snapshots_, arg.options, &directories_, &event_logger_); } std::vector> exec_results; for (size_t i = 0; i != num_cfs; ++i) { @@ -3895,19 +3895,21 @@ Status DBImpl::IngestExternalFiles( } } if (status.ok()) { - bool should_increment_last_seqno = - ingestion_jobs[0].ShouldIncrementLastSequence(); + int consumed_seqno_count = + ingestion_jobs[0].ConsumedSequenceNumbersCount(); #ifndef NDEBUG for (size_t i = 1; i != num_cfs; ++i) { - assert(should_increment_last_seqno == - ingestion_jobs[i].ShouldIncrementLastSequence()); + assert(!!consumed_seqno_count == + !!ingestion_jobs[i].ConsumedSequenceNumbersCount()); + consumed_seqno_count += + ingestion_jobs[i].ConsumedSequenceNumbersCount(); } #endif - if (should_increment_last_seqno) { + if (consumed_seqno_count > 0) { const SequenceNumber last_seqno = versions_->LastSequence(); - versions_->SetLastAllocatedSequence(last_seqno + 1); - versions_->SetLastPublishedSequence(last_seqno + 1); - versions_->SetLastSequence(last_seqno + 1); + versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count); + versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count); + versions_->SetLastSequence(last_seqno + consumed_seqno_count); } autovector cfds_to_commit; autovector mutable_cf_options_list; diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index c85d7394ea7..43a003a85cc 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -1070,6 +1070,47 @@ TEST_P(ExternalSSTFileBasicTest, IngestExternalFileWithCorruptedPropsBlock) { } while (ChangeOptionsForFileIngestionTest()); } +TEST_F(ExternalSSTFileBasicTest, OverlappingFiles) { + Options options = CurrentOptions(); + + std::vector files; + { + SstFileWriter sst_file_writer(EnvOptions(), options); + std::string file1 = sst_files_dir_ + "file1.sst"; + ASSERT_OK(sst_file_writer.Open(file1)); + ASSERT_OK(sst_file_writer.Put("a", "z")); + ASSERT_OK(sst_file_writer.Put("i", "m")); + ExternalSstFileInfo file1_info; + ASSERT_OK(sst_file_writer.Finish(&file1_info)); + files.push_back(std::move(file1)); + } + { + SstFileWriter sst_file_writer(EnvOptions(), options); + std::string file2 = sst_files_dir_ + "file2.sst"; + ASSERT_OK(sst_file_writer.Open(file2)); + ASSERT_OK(sst_file_writer.Put("i", "k")); + ExternalSstFileInfo file2_info; + ASSERT_OK(sst_file_writer.Finish(&file2_info)); + files.push_back(std::move(file2)); + } + + IngestExternalFileOptions ifo; + ASSERT_OK(db_->IngestExternalFile(files, ifo)); + ASSERT_EQ(Get("a"), "z"); + ASSERT_EQ(Get("i"), "k"); + + int total_keys = 0; + Iterator* iter = db_->NewIterator(ReadOptions()); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + total_keys++; + } + delete iter; + ASSERT_EQ(total_keys, 2); + + ASSERT_EQ(2, NumTableFilesAtLevel(0)); +} + INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest, testing::Values(std::make_tuple(true, true), std::make_tuple(true, false), diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 03bcd424022..3926d7fa9ff 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -71,11 +71,16 @@ Status ExternalSstFileIngestionJob::Prepare( for (size_t i = 0; i < num_files - 1; i++) { if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key, sorted_files[i + 1]->smallest_internal_key) >= 0) { - return Status::NotSupported("Files have overlapping ranges"); + files_overlap_ = true; + break; } } } + if (ingestion_options_.ingest_behind && files_overlap_) { + return Status::NotSupported("Files have overlapping ranges"); + } + for (IngestedFileInfo& f : files_to_ingest_) { if (f.num_entries == 0 && f.num_range_deletions == 0) { return Status::InvalidArgument("File contain no entries"); @@ -212,7 +217,7 @@ Status ExternalSstFileIngestionJob::Run() { } // It is safe to use this instead of LastAllocatedSequence since we are // the only active writer, and hence they are equal - const SequenceNumber last_seqno = versions_->LastSequence(); + SequenceNumber last_seqno = versions_->LastSequence(); edit_.SetColumnFamily(cfd_->GetID()); // The levels that the files will be ingested into @@ -222,8 +227,8 @@ Status ExternalSstFileIngestionJob::Run() { status = CheckLevelForIngestedBehindFile(&f); } else { status = AssignLevelAndSeqnoForIngestedFile( - super_version, force_global_seqno, cfd_->ioptions()->compaction_style, - &f, &assigned_seqno); + super_version, force_global_seqno, cfd_->ioptions()->compaction_style, + last_seqno, &f, &assigned_seqno); } if (!status.ok()) { return status; @@ -231,8 +236,10 @@ Status ExternalSstFileIngestionJob::Run() { status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno); TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run", &assigned_seqno); - if (assigned_seqno == last_seqno + 1) { - consumed_seqno_ = true; + if (assigned_seqno > last_seqno) { + assert(assigned_seqno == last_seqno + 1); + last_seqno = assigned_seqno; + ++consumed_seqno_count_; } if (!status.ok()) { return status; @@ -250,6 +257,13 @@ void ExternalSstFileIngestionJob::UpdateStats() { uint64_t total_keys = 0; uint64_t total_l0_files = 0; uint64_t total_time = env_->NowMicros() - job_start_time_; + + EventLoggerStream stream = event_logger_->Log(); + stream << "event" + << "ingest_finished"; + stream << "files_ingested"; + stream.StartArray(); + for (IngestedFileInfo& f : files_to_ingest_) { InternalStats::CompactionStats stats(CompactionReason::kExternalSstIngestion, 1); stats.micros = total_time; @@ -277,7 +291,18 @@ void ExternalSstFileIngestionJob::UpdateStats() { "(global_seqno=%" PRIu64 ")\n", f.external_file_path.c_str(), f.picked_level, f.internal_file_path.c_str(), f.assigned_seqno); + stream << "file" << f.internal_file_path << "level" << f.picked_level; } + stream.EndArray(); + + stream << "lsm_state"; + stream.StartArray(); + auto vstorage = cfd_->current()->storage_info(); + for (int level = 0; level < vstorage->num_levels(); ++level) { + stream << vstorage->NumLevelFiles(level); + } + stream.EndArray(); + cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL, total_keys); cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL, @@ -301,7 +326,8 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) { f.internal_file_path.c_str(), s.ToString().c_str()); } } - consumed_seqno_ = false; + consumed_seqno_count_ = 0; + files_overlap_ = false; } else if (status.ok() && ingestion_options_.move_files) { // The files were moved and added successfully, remove original file links for (IngestedFileInfo& f : files_to_ingest_) { @@ -479,13 +505,13 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo( Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style, - IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno) { + SequenceNumber last_seqno, IngestedFileInfo* file_to_ingest, + SequenceNumber* assigned_seqno) { Status status; *assigned_seqno = 0; - const SequenceNumber last_seqno = versions_->LastSequence(); if (force_global_seqno) { *assigned_seqno = last_seqno + 1; - if (compaction_style == kCompactionStyleUniversal) { + if (compaction_style == kCompactionStyleUniversal || files_overlap_) { file_to_ingest->picked_level = 0; return status; } @@ -547,6 +573,12 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( target_level = lvl; } } + // If files overlap, we have to ingest them at level 0 and assign the newest + // sequence number + if (files_overlap_) { + target_level = 0; + *assigned_seqno = last_seqno + 1; + } TEST_SYNC_POINT_CALLBACK( "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile", &overlap_with_db); diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index 4f9fac2416d..90b8326bbef 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -12,6 +12,7 @@ #include "db/dbformat.h" #include "db/internal_stats.h" #include "db/snapshot_impl.h" +#include "logging/event_logger.h" #include "options/db_options.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -71,7 +72,7 @@ class ExternalSstFileIngestionJob { const ImmutableDBOptions& db_options, const EnvOptions& env_options, SnapshotList* db_snapshots, const IngestExternalFileOptions& ingestion_options, - Directories* directories) + Directories* directories, EventLogger* event_logger) : env_(env), versions_(versions), cfd_(cfd), @@ -80,8 +81,9 @@ class ExternalSstFileIngestionJob { db_snapshots_(db_snapshots), ingestion_options_(ingestion_options), directories_(directories), + event_logger_(event_logger), job_start_time_(env_->NowMicros()), - consumed_seqno_(false) { + consumed_seqno_count_(0) { assert(directories != nullptr); } @@ -116,8 +118,8 @@ class ExternalSstFileIngestionJob { return files_to_ingest_; } - // Whether to increment VersionSet's seqno after this job runs - bool ShouldIncrementLastSequence() const { return consumed_seqno_; } + // How many sequence numbers did we consume as part of the ingest job? + int ConsumedSequenceNumbersCount() const { return consumed_seqno_count_; } private: // Open the external file and populate `file_to_ingest` with all the @@ -132,6 +134,7 @@ class ExternalSstFileIngestionJob { Status AssignLevelAndSeqnoForIngestedFile(SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style, + SequenceNumber last_seqno, IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno); @@ -163,9 +166,13 @@ class ExternalSstFileIngestionJob { autovector files_to_ingest_; const IngestExternalFileOptions& ingestion_options_; Directories* directories_; + EventLogger* event_logger_; VersionEdit edit_; uint64_t job_start_time_; - bool consumed_seqno_; + int consumed_seqno_count_; + // Set in ExternalSstFileIngestionJob::Prepare(), if true all files are + // ingested in L0 + bool files_overlap_{false}; }; } // namespace rocksdb