Skip to content

Commit

Permalink
Support extract duration (#6205)
Browse files Browse the repository at this point in the history
close #5636
  • Loading branch information
birdstorm authored Nov 11, 2022
1 parent f1ef741 commit fececd9
Show file tree
Hide file tree
Showing 11 changed files with 376 additions and 22 deletions.
6 changes: 3 additions & 3 deletions dbms/src/Common/MyDuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ Int32 MyDuration::hours() const

Int32 MyDuration::minutes() const
{
return std::abs(nanos) / NANOS_PER_MINUTE % 60;
return (std::abs(nanos) / NANOS_PER_MINUTE) % 60;
}

Int32 MyDuration::seconds() const
{
return std::abs(nanos) / NANOS_PER_SECOND % 60;
return (std::abs(nanos) / NANOS_PER_SECOND) % 60;
}

Int32 MyDuration::microSecond() const
{
return std::abs(nanos) / NANOS_PER_MICRO % 1000000;
return (std::abs(nanos) / NANOS_PER_MICRO) % 1000000;
}

String MyDuration::toString() const
Expand Down
15 changes: 13 additions & 2 deletions dbms/src/Common/MyDuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,22 @@ class MyDuration
static constexpr Int64 MAX_NANOS = MAX_HOUR_PART * NANOS_PER_HOUR + MAX_MINUTE_PART * NANOS_PER_MINUTE + MAX_SECOND_PART * NANOS_PER_SECOND + MAX_MICRO_PART * NANOS_PER_MICRO;
static_assert(MAX_NANOS > 0);

static const int8_t DefaultFsp = 6;

Int64 nanos;
UInt8 fsp;

public:
MyDuration() = default;
explicit MyDuration(Int64 nanos_)
: nanos(nanos_)
, fsp(DefaultFsp)
{
if (nanos_ > MAX_NANOS || nanos_ < -MAX_NANOS)
{
throw Exception(fmt::format("nanos must >= {} and <= {}", -MAX_NANOS, MAX_NANOS), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
}
MyDuration(Int64 nanos_, UInt8 fsp_)
: nanos(nanos_)
, fsp(fsp_)
Expand All @@ -61,13 +72,13 @@ class MyDuration
{
throw Exception(fmt::format("nanos must >= {} and <= {}", -MAX_NANOS, MAX_NANOS), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
if (fsp > 6)
if (fsp > 6 || fsp < 0)
throw Exception("fsp must >= 0 and <= 6", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
MyDuration(Int32 neg, Int32 hour, Int32 minute, Int32 second, Int32 microsecond, UInt8 fsp)
: MyDuration(neg * (hour * NANOS_PER_HOUR + minute * NANOS_PER_MINUTE + second * NANOS_PER_SECOND + microsecond * NANOS_PER_MICRO), fsp)
{
if (fsp > 6)
if (fsp > 6 || fsp < 0)
throw Exception("fsp must >= 0 and <= 6", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (minute > MAX_MINUTE_PART || minute < 0)
throw Exception("minute must >= 0 and <= 59", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ const std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
{tipb::ScalarFuncSig::FromUnixTime1Arg, "fromUnixTime"},
{tipb::ScalarFuncSig::FromUnixTime2Arg, "fromUnixTime"},
{tipb::ScalarFuncSig::ExtractDatetime, "extractMyDateTime"},
//{tipb::ScalarFuncSig::ExtractDuration, "cast"},
{tipb::ScalarFuncSig::ExtractDuration, "extractMyDuration"},

//{tipb::ScalarFuncSig::AddDateStringString, "cast"},
{tipb::ScalarFuncSig::AddDateStringInt, "date_add"},
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/tests/gtest_log_search.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ inline Int64 getTimezoneAndOffset(int tz_sign, int tz_hour, int tz_min)

inline void getTimezoneString(char * tzs, int tz_sign, int tz_hour, int tz_min)
{
sprintf(tzs, "%c%02d:%02d", tz_sign > 0 ? '+' : '-', tz_hour, tz_min);
snprintf(tzs, 10, "%c%02d:%02d", tz_sign > 0 ? '+' : '-', tz_hour, tz_min);
}

TEST_F(LogSearchTest, LogSearch)
Expand Down
11 changes: 0 additions & 11 deletions dbms/src/Functions/FunctionsDateTime.h
Original file line number Diff line number Diff line change
Expand Up @@ -2802,17 +2802,6 @@ class FunctionExtractMyDateTime : public IFunction
dispatch<ExtractMyDateTimeImpl::extractDayHour>(col_from, vec_to);
else if (unit == "year_month")
dispatch<ExtractMyDateTimeImpl::extractYearMonth>(col_from, vec_to);
/// TODO: support ExtractDuration
// else if (unit == "hour");
// else if (unit == "minute");
// else if (unit == "second");
// else if (unit == "microsecond");
// else if (unit == "second_microsecond");
// else if (unit == "minute_microsecond");
// else if (unit == "minute_second");
// else if (unit == "hour_microsecond");
// else if (unit == "hour_second");
// else if (unit == "hour_minute");
else
throw TiFlashException(fmt::format("Function {} does not support '{}' unit", getName(), unit), Errors::Coprocessor::BadRequest);

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Functions/FunctionsDuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,5 +215,6 @@ void registerFunctionsDuration(FunctionFactory & factory)
factory.registerFunction<FunctionDurationMicroSecond>();

factory.registerFunction<FunctionToTiDBTimeToSec>();
factory.registerFunction<FunctionExtractMyDuration>();
}
} // namespace DB
197 changes: 195 additions & 2 deletions dbms/src/Functions/FunctionsDuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

#pragma once

#include <Columns/ColumnString.h>
#include <Common/MyDuration.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeMyDuration.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
Expand Down Expand Up @@ -88,4 +88,197 @@ class FunctionMyDurationToSec : public IFunction
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) const override;
};

} // namespace DB
struct ExtractMyDurationImpl
{
static Int64 signMultiplier(const MyDuration & duration)
{
return duration.isNeg() ? -1 : 1;
}

static Int64 extractHour(Int64 nano)
{
MyDuration duration(nano);
return signMultiplier(duration) * duration.hours();
}

static Int64 extractMinute(Int64 nano)
{
MyDuration duration(nano);
return signMultiplier(duration) * duration.minutes();
}

static Int64 extractSecond(Int64 nano)
{
MyDuration duration(nano);
return signMultiplier(duration) * duration.seconds();
}

static Int64 extractMicrosecond(Int64 nano)
{
MyDuration duration(nano);
return signMultiplier(duration) * duration.microSecond();
}

static Int64 extractSecondMicrosecond(Int64 nano)
{
MyDuration duration(nano);
return signMultiplier(duration) * (duration.seconds() * 1000000LL + duration.microSecond());
}

static Int64 extractMinuteMicrosecond(Int64 nano)
{
MyDuration duration(nano);
return signMultiplier(duration) * ((duration.minutes() * 100LL + duration.seconds()) * 1000000LL + duration.microSecond());
}

static Int64 extractMinuteSecond(Int64 nano)
{
MyDuration duration(nano);
return signMultiplier(duration) * (duration.minutes() * 100LL + duration.seconds());
}

static Int64 extractHourMicrosecond(Int64 nano)
{
MyDuration duration(nano);
return signMultiplier(duration) * ((duration.hours() * 10000LL + duration.minutes() * 100LL + duration.seconds()) * 1000000LL + duration.microSecond());
}

static Int64 extractHourSecond(Int64 nano)
{
MyDuration duration(nano);
return signMultiplier(duration) * (duration.hours() * 10000LL + duration.minutes() * 100LL + duration.seconds());
}

static Int64 extractHourMinute(Int64 nano)
{
MyDuration duration(nano);
return signMultiplier(duration) * (duration.hours() * 100LL + duration.minutes());
}

static Int64 extractDayMicrosecond(Int64 nano)
{
MyDuration duration(nano);
return signMultiplier(duration) * ((duration.hours() * 10000LL + duration.minutes() * 100LL + duration.seconds()) * 1000000LL + duration.microSecond());
}

static Int64 extractDaySecond(Int64 nano)
{
MyDuration duration(nano);
return signMultiplier(duration) * (duration.hours() * 10000LL + duration.minutes() * 100LL + duration.seconds());
}

static Int64 extractDayMinute(Int64 nano)
{
MyDuration duration(nano);
return signMultiplier(duration) * (duration.hours() * 100LL + duration.minutes());
}

static Int64 extractDayHour(Int64 nano)
{
MyDuration duration(nano);
return signMultiplier(duration) * duration.hours();
}
};

class FunctionExtractMyDuration : public IFunction
{
public:
static constexpr auto name = "extractMyDuration";

static FunctionPtr create(const Context &) { return std::make_shared<FunctionExtractMyDuration>(); };

String getName() const override { return name; }

size_t getNumberOfArguments() const override { return 2; }

DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!arguments[0]->isString())
throw TiFlashException(fmt::format("First argument for function {} (unit) must be String", getName()), Errors::Coprocessor::BadRequest);

if (!arguments[1]->isMyTime())
throw TiFlashException(
fmt::format("Illegal type {} of second argument of function {}. Must be Duration.", arguments[1]->getName(), getName()),
Errors::Coprocessor::BadRequest);

return std::make_shared<DataTypeInt64>();
}

bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {0}; }

void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) const override
{
const auto * unit_column = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get());
if (!unit_column)
throw TiFlashException(
fmt::format("First argument for function {} must be constant String", getName()),
Errors::Coprocessor::BadRequest);

String unit = Poco::toLower(unit_column->getValue<String>());

auto col_from = block.getByPosition(arguments[1]).column;

size_t rows = block.rows();
auto col_to = ColumnInt64::create(rows);
auto & vec_to = col_to->getData();

if (unit == "hour")
dispatch<ExtractMyDurationImpl::extractHour>(col_from, vec_to);
else if (unit == "minute")
dispatch<ExtractMyDurationImpl::extractMinute>(col_from, vec_to);
else if (unit == "second")
dispatch<ExtractMyDurationImpl::extractSecond>(col_from, vec_to);
else if (unit == "microsecond")
dispatch<ExtractMyDurationImpl::extractMicrosecond>(col_from, vec_to);
else if (unit == "second_microsecond")
dispatch<ExtractMyDurationImpl::extractSecondMicrosecond>(col_from, vec_to);
else if (unit == "minute_microsecond")
dispatch<ExtractMyDurationImpl::extractMinuteMicrosecond>(col_from, vec_to);
else if (unit == "minute_second")
dispatch<ExtractMyDurationImpl::extractMinuteSecond>(col_from, vec_to);
else if (unit == "hour_microsecond")
dispatch<ExtractMyDurationImpl::extractHourMicrosecond>(col_from, vec_to);
else if (unit == "hour_second")
dispatch<ExtractMyDurationImpl::extractHourSecond>(col_from, vec_to);
else if (unit == "hour_minute")
dispatch<ExtractMyDurationImpl::extractHourMinute>(col_from, vec_to);
else if (unit == "day_microsecond")
dispatch<ExtractMyDurationImpl::extractDayMicrosecond>(col_from, vec_to);
else if (unit == "day_second")
dispatch<ExtractMyDurationImpl::extractDaySecond>(col_from, vec_to);
else if (unit == "day_minute")
dispatch<ExtractMyDurationImpl::extractDayMinute>(col_from, vec_to);
else if (unit == "day_hour")
dispatch<ExtractMyDurationImpl::extractDayHour>(col_from, vec_to);
else
throw TiFlashException(fmt::format("Function {} does not support '{}' unit", getName(), unit), Errors::Coprocessor::BadRequest);

block.getByPosition(result).column = std::move(col_to);
}

