Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Json unquote function #8407

Merged
merged 29 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
aa7e702
Save work
yibin87 Nov 14, 2023
0423a18
Save work for json scanner
yibin87 Nov 15, 2023
cfd932b
Complete valid string check logic
yibin87 Nov 16, 2023
b523aee
Complete dev work
yibin87 Nov 20, 2023
757def7
Add json_unquote support
yibin87 Nov 22, 2023
b8f4649
Fix format issue
yibin87 Nov 23, 2023
bd85cb5
Little fix
yibin87 Nov 23, 2023
efbffc9
Fix to test failures
yibin87 Nov 23, 2023
890fef7
fix ut and a little refact
yibin87 Nov 24, 2023
ee1e017
Little refact
yibin87 Nov 24, 2023
1c52415
Update test case
yibin87 Nov 24, 2023
e81918e
fix test case
yibin87 Nov 24, 2023
e5f0d5e
update test case
yibin87 Nov 24, 2023
0068d15
Little refact
yibin87 Nov 24, 2023
fa71aa7
Update dbms/src/Functions/FunctionsJson.h
yibin87 Nov 27, 2023
e9f9977
Update dbms/src/Functions/FunctionsJson.h
yibin87 Nov 27, 2023
d1d6fd7
Merge latest code from master
yibin87 Nov 28, 2023
1c010d5
Refact
yibin87 Nov 28, 2023
de525af
Little change
yibin87 Nov 28, 2023
e00030d
Little fix
yibin87 Nov 28, 2023
f276079
Formatting
yibin87 Nov 28, 2023
e5ec942
Fix gtest compilation error
yibin87 Nov 28, 2023
de65bef
fix code error
yibin87 Nov 28, 2023
402b16c
Little fix
yibin87 Nov 28, 2023
9b3e744
Fix format issue
yibin87 Nov 28, 2023
28a6233
Address comments
yibin87 Nov 28, 2023
b7752df
Address comments to throw exception when invalid utf8 code encountered
yibin87 Nov 28, 2023
d8cbc35
Merge branch 'master' into json_unquote
ti-chi-bot[bot] Nov 28, 2023
572fea9
Merge branch 'master' into json_unquote
ti-chi-bot[bot] Nov 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzerHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,27 +281,30 @@ String DAGExpressionAnalyzerHelper::buildCastFunction(
return buildCastFunctionInternal(analyzer, {name, type_expr_name}, false, expr.field_type(), actions);
}

