Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract common parts of ApplySnapshot #8110

Merged
merged 16 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions dbms/src/Storages/DeltaMerge/BoundedSSTFilesToBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2023 PingCAP, Inc.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved out from SSTFilesToBlockInputStream, with some header files excluded.

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Interpreters/Context.h>
#include <Poco/File.h>
#include <Storages/DeltaMerge/DMVersionFilterBlockInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/DeltaMerge/PKSquashingBlockInputStream.h>
#include <Storages/DeltaMerge/SSTFilesToBlockInputStream.h>
#include <Storages/KVStore/Decode/PartitionStreams.h>
#include <Storages/KVStore/FFI/ProxyFFI.h>
#include <Storages/StorageDeltaMerge.h>
#include <common/logger_useful.h>

namespace DB
{
namespace DM
{
BoundedSSTFilesToBlockInputStream::BoundedSSTFilesToBlockInputStream( //
SSTFilesToBlockInputStreamPtr child,
const ColId pk_column_id_,
const DecodingStorageSchemaSnapshotConstPtr & schema_snap)
: pk_column_id(pk_column_id_)
, _raw_child(std::move(child))
{
const bool is_common_handle = schema_snap->is_common_handle;
// Initlize `mvcc_compact_stream`
// First refine the boundary of blocks. Note that the rows decoded from SSTFiles are sorted by primary key asc, timestamp desc
// (https://github.com/tikv/tikv/blob/v5.0.1/components/txn_types/src/types.rs#L103-L108).
// While DMVersionFilter require rows sorted by primary key asc, timestamp asc, so we need an extra sort in PKSquashing.
auto stream = std::make_shared<PKSquashingBlockInputStream</*need_extra_sort=*/true>>(
_raw_child,
pk_column_id,
is_common_handle);
mvcc_compact_stream = std::make_unique<DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_COMPACT>>(
stream,
*(schema_snap->column_defines),
_raw_child->opts.gc_safepoint,
is_common_handle);
}

void BoundedSSTFilesToBlockInputStream::readPrefix()
{
mvcc_compact_stream->readPrefix();
}

void BoundedSSTFilesToBlockInputStream::readSuffix()
{
mvcc_compact_stream->readSuffix();
}

Block BoundedSSTFilesToBlockInputStream::read()
{
return mvcc_compact_stream->read();
}

SSTFilesToBlockInputStream::ProcessKeys BoundedSSTFilesToBlockInputStream::getProcessKeys() const
{
return _raw_child->process_keys;
}

RegionPtr BoundedSSTFilesToBlockInputStream::getRegion() const
{
return _raw_child->region;
}

std::tuple<size_t, size_t, size_t, UInt64> //
BoundedSSTFilesToBlockInputStream::getMvccStatistics() const
{
return std::make_tuple(
mvcc_compact_stream->getEffectiveNumRows(),
mvcc_compact_stream->getNotCleanRows(),
mvcc_compact_stream->getDeletedRows(),
mvcc_compact_stream->getGCHintVersion());
}

} // namespace DM
} // namespace DB
87 changes: 10 additions & 77 deletions dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.cpp
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,21 @@ extern const int ILLFORMAT_RAFT_ROW;
namespace DM
{
SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( //
const std::string & log_prefix_,
RegionPtr region_,
UInt64 snapshot_index_,
const SSTViewVec & snaps_,
const TiFlashRaftProxyHelper * proxy_helper_,
DecodingStorageSchemaSnapshotConstPtr schema_snap_,
Timestamp gc_safepoint_,
bool force_decode_,
TMTContext & tmt_,
size_t expected_size_)
SSTFilesToBlockInputStreamOpts && opts_)
: region(std::move(region_))
, snapshot_index(snapshot_index_)
, snaps(snaps_)
, proxy_helper(proxy_helper_)
, schema_snap(std::move(schema_snap_))
, tmt(tmt_)
, gc_safepoint(gc_safepoint_)
, expected_size(expected_size_)
, log(Logger::get(log_prefix_))
, force_decode(force_decode_)
{}
, opts(std::move(opts_))
{
log = Logger::get(opts.log_prefix);
}

SSTFilesToBlockInputStream::~SSTFilesToBlockInputStream() = default;

