Skip to content

Commit

Permalink
Merge branch 'master' into split_execution_summary
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy authored Oct 10, 2022
2 parents 59d2ffd + 778d643 commit f4455da
Show file tree
Hide file tree
Showing 17 changed files with 340 additions and 103 deletions.
3 changes: 0 additions & 3 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f

#define APPLY_FOR_FAILPOINTS(M) \
M(skip_check_segment_update) \
M(gc_skip_update_safe_point) \
M(gc_skip_merge_delta) \
M(gc_skip_merge) \
M(force_set_page_file_write_errno) \
M(force_split_io_size_4k) \
M(minimum_block_size_for_cross_join) \
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ void MockRaftCommand::dbgFuncRegionSnapshotWithData(Context & context, const AST

// Mock to apply a snapshot with data in `region`
auto & tmt = context.getTMTContext();
context.getTMTContext().getKVStore()->checkAndApplySnapshot<RegionPtrWithBlock>(region, tmt);
context.getTMTContext().getKVStore()->checkAndApplyPreHandledSnapshot<RegionPtrWithBlock>(region, tmt);
output(fmt::format("put region #{}, range{} to table #{} with {} records", region_id, range_string, table_id, cnt));
}

Expand Down Expand Up @@ -573,7 +573,7 @@ void MockRaftCommand::dbgFuncRegionSnapshotApplyBlock(Context & context, const A
auto region_id = static_cast<RegionID>(safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args.front()).value));
auto [region, block_cache] = GLOBAL_REGION_MAP.popRegionCache("__snap_" + std::to_string(region_id));
auto & tmt = context.getTMTContext();
context.getTMTContext().getKVStore()->checkAndApplySnapshot<RegionPtrWithBlock>({region, std::move(block_cache)}, tmt);
context.getTMTContext().getKVStore()->checkAndApplyPreHandledSnapshot<RegionPtrWithBlock>({region, std::move(block_cache)}, tmt);