String DAGExpressionAnalyzerHelper::buildCastAsJsonWithInputTiDBField(
String DAGExpressionAnalyzerHelper::buildSingleParamJsonRelatedFunctions(
DAGExpressionAnalyzer * analyzer,
const tipb::Expr & expr,
const ExpressionActionsPtr & actions)
{
auto func_name = getFunctionName(expr);
if unlikely (expr.children_size() != 1)
throw TiFlashException("Cast function only support one argument", Errors::Coprocessor::BadRequest);
throw TiFlashException(
fmt::format("{} function only support one argument", func_name),
Errors::Coprocessor::BadRequest);
if unlikely (!exprHasValidFieldType(expr))
throw TiFlashException("CAST function without valid field type", Errors::Coprocessor::BadRequest);
throw TiFlashException(
fmt::format("{} function without valid field type", func_name),
Errors::Coprocessor::BadRequest);

const auto & input_expr = expr.children(0);
auto func_name = getFunctionName(expr);

String arg = analyzer->getActions(input_expr, actions);
const auto & collator = getCollatorFromExpr(expr);
String result_name = genFuncString(func_name, {arg}, {collator});
if (actions->getSampleBlock().has(result_name))
return result_name;

const FunctionBuilderPtr & function_builder = FunctionFactory::instance().get(func_name, analyzer->getContext());
auto * function_build_ptr = function_builder.get();
const FunctionBuilderPtr & ifunction_builder = FunctionFactory::instance().get(func_name, analyzer->getContext());
auto * function_build_ptr = ifunction_builder.get();
if (auto * function_builder = dynamic_cast<DefaultFunctionBuilder *>(function_build_ptr); function_builder)
{
auto * function_impl = function_builder->getFunctionImpl().get();
Expand All @@ -321,17 +324,29 @@ String DAGExpressionAnalyzerHelper::buildCastAsJsonWithInputTiDBField(
{
function_cast_time_as_json->setInputTiDBFieldType(input_expr.field_type());
}
else if (auto * function_json_unquote = dynamic_cast<FunctionJsonUnquote *>(function_impl);
function_json_unquote)
{
bool valid_check
= !(isScalarFunctionExpr(input_expr) && input_expr.sig() == tipb::ScalarFuncSig::CastJsonAsString);
function_json_unquote->setNeedValidCheck(valid_check);
}
else if (auto * function_cast_json_as_string = dynamic_cast<FunctionCastJsonAsString *>(function_impl);
function_cast_json_as_string)
{
function_cast_json_as_string->setOutputTiDBFieldType(expr.field_type());
}
else
{
throw Exception(fmt::format("Unexpected func {} in buildCastAsJsonWithInputTiDBField", func_name));
throw Exception(fmt::format("Unexpected func {} in buildSingleParamJsonRelatedFunctions", func_name));
}
}
else
{
throw Exception(fmt::format("Unexpected func {} in buildCastAsJsonWithInputTiDBField", func_name));
throw Exception(fmt::format("Unexpected func {} in buildSingleParamJsonRelatedFunctions", func_name));
}

const ExpressionAction & action = ExpressionAction::applyFunction(function_builder, {arg}, result_name, collator);
const ExpressionAction & action = ExpressionAction::applyFunction(ifunction_builder, {arg}, result_name, collator);
actions->add(action);
return result_name;
}
Expand Down Expand Up @@ -534,9 +549,11 @@ DAGExpressionAnalyzerHelper::FunctionBuilderMap DAGExpressionAnalyzerHelper::fun
{"ifNull", DAGExpressionAnalyzerHelper::buildIfNullFunction},
{"multiIf", DAGExpressionAnalyzerHelper::buildMultiIfFunction},
{"tidb_cast", DAGExpressionAnalyzerHelper::buildCastFunction},
{"cast_int_as_json", DAGExpressionAnalyzerHelper::buildCastAsJsonWithInputTiDBField},
{"cast_string_as_json", DAGExpressionAnalyzerHelper::buildCastAsJsonWithInputTiDBField},
{"cast_time_as_json", DAGExpressionAnalyzerHelper::buildCastAsJsonWithInputTiDBField},
{"cast_int_as_json", DAGExpressionAnalyzerHelper::buildSingleParamJsonRelatedFunctions},
{"cast_string_as_json", DAGExpressionAnalyzerHelper::buildSingleParamJsonRelatedFunctions},
{"cast_time_as_json", DAGExpressionAnalyzerHelper::buildSingleParamJsonRelatedFunctions},
{"cast_json_as_string", DAGExpressionAnalyzerHelper::buildSingleParamJsonRelatedFunctions},
{"json_unquote", DAGExpressionAnalyzerHelper::buildSingleParamJsonRelatedFunctions},
{"and", DAGExpressionAnalyzerHelper::buildLogicalFunction},
{"or", DAGExpressionAnalyzerHelper::buildLogicalFunction},
{"xor", DAGExpressionAnalyzerHelper::buildLogicalFunction},
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGExpressionAnalyzerHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class DAGExpressionAnalyzerHelper
const tipb::Expr & expr,
const ExpressionActionsPtr & actions);

static String buildCastAsJsonWithInputTiDBField(
static String buildSingleParamJsonRelatedFunctions(
DAGExpressionAnalyzer * analyzer,
const tipb::Expr & expr,
const ExpressionActionsPtr & actions);
Expand Down
135 changes: 109 additions & 26 deletions dbms/src/Functions/FunctionsJson.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
#include <DataTypes/DataTypesNumber.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/FunctionsTiDBConversion.h>
#include <Functions/GatherUtils/Sources.h>
#include <Functions/IFunction.h>
#include <Functions/castTypeToEither.h>
#include <Interpreters/Context.h>
#include <TiDB/Decode/JsonBinary.h>
#include <TiDB/Decode/JsonPathExprRef.h>
#include <TiDB/Decode/JsonScanner.h>
#include <TiDB/Schema/TiDB.h>
#include <common/JSON.h>
#include <simdjson.h>
Expand Down Expand Up @@ -301,6 +304,7 @@ class FunctionJsonUnquote : public IFunction

size_t getNumberOfArguments() const override { return 1; }

void setNeedValidCheck(bool need_valid_check_) { need_valid_check = need_valid_check_; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
Expand All @@ -327,16 +331,10 @@ class FunctionJsonUnquote : public IFunction
offsets_to.resize(rows);
ColumnUInt8::MutablePtr col_null_map = ColumnUInt8::create(rows, 0);
JsonBinary::JsonBinaryWriteBuffer write_buffer(data_to);
size_t current_offset = 0;
for (size_t i = 0; i < block.rows(); ++i)
{
size_t next_offset = offsets_from[i];
size_t data_length = next_offset - current_offset - 1;
JsonBinary::unquoteStringInBuffer(StringRef(&data_from[current_offset], data_length), write_buffer);
writeChar(0, write_buffer);
offsets_to[i] = write_buffer.count();
current_offset = next_offset;
}
if (need_valid_check)
doUnquote<true>(block, data_from, offsets_from, offsets_to, write_buffer);
else
doUnquote<false>(block, data_from, offsets_from, offsets_to, write_buffer);
data_to.resize(write_buffer.count());
block.getByPosition(result).column = ColumnNullable::create(std::move(col_to), std::move(col_null_map));
}
Expand All @@ -345,21 +343,69 @@ class FunctionJsonUnquote : public IFunction
fmt::format("Illegal column {} of argument of function {}", column->getName(), getName()),
ErrorCodes::ILLEGAL_COLUMN);
}

