Skip to content

Commit

Permalink
Cleanup reduant code
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang committed Jul 8, 2024
1 parent 1ad8a21 commit 767b550
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 20 deletions.
29 changes: 15 additions & 14 deletions dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,6 @@ void SSTFilesToBlockInputStream::checkFinishedState(SSTReaderPtr & reader, Colum
return;
if (!reader->remained())
return;
if (prehandle_task->isAbort())
return;

// now the stream must be stopped by `soft_limit`, let's check the keys in reader
RUNTIME_CHECK_MSG(soft_limit.has_value(), "soft_limit.has_value(), cf={}", magic_enum::enum_name(cf));
Expand All @@ -163,9 +161,13 @@ void SSTFilesToBlockInputStream::checkFinishedState(SSTReaderPtr & reader, Colum

void SSTFilesToBlockInputStream::readSuffix()
{
checkFinishedState(write_cf_reader, ColumnFamilyType::Write);
checkFinishedState(default_cf_reader, ColumnFamilyType::Default);
checkFinishedState(lock_cf_reader, ColumnFamilyType::Lock);
// For aborted task, we don't need to check the finish state
if (!prehandle_task->isAbort())
{
checkFinishedState(write_cf_reader, ColumnFamilyType::Write);
checkFinishedState(default_cf_reader, ColumnFamilyType::Default);
checkFinishedState(lock_cf_reader, ColumnFamilyType::Lock);
}

// reset all SSTReaders and return without writting blocks any more.
write_cf_reader.reset();
Expand All @@ -179,7 +181,7 @@ Block SSTFilesToBlockInputStream::read()

while (write_cf_reader && write_cf_reader->remained())
{
bool should_stop_advancing = maybeStopBySoftLimit(ColumnFamilyType::Write, write_cf_reader);
bool should_stop_advancing = maybeStopBySoftLimit(ColumnFamilyType::Write, write_cf_reader.get());
if (should_stop_advancing)
{
// Load the last batch
Expand Down Expand Up @@ -241,22 +243,19 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
const DecodedTiKVKey * const rowkey_to_be_included)
{
SSTReader * reader;
SSTReaderPtr * reader_ptr;
size_t * p_process_keys;
size_t * p_process_keys_bytes;
DecodedTiKVKey * last_loaded_rowkey;
if (cf == ColumnFamilyType::Default)
{
reader = default_cf_reader.get();
reader_ptr = &default_cf_reader;
p_process_keys = &process_keys.default_cf;
p_process_keys_bytes = &process_keys.default_cf_bytes;
last_loaded_rowkey = &default_last_loaded_rowkey;
}
else if (cf == ColumnFamilyType::Lock)
{
reader = lock_cf_reader.get();
reader_ptr = &lock_cf_reader;
p_process_keys = &process_keys.lock_cf;
p_process_keys_bytes = &process_keys.lock_cf_bytes;
last_loaded_rowkey = &lock_last_loaded_rowkey;
Expand All @@ -266,7 +265,7 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(

if (reader && reader->remained())
{
maybeSkipBySoftLimit(cf, *reader_ptr);
maybeSkipBySoftLimit(cf, reader);
}

Stopwatch sw;
Expand All @@ -276,7 +275,7 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
{
while (reader && reader->remained())
{
if (maybeStopBySoftLimit(cf, *reader_ptr))
if (maybeStopBySoftLimit(cf, reader))
{
break;
}
Expand Down Expand Up @@ -335,7 +334,7 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
// Let's try to load keys until process_keys_offset_end
while (reader && reader->remained() && *p_process_keys < process_keys_offset_end)
{
if (maybeStopBySoftLimit(cf, *reader_ptr))
if (maybeStopBySoftLimit(cf, reader))
{
break;
}
Expand Down Expand Up @@ -420,8 +419,9 @@ std::vector<std::string> SSTFilesToBlockInputStream::findSplitKeys(size_t splits

// Returning false means no skip is performed, the reader is intact.
// Returning true means skip is performed, must read from current value.
bool SSTFilesToBlockInputStream::maybeSkipBySoftLimit(ColumnFamilyType cf, SSTReaderPtr & reader)
bool SSTFilesToBlockInputStream::maybeSkipBySoftLimit(ColumnFamilyType cf, SSTReader * reader)
{
assert(reader != nullptr);
if (!soft_limit.has_value())
return false;
const auto & start_limit = soft_limit.value().getStartLimit();
Expand Down Expand Up @@ -504,8 +504,9 @@ bool SSTFilesToBlockInputStream::maybeSkipBySoftLimit(ColumnFamilyType cf, SSTRe
return false;
}

bool SSTFilesToBlockInputStream::maybeStopBySoftLimit(ColumnFamilyType cf, SSTReaderPtr & reader)
bool SSTFilesToBlockInputStream::maybeStopBySoftLimit(ColumnFamilyType cf, SSTReader * reader)
{
assert(reader != nullptr);
if (!soft_limit.has_value())
return false;
const SSTScanSoftLimit & sl = soft_limit.value();
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream
size_t lock_cf_bytes = 0;

inline size_t total() const { return default_cf + write_cf + lock_cf; }
inline size_t total_bytes() const { return default_cf_bytes + write_cf_bytes + lock_cf_bytes; }
inline size_t totalBytes() const { return default_cf_bytes + write_cf_bytes + lock_cf_bytes; }
};

const ProcessKeys & getProcessKeys() const { return process_keys; }
Expand All @@ -149,15 +149,15 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream
}

using SSTReaderPtr = std::unique_ptr<SSTReader>;
bool maybeSkipBySoftLimit(ColumnFamilyType cf, SSTReaderPtr & reader);
bool maybeSkipBySoftLimit() { return maybeSkipBySoftLimit(ColumnFamilyType::Write, write_cf_reader); }
bool maybeSkipBySoftLimit(ColumnFamilyType cf, SSTReader * reader);
bool maybeSkipBySoftLimit() { return maybeSkipBySoftLimit(ColumnFamilyType::Write, write_cf_reader.get()); }

private:
void loadCFDataFromSST(ColumnFamilyType cf, const DecodedTiKVKey * rowkey_to_be_included);

// Emits data into block if the transaction to this key is committed.
Block readCommitedBlock();
bool maybeStopBySoftLimit(ColumnFamilyType cf, SSTReaderPtr & reader);
bool maybeStopBySoftLimit(ColumnFamilyType cf, SSTReader * reader);
void checkFinishedState(SSTReaderPtr & reader, ColumnFamilyType cf);

private:
Expand Down Expand Up @@ -219,7 +219,7 @@ class BoundedSSTFilesToBlockInputStream final

// Note that we only keep _raw_child for getting ingest info / process key, etc. All block should be
// read from `mvcc_compact_stream`
const SSTFilesToBlockInputStreamPtr _raw_child;
const SSTFilesToBlockInputStreamPtr _raw_child; // NOLINT(readability-identifier-naming)
std::unique_ptr<DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_COMPACT>> mvcc_compact_stream;
};

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ static inline std::tuple<ReadFromStreamResult, PrehandleResult> executeTransform
.ingest_ids = stream->outputFiles(),
.stats = PrehandleResult::Stats{
.parallels = 1,
.raft_snapshot_bytes = sst_stream->getProcessKeys().total_bytes(),
.raft_snapshot_bytes = sst_stream->getProcessKeys().totalBytes(),
.approx_raft_snapshot_size = 0,
.dt_disk_bytes = stream->getTotalBytesOnDisk(),
.dt_total_bytes = stream->getTotalCommittedBytes(),
Expand Down

0 comments on commit 767b550

Please sign in to comment.