Skip to content

Commit

Permalink
Merge branch 'master' into refactor-dmfile
Browse files Browse the repository at this point in the history
  • Loading branch information
Lloyd-Pottiger authored Mar 26, 2024
2 parents ce612c3 + 12d7a96 commit 1f1c8e8
Show file tree
Hide file tree
Showing 53 changed files with 867 additions and 611 deletions.
13 changes: 13 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,19 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"Bucketed snapshot total size", \
Histogram, \
F(type_approx_raft_snapshot, {{"type", "approx_raft_snapshot"}}, ExpBuckets{1024, 2, 24})) /* 16G */ \
M(tiflash_raft_learner_read_failures_count, \
"Raft learner read failure reason counter", \
Counter, \
F(type_not_found_tiflash, {{"type", "not_found_tiflash"}}), \
F(type_epoch_not_match, {{"type", "epoch_not_match"}}), \
F(type_not_leader, {{"type", "not_leader"}}), \
F(type_not_found_tikv, {{"type", "not_found_tikv"}}), \
F(type_bucket_epoch_not_match, {{"type", "bucket_epoch_not_match"}}), \
F(type_flashback, {{"type", "flashback"}}), \
F(type_key_not_in_region, {{"type", "key_not_in_region"}}), \
F(type_tikv_server_issue, {{"type", "tikv_server_issue"}}), \
F(type_tikv_lock, {{"type", "tikv_lock"}}), \
F(type_other, {{"type", "write"}})) \
/* required by DBaaS */ \
M(tiflash_server_info, \
"Indicate the tiflash server info, and the value is the start timestamp (s).", \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/StorageDeltaMerge.h>

Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <Flash/Coprocessor/DAGStorageInterpreter.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/Context.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/ExpressionActions.h>
#include <Storages/TableLockHolder.h>
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Disaggregated/WNFetchPagesStreamWriter.h>
#include <Flash/Mpp/TrackedMppDataPacket.h>
#include <Interpreters/Settings.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider.h>
#include <Storages/DeltaMerge/Delta/DeltaValueSpace.h>
Expand Down Expand Up @@ -149,5 +150,17 @@ void WNFetchPagesStreamWriter::syncWrite()
send_pages_ns / 1000000);
}

WNFetchPagesStreamWriter::WNFetchPagesStreamWriter(
std::function<void(const disaggregated::PagesPacket &)> && sync_write_,
DM::SegmentReadTaskPtr seg_task_,
PageIdU64s read_page_ids_,
const Settings & settings_)
: sync_write(std::move(sync_write_))
, seg_task(std::move(seg_task_))
, read_page_ids(std::move(read_page_ids_))
, packet_limit_size(settings_.dt_fetch_pages_packet_limit_size)
, enable_fetch_memtableset(settings_.dt_enable_fetch_memtableset)
, mem_tracker_wrapper(fetch_pages_mem_tracker.get())
{}

} // namespace DB
12 changes: 4 additions & 8 deletions dbms/src/Flash/Disaggregated/WNFetchPagesStreamWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@

namespace DB
{

struct Settings;

using SyncPagePacketWriter = grpc::ServerWriter<disaggregated::PagesPacket>;

class WNFetchPagesStreamWriter;
Expand All @@ -48,14 +51,7 @@ class WNFetchPagesStreamWriter
std::function<void(const disaggregated::PagesPacket &)> && sync_write_,
DM::SegmentReadTaskPtr seg_task_,
PageIdU64s read_page_ids_,
const Settings & settings_)
: sync_write(std::move(sync_write_))
, seg_task(std::move(seg_task_))
, read_page_ids(std::move(read_page_ids_))
, packet_limit_size(settings_.dt_fetch_pages_packet_limit_size)
, enable_fetch_memtableset(settings_.dt_enable_fetch_memtableset)
, mem_tracker_wrapper(fetch_pages_mem_tracker.get())
{}
const Settings & settings_);

void syncWrite();

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ struct Settings
M(SettingDouble, dt_page_gc_threshold, 0.5, "Max valid rate of deciding to do a GC in PageStorage") \
M(SettingDouble, dt_page_gc_threshold_raft_data, 0.05, "Max valid rate of deciding to do a GC for BlobFile storing PageData in PageStorage") \
M(SettingBool, dt_enable_read_thread, true, "Enable storage read thread or not") \
M(SettingUInt64, dt_max_sharing_column_bytes_for_all, 2048 * Constant::MB, "Memory limitation for data sharing of all requests. 0 means disable data sharing") \
M(SettingUInt64, dt_max_sharing_column_count, 5, "ColumnPtr object limitation for data sharing of each DMFileReader::Stream. 0 means disable data sharing") \
M(SettingUInt64, dt_max_sharing_column_bytes_for_all, 2048 * Constant::MB, "Memory limitation for data sharing of all requests, include those sharing blocks in block queue. 0 means disable data sharing") \
M(SettingUInt64, dt_max_sharing_column_count, 5, "Deprecated") \
M(SettingBool, dt_enable_bitmap_filter, true, "Use bitmap filter to read data or not") \
M(SettingDouble, dt_read_thread_count_scale, 2.0, "Number of read thread = number of logical cpu cores * dt_read_thread_count_scale. Only has meaning at server startup.") \
M(SettingDouble, dt_filecache_max_downloading_count_scale, 1.0, "Max downloading task count of FileCache = io thread count * dt_filecache_max_downloading_count_scale.") \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Operators/DMSegmentThreadSourceOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/FailPoint.h>
#include <Interpreters/Context.h>
#include <Operators/DMSegmentThreadSourceOp.h>
#include <Storages/DeltaMerge/DMContext.h>

