Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracing: Add tracing_id to PageStorage snapshot #4330

Merged
merged 5 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ bool DeltaValueSpace::compact(DMContext & context)
LOG_FMT_DEBUG(log, "{} Nothing to compact", simpleInfo());
return true;
}
log_storage_snap = context.storage_pool.log()->getSnapshot();
log_storage_snap = context.storage_pool.log()->getSnapshot(/*tracing_id*/ fmt::format("minor_compact_{}", simpleInfo()));
}

// do compaction task
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ class DeltaValueSpace : public std::enable_shared_from_this<DeltaValueSpace>
DeltaSnapshotPtr createSnapshot(const DMContext & context, bool for_update, CurrentMetrics::Metric type);
};

class DeltaValueSnapshot : public std::enable_shared_from_this<DeltaValueSnapshot>
class DeltaValueSnapshot
: public std::enable_shared_from_this<DeltaValueSnapshot>
, private boost::noncopyable
{
friend class DeltaValueSpace;
Expand All @@ -260,7 +261,7 @@ class DeltaValueSnapshot : public std::enable_shared_from_this<DeltaValueSnapsho
// We need a reference to original delta object, to release the "is_updating" lock.
DeltaValueSpacePtr _delta;

CurrentMetrics::Metric type;
const CurrentMetrics::Metric type;

public:
DeltaSnapshotPtr clone()
Expand All @@ -280,8 +281,8 @@ class DeltaValueSnapshot : public std::enable_shared_from_this<DeltaValueSnapsho
}

explicit DeltaValueSnapshot(CurrentMetrics::Metric type_)
: type(type_)
{
type = type_;
CurrentMetrics::add(type);
}

Expand Down Expand Up @@ -315,7 +316,6 @@ class DeltaValueSnapshot : public std::enable_shared_from_this<DeltaValueSnapsho

RowKeyRange getSquashDeleteRange() const;

