Skip to content

Commit

Permalink
Storage: Refine the snapshot creation on DeltaValueSpace (#9611)
Browse files Browse the repository at this point in the history
ref #6233
  • Loading branch information
JaySon-Huang authored Nov 14, 2024
1 parent c140fa4 commit b901c23
Show file tree
Hide file tree
Showing 27 changed files with 182 additions and 155 deletions.
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/RuntimeFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <Columns/IColumn.h>
#include <Interpreters/Set.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/Filter/RSOperator_fwd.h>
#include <tipb/executor.pb.h>

namespace DB
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Operators/DMSegmentThreadSourceOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

namespace DB
{
class RSOperator;
using RSOperatorPtr = std::shared_ptr<RSOperator>;

class DMSegmentThreadSourceOp : public SourceOp
{
Expand Down
47 changes: 32 additions & 15 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ class ColumnFileSetSnapshot
friend struct Remote::Serializer;

private:
ColumnFiles column_files;
size_t rows{0};
size_t bytes{0};
size_t deletes{0};
const ColumnFiles column_files;
const size_t rows{0};
const size_t bytes{0};
const size_t deletes{0};

public:
/// This field is public writeable intentionally. It allows us to build a snapshot first,
Expand All @@ -72,23 +72,40 @@ class ColumnFileSetSnapshot
/// Why we don't know the data provider at that time? Because when we have remote proto, data is not yet received.
IColumnFileDataProviderPtr data_provider = nullptr;

explicit ColumnFileSetSnapshot(const IColumnFileDataProviderPtr & data_provider_)
: data_provider{data_provider_}
ColumnFileSetSnapshot(
const IColumnFileDataProviderPtr & data_provider_,
ColumnFiles && column_files_,
size_t rows_,
size_t bytes_,
size_t deletes_)
: column_files(std::move(column_files_))
, rows(rows_)
, bytes(bytes_)
, deletes(deletes_)
, data_provider{data_provider_}
{}

static ColumnFileSetSnapshotPtr buildFromColumnFiles(
const IColumnFileDataProviderPtr & data_provider_,
ColumnFiles && column_files_)
{
size_t rows = 0, bytes = 0, deletes = 0;
for (const auto & column_file : column_files_)
{
rows += column_file->getRows();
bytes += column_file->getBytes();
deletes += column_file->getDeletes();
}
return std::make_shared<ColumnFileSetSnapshot>(data_provider_, std::move(column_files_), rows, bytes, deletes);
}

ColumnFileSetSnapshotPtr clone()
{
auto c = std::make_shared<ColumnFileSetSnapshot>(data_provider);
c->data_provider = data_provider;
c->column_files = column_files;
c->rows = rows;
c->bytes = bytes;
c->deletes = deletes;

return c;
ColumnFiles cf_copy = column_files;
return std::make_shared<ColumnFileSetSnapshot>(data_provider, std::move(cf_copy), rows, bytes, deletes);
}

ColumnFiles & getColumnFiles() { return column_files; }
const ColumnFiles & getColumnFiles() const { return column_files; }

size_t getColumnFileCount() const { return column_files.size(); }
size_t getRows() const { return rows; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ColumnFileSetWithVectorIndexInputStream : public VectorIndexBlockInputStre
std::vector<VectorIndexViewer::Key> sorted_results;
std::vector<ColumnFileTinyVectorIndexReaderPtr> tiny_readers;

ColumnFiles & column_files;
const ColumnFiles & column_files;

const Block header;
IColumn::Filter filter;
Expand Down
21 changes: 10 additions & 11 deletions dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ ConcatSkippableBlockInputStream<need_row_id>::ConcatSkippableBlockInputStream(
, scan_context(scan_context_)
, lac_bytes_collector(scan_context_ ? scan_context_->resource_group_name : "")
{
assert(inputs_.size() == 1); // otherwise the `rows` is not correct
children.insert(children.end(), inputs_.begin(), inputs_.end());
current_stream = children.begin();
}
Expand All @@ -43,6 +44,7 @@ ConcatSkippableBlockInputStream<need_row_id>::ConcatSkippableBlockInputStream(
, scan_context(scan_context_)
, lac_bytes_collector(scan_context_ ? scan_context_->resource_group_name : "")
{
assert(rows.size() == inputs_.size());
children.insert(children.end(), inputs_.begin(), inputs_.end());
current_stream = children.begin();
}
Expand Down Expand Up @@ -87,10 +89,9 @@ size_t ConcatSkippableBlockInputStream<need_row_id>::skipNextBlock()
{
while (current_stream != children.end())
{
auto * skippable_stream = dynamic_cast<SkippableBlockInputStream *>((*current_stream).get());
auto * skippable_stream = dynamic_cast<SkippableBlockInputStream *>(current_stream->get());

size_t skipped_rows = skippable_stream->skipNextBlock();

if (skipped_rows > 0)
{
return skipped_rows;
Expand All @@ -112,9 +113,8 @@ Block ConcatSkippableBlockInputStream<need_row_id>::readWithFilter(const IColumn

while (current_stream != children.end())
{
auto * skippable_stream = dynamic_cast<SkippableBlockInputStream *>((*current_stream).get());
auto * skippable_stream = dynamic_cast<SkippableBlockInputStream *>(current_stream->get());
res = skippable_stream->readWithFilter(filter);

if (res)
{
res.setStartOffset(res.startOffset() + precede_stream_rows);
Expand All @@ -139,7 +139,6 @@ Block ConcatSkippableBlockInputStream<need_row_id>::read(FilterPtr & res_filter,
while (current_stream != children.end())
{
res = (*current_stream)->read(res_filter, return_filter);

if (res)
{
res.setStartOffset(res.startOffset() + precede_stream_rows);
Expand Down Expand Up @@ -196,6 +195,8 @@ void ConcatVectorIndexBlockInputStream::load()
return;

UInt32 precedes_rows = 0;
// otherwise the `row.key` of the search result is not correct
assert(stream->children.size() == index_streams.size());
std::vector<VectorIndexViewer::SearchResult> search_results;
for (size_t i = 0; i < stream->children.size(); ++i)
{
Expand All @@ -210,16 +211,14 @@ void ConcatVectorIndexBlockInputStream::load()
}

// Keep the top k minimum distances rows.
auto select_size = search_results.size() > topk ? topk : search_results.size();
const auto select_size = std::min(search_results.size(), topk);
auto top_k_end = search_results.begin() + select_size;
std::nth_element(search_results.begin(), top_k_end, search_results.end(), [](const auto & lhs, const auto & rhs) {
return lhs.distance < rhs.distance;
});
search_results.resize(select_size);
std::vector<UInt32> selected_rows;
selected_rows.reserve(search_results.size());
for (const auto & row : search_results)
selected_rows.push_back(row.key);
std::vector<UInt32> selected_rows(select_size);
for (size_t i = 0; i < select_size; ++i)
selected_rows[i] = search_results[i].key;
// Sort by key again.
std::sort(selected_rows.begin(), selected_rows.end());

Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ extern const char pause_when_reading_from_dt_stream[];

namespace DM
{
class RSOperator;
using RSOperatorPtr = std::shared_ptr<RSOperator>;

class DMSegmentThreadInputStream : public IProfilingBlockInputStream
{
Expand Down
22 changes: 9 additions & 13 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@

#include <ext/scope_guard.h>

namespace DB
{
namespace DM
namespace DB::DM
{

inline UInt64 serializeColumnFilePersisteds(WriteBuffer & buf, const ColumnFilePersisteds & persisted_files)
{
serializeSavedColumnFiles(buf, persisted_files);
Expand Down Expand Up @@ -403,24 +402,21 @@ bool ColumnFilePersistedSet::installCompactionResults(const MinorCompactionPtr &

ColumnFileSetSnapshotPtr ColumnFilePersistedSet::createSnapshot(const IColumnFileDataProviderPtr & data_provider)
{
auto snap = std::make_shared<ColumnFileSetSnapshot>(data_provider);
snap->rows = rows;
snap->bytes = bytes;
snap->deletes = deletes;

size_t total_rows = 0;
size_t total_deletes = 0;
ColumnFiles column_files;
column_files.reserve(persisted_files.size());
for (const auto & file : persisted_files)
{
if (auto * t = file->tryToTinyFile(); (t && t->getCache()))
{
// Compact threads could update the value of ColumnTinyFile::cache,
// and since ColumnFile is not multi-threads safe, we should create a new column file object.
snap->column_files.push_back(std::make_shared<ColumnFileTiny>(*t));
column_files.push_back(std::make_shared<ColumnFileTiny>(*t));
}
else
{
snap->column_files.push_back(file);
column_files.push_back(file);
}
total_rows += file->getRows();
total_deletes += file->getDeletes();
Expand All @@ -438,7 +434,7 @@ ColumnFileSetSnapshotPtr ColumnFilePersistedSet::createSnapshot(const IColumnFil
throw Exception("Rows and deletes check failed.", ErrorCodes::LOGICAL_ERROR);
}

return snap;
return std::make_shared<ColumnFileSetSnapshot>(data_provider, std::move(column_files), rows, bytes, deletes);
}
} // namespace DM
} // namespace DB

} // namespace DB::DM
52 changes: 30 additions & 22 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,30 +348,24 @@ struct CloneColumnFilesHelper
WriteBatches & wbs);
};

class DeltaValueSnapshot
: public std::enable_shared_from_this<DeltaValueSnapshot>
, private boost::noncopyable
class DeltaValueSnapshot : private boost::noncopyable
{
friend class DeltaValueSpace;
friend struct DB::DM::Remote::Serializer;

#ifndef DBMS_PUBLIC_GTEST
private:
#else
public:
#endif
const bool is_update{false};

// The delta index of cached.
DeltaIndexPtr shared_delta_index;
UInt64 delta_index_epoch = 0;
const DeltaIndexPtr shared_delta_index;
const UInt64 delta_index_epoch;

// mem-table may not be ready when the snapshot is creating under disagg arch, so it is not "const"
ColumnFileSetSnapshotPtr mem_table_snap;

ColumnFileSetSnapshotPtr persisted_files_snap;
const ColumnFileSetSnapshotPtr persisted_files_snap;

// We need a reference to original delta object, to release the "is_updating" lock.
DeltaValueSpacePtr delta;
const DeltaValueSpacePtr delta;

const CurrentMetrics::Metric type;

Expand All @@ -381,19 +375,30 @@ class DeltaValueSnapshot
// We only allow one for_update snapshots to exist, so it cannot be cloned.
RUNTIME_CHECK(!is_update);

auto c = std::make_shared<DeltaValueSnapshot>(type, is_update);
c->shared_delta_index = shared_delta_index;
c->delta_index_epoch = delta_index_epoch;
c->mem_table_snap = mem_table_snap->clone();
c->persisted_files_snap = persisted_files_snap->clone();

c->delta = delta;

return c;
return std::make_shared<DeltaValueSnapshot>(
type,
is_update,
mem_table_snap->clone(),
persisted_files_snap->clone(),
delta,
shared_delta_index,
delta_index_epoch);
}

explicit DeltaValueSnapshot(CurrentMetrics::Metric type_, bool update_)
DeltaValueSnapshot(
CurrentMetrics::Metric type_,
bool update_,
ColumnFileSetSnapshotPtr mem_snap,
const ColumnFileSetSnapshotPtr persisted_snap,
DeltaValueSpacePtr delta_vs,
DeltaIndexPtr delta_index,
UInt64 index_epoch)
: is_update(update_)
, shared_delta_index(std::move(delta_index))
, delta_index_epoch(index_epoch)
, mem_table_snap(std::move(mem_snap))
, persisted_files_snap(std::move(persisted_snap))
, delta(std::move(delta_vs))
, type(type_)
{
CurrentMetrics::add(type);
Expand Down Expand Up @@ -434,6 +439,9 @@ class DeltaValueSnapshot

bool isForUpdate() const { return is_update; }

// Semantically speaking this is a "hack" for the "snapshot".
// But we need this because under disagg arch, we fetch the mem-table by streaming way.
// After all mem-table fetched, we set the mem-table-set snapshot to the DeltaValueSnapshot.
void setMemTableSetSnapshot(const ColumnFileSetSnapshotPtr & mem_table_snap_) { mem_table_snap = mem_table_snap_; }
};

Expand Down
22 changes: 8 additions & 14 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
#include <Storages/DeltaMerge/WriteBatchesImpl.h>
#include <Storages/PathPool.h>

namespace DB
{
namespace DM
namespace DB::DM
{

void MemTableSet::appendColumnFileInner(const ColumnFilePtr & column_file)
{
if (!column_files.empty())
Expand Down Expand Up @@ -243,14 +242,10 @@ ColumnFileSetSnapshotPtr MemTableSet::createSnapshot(
if (disable_sharing && !column_files.empty() && column_files.back()->isAppendable())
column_files.back()->disableAppend();

auto snap = std::make_shared<ColumnFileSetSnapshot>(data_provider);
snap->rows = rows;
snap->bytes = bytes;
snap->deletes = deletes;
snap->column_files.reserve(column_files.size());

size_t total_rows = 0;
size_t total_deletes = 0;
ColumnFiles column_files_snap;
column_files_snap.reserve(column_files.size());
for (const auto & file : column_files)
{
// ColumnFile is not a thread-safe object, but only ColumnFileInMemory may be appendable after its creation.
Expand All @@ -260,11 +255,11 @@ ColumnFileSetSnapshotPtr MemTableSet::createSnapshot(
// Compact threads could update the value of ColumnFileInMemory,
// and since ColumnFile is not multi-threads safe, we should create a new column file object.
// TODO: When `disable_sharing == true`, may be we can safely use the same ptr without the clone.
snap->column_files.push_back(m->clone());
column_files_snap.emplace_back(m->clone());
}
else
{
snap->column_files.push_back(file);
column_files_snap.emplace_back(file);
}
total_rows += file->getRows();
total_deletes += file->getDeletes();
Expand All @@ -279,7 +274,7 @@ ColumnFileSetSnapshotPtr MemTableSet::createSnapshot(
total_deletes,
deletes.load());

return snap;
return std::make_shared<ColumnFileSetSnapshot>(data_provider, std::move(column_files_snap), rows, bytes, deletes);
}

ColumnFileFlushTaskPtr MemTableSet::buildFlushTask(
Expand Down Expand Up @@ -362,5 +357,4 @@ void MemTableSet::removeColumnFilesInFlushTask(const ColumnFileFlushTask & flush
}


} // namespace DM
} // namespace DB
} // namespace DB::DM
Loading

0 comments on commit b901c23

Please sign in to comment.