namespace DB
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/DTTool/DTToolBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/FormatVersion.h>
#include <Storages/KVStore/TMTContext.h>
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
#include <Storages/DeltaMerge/ReadThread/ColumnSharingCache.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReader.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/FormatVersion.h>
#include <Storages/IManageableStorage.h>
#include <Storages/KVStore/FFI/FileEncryption.h>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/ScanContext.h>

namespace DB
{
Expand Down Expand Up @@ -80,6 +81,11 @@ std::pair<size_t, size_t> findColumnFile(const ColumnFiles & column_files, size_
return {column_file_index, 0};
}

ColumnFileSetReader::ColumnFileSetReader(const DMContext & context_)
: context(context_)
, lac_bytes_collector(context_.scan_context ? context_.scan_context->resource_group_name : "")
{}

ColumnFileSetReader::ColumnFileSetReader(
const DMContext & context_,
const ColumnFileSetSnapshotPtr & snapshot_,
Expand Down Expand Up @@ -341,5 +347,6 @@ bool ColumnFileSetReader::shouldPlace(
return false;
}


} // namespace DM
} // namespace DB
8 changes: 2 additions & 6 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
#pragma once

#include <Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/DMContext_fwd.h>
#include <Storages/DeltaMerge/SkippableBlockInputStream.h>

namespace DB
Expand Down Expand Up @@ -45,10 +44,7 @@ class ColumnFileSetReader
LACBytesCollector lac_bytes_collector;

private:
explicit ColumnFileSetReader(const DMContext & context_)
: context(context_)
, lac_bytes_collector(context_.scan_context ? context_.scan_context->resource_group_name : "")
{}
explicit ColumnFileSetReader(const DMContext & context_);

Block readPKVersion(size_t offset, size_t limit);

Expand Down
16 changes: 16 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,5 +405,21 @@ ColumnFileReaderPtr ColumnFileTinyReader::createNewReader(const ColumnDefinesPtr
return std::make_shared<ColumnFileTinyReader>(tiny_file, data_provider, new_col_defs, cols_data_cache);
}

ColumnFileTiny::ColumnFileTiny(
const ColumnFileSchemaPtr & schema_,
UInt64 rows_,
UInt64 bytes_,
PageIdU64 data_page_id_,
const DMContext & dm_context,
const CachePtr & cache_)
: schema(schema_)
, rows(rows_)
, bytes(bytes_)
, data_page_id(data_page_id_)
, keyspace_id(dm_context.keyspace_id)
, file_provider(dm_context.global_context.getFileProvider())
, cache(cache_)
{}

} // namespace DM
} // namespace DB
13 changes: 3 additions & 10 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@

#pragma once

#include <IO/FileProvider/FileProvider_fwd.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFile.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/DMContext_fwd.h>
#include <Storages/DeltaMerge/Remote/Serializer_fwd.h>
#include <Storages/Page/PageStorage_fwd.h>

Expand Down Expand Up @@ -87,15 +88,7 @@ class ColumnFileTiny : public ColumnFilePersisted
UInt64 bytes_,
PageIdU64 data_page_id_,
const DMContext & dm_context,
const CachePtr & cache_ = nullptr)
: schema(schema_)
, rows(rows_)
, bytes(bytes_)
, data_page_id(data_page_id_)
, keyspace_id(dm_context.keyspace_id)
, file_provider(dm_context.global_context.getFileProvider())
, cache(cache_)
{}
const CachePtr & cache_ = nullptr);

Type getType() const override { return Type::TINY_FILE; }

Expand Down
42 changes: 42 additions & 0 deletions dbms/src/Storages/DeltaMerge/DMContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,58 @@

#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/ScanContext.h>