Expand Down Expand Up @@ -170,14 +164,14 @@ Block SSTFilesToBlockInputStream::read()
region->insert(ColumnFamilyType::Write, TiKVKey(key.data, key.len), TiKVValue(value.data, value.len));
++process_keys.write_cf;
process_keys.write_cf_bytes += (key.len + value.len);
if (process_keys.write_cf % expected_size == 0)
if (process_keys.write_cf % opts.expected_size == 0)
{
loaded_write_cf_key.assign(key.data, key.len);
}
} // Notice: `key`, `value` are string-view-like object, should never use after `next` called
write_cf_reader->next();

if (process_keys.write_cf % expected_size == 0)
if (process_keys.write_cf % opts.expected_size == 0)
{
// If we should form a new block.
const DecodedTiKVKey rowkey = RecordKVFormat::decodeTiKVKey(TiKVKey(std::move(loaded_write_cf_key)));
Expand Down Expand Up @@ -297,7 +291,7 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
// Update the end offset.
// If there are no more key-value, the outer while loop will be break.
// Else continue to read next batch from current CF.
process_keys_offset_end += expected_size;
process_keys_offset_end += opts.expected_size;
}
}

Expand All @@ -310,7 +304,7 @@ Block SSTFilesToBlockInputStream::readCommitedBlock()
{
// Read block from `region`. If the schema has been updated, it will
// throw an exception with code `ErrorCodes::REGION_DATA_SCHEMA_UPDATED`
return GenRegionBlockDataWithSchema(region, schema_snap, gc_safepoint, force_decode, tmt);
return GenRegionBlockDataWithSchema(region, opts.schema_snap, opts.gc_safepoint, opts.force_decode, tmt);
}
catch (DB::Exception & e)
{
Expand Down Expand Up @@ -340,66 +334,5 @@ Block SSTFilesToBlockInputStream::readCommitedBlock()
throw;
}
}

/// Methods for BoundedSSTFilesToBlockInputStream

BoundedSSTFilesToBlockInputStream::BoundedSSTFilesToBlockInputStream( //
SSTFilesToBlockInputStreamPtr child,
const ColId pk_column_id_,
const DecodingStorageSchemaSnapshotConstPtr & schema_snap)
: pk_column_id(pk_column_id_)
, _raw_child(std::move(child))
{
const bool is_common_handle = schema_snap->is_common_handle;
// Initlize `mvcc_compact_stream`
// First refine the boundary of blocks. Note that the rows decoded from SSTFiles are sorted by primary key asc, timestamp desc
// (https://github.com/tikv/tikv/blob/v5.0.1/components/txn_types/src/types.rs#L103-L108).
// While DMVersionFilter require rows sorted by primary key asc, timestamp asc, so we need an extra sort in PKSquashing.
auto stream = std::make_shared<PKSquashingBlockInputStream</*need_extra_sort=*/true>>(
_raw_child,
pk_column_id,
is_common_handle);
mvcc_compact_stream = std::make_unique<DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_COMPACT>>(
stream,
*(schema_snap->column_defines),
_raw_child->gc_safepoint,
is_common_handle);
}

void BoundedSSTFilesToBlockInputStream::readPrefix()
{
mvcc_compact_stream->readPrefix();
}

void BoundedSSTFilesToBlockInputStream::readSuffix()
{
mvcc_compact_stream->readSuffix();
}

Block BoundedSSTFilesToBlockInputStream::read()
{
return mvcc_compact_stream->read();
}

SSTFilesToBlockInputStream::ProcessKeys BoundedSSTFilesToBlockInputStream::getProcessKeys() const
{
return _raw_child->process_keys;
}

RegionPtr BoundedSSTFilesToBlockInputStream::getRegion() const
{
return _raw_child->region;
}

std::tuple<size_t, size_t, size_t, UInt64> //
BoundedSSTFilesToBlockInputStream::getMvccStatistics() const
{
return std::make_tuple(
mvcc_compact_stream->getEffectiveNumRows(),
mvcc_compact_stream->getNotCleanRows(),
mvcc_compact_stream->getDeletedRows(),
mvcc_compact_stream->getGCHintVersion());
}

} // namespace DM
} // namespace DB
} // namespace DB
24 changes: 14 additions & 10 deletions dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,33 @@ using SSTFilesToBlockInputStreamPtr = std::shared_ptr<SSTFilesToBlockInputStream
class BoundedSSTFilesToBlockInputStream;
using BoundedSSTFilesToBlockInputStreamPtr = std::shared_ptr<BoundedSSTFilesToBlockInputStream>;