template <bool validCheck>
void doUnquote(
const Block & block,
const ColumnString::Chars_t & data_from,
const IColumn::Offsets & offsets_from,
IColumn::Offsets & offsets_to,
JsonBinary::JsonBinaryWriteBuffer & write_buffer) const
{
size_t current_offset = 0;
for (size_t i = 0; i < block.rows(); ++i)
{
size_t next_offset = offsets_from[i];
size_t data_length = next_offset - current_offset - 1;
if constexpr (validCheck)
{
// TODO(hyb): use SIMDJson to check when SIMDJson is proved in practice
if (data_length >= 2 && data_from[current_offset] == '"' && data_from[next_offset - 2] == '"'
&& unlikely(
!checkJsonValid(reinterpret_cast<const char *>(&data_from[current_offset]), data_length)))
{
throw Exception(
"Invalid JSON text: The document root must not be followed by other values.",
ErrorCodes::ILLEGAL_COLUMN);
}
}
JsonBinary::unquoteStringInBuffer(StringRef(&data_from[current_offset], data_length), write_buffer);
writeChar(0, write_buffer);
offsets_to[i] = write_buffer.count();
current_offset = next_offset;
}
}

private:
bool need_valid_check = false;
};


class FunctionCastJsonAsString : public IFunction
{
public:
static constexpr auto name = "cast_json_as_string";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionCastJsonAsString>(); }
static FunctionPtr create(const Context & context)
{
if (!context.getDAGContext())
{
throw Exception("DAGContext should not be nullptr.", ErrorCodes::LOGICAL_ERROR);
}
return std::make_shared<FunctionCastJsonAsString>(context);
}

explicit FunctionCastJsonAsString(const Context & context)
: context(context)
{}

String getName() const override { return name; }

size_t getNumberOfArguments() const override { return 1; }

bool useDefaultImplementationForConstants() const override { return true; }

void setOutputTiDBFieldType(const tipb::FieldType & tidb_tp_) { tidb_tp = &tidb_tp_; }

DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if unlikely (!arguments[0]->isString())
Expand All @@ -386,25 +432,59 @@ class FunctionCastJsonAsString : public IFunction
ColumnUInt8::MutablePtr col_null_map = ColumnUInt8::create(rows, 0);
ColumnUInt8::Container & vec_null_map = col_null_map->getData();
JsonBinary::JsonBinaryWriteBuffer write_buffer(data_to);
size_t current_offset = 0;
for (size_t i = 0; i < block.rows(); ++i)
if likely (tidb_tp->flen() < 0)
{
size_t next_offset = offsets_from[i];
size_t json_length = next_offset - current_offset - 1;
if unlikely (isNullJsonBinary(json_length))
size_t current_offset = 0;
for (size_t i = 0; i < block.rows(); ++i)
{
vec_null_map[i] = 1;
size_t next_offset = offsets_from[i];
size_t json_length = next_offset - current_offset - 1;
if unlikely (isNullJsonBinary(json_length))
vec_null_map[i] = 1;
else
{
JsonBinary json_binary(
data_from[current_offset],
StringRef(&data_from[current_offset + 1], json_length - 1));
json_binary.toStringInBuffer(write_buffer);
}
writeChar(0, write_buffer);
offsets_to[i] = write_buffer.count();
current_offset = next_offset;
}
else
}
else
{
ColumnString::Chars_t container_per_element;
size_t current_offset = 0;
for (size_t i = 0; i < block.rows(); ++i)
{
JsonBinary json_binary(
data_from[current_offset],
StringRef(&data_from[current_offset + 1], json_length - 1));
json_binary.toStringInBuffer(write_buffer);
size_t next_offset = offsets_from[i];
size_t json_length = next_offset - current_offset - 1;
if unlikely (isNullJsonBinary(json_length))
vec_null_map[i] = 1;
else
{
JsonBinary::JsonBinaryWriteBuffer element_write_buffer(container_per_element);
JsonBinary json_binary(
data_from[current_offset],
StringRef(&data_from[current_offset + 1], json_length - 1));
json_binary.toStringInBuffer(element_write_buffer);
size_t orig_length = element_write_buffer.count();
auto byte_length = charLengthToByteLengthFromUTF8(
reinterpret_cast<char *>(container_per_element.data()),
orig_length,
tidb_tp->flen());
byte_length = std::min(byte_length, orig_length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is not necessary since charLengthToByteLengthFromUTF8 should ensure that the return value is less than orig_length?

if (byte_length < element_write_buffer.count())
context.getDAGContext()->handleTruncateError("Data Too Long");
write_buffer.write(reinterpret_cast<char *>(container_per_element.data()), byte_length);
Copy link
Contributor

@SeaRise SeaRise Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

Suggested change
JsonBinary::JsonBinaryWriteBuffer element_write_buffer(container_per_element);
JsonBinary json_binary(
data_from[current_offset],
StringRef(&data_from[current_offset + 1], json_length - 1));
json_binary.toStringInBuffer(element_write_buffer);
size_t orig_length = element_write_buffer.count();
auto byte_length = charLengthToByteLengthFromUTF8(
reinterpret_cast<char *>(container_per_element.data()),
orig_length,
tidb_tp->flen());
byte_length = std::min(byte_length, orig_length);
if (byte_length < element_write_buffer.count())
context.getDAGContext()->handleTruncateError("Data Too Long");
write_buffer.write(reinterpret_cast<char *>(container_per_element.data()), byte_length);
auto start_pos = write_buffer.offset();
JsonBinary json_binary(
data_from[current_offset],
StringRef(&data_from[current_offset + 1], json_length - 1));
json_binary.toStringInBuffer(write_buffer);
auto end_pos = write_buffer.offset();
auto orig_length = end_pos - start_pos;
auto byte_length = charLengthToByteLengthFromUTF8(
reinterpret_cast<char *>(write_buffer.data() + start_offset),
orig_length,
tidb_tp->flen());
byte_length = std::min(byte_length, orig_length);
if (byte_length < orig_length)
{
context.getDAGContext()->handleTruncateError("Data Too Long");
write_buffer.setOffset(start_pos + byte_length);
}

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid one more memcpy.

Copy link
Contributor Author

@yibin87 yibin87 Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah,you're right. I just think this code path is not common used(because cast json as fixed length char is valid but strange), and even if it is used the performance won't drop significantly, thus choose to use the temporary buffer here to make code more easier.

}

writeChar(0, write_buffer);
offsets_to[i] = write_buffer.count();
current_offset = next_offset;
}
writeChar(0, write_buffer);
offsets_to[i] = write_buffer.count();
current_offset = next_offset;
}
data_to.resize(write_buffer.count());
block.getByPosition(result).column = ColumnNullable::create(std::move(col_to), std::move(col_null_map));
Expand All @@ -414,8 +494,11 @@ class FunctionCastJsonAsString : public IFunction
fmt::format("Illegal column {} of argument of function {}", column->getName(), getName()),
ErrorCodes::ILLEGAL_COLUMN);
}
};

