Skip to content

Commit

Permalink
block per kv checksum
Browse files Browse the repository at this point in the history
  • Loading branch information
cbi42 committed Mar 10, 2023
1 parent 969d4e1 commit f507a58
Show file tree
Hide file tree
Showing 46 changed files with 1,567 additions and 207 deletions.
3 changes: 2 additions & 1 deletion db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ Status BuildTable(
MaxFileSizeForL0MetaPin(mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key*/ nullptr,
/*allow_unprepared_value*/ false));
/*allow_unprepared_value*/ false,
mutable_cf_options.block_protection_bytes_per_key));
s = it->status();
if (s.ok() && paranoid_file_checks) {
OutputValidator file_validator(tboptions.internal_comparator,
Expand Down
6 changes: 6 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,12 @@ Status ColumnFamilyData::ValidateOptions(
"Memtable per key-value checksum protection only supports 0, 1, 2, 4 "
"or 8 bytes per key.");
}
if (std::find(supported.begin(), supported.end(),
cf_options.block_protection_bytes_per_key) == supported.end()) {
return Status::NotSupported(
"Block per key-value checksum protection only supports 0, 1, 2, 4 "
"or 8 bytes per key.");
}
return s;
}

Expand Down
8 changes: 6 additions & 2 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,9 @@ void CompactionJob::GenSubcompactionBoundaries() {
FileMetaData* f = flevel->files[i].file_metadata;
std::vector<TableReader::Anchor> my_anchors;
Status s = cfd->table_cache()->ApproximateKeyAnchors(
ReadOptions(), icomp, *f, my_anchors);
ReadOptions(), icomp, *f,
c->mutable_cf_options()->block_protection_bytes_per_key,
my_anchors);
if (!s.ok() || my_anchors.empty()) {
my_anchors.emplace_back(f->largest.user_key(), f->fd.GetFileSize());
}
Expand Down Expand Up @@ -736,7 +738,9 @@ Status CompactionJob::Run() {
*compact_->compaction->mutable_cf_options()),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr,
/*allow_unprepared_value=*/false);
/*allow_unprepared_value=*/false,
compact_->compaction->mutable_cf_options()
->block_protection_bytes_per_key);
auto s = iter->status();

