Skip to content

Commit

Permalink
Merge branch 'hongyunyan_add_comment' of https://github.com/hongyunya…
Browse files Browse the repository at this point in the history
…n/tiflash into hongyunyan_add_comment
  • Loading branch information
hongyunyan committed Jun 14, 2022
2 parents 0b5b619 + 65cee6a commit b593b97
Show file tree
Hide file tree
Showing 102 changed files with 1,690 additions and 1,804 deletions.
3 changes: 0 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,6 @@ else (ENABLE_FAILPOINTS)
message (STATUS "Failpoints are disabled")
endif (ENABLE_FAILPOINTS)

# Enable PageStorage V3 test.
option (ENABLE_V3_PAGESTORAGE "Enables V3 PageStorage" ON)

# Flags for test coverage
option (TEST_COVERAGE "Enables flags for test coverage" OFF)
option (TEST_COVERAGE_XML "Output XML report for test coverage" OFF)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ TiFlash repository is based on [ClickHouse](https://github.com/ClickHouse/ClickH

### Start with TiDB Cloud

Quickly explore TiFlash with [a free trial of TiDB Cloud](https://tidbcloud.com/signup).
Quickly explore TiFlash with [a free trial of TiDB Cloud](https://tidbcloud.com/free-trial).

See [TiDB Cloud Quick Start Guide](https://docs.pingcap.com/tidbcloud/tidb-cloud-quickstart).

Expand Down
13 changes: 7 additions & 6 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,10 @@ add_headers_and_sources(dbms src/Storages/Page/V2/VersionSet)
add_headers_and_sources(dbms src/Storages/Page/V2/gc)
add_headers_and_sources(dbms src/WindowFunctions)
add_headers_and_sources(dbms src/TiDB/Schema)
if (ENABLE_V3_PAGESTORAGE)
add_headers_and_sources(dbms src/Storages/Page/V3)
add_headers_and_sources(dbms src/Storages/Page/V3/LogFile)
add_headers_and_sources(dbms src/Storages/Page/V3/WAL)
add_headers_and_sources(dbms src/Storages/Page/V3/spacemap)
endif()
add_headers_and_sources(dbms src/Storages/Page/V3)
add_headers_and_sources(dbms src/Storages/Page/V3/LogFile)
add_headers_and_sources(dbms src/Storages/Page/V3/WAL)
add_headers_and_sources(dbms src/Storages/Page/V3/spacemap)
add_headers_and_sources(dbms src/Storages/Page/)
add_headers_and_sources(dbms src/TiDB)
add_headers_and_sources(dbms src/Client)
Expand Down Expand Up @@ -323,6 +321,9 @@ if (ENABLE_TESTS)
if (ENABLE_TIFLASH_DTWORKLOAD)
target_link_libraries(bench_dbms dt-workload-lib)
endif ()
if (ENABLE_TIFLASH_PAGEWORKLOAD)
target_link_libraries(bench_dbms page-workload-lib)
endif ()

add_check(bench_dbms)
endif ()
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/getNumberOfPhysicalCPUCores.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
#pragma once

/// Get number of CPU cores without hyper-threading.
/// Note: do not support environment under resource isolation mechanism like Docker, CGroup.
unsigned getNumberOfPhysicalCPUCores();
18 changes: 18 additions & 0 deletions dbms/src/DataStreams/WindowBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Common/Arena.h>
#include <DataStreams/WindowBlockInputStream.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/WindowDescription.h>
#include <Interpreters/convertFieldToType.h>

namespace DB
Expand Down Expand Up @@ -574,4 +575,21 @@ void WindowBlockInputStream::tryCalculate()
peer_group_number = 1;
}
}

void WindowBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.append(", function: {");
buffer.joinStr(
window_description.window_functions_descriptions.begin(),
window_description.window_functions_descriptions.end(),
[&](const auto & func, FmtBuffer & b) {
b.append(func.window_function->getName());
},
", ");
buffer.fmtAppend(
"}}, frame: {{type: {}, boundary_begin: {}, boundary_end: {}}}",
frameTypeToString(window_description.frame.type),
boundaryTypeToString(window_description.frame.begin_type),
boundaryTypeToString(window_description.frame.end_type));
}
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/WindowBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/FmtUtils.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/WindowDescription.h>
Expand Down Expand Up @@ -169,6 +170,7 @@ class WindowBlockInputStream : public IProfilingBlockInputStream

protected:
Block readImpl() override;
void appendInfo(FmtBuffer & buffer) const override;

LoggerPtr log;

Expand Down
199 changes: 197 additions & 2 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTOrderByElement.h>
#include <Parsers/ASTSelectQuery.h>
#include <Poco/StringTokenizer.h>
#include <common/logger_useful.h>

namespace DB
{
using ASTPartitionByElement = ASTOrderByElement;
void literalFieldToTiPBExpr(const ColumnInfo & ci, const Field & val_field, tipb::Expr * expr, Int32 collator_id)
{
*(expr->mutable_field_type()) = columnInfoToFieldType(ci);
Expand Down Expand Up @@ -190,6 +190,12 @@ std::unordered_map<String, tipb::ExprType> agg_func_name_to_sig({
{"group_concat", tipb::ExprType::GroupConcat},
});

std::unordered_map<String, tipb::ExprType> window_func_name_to_sig({
{"RowNumber", tipb::ExprType::RowNumber},
{"Rank", tipb::ExprType::Rank},
{"DenseRank", tipb::ExprType::DenseRank},
});

DAGColumnInfo toNullableDAGColumnInfo(const DAGColumnInfo & input)
{
DAGColumnInfo output = input;
Expand Down Expand Up @@ -1343,6 +1349,105 @@ void Join::toMPPSubPlan(size_t & executor_index, const DAGProperties & propertie
exchange_map[left_exchange_receiver->name] = std::make_pair(left_exchange_receiver, left_exchange_sender);
exchange_map[right_exchange_receiver->name] = std::make_pair(right_exchange_receiver, right_exchange_sender);
}

bool Window::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeWindow);
tipb_executor->set_executor_id(name);
tipb::Window * window = tipb_executor->mutable_window();
auto & input_schema = children[0]->output_schema;
for (const auto & expr : func_descs)
{
tipb::Expr * window_expr = window->add_func_desc();
const auto * window_func = typeid_cast<const ASTFunction *>(expr.get());
for (const auto & arg : window_func->arguments->children)
{
tipb::Expr * func = window_expr->add_children();
astToPB(input_schema, arg, func, collator_id, context);
}
auto window_sig_it = window_func_name_to_sig.find(window_func->name);
if (window_sig_it == window_func_name_to_sig.end())
throw Exception(fmt::format("Unsupported window function {}", window_func->name), ErrorCodes::LOGICAL_ERROR);
auto window_sig = window_sig_it->second;
window_expr->set_tp(window_sig);
auto * ft = window_expr->mutable_field_type();
// TODO: Maybe more window functions with different field type.
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagBinary);
ft->set_collate(collator_id);
ft->set_flen(21);
ft->set_decimal(-1);
}

for (const auto & child : order_by_exprs)
{
auto * elem = typeid_cast<ASTOrderByElement *>(child.get());
if (!elem)
throw Exception("Invalid order by element", ErrorCodes::LOGICAL_ERROR);
tipb::ByItem * by = window->add_order_by();
by->set_desc(elem->direction < 0);
tipb::Expr * expr = by->mutable_expr();
astToPB(children[0]->output_schema, elem->children[0], expr, collator_id, context);
}

for (const auto & child : partition_by_exprs)
{
auto * elem = typeid_cast<ASTPartitionByElement *>(child.get());
if (!elem)
throw Exception("Invalid partition by element", ErrorCodes::LOGICAL_ERROR);
tipb::ByItem * by = window->add_partition_by();
by->set_desc(elem->direction < 0);
tipb::Expr * expr = by->mutable_expr();
astToPB(children[0]->output_schema, elem->children[0], expr, collator_id, context);
}

if (frame.type.has_value())
{
tipb::WindowFrame * mut_frame = window->mutable_frame();
mut_frame->set_type(frame.type.value());
if (frame.start.has_value())
{
auto * start = mut_frame->mutable_start();
start->set_offset(std::get<2>(frame.start.value()));
start->set_unbounded(std::get<1>(frame.start.value()));
start->set_type(std::get<0>(frame.start.value()));
}

if (frame.end.has_value())
{
auto * end = mut_frame->mutable_end();
end->set_offset(std::get<2>(frame.end.value()));
end->set_unbounded(std::get<1>(frame.end.value()));
end->set_type(std::get<0>(frame.end.value()));
}
}

auto * children_executor = window->mutable_child();
return children[0]->toTiPBExecutor(children_executor, collator_id, mpp_info, context);
}

bool Sort::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeSort);
tipb_executor->set_executor_id(name);
tipb::Sort * sort = tipb_executor->mutable_sort();
sort->set_ispartialsort(is_partial_sort);

