Skip to content

Commit

Permalink
add benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Mar 7, 2024
1 parent b7e8fc0 commit 43ffaeb
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 47 deletions.
4 changes: 3 additions & 1 deletion cpp-ch/local-engine/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ if (ENABLE_TESTS)
target_compile_options(unit_tests_local_engine PRIVATE -Wno-unreachable-code)
target_link_libraries(unit_tests_local_engine PRIVATE gluten_clickhouse_backend_libs clickhouse_parsers loggers ch_contrib::gmock_all ch_contrib::gtest)
target_link_libraries(unit_tests_local_engine PRIVATE ch_parquet)
else()
endif()

if (ENABLE_BENCHMARKS)
include_directories(benchmark_local_engine SYSTEM PUBLIC
${ClickHouse_SOURCE_DIR}/utils/extern-local_engine
)
Expand Down
86 changes: 82 additions & 4 deletions cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
#include <parquet/arrow/reader.h>
#include <substrait/plan.pb.h>
#include <tests/gluten_test_util.h>
#include <Common/DebugUtils.h>

namespace
{

void BM_ParquetReadAllLocal(benchmark::State & state)
void BM_ColumnIndexRead_NoFilter(benchmark::State & state)
{
using namespace DB;

Expand All @@ -55,7 +56,7 @@ void BM_ParquetReadAllLocal(benchmark::State & state)
}
}

void BM_ParquetReadAllOld(benchmark::State & state)
void BM_ColumnIndexRead_Old(benchmark::State & state)
{
using namespace DB;

Expand Down Expand Up @@ -163,10 +164,87 @@ void BM_OptimizedParquetReadDate32(benchmark::State & state)
}
}
}

substrait::ReadRel::LocalFiles createLocalFiles(const std::string & filename, const bool use_local_format)
{
substrait::ReadRel::LocalFiles files;
substrait::ReadRel::LocalFiles::FileOrFiles * file_item = files.add_items();
file_item->set_uri_file("file://" + filename);
file_item->set_start(0);
file_item->set_length(std::filesystem::file_size(filename));
const substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format;
file_item->mutable_parquet()->CopyFrom(parquet_format);

auto config = Poco::AutoPtr(new Poco::Util::MapConfiguration());
config->setBool("use_local_format", use_local_format);
local_engine::SerializedPlanParser::global_context->setConfig(config);

return files;
}

void doRead(const substrait::ReadRel::LocalFiles & files, const DB::ActionsDAGPtr & pushDown, const DB::Block & header)
{
const auto builder = std::make_unique<DB::QueryPipelineBuilder>();
const auto source
= std::make_shared<local_engine::SubstraitFileSource>(local_engine::SerializedPlanParser::global_context, header, files);
source->setKeyCondition(pushDown, local_engine::SerializedPlanParser::global_context);
builder->init(DB::Pipe(source));
auto pipeline = DB::QueryPipelineBuilder::getPipeline(std::move(*builder));
auto reader = DB::PullingPipelineExecutor(pipeline);
auto result = header.cloneEmpty();
size_t total_rows = 0;
while (reader.pull(result))
{
#ifndef NDEBUG
debug::headBlock(result);
#endif
total_rows += result.rows();
}
#ifndef NDEBUG
std::cerr << "rows:" << total_rows << std::endl;
#endif
}

void BM_ColumnIndexRead_Filter_ReturnAllResult(benchmark::State & state)
{
using namespace DB;

const std::string filename = local_engine::test::data_file(
"benchmark/column_index/lineitem/part-00000-9395e12a-3620-4085-9677-c63b920353f4-c000.snappy.parquet");
const std::string filter1 = "l_shipdate is not null AND l_shipdate <= toDate32('1998-09-01')";
const substrait::ReadRel::LocalFiles files = createLocalFiles(filename, true);
const AnotherRowType schema = local_engine::test::readParquetSchema(filename);
const ActionsDAGPtr pushDown = local_engine::test::parseFilter(filter1, schema);
const Block header = {toBlockRowType(schema)};

for (auto _ : state)
doRead(files, pushDown, header);
local_engine::SerializedPlanParser::global_context->setConfig(Poco::AutoPtr(new Poco::Util::MapConfiguration()));
}

void BM_ColumnIndexRead_Filter_ReturnHalfResult(benchmark::State & state)
{
using namespace DB;

const std::string filename = local_engine::test::data_file(
"benchmark/column_index/lineitem/part-00000-9395e12a-3620-4085-9677-c63b920353f4-c000.snappy.parquet");
const std::string filter1 = "l_orderkey is not null AND l_orderkey > 300977829";
const substrait::ReadRel::LocalFiles files = createLocalFiles(filename, true);
const AnotherRowType schema = local_engine::test::readParquetSchema(filename);
const ActionsDAGPtr pushDown = local_engine::test::parseFilter(filter1, schema);
const Block header = {toBlockRowType(schema)};

for (auto _ : state)
doRead(files, pushDown, header);
local_engine::SerializedPlanParser::global_context->setConfig(Poco::AutoPtr(new Poco::Util::MapConfiguration()));
}

}