struct SSTFilesToBlockInputStreamOpts
{
std::string log_prefix;
DecodingStorageSchemaSnapshotConstPtr schema_snap;
Timestamp gc_safepoint;
// If we abort when meet error in decoding.
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
bool force_decode;
// The expected size of emitted `Block`.
size_t expected_size;
};

// Read blocks from TiKV's SSTFiles
class SSTFilesToBlockInputStream final : public IBlockInputStream
{
public:
SSTFilesToBlockInputStream( //
const std::string & log_prefix_,
RegionPtr region_,
UInt64 snapshot_index_,
const SSTViewVec & snaps_,
const TiFlashRaftProxyHelper * proxy_helper_,
DecodingStorageSchemaSnapshotConstPtr schema_snap_,
Timestamp gc_safepoint_,
bool force_decode_,
TMTContext & tmt_,
size_t expected_size_ = DEFAULT_MERGE_BLOCK_SIZE);
SSTFilesToBlockInputStreamOpts && opts_);
~SSTFilesToBlockInputStream() override;

String getName() const override { return "SSTFilesToBlockInputStream"; }

Block getHeader() const override { return toEmptyBlock(*(schema_snap->column_defines)); }
Block getHeader() const override { return toEmptyBlock(*(opts.schema_snap->column_defines)); }

void readPrefix() override;
void readSuffix() override;
Expand Down Expand Up @@ -101,10 +108,8 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream
UInt64 snapshot_index;
const SSTViewVec & snaps;
const TiFlashRaftProxyHelper * proxy_helper{nullptr};
DecodingStorageSchemaSnapshotConstPtr schema_snap;
TMTContext & tmt;
const Timestamp gc_safepoint;
size_t expected_size;
SSTFilesToBlockInputStreamOpts opts;
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
LoggerPtr log;

using SSTReaderPtr = std::unique_ptr<SSTReader>;
Expand All @@ -117,7 +122,6 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream

friend class BoundedSSTFilesToBlockInputStream;

const bool force_decode;
bool is_decode_cancelled = false;

ProcessKeys process_keys;
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
#include <Storages/KVStore/FFI/ProxyFFICommon.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/MultiRaft/Disagg/CheckpointInfo.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerAsyncTasksImpl.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h>
#include <Storages/KVStore/Region.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/KVStore/Utils/AsyncTasks.h>
#include <Storages/Page/Config.h>
#include <Storages/Page/V3/CheckpointFile/CPManifestFileReader.h>
#include <Storages/Page/V3/PageDirectory.h>
Expand All @@ -49,7 +50,7 @@ FastAddPeerContext::FastAddPeerContext(uint64_t thread_count)
static constexpr int region_per_sec = 2;
thread_count = ffi_handle_sec * region_per_sec;
}
tasks_trace = std::make_shared<AsyncTasks>(thread_count);
tasks_trace = std::make_shared<FAPAsyncTasks>(thread_count);
}

ParsedCheckpointDataHolderPtr FastAddPeerContext::CheckpointCacheElement::getParsedCheckpointData(Context & context)
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

#pragma once

#include <Storages/KVStore/FFI/ProxyFFI.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h>
#include <Storages/KVStore/Utils/AsyncTasks.h>

namespace DB
{
struct AsyncTasks;
using FAPAsyncTasks = AsyncTasks<uint64_t, std::function<FastAddPeerRes()>, FastAddPeerRes>;

class FastAddPeerContext
{
Expand All @@ -32,7 +34,7 @@ class FastAddPeerContext
UInt64 required_seq);

public:
std::shared_ptr<AsyncTasks> tasks_trace;
std::shared_ptr<FAPAsyncTasks> tasks_trace;

private:
class CheckpointCacheElement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
#include <Encryption/PosixRandomAccessFile.h>
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Storages/KVStore/FFI/ProxyFFI.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/MultiRaft/Disagg/CheckpointInfo.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerAsyncTasksImpl.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h>
#include <Storages/KVStore/Region.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/KVStore/Utils/AsyncTasks.h>
#include <Storages/Page/V3/CheckpointFile/CPManifestFileReader.h>
#include <Storages/Page/V3/PageDirectory.h>
#include <Storages/Page/V3/Universal/RaftDataReader.h>
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ namespace DB
class UniversalPageStorage;
using UniversalPageStoragePtr = std::shared_ptr<UniversalPageStorage>;

struct AsyncTasks;

// A mapping from segment end key to segment id,
// The main usage:
// auto lock = lock();
Expand Down
Loading