if (s.ok() && paranoid_file_checks_) {
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,8 @@ class CompactionJobTestBase : public testing::Test {
Status s = cf_options_.table_factory->NewTableReader(
read_opts,
TableReaderOptions(*cfd->ioptions(), nullptr, FileOptions(),
cfd_->internal_comparator()),
cfd_->internal_comparator(),
0 /* block_protection_bytes_per_key */),
std::move(freader), file_size, &table_reader, false);
ASSERT_OK(s);
assert(table_reader);
Expand Down
6 changes: 4 additions & 2 deletions db/convenience.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ Status VerifySstFileChecksum(const Options& options,
const bool kImmortal = true;
auto reader_options = TableReaderOptions(
ioptions, options.prefix_extractor, env_options, internal_comparator,
false /* skip_filters */, !kImmortal, false /* force_direct_prefetch */,
-1 /* level */);
options.block_protection_bytes_per_key, false /* skip_filters */,
!kImmortal, false /* force_direct_prefetch */, -1 /* level */);
reader_options.largest_seqno = largest_seqno;
reader_options.block_protection_bytes_per_key =
options.block_protection_bytes_per_key;
s = ioptions.table_factory->NewTableReader(
reader_options, std::move(file_reader), file_size, &table_reader,
false /* prefetch_index_and_filter_in_cache */);
Expand Down
4 changes: 3 additions & 1 deletion db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -678,11 +678,13 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
TableReaderOptions(
*cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
env_options_, cfd_->internal_comparator(),
sv->mutable_cf_options.block_protection_bytes_per_key,
/*skip_filters*/ false, /*immortal*/ false,
/*force_direct_prefetch*/ false, /*level*/ -1,
/*block_cache_tracer*/ nullptr,
/*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
/*cur_file_num*/ new_file_number),
/*cur_file_num*/ new_file_number, /*unique_id*/ {},
/*largest_seqno*/ 0),
std::move(sst_file_reader), file_to_ingest->file_size, &table_reader);
if (!status.ok()) {
return status;
Expand Down
21 changes: 14 additions & 7 deletions db/forward_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ForwardLevelIterator : public InternalIterator {
const ColumnFamilyData* const cfd, const ReadOptions& read_options,
const std::vector<FileMetaData*>& files,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
bool allow_unprepared_value)
bool allow_unprepared_value, uint8_t block_protection_bytes_per_key)
: cfd_(cfd),
read_options_(read_options),
files_(files),
Expand All @@ -45,7 +45,8 @@ class ForwardLevelIterator : public InternalIterator {
file_iter_(nullptr),
pinned_iters_mgr_(nullptr),
prefix_extractor_(prefix_extractor),
allow_unprepared_value_(allow_unprepared_value) {
allow_unprepared_value_(allow_unprepared_value),
block_protection_bytes_per_key_(block_protection_bytes_per_key) {
status_.PermitUncheckedError(); // Allow uninitialized status through
}

Expand Down Expand Up @@ -87,7 +88,8 @@ class ForwardLevelIterator : public InternalIterator {
/*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1,
/*max_file_size_for_l0_meta_pin=*/0,
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, allow_unprepared_value_);
/*largest_compaction_key=*/nullptr, allow_unprepared_value_,
block_protection_bytes_per_key_);
file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
valid_ = false;
if (!range_del_agg.IsEmpty()) {
Expand Down Expand Up @@ -211,6 +213,7 @@ class ForwardLevelIterator : public InternalIterator {
// Kept alive by ForwardIterator::sv_->mutable_cf_options
const std::shared_ptr<const SliceTransform>& prefix_extractor_;
const bool allow_unprepared_value_;
const uint8_t block_protection_bytes_per_key_;
};

ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
Expand Down Expand Up @@ -735,7 +738,8 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, allow_unprepared_value_));
/*largest_compaction_key=*/nullptr, allow_unprepared_value_,
sv_->mutable_cf_options.block_protection_bytes_per_key));
}
BuildLevelIterators(vstorage, sv_);
current_ = nullptr;
Expand Down Expand Up @@ -816,7 +820,8 @@ void ForwardIterator::RenewIterators() {
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(svnew->mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, allow_unprepared_value_));
/*largest_compaction_key=*/nullptr, allow_unprepared_value_,
svnew->mutable_cf_options.block_protection_bytes_per_key));
}

for (auto* f : l0_iters_) {
Expand Down Expand Up @@ -860,7 +865,8 @@ void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage,
} else {
level_iters_.push_back(new ForwardLevelIterator(
cfd_, read_options_, level_files,
sv->mutable_cf_options.prefix_extractor, allow_unprepared_value_));
sv->mutable_cf_options.prefix_extractor, allow_unprepared_value_,
sv->mutable_cf_options.block_protection_bytes_per_key));
}
}
}
Expand All @@ -882,7 +888,8 @@ void ForwardIterator::ResetIncompleteIterators() {
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, allow_unprepared_value_);
/*largest_compaction_key=*/nullptr, allow_unprepared_value_,
sv_->mutable_cf_options.block_protection_bytes_per_key);
l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_);
}