output(fmt::format("success apply {} with block cache", region->id()));
}
Expand Down Expand Up @@ -789,7 +789,7 @@ void MockRaftCommand::dbgFuncRegionSnapshotApplyDTFiles(Context & context, const
const auto region_name = "__snap_snap_" + std::to_string(region_id);
auto [new_region, external_files] = GLOBAL_REGION_MAP.popRegionSnap(region_name);
auto & tmt = context.getTMTContext();
context.getTMTContext().getKVStore()->checkAndApplySnapshot<RegionPtrWithSnapshotFiles>(
context.getTMTContext().getKVStore()->checkAndApplyPreHandledSnapshot<RegionPtrWithSnapshotFiles>(
RegionPtrWithSnapshotFiles{new_region, std::move(external_files)},
tmt);

Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,23 +424,26 @@ TEST_F(TestMPPTunnel, WriteError)

TEST_F(TestMPPTunnel, WriteAfterFinished)
{
std::unique_ptr<PacketWriter> writer_ptr = nullptr;
MPPTunnelPtr mpp_tunnel_ptr = nullptr;
try
{
auto mpp_tunnel_ptr = constructRemoteSyncTunnel();
std::unique_ptr<PacketWriter> writer_ptr = std::make_unique<MockWriter>();
mpp_tunnel_ptr = constructRemoteSyncTunnel();
writer_ptr = std::make_unique<MockWriter>();
mpp_tunnel_ptr->connect(writer_ptr.get());
GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true);
mpp_tunnel_ptr->close("Canceled", false);
auto data_packet_ptr = std::make_unique<mpp::MPPDataPacket>();
data_packet_ptr->set_data("First");
mpp_tunnel_ptr->write(*data_packet_ptr);
mpp_tunnel_ptr->waitForFinish();
GTEST_FAIL();
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed,");
}
if (mpp_tunnel_ptr != nullptr)
mpp_tunnel_ptr->waitForFinish();
}

/// Test Local MPPTunnel
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ using DMContextPtr = std::shared_ptr<DMContext>;
using NotCompress = std::unordered_set<ColId>;
using SegmentIdSet = std::unordered_set<UInt64>;
struct ExternalDTFileInfo;
struct GCOptions;

inline static const PageId DELTA_MERGE_FIRST_SEGMENT_ID = 1;

Expand Down Expand Up @@ -340,7 +341,7 @@ class DeltaMergeStore : private boost::noncopyable
void compact(const Context & context, const RowKeyRange & range);

/// Iterator over all segments and apply gc jobs.
UInt64 onSyncGc(Int64 limit);
UInt64 onSyncGc(Int64 limit, const GCOptions & gc_options);

/**
* Try to merge the segment in the current thread as the GC operation.
Expand Down
32 changes: 8 additions & 24 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/SyncPoint/SyncPoint.h>
#include <Common/TiFlashMetrics.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/GCOptions.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/Transaction/TMTContext.h>

Expand All @@ -29,9 +30,6 @@ namespace DB
{
namespace FailPoints
{
extern const char gc_skip_update_safe_point[];
extern const char gc_skip_merge_delta[];
extern const char gc_skip_merge[];
extern const char pause_before_dt_background_delta_merge[];
extern const char pause_until_dt_background_delta_merge[];
} // namespace FailPoints
Expand Down Expand Up @@ -517,10 +515,6 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte

SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, const SegmentPtr & segment)
{
fiu_do_on(FailPoints::gc_skip_merge, {
return {};
});

auto segment_rows = segment->getEstimatedRows();
auto segment_bytes = segment->getEstimatedBytes();
if (segment_rows >= dm_context->small_segment_rows || segment_bytes >= dm_context->small_segment_bytes)
Expand Down Expand Up @@ -560,10 +554,6 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c

SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(const DMContextPtr & dm_context, const SegmentPtr & segment, const SegmentPtr & prev_segment, const SegmentPtr & next_segment, DB::Timestamp gc_safe_point)
{
fiu_do_on(FailPoints::gc_skip_merge_delta, {
return {};
});

SegmentSnapshotPtr segment_snap;
{
std::shared_lock lock(read_write_mutex);
Expand Down Expand Up @@ -684,20 +674,13 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(const DMContextPtr & dm_conte
return new_segment;
}

UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
UInt64 DeltaMergeStore::onSyncGc(Int64 limit, const GCOptions & gc_options)
{
if (shutdown_called.load(std::memory_order_relaxed))
return 0;

bool skip_update_safe_point = false;
fiu_do_on(FailPoints::gc_skip_update_safe_point, {
skip_update_safe_point = true;
});
if (!skip_update_safe_point)
{
if (!updateGCSafePoint())
return 0;
}
if (gc_options.update_safe_point && !updateGCSafePoint())
return 0;

{
std::shared_lock lock(read_write_mutex);
Expand All @@ -712,9 +695,10 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)

DB::Timestamp gc_safe_point = latest_gc_safe_point.load(std::memory_order_acquire);
LOG_TRACE(log,
"GC on table {} start with key: {}, gc_safe_point: {}, max gc limit: {}",
"GC on table start, table={} check_key={} options={} gc_safe_point={} max_gc_limit={}",
table_name,
next_gc_check_key.toDebugString(),
gc_options.toString(),
gc_safe_point,
limit);

Expand Down Expand Up @@ -764,9 +748,9 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
try
{
SegmentPtr new_seg = nullptr;
if (!new_seg)
if (!new_seg && gc_options.do_merge)
new_seg = gcTrySegmentMerge(dm_context, segment);
if (!new_seg)
if (!new_seg && gc_options.do_merge_delta)
new_seg = gcTrySegmentMergeDelta(dm_context, segment, prev_segment, next_segment, gc_safe_point);

if (!new_seg)
Expand Down
66 changes: 66 additions & 0 deletions dbms/src/Storages/DeltaMerge/GCOptions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <fmt/format.h>

#include <string>

#pragma once

namespace DB
{
namespace DM
{

struct GCOptions
{
bool do_merge = true;
bool do_merge_delta = true;
bool update_safe_point = true;

static GCOptions newAll()
{
return GCOptions{
.do_merge = true,
.do_merge_delta = true,
.update_safe_point = true,
};
}

static GCOptions newNoneForTest()
{
return GCOptions{
.do_merge = false,
.do_merge_delta = false,
.update_safe_point = false,
};
}

static GCOptions newAllForTest()
{
return GCOptions{
.do_merge = true,
.do_merge_delta = true,
.update_safe_point = false,
};
}

std::string toString() const
{
return fmt::format("<merge={} merge_delta={} update_safe_point={}>", do_merge, do_merge_delta, update_safe_point);
}
};

} // namespace DM
} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ namespace DB
{
namespace FailPoints
{
extern const char gc_skip_update_safe_point[];
extern const char pause_before_dt_background_delta_merge[];
extern const char pause_until_dt_background_delta_merge[];
extern const char force_triggle_background_merge_delta[];
Expand Down
Loading

0 comments on commit f4455da

Please sign in to comment.