Skip to content

Commit

Permalink
Merge branch 'switch_rf_local' of github.com:xzhangxian1008/tiflash i…
Browse files Browse the repository at this point in the history
…nto switch_rf_local
  • Loading branch information
xzhangxian1008 committed Feb 7, 2023
2 parents c9eb9d2 + 7b3de3c commit 153a3b6
Show file tree
Hide file tree
Showing 105 changed files with 1,004 additions and 1,271 deletions.
7 changes: 6 additions & 1 deletion dbms/src/Columns/ColumnAggregateFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <AggregateFunctions/AggregateFunctionState.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnsCommon.h>
#include <Common/HashTable/Hash.h>
#include <Common/SipHash.h>
#include <Common/typeid_cast.h>
Expand Down Expand Up @@ -143,7 +144,11 @@ ColumnPtr ColumnAggregateFunction::filter(const Filter & filter, ssize_t result_
auto & res_data = res->getData();

if (result_size_hint)
res_data.reserve(result_size_hint > 0 ? result_size_hint : size);
{
if (result_size_hint < 0)
result_size_hint = countBytesInFilter(filter);
res_data.reserve(result_size_hint);
}

for (size_t i = 0; i < size; ++i)
if (filter[i])
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Columns/ColumnArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>
void insertFrom(const IColumn & src_, size_t n) override;
void insertDefault() override;
void popBack(size_t n) override;
/// TODO: If result_size_hint < 0, makes reserve() using size of filtered column, not source column to avoid some OOM issues.
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override;
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Columns/ColumnDecimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,11 @@ ColumnPtr ColumnDecimal<T>::filter(const IColumn::Filter & filt, ssize_t result_
Container & res_data = res->getData();

if (result_size_hint)
res_data.reserve(result_size_hint > 0 ? result_size_hint : size);
{
if (result_size_hint < 0)
result_size_hint = countBytesInFilter(filt);
res_data.reserve(result_size_hint);
}

const UInt8 * filt_pos = filt.data();
const UInt8 * filt_end = filt_pos + size;
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Columns/ColumnFixedString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnsCommon.h>
#include <Common/Arena.h>
#include <Common/HashTable/Hash.h>
#include <Common/SipHash.h>
Expand Down Expand Up @@ -205,7 +206,11 @@ ColumnPtr ColumnFixedString::filter(const IColumn::Filter & filt, ssize_t result
auto res = ColumnFixedString::create(n);

if (result_size_hint)
res->chars.reserve(result_size_hint > 0 ? result_size_hint * n : chars.size());
{
if (result_size_hint < 0)
result_size_hint = countBytesInFilter(filt);
res->chars.reserve(result_size_hint * n);
}

const UInt8 * filt_pos = &filt[0];
const UInt8 * filt_end = filt_pos + col_size;
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Columns/ColumnVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Columns/ColumnVector.h>
#include <Columns/ColumnsCommon.h>
#include <Common/Arena.h>
#include <Common/Exception.h>
#include <Common/HashTable/Hash.h>
Expand Down Expand Up @@ -213,7 +214,11 @@ ColumnPtr ColumnVector<T>::filter(const IColumn::Filter & filt, ssize_t result_s
Container & res_data = res->getData();

if (result_size_hint)
res_data.reserve(result_size_hint > 0 ? result_size_hint : size);
{
if (result_size_hint < 0)
result_size_hint = countBytesInFilter(filt);
res_data.reserve(result_size_hint);
}

const UInt8 * filt_pos = &filt[0];
const UInt8 * filt_end = filt_pos + size;
Expand Down
15 changes: 8 additions & 7 deletions dbms/src/Columns/ColumnsCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ struct ResultOffsetsBuilder
: res_offsets(*res_offsets_)
{}

void reserve(ssize_t result_size_hint, size_t src_size)
void reserve(size_t result_size_hint)
{
res_offsets.reserve(result_size_hint > 0 ? result_size_hint : src_size);
res_offsets.reserve(result_size_hint);
}

void insertOne(size_t array_size)
Expand Down Expand Up @@ -191,7 +191,7 @@ struct ResultOffsetsBuilder
struct NoResultOffsetsBuilder
{
explicit NoResultOffsetsBuilder(IColumn::Offsets *) {}
void reserve(ssize_t, size_t) {}
void reserve(size_t) {}
void insertOne(size_t) {}

template <size_t SIMD_BYTES>
Expand Down Expand Up @@ -221,11 +221,12 @@ void filterArraysImplGeneric(

if (result_size_hint)
{
result_offsets_builder.reserve(result_size_hint, size);

if (result_size_hint < 0)
res_elems.reserve(src_elems.size());
else if (result_size_hint < 1000000000 && src_elems.size() < 1000000000) /// Avoid overflow.
result_size_hint = countBytesInFilter(filt);

result_offsets_builder.reserve(result_size_hint);

if (result_size_hint < 1000000000 && src_elems.size() < 1000000000) /// Avoid overflow.
res_elems.reserve((result_size_hint * src_elems.size() + size - 1) / size);
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Columns/IColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class IColumn : public COWPtr<IColumn>
* Is used in WHERE and HAVING operations.
* If result_size_hint > 0, then makes advance reserve(result_size_hint) for the result column;
* if 0, then don't makes reserve(),
* otherwise (i.e. < 0), makes reserve() using size of source column.
* otherwise (i.e. < 0), makes reserve() using size of filtered column.
*/
using Filter = PaddedPODArray<UInt8>;
virtual Ptr filter(const Filter & filt, ssize_t result_size_hint) const = 0;
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(exception_during_mpp_root_task_run) \
M(exception_during_write_to_storage) \
M(force_set_sst_to_dtfile_block_size) \
M(force_set_sst_decode_rand) \
M(exception_before_page_file_write_sync) \
M(force_set_segment_ingest_packs_fail) \
M(segment_merge_after_ingest_packs) \
Expand Down
42 changes: 35 additions & 7 deletions dbms/src/DataStreams/LimitBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,58 @@ namespace DB
LimitBlockInputStream::LimitBlockInputStream(
const BlockInputStreamPtr & input,
size_t limit_,
size_t offset_,
const String & req_id)
: log(Logger::get(req_id))
, action(input->getHeader(), limit_)
, limit(limit_)
, offset(offset_)
{
children.push_back(input);
}


Block LimitBlockInputStream::readImpl()
{
Block res = children.back()->read();
Block res;
size_t rows = 0;

if (action.transform(res))
if (pos >= offset + limit)
{
return res;
}
else

do
{
return {};
}
res = children.back()->read();
if (!res)
return res;
rows = res.rows();
pos += rows;
} while (pos <= offset);

/// give away the whole block
if (pos >= offset + rows && pos <= offset + limit)
return res;

/// give away a piece of the block
UInt64 start = std::max(
static_cast<Int64>(0),
static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows));

UInt64 length = std::min(
static_cast<Int64>(limit),
std::min(
static_cast<Int64>(pos) - static_cast<Int64>(offset),
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows)));

for (size_t i = 0; i < res.columns(); ++i)
res.safeGetByPosition(i).column = res.safeGetByPosition(i).column->cut(start, length);

return res;
}

void LimitBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.fmtAppend(", limit = {}", action.getLimit());
buffer.fmtAppend(", limit = {}", limit);
}
} // namespace DB
6 changes: 5 additions & 1 deletion dbms/src/DataStreams/LimitBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class LimitBlockInputStream : public IProfilingBlockInputStream
LimitBlockInputStream(
const BlockInputStreamPtr & input,
size_t limit_,
size_t offset_,
const String & req_id);

String getName() const override { return NAME; }
Expand All @@ -46,7 +47,10 @@ class LimitBlockInputStream : public IProfilingBlockInputStream

private:
LoggerPtr log;
LocalLimitTransformAction action;
size_t limit;
size_t offset;
/// how many lines were read, including the last read block
size_t pos = 0;
};

} // namespace DB
17 changes: 0 additions & 17 deletions dbms/src/DataStreams/LimitTransformAction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,6 @@ void cut(Block & block, size_t rows [[maybe_unused]], size_t limit, size_t pos)
}
} // namespace