private:
using Func = Int64 (*)(Int64);

template <Func F>
static void dispatch(const ColumnPtr col_from, PaddedPODArray<Int64> & vec_to)
{
if (const auto * from = checkAndGetColumn<ColumnInt64>(col_from.get()); from)
{
const auto & data = from->getData();
vectorDuration<F>(data, vec_to);
}
}

template <Func F>
static void vectorDuration(const ColumnInt64::Container & vec_from, PaddedPODArray<Int64> & vec_to)
{
vec_to.resize(vec_from.size());
for (size_t i = 0; i < vec_from.size(); i++)
{
vec_to[i] = F(vec_from[i]);
}
}
};

} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/Functions/tests/gtest_datetime_extract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ try
size_t res_col_idx = 2;
func_builder_ptr->build({unit_ctn, datetime_ctn})->execute(block, arg_cols_idx, res_col_idx);
const IColumn * ctn_res = block.getByPosition(res_col_idx).column.get();
const ColumnInt64 * col_res = checkAndGetColumn<ColumnInt64>(ctn_res);
const auto * col_res = checkAndGetColumn<ColumnInt64>(ctn_res);

Field res_field;
col_res->get(0, res_field);
Expand Down Expand Up @@ -149,7 +149,7 @@ try
size_t res_col_idx = 2;
func_builder_ptr->build({unit_ctn, datetime_ctn})->execute(block, arg_cols_idx, res_col_idx);
const IColumn * ctn_res = block.getByPosition(res_col_idx).column.get();
const ColumnInt64 * col_res = checkAndGetColumn<ColumnInt64>(ctn_res);
const auto * col_res = checkAndGetColumn<ColumnInt64>(ctn_res);

Field res_field;
col_res->get(0, res_field);
Expand Down
Loading

0 comments on commit fececd9

Please sign in to comment.