for (const auto & child : by_exprs)
{
auto * elem = typeid_cast<ASTOrderByElement *>(child.get());
if (!elem)
throw Exception("Invalid order by element", ErrorCodes::LOGICAL_ERROR);
tipb::ByItem * by = sort->add_byitems();
by->set_desc(elem->direction < 0);
tipb::Expr * expr = by->mutable_expr();
astToPB(children[0]->output_schema, elem->children[0], expr, collator_id, context);
}

auto * children_executor = sort->mutable_child();
return children[0]->toTiPBExecutor(children_executor, collator_id, mpp_info, context);
}

} // namespace mock

ExecutorPtr compileTableScan(size_t & executor_index, TableInfo & table_info, String & table_alias, bool append_pk_column)
Expand Down Expand Up @@ -1561,11 +1666,101 @@ ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, ti
return exchange_sender;
}


ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema)
{
ExecutorPtr exchange_receiver = std::make_shared<mock::ExchangeReceiver>(executor_index, schema);
return exchange_receiver;
}

ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame)
{
std::vector<ASTPtr> partition_columns;
if (partition_by_expr_list != nullptr)
{
for (const auto & child : partition_by_expr_list->children)
{
auto * elem = typeid_cast<ASTPartitionByElement *>(child.get());
if (!elem)
throw Exception("Invalid partition by element", ErrorCodes::LOGICAL_ERROR);
partition_columns.push_back(child);
compileExpr(input->output_schema, elem->children[0]);
}
}

std::vector<ASTPtr> order_columns;
if (order_by_expr_list != nullptr)
{
for (const auto & child : order_by_expr_list->children)
{
auto * elem = typeid_cast<ASTOrderByElement *>(child.get());
if (!elem)
throw Exception("Invalid order by element", ErrorCodes::LOGICAL_ERROR);
order_columns.push_back(child);
compileExpr(input->output_schema, elem->children[0]);
}
}

DAGSchema output_schema;
output_schema.insert(output_schema.end(), input->output_schema.begin(), input->output_schema.end());

std::vector<ASTPtr> window_exprs;
if (func_desc_list != nullptr)
{
for (const auto & expr : func_desc_list->children)
{
const auto * func = typeid_cast<const ASTFunction *>(expr.get());
window_exprs.push_back(expr);
std::vector<TiDB::ColumnInfo> children_ci;
for (const auto & arg : func->arguments->children)
{
children_ci.push_back(compileExpr(input->output_schema, arg));
}
// TODO: add more window functions
TiDB::ColumnInfo ci;
switch (window_func_name_to_sig[func->name])
{
case tipb::ExprType::RowNumber:
case tipb::ExprType::Rank:
case tipb::ExprType::DenseRank:
{
ci.tp = TiDB::TypeLongLong;
ci.flag = TiDB::ColumnFlagBinary;
break;
}
default:
throw Exception(fmt::format("Unsupported window function {}", func->name), ErrorCodes::LOGICAL_ERROR);
}
output_schema.emplace_back(std::make_pair(func->getColumnName(), ci));
}
}

ExecutorPtr window = std::make_shared<mock::Window>(
executor_index,
output_schema,
window_exprs,
std::move(partition_columns),
std::move(order_columns),
frame);
window->children.push_back(input);
return window;
}

ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort)
{
std::vector<ASTPtr> order_columns;
if (order_by_expr_list != nullptr)
{
for (const auto & child : order_by_expr_list->children)
{
auto * elem = typeid_cast<ASTOrderByElement *>(child.get());
if (!elem)
throw Exception("Invalid order by element", ErrorCodes::LOGICAL_ERROR);
order_columns.push_back(child);
compileExpr(input->output_schema, elem->children[0]);
}
}
ExecutorPtr sort = std::make_shared<mock::Sort>(executor_index, input->output_schema, std::move(order_columns), is_partial_sort);
sort->children.push_back(input);
return sort;
}
} // namespace DB
Loading

0 comments on commit b593b97

Please sign in to comment.