BENCHMARK(BM_ParquetReadAllOld)->Unit(benchmark::kMillisecond)->Iterations(50);
BENCHMARK(BM_ParquetReadAllLocal)->Unit(benchmark::kMillisecond)->Iterations(50);
BENCHMARK(BM_ColumnIndexRead_Old)->Unit(benchmark::kMillisecond)->Iterations(20);
BENCHMARK(BM_ColumnIndexRead_NoFilter)->Unit(benchmark::kMillisecond)->Iterations(20);
BENCHMARK(BM_ColumnIndexRead_Filter_ReturnAllResult)->Unit(benchmark::kMillisecond)->Iterations(20);
BENCHMARK(BM_ColumnIndexRead_Filter_ReturnHalfResult)->Unit(benchmark::kMillisecond)->Iterations(20);
BENCHMARK(BM_ParquetReadDate32)->Unit(benchmark::kMillisecond)->Iterations(10);
BENCHMARK(BM_OptimizedParquetReadString)->Unit(benchmark::kMillisecond)->Iterations(10);
BENCHMARK(BM_OptimizedParquetReadDate32)->Unit(benchmark::kMillisecond)->Iterations(200);
38 changes: 38 additions & 0 deletions cpp-ch/local-engine/tests/gluten_test_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
#include <Formats/FormatSettings.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromFile.h>

#include <Interpreters/ActionsVisitor.h>
#include <Parser/SerializedPlanParser.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
#include <Common/Exception.h>
Expand All @@ -34,6 +39,39 @@ extern const int LOGICAL_ERROR;

namespace local_engine::test
{
using namespace DB;
ActionsDAGPtr parseFilter(const std::string & filter, const AnotherRowType & name_and_types)
{
using namespace DB;

std::unordered_map<std::string, ColumnWithTypeAndName> node_name_to_input_column;
std::ranges::transform(
name_and_types,
std::inserter(node_name_to_input_column, node_name_to_input_column.end()),
[](const auto & name_and_type) { return std::make_pair(name_and_type.name, toBlockFieldType(name_and_type)); });

NamesAndTypesList aggregation_keys;
ColumnNumbersList aggregation_keys_indexes_list;
const AggregationKeysInfo info(aggregation_keys, aggregation_keys_indexes_list, GroupByKind::NONE);
constexpr SizeLimits size_limits_for_set;
ParserExpression parser2;
const ASTPtr ast_exp = parseQuery(parser2, filter.data(), filter.data() + filter.size(), "", 0, 0);
const auto prepared_sets = std::make_shared<PreparedSets>();
ActionsMatcher::Data visitor_data(
SerializedPlanParser::global_context,
size_limits_for_set,
static_cast<size_t>(0),
name_and_types,
std::make_shared<ActionsDAG>(name_and_types),
prepared_sets /* prepared_sets */,
false /* no_subqueries */,
false /* no_makeset */,
false /* only_consts */,
info);
ActionsVisitor(visitor_data).visit(ast_exp);
return ActionsDAG::buildFilterActionsDAG({visitor_data.getActions()->getOutputs().back()}, node_name_to_input_column);
}

const char * get_data_dir()
{
const auto * const result = std::getenv("PARQUET_TEST_DATA");
Expand Down
13 changes: 9 additions & 4 deletions cpp-ch/local-engine/tests/gluten_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include <string>
#include <Core/ColumnsWithTypeAndName.h>
#include <Core/NamesAndTypes.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/ActionsDAG.h>
#include <parquet/schema.h>

using BlockRowType = DB::ColumnsWithTypeAndName;
Expand Down Expand Up @@ -56,6 +58,8 @@ DB::DataTypePtr toDataType(const parquet::ColumnDescriptor & type);

AnotherRowType readParquetSchema(const std::string & file);

DB::ActionsDAGPtr parseFilter(const std::string & filter, const AnotherRowType & name_and_types);

}

inline DB::DataTypePtr BIGINT()
Expand Down Expand Up @@ -101,6 +105,11 @@ inline DB::DataTypePtr STRING()
return std::make_shared<DB::DataTypeString>();
}

inline DB::DataTypePtr DATE()
{
return std::make_shared<DB::DataTypeDate32>();
}

