Skip to content

Commit

Permalink
Support json array (#8329)
Browse files Browse the repository at this point in the history
close #8308
  • Loading branch information
SeaRise authored Nov 8, 2023
1 parent 4ee89b1 commit 1bdd6ad
Show file tree
Hide file tree
Showing 8 changed files with 402 additions and 63 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ const std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
//{tipb::ScalarFuncSig::JsonRemoveSig, "cast"},
//{tipb::ScalarFuncSig::JsonMergeSig, "cast"},
//{tipb::ScalarFuncSig::JsonObjectSig, "cast"},
//{tipb::ScalarFuncSig::JsonArraySig, "cast"},
{tipb::ScalarFuncSig::JsonArraySig, "json_array"},
//{tipb::ScalarFuncSig::JsonValidJsonSig, "cast"},
//{tipb::ScalarFuncSig::JsonContainsSig, "cast"},
//{tipb::ScalarFuncSig::JsonArrayAppendSig, "cast"},
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Functions/FunctionsJson.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ void registerFunctionsJson(FunctionFactory & factory)
factory.registerFunction<FunctionsJsonExtract>();
factory.registerFunction<FunctionsJsonUnquote>();
factory.registerFunction<FunctionsCastJsonAsString>();
factory.registerFunction<FunctionJsonLength>();
factory.registerFunction<FunctionsJsonArray>();
}
} // namespace DB
149 changes: 148 additions & 1 deletion dbms/src/Functions/FunctionsJson.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/GatherUtils/Sources.h>
#include <Functions/IFunction.h>
#include <TiDB/Decode/JsonBinary.h>
#include <TiDB/Decode/JsonPathExprRef.h>

#include <ext/range.h>

namespace DB
{
/** Json related functions:
Expand All @@ -34,6 +37,8 @@ namespace DB
* Throw exception if any path_string failed to parse.
* json_unquote(json_string)
* cast_json_as_string(json_object)
* json_length(json_object)
* json_array(json_object...)
*
*/

Expand All @@ -47,6 +52,8 @@ inline bool isNullJsonBinary(size_t size)
return size == 0;
}

using namespace GatherUtils;

class FunctionsJsonExtract : public IFunction
{
public:
Expand Down Expand Up @@ -324,6 +331,7 @@ class FunctionsJsonUnquote : public IFunction
}
};


class FunctionsCastJsonAsString : public IFunction
{
public:
Expand Down Expand Up @@ -392,4 +400,143 @@ class FunctionsCastJsonAsString : public IFunction
}
};

} // namespace DB

class FunctionJsonLength : public IFunction
{
public:
static constexpr auto name = "jsonLength";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionJsonLength>(); }

String getName() const override { return name; }

size_t getNumberOfArguments() const override { return 1; }

DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!arguments[0]->isString())
throw Exception(
fmt::format("Illegal type {} of argument of function {}", arguments[0]->getName(), getName()),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);

return std::make_shared<DataTypeUInt64>();
}

bool useDefaultImplementationForConstants() const override { return true; }

void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) const override
{
const ColumnPtr column = block.getByPosition(arguments[0]).column;
if (const auto * col = checkAndGetColumn<ColumnString>(column.get()))
{
auto col_res = ColumnUInt64::create();
typename ColumnUInt64::Container & vec_col_res = col_res->getData();
{
const auto & data = col->getChars();
const auto & offsets = col->getOffsets();
const size_t size = offsets.size();
vec_col_res.resize(size);

ColumnString::Offset prev_offset = 0;
for (size_t i = 0; i < size; ++i)
{
std::string_view sv(
reinterpret_cast<const char *>(&data[prev_offset]),
offsets[i] - prev_offset - 1);
vec_col_res[i] = JsonBinary::getJsonLength(sv);
prev_offset = offsets[i];
}
}
block.getByPosition(result).column = std::move(col_res);
}
else
throw Exception(
fmt::format("Illegal column {} of argument of function {}", column->getName(), getName()),
ErrorCodes::ILLEGAL_COLUMN);
}
};


class FunctionsJsonArray : public IFunction
{
public:
static constexpr auto name = "json_array";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionsJsonArray>(); }

String getName() const override { return name; }

size_t getNumberOfArguments() const override { return 0; }

bool isVariadic() const override { return true; }

bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
for (const auto arg_idx : ext::range(0, arguments.size()))
{
if (!arguments[arg_idx]->onlyNull())
{
const auto * arg = removeNullable(arguments[arg_idx]).get();
if (!arg->isStringOrFixedString())
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}",
arg->getName(),
arg_idx + 1,
getName());
}
}
return std::make_shared<DataTypeString>();
}

void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) const override
{
auto nested_block = createBlockWithNestedColumns(block, arguments, result);
StringSources sources;
for (auto column_number : arguments)
{
sources.push_back(
block.getByPosition(column_number).type->onlyNull()
? nullptr
: createDynamicStringSource(*nested_block.getByPosition(column_number).column));
}

auto rows = block.rows();
auto col_to = ColumnString::create();
auto & data_to = col_to->getChars();
WriteBufferFromVector<ColumnString::Chars_t> write_buffer(data_to);
auto & offsets_to = col_to->getOffsets();
offsets_to.resize(rows);

std::vector<JsonBinary> jsons;
jsons.reserve(sources.size());
for (size_t i = 0; i < rows; ++i)
{
for (size_t col = 0; col < sources.size(); ++col)
{
if (sources[col] && !block.getByPosition(arguments[col]).column->isNullAt(i))
{
const auto & data_from = sources[col]->getWhole();
jsons.emplace_back(data_from.data[0], StringRef(&data_from.data[1], data_from.size - 1));
}
else
{
jsons.emplace_back(JsonBinary::TYPE_CODE_LITERAL, StringRef(&JsonBinary::LITERAL_NIL, 1));
}
}
JsonBinary::buildBinaryJsonArrayInBuffer(jsons, write_buffer);
jsons.clear();
writeChar(0, write_buffer);
offsets_to[i] = write_buffer.count();
for (const auto & source : sources)
{
if (source)
source->next();
}
}
data_to.resize(write_buffer.count());

block.getByPosition(result).column = std::move(col_to);
}
};
} // namespace DB
57 changes: 0 additions & 57 deletions dbms/src/Functions/FunctionsString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1102,62 +1102,6 @@ class FunctionReverse : public IFunction
}
};

extern UInt64 GetJsonLength(const std::string_view & sv);

class FunctionJsonLength : public IFunction
{
public:
static constexpr auto name = "jsonLength";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionJsonLength>(); }

String getName() const override { return name; }

size_t getNumberOfArguments() const override { return 1; }

DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!arguments[0]->isString())
throw Exception(
fmt::format("Illegal type {} of argument of function {}", arguments[0]->getName(), getName()),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);

return std::make_shared<DataTypeUInt64>();
}

bool useDefaultImplementationForConstants() const override { return true; }

void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) const override
{
const ColumnPtr column = block.getByPosition(arguments[0]).column;
if (const auto * col = checkAndGetColumn<ColumnString>(column.get()))
{
auto col_res = ColumnUInt64::create();
typename ColumnUInt64::Container & vec_col_res = col_res->getData();
{
const auto & data = col->getChars();
const auto & offsets = col->getOffsets();
const size_t size = offsets.size();
vec_col_res.resize(size);

ColumnString::Offset prev_offset = 0;
for (size_t i = 0; i < size; ++i)
{
std::string_view sv(
reinterpret_cast<const char *>(&data[prev_offset]),
offsets[i] - prev_offset - 1);
vec_col_res[i] = GetJsonLength(sv);
prev_offset = offsets[i];
}
}
block.getByPosition(result).column = std::move(col_res);
}
else
throw Exception(
fmt::format("Illegal column {} of argument of function {}", column->getName(), getName()),
ErrorCodes::ILLEGAL_COLUMN);
}
};

template <typename Name, bool is_injective>
class ConcatImpl : public IFunction
{
Expand Down Expand Up @@ -6563,7 +6507,6 @@ void registerFunctionsString(FunctionFactory & factory)
factory.registerFunction<FunctionSubstring>();
factory.registerFunction<FunctionSubstringUTF8>();
factory.registerFunction<FunctionAppendTrailingCharIfAbsent>();
factory.registerFunction<FunctionJsonLength>();
factory.registerFunction<FunctionRightUTF8>();
factory.registerFunction<FunctionASCII>();
factory.registerFunction<FunctionPosition>();
Expand Down
Loading

0 comments on commit 1bdd6ad

Please sign in to comment.