Skip to content

Commit

Permalink
Merge branch 'master' into fix-extra_table_id_column-duration-lm
Browse files Browse the repository at this point in the history
  • Loading branch information
Lloyd-Pottiger authored May 18, 2023
2 parents 2303b8d + a1cbb63 commit 225f237
Show file tree
Hide file tree
Showing 99 changed files with 2,615 additions and 975 deletions.
2 changes: 1 addition & 1 deletion contrib/aws
42 changes: 42 additions & 0 deletions dbms/src/DataStreams/AddExtraTableIDColumnInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/AddExtraTableIDColumnInputStream.h>

namespace DB
{

AddExtraTableIDColumnInputStream::AddExtraTableIDColumnInputStream(
BlockInputStreamPtr input,
int extra_table_id_index,
TableID physical_table_id)
: action(input->getHeader(), extra_table_id_index, physical_table_id)
{
children.push_back(input);
}

Block AddExtraTableIDColumnInputStream::readImpl()
{
Block res = children.back()->read();
if (!res)
return res;

auto ok = action.transform(res);
if (!ok)
return {};

return res;
}

} // namespace DB
48 changes: 48 additions & 0 deletions dbms/src/DataStreams/AddExtraTableIDColumnInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <DataStreams/AddExtraTableIDColumnTransformAction.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/Transaction/Types.h>

namespace DB
{

/**
* Adds an extra TableID column to the block.
*/
class AddExtraTableIDColumnInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "AddExtraTableIDColumn";

public:
AddExtraTableIDColumnInputStream(
BlockInputStreamPtr input,
int extra_table_id_index,
TableID physical_table_id);

String getName() const override { return NAME; }

Block getHeader() const override { return action.getHeader(); }

protected:
Block readImpl() override;

private:
AddExtraTableIDColumnTransformAction action;
};

} // namespace DB
93 changes: 93 additions & 0 deletions dbms/src/DataStreams/AddExtraTableIDColumnTransformAction.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/AddExtraTableIDColumnTransformAction.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>

namespace DB
{

Block AddExtraTableIDColumnTransformAction::buildHeader(
const Block & inner_header_,
int extra_table_id_index)
{
auto header = inner_header_.cloneEmpty();
if (extra_table_id_index != InvalidColumnID)
{
const auto & extra_table_id_col_define = DM::getExtraTableIDColumnDefine();
ColumnWithTypeAndName col{
extra_table_id_col_define.type->createColumn(),
extra_table_id_col_define.type,
extra_table_id_col_define.name,
extra_table_id_col_define.id,
extra_table_id_col_define.default_value};
header.insert(extra_table_id_index, col);
}
return header;
}

Block AddExtraTableIDColumnTransformAction::buildHeader(
const DM::ColumnDefines & columns_to_read_,
int extra_table_id_index)
{
auto inner_header = toEmptyBlock(columns_to_read_);
return buildHeader(inner_header, extra_table_id_index);
}

AddExtraTableIDColumnTransformAction::AddExtraTableIDColumnTransformAction(
const Block & inner_header_,
int extra_table_id_index_,
TableID physical_table_id_)
: header(buildHeader(inner_header_, extra_table_id_index_))
, extra_table_id_index(extra_table_id_index_)
, physical_table_id(physical_table_id_)
{
}

AddExtraTableIDColumnTransformAction::AddExtraTableIDColumnTransformAction(
const DM::ColumnDefines & columns_to_read_,
int extra_table_id_index_,
TableID physical_table_id_)
: header(buildHeader(columns_to_read_, extra_table_id_index_))
, extra_table_id_index(extra_table_id_index_)
, physical_table_id(physical_table_id_)
{
}

Block AddExtraTableIDColumnTransformAction::getHeader() const
{
return header;
}

bool AddExtraTableIDColumnTransformAction::transform(Block & block)
{
if (unlikely(!block))
return true;

if (extra_table_id_index != InvalidColumnID)
{
const auto & extra_table_id_col_define = DM::getExtraTableIDColumnDefine();
ColumnWithTypeAndName col{{}, extra_table_id_col_define.type, extra_table_id_col_define.name, extra_table_id_col_define.id};
size_t row_number = block.rows();
auto col_data = col.type->createColumnConst(row_number, Field(physical_table_id));
col.column = std::move(col_data);
block.insert(extra_table_id_index, std::move(col));
}

total_rows += block.rows();

return true;
}

} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,44 @@