bool LocalLimitTransformAction::transform(Block & block)
{
if (unlikely(!block))
return true;

/// pos - how many lines were read, including the last read block
if (pos >= limit)
return false;

auto rows = block.rows();
pos += rows;
if (pos > limit)
cut(block, rows, limit, pos);
// for pos <= limit, give away the whole block
return true;
}

bool GlobalLimitTransformAction::transform(Block & block)
{
if (unlikely(!block))
Expand Down
22 changes: 0 additions & 22 deletions dbms/src/DataStreams/LimitTransformAction.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,6 @@

namespace DB
{
struct LocalLimitTransformAction
{
public:
LocalLimitTransformAction(
const Block & header_,
size_t limit_)
: header(header_)
, limit(limit_)
{
}

bool transform(Block & block);

Block getHeader() const { return header; }
size_t getLimit() const { return limit; }

private:
const Block header;
const size_t limit;
size_t pos = 0;
};

struct GlobalLimitTransformAction
{
public:
Expand Down
32 changes: 4 additions & 28 deletions dbms/src/DataStreams/MarkInCompressedFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

#pragma once

#include <tuple>

#include <Common/PODArray.h>
#include <Core/Types.h>
#include <IO/WriteHelpers.h>
#include <Common/PODArray.h>

#include <tuple>


namespace DB
Expand Down Expand Up @@ -50,28 +50,4 @@ struct MarkInCompressedFile

using MarksInCompressedFile = PODArray<MarkInCompressedFile>;
using MarksInCompressedFilePtr = std::shared_ptr<MarksInCompressedFile>;

struct MarkWithSizeInCompressedFile
{
MarkInCompressedFile mark;
size_t mark_size;

bool operator==(const MarkWithSizeInCompressedFile & rhs) const
{
return std::tie(mark, mark_size) == std::tie(rhs.mark, rhs.mark_size);
}
bool operator!=(const MarkWithSizeInCompressedFile & rhs) const
{
return !(*this == rhs);
}

String toString() const
{
return "(" + mark.toString() + "," + DB::toString(mark_size) + ")";
}
};

using MarkWithSizesInCompressedFile = PODArray<MarkWithSizeInCompressedFile>;
using MarkWithSizesInCompressedFilePtr = std::shared_ptr<MarkWithSizesInCompressedFile>;

}
} // namespace DB
5 changes: 0 additions & 5 deletions dbms/src/Debug/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -623,11 +623,6 @@ void MockRaftStoreProxy::snapshot(
// The new entry is committed on Proxy's side.
region->updateCommitIndex(index);

auto ori_snapshot_apply_method = kvs.snapshot_apply_method;
kvs.snapshot_apply_method = TiDB::SnapshotApplyMethod::DTFile_Single;
SCOPE_EXIT({
kvs.snapshot_apply_method = ori_snapshot_apply_method;
});
std::vector<SSTView> ssts;
for (auto & cf : cfs)
{
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ namespace DB
namespace FailPoints
{
extern const char force_set_sst_to_dtfile_block_size[];
extern const char force_set_sst_decode_rand[];
extern const char force_set_safepoint_when_decode_block[];
} // namespace FailPoints

Expand Down Expand Up @@ -422,7 +421,6 @@ void MockRaftCommand::dbgFuncIngestSST(Context & context, const ASTs & args, DBG
auto & kvstore = tmt.getKVStore();
auto region = kvstore->getRegion(region_id);

FailPointHelper::enableFailPoint(FailPoints::force_set_sst_decode_rand);
// Register some mock SST reading methods so that we can decode data in `MockSSTReader::MockSSTData`
RegionMockTest mock_test(kvstore.get(), region);

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,11 +712,11 @@ void DAGQueryBlockInterpreter::executeLimit(DAGPipeline & pipeline)
limit = query_block.limit_or_topn->limit().limit();
else
limit = query_block.limit_or_topn->topn().limit();
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, log->identifier()); });
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, /*offset*/ 0, log->identifier()); });
if (pipeline.hasMoreThanOneStream())
{
executeUnion(pipeline, max_streams, log, false, "for partial limit");
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, log->identifier()); });
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, /*offset*/ 0, log->identifier()); });
}
}

Expand Down
Loading

0 comments on commit 153a3b6

Please sign in to comment.