Skip to content

Commit

Permalink
Push down filter on timestamp type column to storage layer (#1875)
Browse files Browse the repository at this point in the history
* try push down timestamp filter to storage
  • Loading branch information
lidezhu authored May 14, 2021
1 parent b054f81 commit af45f02
Show file tree
Hide file tree
Showing 12 changed files with 312 additions and 25 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Databases/test/gtest_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ DatabasePtr detachThenAttach(Context & ctx, const String & db_name, DatabasePtr
}

db = ctx.getDatabase(db_name);
return db;
return std::move(db);
}

TEST_F(DatabaseTiFlash_test, Tombstone)
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Debug/DBGInvoker.h>
#include <Debug/dbgFuncCoprocessor.h>
#include <Debug/dbgFuncFailPoint.h>
#include <Debug/dbgFuncMisc.h>
#include <Debug/dbgFuncMockRaftCommand.h>
#include <Debug/dbgFuncMockTiDBData.h>
#include <Debug/dbgFuncMockTiDBTable.h>
Expand Down Expand Up @@ -103,6 +104,8 @@ DBGInvoker::DBGInvoker()
regSchemalessFunc("mapped_database", dbgFuncMappedDatabase);
regSchemalessFunc("mapped_table", dbgFuncMappedTable);
regSchemafulFunc("query_mapped", dbgFuncQueryMapped);

regSchemalessFunc("search_log_for_key", dbgFuncSearchLogForKey);
}

void replaceSubstr(std::string & str, const std::string & target, const std::string & replacement)
Expand Down
41 changes: 41 additions & 0 deletions dbms/src/Debug/dbgFuncMisc.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#include <Common/typeid_cast.h>
#include <Debug/dbgFuncMisc.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTLiteral.h>

#include <fstream>
#include <regex>

namespace DB
{
void dbgFuncSearchLogForKey(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() < 1)
throw Exception("Args not matched, should be: key", ErrorCodes::BAD_ARGUMENTS);

String key = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value);
auto log_path = context.getConfigRef().getString("logger.log");

std::ifstream file(log_path);
std::vector<String> line_candidates;
String line;
while (std::getline(file, line))
{
if ((line.find(key) != String::npos) && (line.find("DBGInvoke") == String::npos))
line_candidates.emplace_back(line);
}
if (line_candidates.empty())
{
output("Invalid");
return;
}
auto & target_line = line_candidates.back();
auto sub_line = target_line.substr(target_line.find(key));
std::regex rx(R"([+-]?([0-9]+([.][0-9]*)?|[.][0-9]+))");
std::smatch m;
if (regex_search(sub_line, m, rx))
output(m[1]);
else
output("Invalid");
}
} // namespace DB
15 changes: 15 additions & 0 deletions dbms/src/Debug/dbgFuncMisc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once

#include <Debug/DBGInvoker.h>

namespace DB
{

class Context;

// Find the last occurence of `key` in log file and extract the first number follow the key.
// Usage:
// ./storage-client.sh "DBGInvoke search_log_for_key(key)"
void dbgFuncSearchLogForKey(Context & context, const ASTs & args, DBGInvoker::Printer output);

} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline &
SelectQueryInfo query_info;
/// to avoid null point exception
query_info.query = dummy_query;
query_info.dag_query = std::make_unique<DAGQueryInfo>(conditions, analyzer->getPreparedSets(), analyzer->getCurrentInputColumns());
query_info.dag_query = std::make_unique<DAGQueryInfo>(
conditions, analyzer->getPreparedSets(), analyzer->getCurrentInputColumns(), context.getTimezoneInfo());
query_info.mvcc_query_info = std::move(mvcc_query_info);