Expand Down
1 change: 1 addition & 0 deletions db/import_column_family_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
TableReaderOptions(
*cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
env_options_, cfd_->internal_comparator(),
sv->mutable_cf_options.block_protection_bytes_per_key,
/*skip_filters*/ false, /*immortal*/ false,
/*force_direct_prefetch*/ false, /*level*/ -1,
/*block_cache_tracer*/ nullptr,
Expand Down
69 changes: 69 additions & 0 deletions db/kv_checksum.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ template <typename T>
class ProtectionInfoKVOC;
template <typename T>
class ProtectionInfoKVOS;
template <typename T>
class ProtectionInfoKV;

// Aliases for 64-bit protection infos.
using ProtectionInfo64 = ProtectionInfo<uint64_t>;
Expand All @@ -64,13 +66,15 @@ class ProtectionInfo {
ProtectionInfoKVO<T> ProtectKVO(const SliceParts& key,
const SliceParts& value,
ValueType op_type) const;
ProtectionInfoKV<T> ProtectKV(const Slice& key, const Slice& value) const;

T GetVal() const { return val_; }

private:
friend class ProtectionInfoKVO<T>;
friend class ProtectionInfoKVOS<T>;
friend class ProtectionInfoKVOC<T>;
friend class ProtectionInfoKV<T>;

// Each field is hashed with an independent value so we can catch fields being
// swapped. Per the `NPHash64()` docs, using consecutive seeds is a pitfall,
Expand Down Expand Up @@ -207,6 +211,23 @@ class ProtectionInfoKVOS {
ProtectionInfoKVO<T> kvo_;
};

template <typename T>
class ProtectionInfoKV {
public:
ProtectionInfoKV() = default;

T GetVal() const { return info_.GetVal(); }

private:
friend class ProtectionInfo<T>;

explicit ProtectionInfoKV(T val) : info_(val) {
static_assert(sizeof(ProtectionInfoKV<T>) == sizeof(T));
}

ProtectionInfo<T> info_;
};

template <typename T>
Status ProtectionInfo<T>::GetStatus() const {
if (val_ != 0) {
Expand Down Expand Up @@ -244,6 +265,16 @@ ProtectionInfoKVO<T> ProtectionInfo<T>::ProtectKVO(const SliceParts& key,
return ProtectionInfoKVO<T>(val);
}

template <typename T>
ProtectionInfoKV<T> ProtectionInfo<T>::ProtectKV(const Slice& key,
const Slice& value) const {
T val = GetVal();
val = val ^ static_cast<T>(GetSliceNPHash64(key, ProtectionInfo<T>::kSeedK));
val =
val ^ static_cast<T>(GetSliceNPHash64(value, ProtectionInfo<T>::kSeedV));
return ProtectionInfoKV<T>(val);
}

template <typename T>
void ProtectionInfoKVO<T>::UpdateK(const Slice& old_key, const Slice& new_key) {
T val = GetVal();
Expand Down Expand Up @@ -395,4 +426,42 @@ void ProtectionInfoKVOS<T>::UpdateS(SequenceNumber old_sequence_number,
SetVal(val);
}

inline void EncodeKVChecksum(uint64_t checksum,
uint32_t protection_bytes_per_key, char* dst) {
switch (protection_bytes_per_key) {
case 1:
dst[0] = static_cast<uint8_t>(checksum);
break;
case 2:
EncodeFixed16(dst, static_cast<uint16_t>(checksum));
break;
case 4:
EncodeFixed32(dst, static_cast<uint32_t>(checksum));
break;
case 8:
EncodeFixed64(dst, checksum);
break;
default:
assert(false);
}
}

inline bool VerifyKVChecksum(uint8_t checksum_len, const char* checksum_ptr,
uint64_t expected) {
switch (checksum_len) {
case 1:
return static_cast<uint8_t>(checksum_ptr[0]) ==
static_cast<uint8_t>(expected);
case 2:
return DecodeFixed16(checksum_ptr) == static_cast<uint16_t>(expected);
case 4:
return DecodeFixed32(checksum_ptr) == static_cast<uint32_t>(expected);
case 8:
return DecodeFixed64(checksum_ptr) == expected;
default:
assert(false);
return false;
}
}

} // namespace ROCKSDB_NAMESPACE
43 changes: 6 additions & 37 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ void MemTable::UpdateOldestKeyTime() {
}

Status MemTable::VerifyEntryChecksum(const char* entry,
size_t protection_bytes_per_key,
uint32_t protection_bytes_per_key,
bool allow_data_in_errors) {
if (protection_bytes_per_key == 0) {
return Status::OK();
Expand Down Expand Up @@ -289,24 +289,8 @@ Status MemTable::VerifyEntryChecksum(const char* entry,
.ProtectKVO(user_key, value, type)
.ProtectS(seq)
.GetVal();
bool match = true;
switch (protection_bytes_per_key) {
case 1:
match = static_cast<uint8_t>(checksum_ptr[0]) ==
static_cast<uint8_t>(expected);
break;
case 2:
match = DecodeFixed16(checksum_ptr) == static_cast<uint16_t>(expected);
break;
case 4:
match = DecodeFixed32(checksum_ptr) == static_cast<uint32_t>(expected);
break;
case 8:
match = DecodeFixed64(checksum_ptr) == expected;
break;
default:
assert(false);
}
bool match = VerifyKVChecksum(static_cast<uint8_t>(protection_bytes_per_key),
checksum_ptr, expected);
if (!match) {
std::string msg(
"Corrupted memtable entry, per key-value checksum verification "
Expand Down Expand Up @@ -526,7 +510,7 @@ class MemTableIterator : public InternalIterator {
bool valid_;
bool arena_mode_;
bool value_pinned_;
size_t protection_bytes_per_key_;
uint32_t protection_bytes_per_key_;
Status status_;
Logger* logger_;

Expand Down Expand Up @@ -690,22 +674,7 @@ void MemTable::UpdateEntryChecksum(const ProtectionInfoKVOS64* kv_prot_info,
} else {
checksum = kv_prot_info->GetVal();
}
switch (moptions_.protection_bytes_per_key) {
case 1:
checksum_ptr[0] = static_cast<uint8_t>(checksum);
break;
case 2:
EncodeFixed16(checksum_ptr, static_cast<uint16_t>(checksum));
break;
case 4:
EncodeFixed32(checksum_ptr, static_cast<uint32_t>(checksum));
break;
case 8:
EncodeFixed64(checksum_ptr, checksum);
break;
default:
assert(false);
}
EncodeKVChecksum(checksum, moptions_.protection_bytes_per_key, checksum_ptr);
}

Status MemTable::Add(SequenceNumber s, ValueType type,
Expand Down Expand Up @@ -901,7 +870,7 @@ struct Saver {
ReadCallback* callback_;
bool* is_blob_index;
bool allow_data_in_errors;
size_t protection_bytes_per_key;
uint32_t protection_bytes_per_key;
bool CheckCallback(SequenceNumber _seq) {
if (callback_) {
return callback_->IsVisible(_seq);
Expand Down
2 changes: 1 addition & 1 deletion db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ class MemTable {

// Returns Corruption status if verification fails.
static Status VerifyEntryChecksum(const char* entry,
size_t protection_bytes_per_key,
uint32_t protection_bytes_per_key,
bool allow_data_in_errors = false);

private:
Expand Down
12 changes: 8 additions & 4 deletions db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,9 @@ class Repairer {
file_size);
std::shared_ptr<const TableProperties> props;
if (status.ok()) {
status = table_cache_->GetTableProperties(file_options_, icmp_, t->meta,
&props);
status = table_cache_->GetTableProperties(
file_options_, icmp_, t->meta, &props,
0 /* block_protection_bytes_per_key */);
}
if (status.ok()) {
auto s =
Expand Down Expand Up @@ -567,7 +568,8 @@ class Repairer {
/*level=*/-1, /*max_file_size_for_l0_meta_pin=*/0,
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr,
/*allow_unprepared_value=*/false);
/*allow_unprepared_value=*/false,
cfd->GetLatestMutableCFOptions()->block_protection_bytes_per_key);
ParsedInternalKey parsed;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Slice key = iter->key();
Expand Down Expand Up @@ -606,7 +608,9 @@ class Repairer {
ReadOptions ropts;
std::unique_ptr<FragmentedRangeTombstoneIterator> r_iter;
status = table_cache_->GetRangeTombstoneIterator(
ropts, cfd->internal_comparator(), t->meta, &r_iter);
ropts, cfd->internal_comparator(), t->meta,
cfd->GetLatestMutableCFOptions()->block_protection_bytes_per_key,
&r_iter);

if (r_iter) {
r_iter->SeekToFirst();
Expand Down
Loading

0 comments on commit f507a58

Please sign in to comment.