namespace DB::DM
{

WriteLimiterPtr DMContext::getWriteLimiter() const
{
return global_context.getWriteLimiter();
}

ReadLimiterPtr DMContext::getReadLimiter() const
{
return global_context.getReadLimiter();
}

DMContext::DMContext(
const Context & session_context_,
const StoragePathPoolPtr & path_pool_,
const StoragePoolPtr & storage_pool_,
const DB::Timestamp min_version_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
bool is_common_handle_,
size_t rowkey_column_size_,
const DB::Settings & settings,
const ScanContextPtr & scan_context_,
const String & tracing_id_)
: global_context(session_context_.getGlobalContext()) // always save the global context
, path_pool(path_pool_)
, storage_pool(storage_pool_)
, min_version(min_version_)
, keyspace_id(keyspace_id_)
, physical_table_id(physical_table_id_)
, is_common_handle(is_common_handle_)
, rowkey_column_size(rowkey_column_size_)
, segment_limit_rows(settings.dt_segment_limit_rows)
, segment_limit_bytes(settings.dt_segment_limit_size)
, segment_force_split_bytes(settings.dt_segment_force_split_size)
, delta_limit_rows(settings.dt_segment_delta_limit_rows)
, delta_limit_bytes(settings.dt_segment_delta_limit_size)
, delta_cache_limit_rows(settings.dt_segment_delta_cache_limit_rows)
, delta_cache_limit_bytes(settings.dt_segment_delta_cache_limit_size)
, delta_small_column_file_rows(settings.dt_segment_delta_small_column_file_rows)
, delta_small_column_file_bytes(settings.dt_segment_delta_small_column_file_size)
, stable_pack_rows(settings.dt_segment_stable_pack_rows)
, enable_logical_split(settings.dt_enable_logical_split)
, read_delta_only(settings.dt_read_delta_only)
, read_stable_only(settings.dt_read_stable_only)
, enable_relevant_place(settings.dt_enable_relevant_place)
, enable_skippable_place(settings.dt_enable_skippable_place)
, tracing_id(tracing_id_)
, scan_context(scan_context_ ? scan_context_ : std::make_shared<ScanContext>())
{}

} // namespace DB::DM
37 changes: 8 additions & 29 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,25 @@

#include <Common/Logger.h>
#include <Core/Types.h>
#include <Interpreters/Context.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/Settings.h>
#include <Storages/DeltaMerge/DMChecksumConfig.h>
#include <Storages/DeltaMerge/DMContext_fwd.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>

#include <memory>

namespace DB
{

class StoragePathPool;
using StoragePathPoolPtr = std::shared_ptr<StoragePathPool>;

class WriteLimiter;
using WriteLimiterPtr = std::shared_ptr<WriteLimiter>;
class ReadLimiter;
using ReadLimiterPtr = std::shared_ptr<ReadLimiter>;

namespace DM
{
Expand Down Expand Up @@ -161,33 +166,7 @@ struct DMContext : private boost::noncopyable
size_t rowkey_column_size_,
const DB::Settings & settings,
const ScanContextPtr & scan_context_ = nullptr,
const String & tracing_id_ = "")
: global_context(session_context_.getGlobalContext()) // always save the global context
, path_pool(path_pool_)
, storage_pool(storage_pool_)
, min_version(min_version_)
, keyspace_id(keyspace_id_)
, physical_table_id(physical_table_id_)
, is_common_handle(is_common_handle_)
, rowkey_column_size(rowkey_column_size_)
, segment_limit_rows(settings.dt_segment_limit_rows)
, segment_limit_bytes(settings.dt_segment_limit_size)
, segment_force_split_bytes(settings.dt_segment_force_split_size)
, delta_limit_rows(settings.dt_segment_delta_limit_rows)
, delta_limit_bytes(settings.dt_segment_delta_limit_size)
, delta_cache_limit_rows(settings.dt_segment_delta_cache_limit_rows)
, delta_cache_limit_bytes(settings.dt_segment_delta_cache_limit_size)
, delta_small_column_file_rows(settings.dt_segment_delta_small_column_file_rows)
, delta_small_column_file_bytes(settings.dt_segment_delta_small_column_file_size)
, stable_pack_rows(settings.dt_segment_stable_pack_rows)
, enable_logical_split(settings.dt_enable_logical_split)
, read_delta_only(settings.dt_read_delta_only)
, read_stable_only(settings.dt_read_stable_only)
, enable_relevant_place(settings.dt_enable_relevant_place)
, enable_skippable_place(settings.dt_enable_skippable_place)
, tracing_id(tracing_id_)
, scan_context(scan_context_ ? scan_context_ : std::make_shared<ScanContext>())
{}
const String & tracing_id_ = "");
};

} // namespace DM
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <Common/FailPoint.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/DMContext_fwd.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/DeltaMerge/SegmentReadTaskPool.h>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Storages/DeltaMerge/DMVersionFilterBlockInputStream.h>
#include <Storages/DeltaMerge/ScanContext.h>

namespace ProfileEvents
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <DataStreams/SelectionByColumnIdTransformAction.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
#include <common/logger_useful.h>

namespace DB
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h>
#include <Storages/DeltaMerge/ReadThread/UnorderedInputStream.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshot.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/SchemaUpdate.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/DeltaMerge/SegmentReadTaskPool.h>
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
namespace DB
{

struct Settings;

class Logger;
using LoggerPtr = std::shared_ptr<Logger>;
struct CheckpointInfo;
Expand Down Expand Up @@ -762,7 +764,7 @@ class DeltaMergeStore : private boost::noncopyable
* This may be called from multiple threads, e.g. at the foreground write moment, or in background threads.
* A `thread_type` should be specified indicating the type of the thread calling this function.
* Depend on the thread type, the "update" to do may be varied.
*
*
* It returns a bool which indicates whether a flush of KVStore is recommended.
*/
bool checkSegmentUpdate(
Expand Down
Loading

0 comments on commit 1f1c8e8

Please sign in to comment.