FAIL_POINT_PAUSE(FailPoints::pause_after_learner_read);
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ namespace DB
// be used to extracted key conditions by storage engine
struct DAGQueryInfo
{
DAGQueryInfo(const std::vector<const tipb::Expr *> & filters_, DAGPreparedSets dag_sets_, const std::vector<NameAndTypePair> & source_columns_)
: filters(filters_), dag_sets(std::move(dag_sets_)), source_columns(source_columns_){};
DAGQueryInfo(const std::vector<const tipb::Expr *> & filters_, DAGPreparedSets dag_sets_,
const std::vector<NameAndTypePair> & source_columns_, const TimezoneInfo & timezone_info_)
: filters(filters_), dag_sets(std::move(dag_sets_)), source_columns(source_columns_), timezone_info(timezone_info_){};
// filters in dag request
const std::vector<const tipb::Expr *> & filters;
// Prepared sets extracted from dag request, which are used for indices
// by storage engine.
DAGPreparedSets dag_sets;
const std::vector<NameAndTypePair> & source_columns;

const TimezoneInfo & timezone_info;
};
} // namespace DB
54 changes: 41 additions & 13 deletions dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_dag.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ inline bool isRoughSetFilterSupportType(const Int32 field_type)
case TiDB::TypeDate:
case TiDB::TypeTime:
case TiDB::TypeDatetime:
return true;
// For timestamp, should take time_zone into consideration. Disable it.
case TiDB::TypeTimestamp:
return false;
return true;
// For these types, should take collation into consideration. Disable them.
case TiDB::TypeVarchar:
case TiDB::TypeJSON:
Expand Down Expand Up @@ -89,6 +87,7 @@ inline RSOperatorPtr parseTiCompareExpr( //
const FilterParser::RSFilterType filter_type,
const ColumnDefines & columns_to_read,
const FilterParser::AttrCreatorByColumnID & creator,
const TimezoneInfo & timezone_info,
Poco::Logger * /* log */)
{
if (unlikely(expr.children_size() != 2))
Expand All @@ -101,10 +100,16 @@ inline RSOperatorPtr parseTiCompareExpr( //

Attr attr;
Field value;
UInt32 state = 0x0;
constexpr UInt32 state_has_column = 0x1;
constexpr UInt32 state_has_literal = 0x2;
constexpr UInt32 state_finish = state_has_column | state_has_literal;
UInt32 state = 0x0;
constexpr UInt32 state_has_column = 0x1;
constexpr UInt32 state_has_literal = 0x2;
constexpr UInt32 state_finish = state_has_column | state_has_literal;
bool is_timestamp_column = false;
for (const auto & child : expr.children())
{
if (isColumnExpr(child))
is_timestamp_column = (child.field_type().tp() == TiDB::TypeTimestamp);
}
for (const auto & child : expr.children())
{
if (isColumnExpr(child))
Expand All @@ -125,6 +130,27 @@ inline RSOperatorPtr parseTiCompareExpr( //
{
state |= state_has_literal;
value = decodeLiteral(child);
if (is_timestamp_column)
{
auto literal_type = child.field_type().tp();
if (unlikely(literal_type != TiDB::TypeTimestamp && literal_type != TiDB::TypeDatetime))
return createUnsupported(expr.ShortDebugString(),
"Compare timestamp column with literal type(" + DB::toString(literal_type)
+ ") is not supported",
false);
// convert literal value from timezone specified in cop request to UTC
if (literal_type == TiDB::TypeDatetime && !timezone_info.is_utc_timezone)
{
static const auto & time_zone_utc = DateLUT::instance("UTC");
UInt64 from_time = value.get<UInt64>();
UInt64 result_time = from_time;
if (timezone_info.is_name_based)
convertTimeZone(from_time, result_time, *timezone_info.timezone, time_zone_utc);
else if (timezone_info.timezone_offset != 0)
convertTimeZoneByOffset(from_time, result_time, -timezone_info.timezone_offset, time_zone_utc);
value = Field(result_time);
}
}
}
}

Expand Down Expand Up @@ -167,6 +193,7 @@ inline RSOperatorPtr parseTiCompareExpr( //
RSOperatorPtr parseTiExpr(const tipb::Expr & expr,
const ColumnDefines & columns_to_read,
const FilterParser::AttrCreatorByColumnID & creator,
const TimezoneInfo & timezone_info,
Poco::Logger * log)
{
assert(isFunctionExpr(expr));
Expand All @@ -192,7 +219,7 @@ RSOperatorPtr parseTiExpr(const tipb::Expr & expr,
{
const auto & child = expr.children(0);
if (likely(isFunctionExpr(child)))
op = createNot(parseTiExpr(child, columns_to_read, creator, log));
op = createNot(parseTiExpr(child, columns_to_read, creator, timezone_info, log));
else
op = createUnsupported(child.ShortDebugString(), "child of logical not is not function", false);
}
Expand All @@ -206,7 +233,7 @@ RSOperatorPtr parseTiExpr(const tipb::Expr & expr,
{
const auto & child = expr.children(i);
if (likely(isFunctionExpr(child)))
children.emplace_back(parseTiExpr(child, columns_to_read, creator, log));
children.emplace_back(parseTiExpr(child, columns_to_read, creator, timezone_info, log));
else
children.emplace_back(createUnsupported(child.ShortDebugString(), "child of logical operator is not function", false));
}
Expand All @@ -223,7 +250,7 @@ RSOperatorPtr parseTiExpr(const tipb::Expr & expr,
case FilterParser::RSFilterType::GreaterEqual:
case FilterParser::RSFilterType::Less:
case FilterParser::RSFilterType::LessEuqal:
op = parseTiCompareExpr(expr, filter_type, columns_to_read, creator, log);
op = parseTiCompareExpr(expr, filter_type, columns_to_read, creator, timezone_info, log);
break;

case FilterParser::RSFilterType::In:
Expand All @@ -246,10 +273,11 @@ RSOperatorPtr parseTiExpr(const tipb::Expr & expr,
inline RSOperatorPtr tryParse(const tipb::Expr & filter,
const ColumnDefines & columns_to_read,
const FilterParser::AttrCreatorByColumnID & creator,
const TimezoneInfo & timezone_info,
Poco::Logger * log)
{
if (isFunctionExpr(filter))
return cop::parseTiExpr(filter, columns_to_read, creator, log);
return cop::parseTiExpr(filter, columns_to_read, creator, timezone_info, log);
else
return createUnsupported(filter.ShortDebugString(), "child of logical and is not function", false);
}
Expand All @@ -268,7 +296,7 @@ RSOperatorPtr FilterParser::parseDAGQuery(const DAGQueryInfo &

if (dag_info.filters.size() == 1)
{
op = cop::tryParse(*dag_info.filters[0], columns_to_read, creator, log);
op = cop::tryParse(*dag_info.filters[0], columns_to_read, creator, dag_info.timezone_info, log);
}
else
{
Expand All @@ -277,7 +305,7 @@ RSOperatorPtr FilterParser::parseDAGQuery(const DAGQueryInfo &
for (size_t i = 0; i < dag_info.filters.size(); ++i)
{
const auto & filter = *dag_info.filters[i];
children.emplace_back(cop::tryParse(filter, columns_to_read, creator, log));
children.emplace_back(cop::tryParse(filter, columns_to_read, creator, dag_info.timezone_info, log));
}
op = createAnd(children);
}
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,19 @@ try
ASSERT_EQ(true, checkMatch(__FUNCTION__, *context, "Date", "2020-09-27", createLessEqual(attr("Date"), Field((String) "2020-09-27"), 0)));
ASSERT_EQ(false, checkMatch(__FUNCTION__, *context, "Date", "2020-09-27", createLessEqual(attr("Date"), Field((String) "2020-09-26"), 0)));

ASSERT_EQ(true, checkMatch(__FUNCTION__, *context, "MyDateTime", "2020-09-27", createEqual(attr("MyDateTime"), parseMyDateTime("2020-09-27"))));
ASSERT_EQ(false, checkMatch(__FUNCTION__, *context, "MyDateTime", "2020-09-27", createEqual(attr("MyDateTime"), parseMyDateTime("2020-09-28"))));
ASSERT_EQ(true, checkMatch(__FUNCTION__, *context, "MyDateTime", "2020-09-27", createIn(attr("MyDateTime"), {parseMyDateTime("2020-09-27")})));
ASSERT_EQ(false, checkMatch(__FUNCTION__, *context, "MyDateTime", "2020-09-27", createIn(attr("MyDateTime"), {parseMyDateTime("2020-09-28")})));
ASSERT_EQ(true, checkMatch(__FUNCTION__, *context, "MyDateTime", "2020-09-27", createGreater(attr("MyDateTime"), parseMyDateTime("2020-09-26"), 0)));
ASSERT_EQ(false, checkMatch(__FUNCTION__, *context, "MyDateTime", "2020-09-27", createGreater(attr("MyDateTime"), parseMyDateTime("2020-09-27"), 0)));
ASSERT_EQ(true, checkMatch(__FUNCTION__, *context, "MyDateTime", "2020-09-27", createGreaterEqual(attr("MyDateTime"), parseMyDateTime("2020-09-27"), 0)));
ASSERT_EQ(false, checkMatch(__FUNCTION__, *context, "MyDateTime", "2020-09-27", createGreaterEqual(attr("MyDateTime"), parseMyDateTime("2020-09-28"), 0)));
ASSERT_EQ(true, checkMatch(__FUNCTION__, *context, "MyDateTime", "2020-09-27", createLess(attr("MyDateTime"), parseMyDateTime("2020-09-28"), 0)));
ASSERT_EQ(false, checkMatch(__FUNCTION__, *context, "MyDateTime", "2020-09-27", createLess(attr("MyDateTime"), parseMyDateTime("2020-09-27"), 0)));
ASSERT_EQ(true, checkMatch(__FUNCTION__, *context, "MyDateTime", "2020-09-27", createLessEqual(attr("MyDateTime"), parseMyDateTime("2020-09-27"), 0)));
ASSERT_EQ(false, checkMatch(__FUNCTION__, *context, "MyDateTime", "2020-09-27", createLessEqual(attr("MyDateTime"), parseMyDateTime("2020-09-26"), 0)));

/// Currently we don't do filtering for null values. i.e. if a pack contains any null values, then the pack will pass the filter.
ASSERT_EQ(true, checkMatch(__FUNCTION__, *context, "Nullable(Int64)", {{"0", "0", "0", "100"}, {"1", "1", "0", "\\N"}}, createEqual(attr("Nullable(Int64)"), Field((Int64)101))));
ASSERT_EQ(true, checkMatch(__FUNCTION__, *context, "Nullable(Int64)", {{"0", "0", "0", "100"}, {"1", "1", "0", "\\N"}}, createIn(attr("Nullable(Int64)"), {Field((Int64)101)})));
Expand Down
Loading

0 comments on commit af45f02

Please sign in to comment.