inline BlockFieldType toBlockFieldType(const AnotherFieldType & type)
{
return BlockFieldType(type.type, type.name);
Expand All @@ -116,15 +125,11 @@ inline BlockRowType toBlockRowType(const AnotherRowType & type, const bool rever
BlockRowType result;
result.reserve(type.size());
if (reverse)
{
for (auto it = type.rbegin(); it != type.rend(); ++it)
result.emplace_back(toBlockFieldType(*it));
}
else
{
for (const auto & field : type)
result.emplace_back(toBlockFieldType(field));
}
return result;
}

Expand Down
46 changes: 8 additions & 38 deletions cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@
#include <string>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ActionsVisitor.h>
#include <Interpreters/ExpressionActions.h>
#include <Parser/SerializedPlanParser.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>

#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include <Storages/Parquet/ArrowUtils.h>
#include <Storages/Parquet/ColumnIndexFilter.h>
Expand Down Expand Up @@ -55,38 +54,6 @@ using namespace DB;

namespace test_utils
{
ActionsDAGPtr parseFilter(const std::string & filter, const AnotherRowType & name_and_types)
{
using namespace DB;

std::unordered_map<std::string, ColumnWithTypeAndName> node_name_to_input_column;
std::ranges::transform(
name_and_types,
std::inserter(node_name_to_input_column, node_name_to_input_column.end()),
[](const auto & name_and_type) { return std::make_pair(name_and_type.name, toBlockFieldType(name_and_type)); });

NamesAndTypesList aggregation_keys;
ColumnNumbersList aggregation_keys_indexes_list;
const AggregationKeysInfo info(aggregation_keys, aggregation_keys_indexes_list, GroupByKind::NONE);
constexpr SizeLimits size_limits_for_set;
ParserExpression parser2;
const ASTPtr ast_exp = parseQuery(parser2, filter.data(), filter.data() + filter.size(), "", 0, 0);
const auto prepared_sets = std::make_shared<PreparedSets>();
ActionsMatcher::Data visitor_data(
local_engine::SerializedPlanParser::global_context,
size_limits_for_set,
static_cast<size_t>(0),
name_and_types,
std::make_shared<ActionsDAG>(name_and_types),
prepared_sets /* prepared_sets */,
false /* no_subqueries */,
false /* no_makeset */,
false /* only_consts */,
info);
ActionsVisitor(visitor_data).visit(ast_exp);
return ActionsDAG::buildFilterActionsDAG({visitor_data.getActions()->getOutputs().back()}, node_name_to_input_column);
}

class PrimitiveNodeBuilder
{
parquet::Repetition::type repetition_ = parquet::Repetition::UNDEFINED;
Expand Down Expand Up @@ -373,7 +340,8 @@ void testCondition(const std::string & exp, const std::vector<size_t> & expected
{
static const AnotherRowType name_and_types = buildTestRowType();
static const local_engine::ColumnIndexStore column_index_store = buildTestColumnIndexStore();
const local_engine::ColumnIndexFilter filter(parseFilter(exp, name_and_types), local_engine::SerializedPlanParser::global_context);
const local_engine::ColumnIndexFilter filter(
local_engine::test::parseFilter(exp, name_and_types), local_engine::SerializedPlanParser::global_context);
assertRows(filter.calculateRowRanges(column_index_store, TOTALSIZE), expectedRows);
}

Expand Down Expand Up @@ -491,7 +459,8 @@ TEST(ColumnIndex, FilteringWithNotFoundColumnName)
// COLUMN5 is not found in the column_index_store,
const AnotherRowType upper_name_and_types{{"COLUMN5", BIGINT()}};
const local_engine::ColumnIndexFilter filter_upper(
parseFilter("COLUMN5 in (7, 20)", upper_name_and_types), local_engine::SerializedPlanParser::global_context);
local_engine::test::parseFilter("COLUMN5 in (7, 20)", upper_name_and_types),
local_engine::SerializedPlanParser::global_context);
assertRows(
filter_upper.calculateRowRanges(column_index_store, TOTALSIZE),
std::vector(boost::counting_iterator<size_t>(0), boost::counting_iterator<size_t>(TOTALSIZE)));
Expand All @@ -500,7 +469,8 @@ TEST(ColumnIndex, FilteringWithNotFoundColumnName)
{
const AnotherRowType lower_name_and_types{{"column5", BIGINT()}};
const local_engine::ColumnIndexFilter filter_lower(
parseFilter("column5 in (7, 20)", lower_name_and_types), local_engine::SerializedPlanParser::global_context);
local_engine::test::parseFilter("column5 in (7, 20)", lower_name_and_types),
local_engine::SerializedPlanParser::global_context);
assertRows(filter_lower.calculateRowRanges(column_index_store, TOTALSIZE), {});
}
}
Expand Down Expand Up @@ -1004,7 +974,7 @@ TEST(ColumnIndex, VectorizedParquetRecordReader)
auto arrow_file = local_engine::test::asArrowFileForParquet(in, format_settings);

static const AnotherRowType name_and_types{{"11", BIGINT()}};
const auto filterAction = parseFilter("`11` = 10 or `11` = 50", name_and_types);
const auto filterAction = local_engine::test::parseFilter("`11` = 10 or `11` = 50", name_and_types);
auto column_index_filter
= std::make_shared<local_engine::ColumnIndexFilter>(filterAction, local_engine::SerializedPlanParser::global_context);

Expand Down

0 comments on commit 43ffaeb

Please sign in to comment.