private:
const tipb::FieldType * tidb_tp;
const Context & context;
};

class FunctionJsonLength : public IFunction
{
Expand Down
42 changes: 21 additions & 21 deletions dbms/src/Functions/FunctionsTiDBConversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,30 +77,30 @@ namespace
constexpr static Int64 pow10[] = {1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000};
}

ALWAYS_INLINE inline size_t charLengthToByteLengthFromUTF8(const char * data, size_t length, size_t char_length)
{
size_t ret = 0;
for (size_t char_index = 0; char_index < char_length && ret < length; char_index++)
{
uint8_t c = data[ret];
if (c < 0x80)
ret += 1;
else if (c < 0xE0)
ret += 2;
else if (c < 0xF0)
ret += 3;
else
ret += 4;
}
return ret;
}

/// cast int/real/decimal/time as string
template <typename FromDataType, bool return_nullable>
struct TiDBConvertToString
{
using FromFieldType = typename FromDataType::FieldType;

static size_t charLengthToByteLengthFromUTF8(const char * data, size_t length, size_t char_length)
{
size_t ret = 0;
for (size_t char_index = 0; char_index < char_length && ret < length; char_index++)
{
uint8_t c = data[ret];
if (c < 0x80)
ret += 1;
else if (c < 0xE0)
ret += 2;
else if (c < 0xF0)
ret += 3;
else
ret += 4;
}
return ret;
}

static void execute(
Block & block,
const ColumnNumbers & arguments,
Expand Down Expand Up @@ -148,7 +148,7 @@ struct TiDBConvertToString
size_t next_offset = (*offsets_from)[i];
size_t org_length = next_offset - current_offset - 1;
size_t byte_length = org_length;
if (tp.flen() > 0)
if (tp.flen() >= 0)
{
byte_length = tp.flen();
if (tp.charset() == "utf8" || tp.charset() == "utf8mb4")
Expand Down Expand Up @@ -189,7 +189,7 @@ struct TiDBConvertToString
WriteBufferFromVector<ColumnString::Chars_t> element_write_buffer(container_per_element);
FormatImpl<FromDataType>::execute(vec_from[i], element_write_buffer, &type, nullptr);
size_t byte_length = element_write_buffer.count();
if (tp.flen() > 0)
if (tp.flen() >= 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a bug fix here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is a existing bug.

byte_length = std::min(byte_length, tp.flen());
if (byte_length < element_write_buffer.count())
context.getDAGContext()->handleTruncateError("Data Too Long");
Expand Down Expand Up @@ -235,7 +235,7 @@ struct TiDBConvertToString
WriteBufferFromVector<ColumnString::Chars_t> element_write_buffer(container_per_element);
FormatImpl<FromDataType>::execute(vec_from[i], element_write_buffer, &type, nullptr);
size_t byte_length = element_write_buffer.count();
if (tp.flen() > 0)
if (tp.flen() >= 0)
byte_length = std::min(byte_length, tp.flen());
if (byte_length < element_write_buffer.count())
context.getDAGContext()->handleTruncateError("Data Too Long");
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Functions/tests/gtest_cast_as_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ class TestCastAsJson : public DB::tests::FunctionTest
json_column = executeFunction(func_name, columns);
}
// The `json_binary` should be cast as a string to improve readability.
return executeFunction("cast_json_as_string", {json_column});
tipb::FieldType field_type;
field_type.set_flen(-1);
field_type.set_collate(TiDB::ITiDBCollator::BINARY);
field_type.set_tp(TiDB::TypeString);
return executeCastJsonAsStringFunction(json_column, field_type);
}

template <typename Input, bool is_raw = false>
Expand Down
Loading