const auto & getStorageSnapshot() { return persisted_files_snap->getStorageSnapshot(); }
const auto & getSharedDeltaIndex() { return shared_delta_index; }
};

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot(const DMContext & context, bool
snap->is_update = for_update;
snap->_delta = this->shared_from_this();

auto storage_snap = std::make_shared<StorageSnapshot>(context.storage_pool, context.getReadLimiter(), true);
// TODO: Add tracing_id from mpp task or background tasks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can distinguish mpp task from background tasks using snap->is_update? For background tasks, it's always true. And for mpp task it's always false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, i will check it in the later pr

auto storage_snap = std::make_shared<StorageSnapshot>(context.storage_pool, context.getReadLimiter(), /*tracing_id*/ "", true);
snap->persisted_files_snap = persisted_file_set->createSnapshot(storage_snap);
snap->shared_delta_index = delta_index;

Expand Down
38 changes: 21 additions & 17 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Columns/ColumnVector.h>
#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/LogWithPrefix.h>
#include <Common/TiFlashMetrics.h>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
Expand Down Expand Up @@ -2260,12 +2261,14 @@ DeltaMergeStoreStat DeltaMergeStore::getStat()
stat.avg_pack_rows_in_stable = static_cast<Float64>(stat.total_stable_rows) / stat.total_pack_count_in_stable;
stat.avg_pack_size_in_stable = static_cast<Float64>(stat.total_stable_size) / stat.total_pack_count_in_stable;

static const String useless_tracing_id("DeltaMergeStore::getStat");
{
std::tie(stat.storage_stable_num_snapshots, //
stat.storage_stable_oldest_snapshot_lifetime,
stat.storage_stable_oldest_snapshot_thread_id)
= storage_pool.data()->getSnapshotsStat();
PageStorage::SnapshotPtr stable_snapshot = storage_pool.data()->getSnapshot();
auto snaps_stat = storage_pool.data()->getSnapshotsStat();
stat.storage_stable_num_snapshots = snaps_stat.num_snapshots;
stat.storage_stable_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds;
stat.storage_stable_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id;
stat.storage_stable_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id;
PageStorage::SnapshotPtr stable_snapshot = storage_pool.data()->getSnapshot(useless_tracing_id);
const auto * concrete_snap = toConcreteSnapshot(stable_snapshot);
if (const auto * const version = concrete_snap->version(); version != nullptr)
{
Expand All @@ -2279,11 +2282,12 @@ DeltaMergeStoreStat DeltaMergeStore::getStat()
}
}
{
std::tie(stat.storage_delta_num_snapshots, //
stat.storage_delta_oldest_snapshot_lifetime,
stat.storage_delta_oldest_snapshot_thread_id)
= storage_pool.log()->getSnapshotsStat();
PageStorage::SnapshotPtr log_snapshot = storage_pool.log()->getSnapshot();
auto snaps_stat = storage_pool.log()->getSnapshotsStat();
stat.storage_delta_num_snapshots = snaps_stat.num_snapshots;
stat.storage_delta_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds;
stat.storage_delta_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id;
stat.storage_delta_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id;
PageStorage::SnapshotPtr log_snapshot = storage_pool.log()->getSnapshot(useless_tracing_id);
const auto * concrete_snap = toConcreteSnapshot(log_snapshot);
if (const auto * const version = concrete_snap->version(); version != nullptr)
{
Expand All @@ -2297,11 +2301,12 @@ DeltaMergeStoreStat DeltaMergeStore::getStat()
}
}
{
std::tie(stat.storage_meta_num_snapshots, //
stat.storage_meta_oldest_snapshot_lifetime,
stat.storage_meta_oldest_snapshot_thread_id)
= storage_pool.meta()->getSnapshotsStat();
PageStorage::SnapshotPtr meta_snapshot = storage_pool.meta()->getSnapshot();
auto snaps_stat = storage_pool.meta()->getSnapshotsStat();
stat.storage_meta_num_snapshots = snaps_stat.num_snapshots;
stat.storage_meta_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds;
stat.storage_meta_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id;
stat.storage_meta_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id;
PageStorage::SnapshotPtr meta_snapshot = storage_pool.meta()->getSnapshot(useless_tracing_id);
const auto * concrete_snap = toConcreteSnapshot(meta_snapshot);
if (const auto * const version = concrete_snap->version(); version != nullptr)
{
Expand Down Expand Up @@ -2431,8 +2436,7 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges(

LOG_FMT_DEBUG(
log,
"{} [sorted_ranges: {}] [tasks before split: {}] [tasks final: {}] [ranges final: {}]",
__FUNCTION__,
"[sorted_ranges: {}] [tasks before split: {}] [tasks final: {}] [ranges final: {}]",
sorted_ranges.size(),
tasks.size(),
result_tasks.size(),
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

namespace DB
{
class LogWithPrefix;
using LogWithPrefixPtr = std::shared_ptr<LogWithPrefix>;

namespace DM
{
class Segment;
Expand Down Expand Up @@ -118,20 +121,23 @@ struct DeltaMergeStoreStat
UInt64 storage_stable_num_snapshots = 0;
Float64 storage_stable_oldest_snapshot_lifetime = 0.0;
UInt64 storage_stable_oldest_snapshot_thread_id = 0;
String storage_stable_oldest_snapshot_tracing_id;
UInt64 storage_stable_num_pages = 0;
UInt64 storage_stable_num_normal_pages = 0;
UInt64 storage_stable_max_page_id = 0;

UInt64 storage_delta_num_snapshots = 0;
Float64 storage_delta_oldest_snapshot_lifetime = 0.0;
UInt64 storage_delta_oldest_snapshot_thread_id = 0;
String storage_delta_oldest_snapshot_tracing_id;
UInt64 storage_delta_num_pages = 0;
UInt64 storage_delta_num_normal_pages = 0;
UInt64 storage_delta_max_page_id = 0;

UInt64 storage_meta_num_snapshots = 0;
Float64 storage_meta_oldest_snapshot_lifetime = 0.0;
UInt64 storage_meta_oldest_snapshot_thread_id = 0;
String storage_meta_oldest_snapshot_tracing_id;
UInt64 storage_meta_num_pages = 0;
UInt64 storage_meta_num_normal_pages = 0;
UInt64 storage_meta_max_page_id = 0;
Expand Down Expand Up @@ -369,7 +375,7 @@ class DeltaMergeStore : private boost::noncopyable
/// Do merge delta for all segments. Only used for debug.
void mergeDeltaAll(const Context & context);

/// Compact fregment packs into bigger one.
/// Compact fragment column files into bigger one.
void compact(const Context & context, const RowKeyRange & range);

/// Iterator over all segments and apply gc jobs.
Expand Down
20 changes: 10 additions & 10 deletions dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@ class StoragePool : private boost::noncopyable
PageReader & dataReader() { return data_storage_reader; }
PageReader & metaReader() { return meta_storage_reader; }

PageReader newLogReader(ReadLimiterPtr read_limiter, bool snapshot_read)
PageReader newLogReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id)
{
return PageReader(ns_id, log_storage, snapshot_read ? log_storage->getSnapshot() : nullptr, read_limiter);
return PageReader(ns_id, log_storage, snapshot_read ? log_storage->getSnapshot(tracing_id) : nullptr, read_limiter);
}
PageReader newDataReader(ReadLimiterPtr read_limiter, bool snapshot_read)
PageReader newDataReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id)
{
return PageReader(ns_id, data_storage, snapshot_read ? data_storage->getSnapshot() : nullptr, read_limiter);
return PageReader(ns_id, data_storage, snapshot_read ? data_storage->getSnapshot(tracing_id) : nullptr, read_limiter);
}
PageReader newMetaReader(ReadLimiterPtr read_limiter, bool snapshot_read)
PageReader newMetaReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about const String & tracing_id = ""?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to keep it without a default value, to let the caller and reviewer think about whether we need to add a tracing id to it or not.

{
return PageReader(ns_id, meta_storage, snapshot_read ? meta_storage->getSnapshot() : nullptr, read_limiter);
return PageReader(ns_id, meta_storage, snapshot_read ? meta_storage->getSnapshot(tracing_id) : nullptr, read_limiter);
}

// Caller must cancel gc tasks before drop
Expand Down Expand Up @@ -112,10 +112,10 @@ class PageIdGenerator : private boost::noncopyable

struct StorageSnapshot : private boost::noncopyable
{
StorageSnapshot(StoragePool & storage, ReadLimiterPtr read_limiter, bool snapshot_read = true)
: log_reader(storage.newLogReader(read_limiter, snapshot_read))
, data_reader(storage.newDataReader(read_limiter, snapshot_read))
, meta_reader(storage.newMetaReader(read_limiter, snapshot_read))
StorageSnapshot(StoragePool & storage, ReadLimiterPtr read_limiter, const String & tracing_id, bool snapshot_read)
: log_reader(storage.newLogReader(read_limiter, snapshot_read, tracing_id))
, data_reader(storage.newDataReader(read_limiter, snapshot_read, tracing_id))
, meta_reader(storage.newMetaReader(read_limiter, snapshot_read, tracing_id))
{}

PageReader log_reader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ TEST_F(DeltaValueSpaceTest, MinorCompaction)
// build compaction task and finish prepare stage
MinorCompactionPtr compaction_task;
{
PageReader reader = dmContext().storage_pool.newLogReader(dmContext().getReadLimiter(), true);
PageReader reader = dmContext().storage_pool.newLogReader(dmContext().getReadLimiter(), true, "");
compaction_task = persisted_file_set->pickUpMinorCompaction(dmContext());
ASSERT_EQ(compaction_task->getCompactionSourceLevel(), 0);
// There should be two compaction sub_tasks.
Expand Down Expand Up @@ -392,7 +392,7 @@ TEST_F(DeltaValueSpaceTest, MinorCompaction)
delta->flush(dmContext());
while (true)
{
PageReader reader = dmContext().storage_pool.newLogReader(dmContext().getReadLimiter(), true);
PageReader reader = dmContext().storage_pool.newLogReader(dmContext().getReadLimiter(), true, "");
auto minor_compaction_task = persisted_file_set->pickUpMinorCompaction(dmContext());
if (!minor_compaction_task)
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/LogWithPrefix.h>
#include <Core/BlockGen.h>
#include <DataTypes/DataTypeEnum.h>
#include <Interpreters/convertFieldToType.h>
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/Config/TOMLConfiguration.h>
#include <Common/LogWithPrefix.h>
#include <Poco/Logger.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
Expand Down
9 changes: 2 additions & 7 deletions dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,10 @@ class PageStorage : private boost::noncopyable

virtual PageId getMaxId(NamespaceId ns_id) = 0;

virtual SnapshotPtr getSnapshot() = 0;
virtual SnapshotPtr getSnapshot(const String & tracing_id) = 0;

// Get some statistics of all living snapshots and the oldest living snapshot.
// Return < num of snapshots,
// living time(seconds) of the oldest snapshot,
// created thread id of the oldest snapshot >
virtual std::tuple<size_t, double, unsigned> getSnapshotsStat() const = 0;
virtual SnapshotsStatistics getSnapshotsStat() const = 0;

virtual void write(WriteBatch && write_batch, const WriteLimiterPtr & write_limiter = nullptr) = 0;

Expand Down Expand Up @@ -222,9 +219,7 @@ class PageReader : private boost::noncopyable
explicit PageReader(NamespaceId ns_id_, PageStoragePtr storage_, ReadLimiterPtr read_limiter_)
: ns_id(ns_id_)
, storage(storage_)
, snap()
, read_limiter(read_limiter_)

{}
/// Snapshot read.
PageReader(NamespaceId ns_id_, PageStoragePtr storage_, const PageStorage::SnapshotPtr & snap_, ReadLimiterPtr read_limiter_)
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Storages/Page/Snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,19 @@
// limitations under the License.

#pragma once
#include <common/types.h>

#include <memory>

namespace DB
{
struct SnapshotsStatistics
{
size_t num_snapshots = 0;
double longest_living_seconds = 0.0;
unsigned longest_living_from_thread_id = 0;
String longest_living_from_tracing_id;
};
class PageStorageSnapshot
{
public:
Expand Down
27 changes: 14 additions & 13 deletions dbms/src/Storages/Page/V2/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,14 +358,14 @@ void PageStorage::restore()
PageId PageStorage::getMaxId(NamespaceId /*ns_id*/)
{
std::lock_guard<std::mutex> write_lock(write_mutex);
return versioned_page_entries.getSnapshot()->version()->maxId();
return versioned_page_entries.getSnapshot("")->version()->maxId();
}

PageId PageStorage::getNormalPageId(NamespaceId /*ns_id*/, PageId page_id, SnapshotPtr snapshot)
{
if (!snapshot)
{
snapshot = this->getSnapshot();
snapshot = this->getSnapshot("");
}

auto [is_ref_id, normal_page_id] = toConcreteSnapshot(snapshot)->version()->isRefId(page_id);
Expand All @@ -376,7 +376,7 @@ DB::PageEntry PageStorage::getEntry(NamespaceId /*ns_id*/, PageId page_id, Snaps
{
if (!snapshot)
{
snapshot = this->getSnapshot();
snapshot = this->getSnapshot("");
}

try
Expand Down Expand Up @@ -576,18 +576,18 @@ void PageStorage::write(DB::WriteBatch && wb, const WriteLimiterPtr & write_limi
}
}

DB::PageStorage::SnapshotPtr PageStorage::getSnapshot()
DB::PageStorage::SnapshotPtr PageStorage::getSnapshot(const String & tracing_id)
{
return versioned_page_entries.getSnapshot();
return versioned_page_entries.getSnapshot(tracing_id);
}

PageStorage::VersionedPageEntries::SnapshotPtr
PageStorage::getConcreteSnapshot()
{
return versioned_page_entries.getSnapshot();
return versioned_page_entries.getSnapshot(/*tracing_id*/ "");
}

std::tuple<size_t, double, unsigned> PageStorage::getSnapshotsStat() const
SnapshotsStatistics PageStorage::getSnapshotsStat() const
{
return versioned_page_entries.getSnapshotsStat();
}
Expand All @@ -596,7 +596,7 @@ DB::Page PageStorage::read(NamespaceId /*ns_id*/, PageId page_id, const ReadLimi
{
if (!snapshot)
{
snapshot = this->getSnapshot();
snapshot = this->getSnapshot("");
}

const auto page_entry = toConcreteSnapshot(snapshot)->version()->find(page_id);
Expand All @@ -612,7 +612,7 @@ PageMap PageStorage::read(NamespaceId /*ns_id*/, const std::vector<PageId> & pag
{
if (!snapshot)
{
snapshot = this->getSnapshot();
snapshot = this->getSnapshot("");
}

std::map<PageFileIdAndLevel, std::pair<PageIdAndEntries, ReaderPtr>> file_read_infos;
Expand Down Expand Up @@ -655,7 +655,7 @@ void PageStorage::read(NamespaceId /*ns_id*/, const std::vector<PageId> & page_i
{
if (!snapshot)
{
snapshot = this->getSnapshot();
snapshot = this->getSnapshot("");
}

std::map<PageFileIdAndLevel, std::pair<PageIdAndEntries, ReaderPtr>> file_read_infos;
Expand Down Expand Up @@ -694,8 +694,9 @@ void PageStorage::read(NamespaceId /*ns_id*/, const std::vector<PageId> & page_i
PageMap PageStorage::read(NamespaceId /*ns_id*/, const std::vector<PageReadFields> & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot)
{
if (!snapshot)
snapshot = this->getSnapshot();

{
snapshot = this->getSnapshot("");
}

std::map<PageFileIdAndLevel, std::pair<ReaderPtr, PageFile::Reader::FieldReadInfos>> file_read_infos;
for (const auto & [page_id, field_indices] : page_fields)
Expand Down Expand Up @@ -737,7 +738,7 @@ void PageStorage::traverse(const std::function<void(const DB::Page & page)> & ac
{
if (!snapshot)
{
snapshot = this->getSnapshot();
snapshot = this->getSnapshot("");
}

std::map<PageFileIdAndLevel, PageIds> file_and_pages;
Expand Down
Loading