namespace DB
{
struct SegmentReadTransformAction
struct AddExtraTableIDColumnTransformAction
{
public:
SegmentReadTransformAction(
const Block & header_,
static Block buildHeader(
const Block & inner_header_,
int extra_table_id_index_);

static Block buildHeader(
const DM::ColumnDefines & columns_to_read_,
int extra_table_id_index_);

AddExtraTableIDColumnTransformAction(
const Block & inner_header_,
int extra_table_id_index_,
TableID physical_table_id_)
: header(header_)
, extra_table_id_index(extra_table_id_index_)
, physical_table_id(physical_table_id_)
{
}
TableID physical_table_id_);

AddExtraTableIDColumnTransformAction(
const DM::ColumnDefines & columns_to_read_,
int extra_table_id_index_,
TableID physical_table_id_);

bool transform(Block & block);

Block getHeader() const;

size_t totalRows() const
{
return total_rows;
}


private:
Block header;
// position of the ExtraPhysTblID column in column_names parameter in the StorageDeltaMerge::read function.
const int extra_table_id_index;
const TableID physical_table_id;

size_t total_rows = 0;
};

} // namespace DB
45 changes: 0 additions & 45 deletions dbms/src/DataStreams/SegmentReadTransformAction.cpp

This file was deleted.

37 changes: 28 additions & 9 deletions dbms/src/DataStreams/WindowBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,22 +647,41 @@ void WindowBlockInputStream::appendInfo(FmtBuffer & buffer) const
action.appendInfo(buffer);
}

void WindowTransformAction::advanceRowNumber(RowNumber & x) const
void WindowTransformAction::advanceRowNumber(RowNumber & row_num) const
{
assert(x.block >= first_block_number);
assert(x.block - first_block_number < window_blocks.size());
assert(row_num.block >= first_block_number);
assert(row_num.block - first_block_number < window_blocks.size());

const auto block_rows = blockAt(x).rows;
assert(x.row < block_rows);
const auto block_rows = blockAt(row_num).rows;
assert(row_num.row < block_rows);

++x.row;
if (x.row < block_rows)
++row_num.row;
if (row_num.row < block_rows)
{
return;
}

x.row = 0;
++x.block;
row_num.row = 0;
++row_num.block;
}

RowNumber WindowTransformAction::getPreviousRowNumber(const RowNumber & row_num) const
{
assert(row_num.block >= first_block_number);
assert(!(row_num.block == 0 && row_num.row == 0));

RowNumber prev_row_num = row_num;
if (row_num.row > 0)
{
--prev_row_num.row;
return prev_row_num;
}

--prev_row_num.block;
assert(prev_row_num.block - first_block_number < window_blocks.size());
const auto new_block_rows = blockAt(prev_row_num).rows;
prev_row_num.row = new_block_rows - 1;
return prev_row_num;
}

bool WindowTransformAction::lead(RowNumber & x, size_t offset) const
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/DataStreams/WindowBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ struct WindowTransformAction
return window_blocks[x.block - first_block_number].output_columns;
}

void advanceRowNumber(RowNumber & x) const;
void advanceRowNumber(RowNumber & row_num) const;

RowNumber getPreviousRowNumber(const RowNumber & row_num) const;

bool lead(RowNumber & x, size_t offset) const;

Expand Down
19 changes: 13 additions & 6 deletions dbms/src/Debug/MockExecutor/AstToPBUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,19 @@ ColumnName splitQualifiedName(const String & s)
DAGSchema::const_iterator checkSchema(const DAGSchema & input, const String & checked_column)
{
auto ft = std::find_if(input.begin(), input.end(), [&checked_column](const auto & field) {
auto [checked_db_name, checked_table_name, checked_column_name] = splitQualifiedName(checked_column);
auto [db_name, table_name, column_name] = splitQualifiedName(field.first);
if (checked_table_name.empty())
return column_name == checked_column_name;
else
return table_name == checked_table_name && column_name == checked_column_name;
try
{
auto [checked_db_name, checked_table_name, checked_column_name] = splitQualifiedName(checked_column);
auto [db_name, table_name, column_name] = splitQualifiedName(field.first);
if (checked_table_name.empty())
return column_name == checked_column_name;
else
return table_name == checked_table_name && column_name == checked_column_name;
}
catch (...)
{
return false;
}
});
return ft;
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Debug/MockExecutor/FuncSigMap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,6 @@ std::unordered_map<String, tipb::ExprType> window_func_name_to_sig({
{"Lead", tipb::ExprType::Lead},
{"Lag", tipb::ExprType::Lag},
{"FirstValue", tipb::ExprType::FirstValue},
{"LastValue", tipb::ExprType::LastValue},
});
} // namespace DB::tests
2 changes: 2 additions & 0 deletions dbms/src/Debug/MockExecutor/WindowBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ bool WindowBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collat
break;
}
case tipb::ExprType::FirstValue:
case tipb::ExprType::LastValue:
{
assert(window_expr->children_size() == 1);
const auto arg_type = window_expr->children(0).field_type();
Expand Down Expand Up @@ -212,6 +213,7 @@ ExecutorBinderPtr compileWindow(ExecutorBinderPtr input, size_t & executor_index
break;
}
case tipb::ExprType::FirstValue:
case tipb::ExprType::LastValue:
{
ci = children_ci[0];
break;
Expand Down
Loading

0 comments on commit 225f237

Please sign in to comment.