Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support insertion into LowCardinality from old native clients. #3769

Merged
merged 1 commit into from
Dec 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions dbms/programs/server/TCPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <Storages/StorageMemory.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Core/ExternalTable.h>
#include <DataTypes/DataTypeLowCardinality.h>

#include "TCPHandler.h"

Expand Down Expand Up @@ -361,6 +362,17 @@ void TCPHandler::processInsertQuery(const Settings & global_settings)

/// Send block to the client - table structure.
Block block = state.io.out->getHeader();

/// Support insert from old clients without low cardinality type.
if (client_revision && client_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE)
{
for (auto & col : block)
{
col.type = recursiveRemoveLowCardinality(col.type);
col.column = recursiveRemoveLowCardinality(col.column);
}
}

sendData(block);

readData(global_settings);
Expand Down Expand Up @@ -743,8 +755,13 @@ void TCPHandler::initBlockInput()
else
state.maybe_compressed_in = in;

Block header;
if (state.io.out)
header = state.io.out->getHeader();

state.block_in = std::make_shared<NativeBlockInputStream>(
*state.maybe_compressed_in,
header,
client_revision);
}
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Columns/ColumnLowCardinality.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void ColumnLowCardinality::insertRangeFrom(const IColumn & src, size_t start, si
auto * low_cardinality_src = typeid_cast<const ColumnLowCardinality *>(&src);

if (!low_cardinality_src)
throw Exception("Expected ColumnLowCardinality, got" + src.getName(), ErrorCodes::ILLEGAL_COLUMN);
throw Exception("Expected ColumnLowCardinality, got " + src.getName(), ErrorCodes::ILLEGAL_COLUMN);

if (&low_cardinality_src->getDictionary() == &getDictionary())
{
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Core/Defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
/// (keys will be placed in different buckets and result will not be fully aggregated).
#define DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 54408

#define DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE 54405

/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
#define DBMS_TCP_PROTOCOL_VERSION 54226

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/DataStreams/NativeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ext/range.h>

#include <DataStreams/NativeBlockInputStream.h>
#include <DataTypes/DataTypeLowCardinality.h>


namespace DB
Expand Down Expand Up @@ -152,6 +153,9 @@ Block NativeBlockInputStream::readImpl()

column.column = std::move(read_column);

if (server_revision && server_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE)
column.column = recursiveLowCardinalityConversion(column.column, column.type, header.getByPosition(i).type);

res.insert(std::move(column));

if (use_index)
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/DataTypes/DataTypeLowCardinality.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,13 @@ class DataTypeLowCardinality : public IDataType
/// Returns dictionary type if type is DataTypeLowCardinality, type otherwise.
DataTypePtr removeLowCardinality(const DataTypePtr & type);

/// Remove LowCardinality recursively from all nested types.
DataTypePtr recursiveRemoveLowCardinality(const DataTypePtr & type);

/// Remove LowCardinality recursively from all nested columns.
ColumnPtr recursiveRemoveLowCardinality(const ColumnPtr & column);

/// Convert column of type from_type to type to_type by converting nested LowCardinality columns.
ColumnPtr recursiveLowCardinalityConversion(const ColumnPtr & column, const DataTypePtr & from_type, const DataTypePtr & to_type);

}
137 changes: 137 additions & 0 deletions dbms/src/DataTypes/DataTypeLowCardinalityHelpers.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnLowCardinality.h>

#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>

namespace DB
{

namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int TYPE_MISMATCH;
}

DataTypePtr recursiveRemoveLowCardinality(const DataTypePtr & type)
{
if (!type)
return type;

if (const auto * array_type = typeid_cast<const DataTypeArray *>(type.get()))
return std::make_shared<DataTypeArray>(recursiveRemoveLowCardinality(array_type->getNestedType()));

if (const auto * tuple_type = typeid_cast<const DataTypeTuple *>(type.get()))
{
DataTypes elements = tuple_type->getElements();
for (auto & element : elements)
element = recursiveRemoveLowCardinality(element);

if (tuple_type->haveExplicitNames())
return std::make_shared<DataTypeTuple>(elements, tuple_type->getElementNames());
else
return std::make_shared<DataTypeTuple>(elements);
}

if (const auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(type.get()))
return low_cardinality_type->getDictionaryType();

return type;
}

ColumnPtr recursiveRemoveLowCardinality(const ColumnPtr & column)
{
if (!column)
return column;

if (const auto * column_array = typeid_cast<const ColumnArray *>(column.get()))
return ColumnArray::create(recursiveRemoveLowCardinality(column_array->getDataPtr()), column_array->getOffsetsPtr());

if (const auto * column_const = typeid_cast<const ColumnConst *>(column.get()))
return ColumnConst::create(recursiveRemoveLowCardinality(column_const->getDataColumnPtr()), column_const->size());

if (const auto * column_tuple = typeid_cast<const ColumnTuple *>(column.get()))
{
Columns columns = column_tuple->getColumns();
for (auto & element : columns)
element = recursiveRemoveLowCardinality(element);
return ColumnTuple::create(columns);
}

if (const auto * column_low_cardinality = typeid_cast<const ColumnLowCardinality *>(column.get()))
return column_low_cardinality->convertToFullColumn();

return column;
}

ColumnPtr recursiveLowCardinalityConversion(const ColumnPtr & column, const DataTypePtr & from_type, const DataTypePtr & to_type)
{
if (from_type->equals(*to_type))
return column;

if (const auto * column_const = typeid_cast<const ColumnConst *>(column.get()))
return ColumnConst::create(recursiveLowCardinalityConversion(column_const->getDataColumnPtr(), from_type, to_type),
column_const->size());

if (const auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(from_type.get()))
{
if (to_type->equals(*low_cardinality_type->getDictionaryType()))
return column->convertToFullColumnIfLowCardinality();
}

if (const auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(to_type.get()))
{
if (from_type->equals(*low_cardinality_type->getDictionaryType()))
{
auto col = low_cardinality_type->createColumn();
static_cast<ColumnLowCardinality &>(*col).insertRangeFromFullColumn(*column, 0, column->size());
return std::move(col);
}
}

if (const auto * from_array_type = typeid_cast<const DataTypeArray *>(from_type.get()))
{
if (const auto * to_array_type = typeid_cast<const DataTypeArray *>(to_type.get()))
{
const auto * column_array = typeid_cast<const ColumnArray *>(column.get());
if (!column_array)
throw Exception("Unexpected column " + column->getName() + " for type " + from_type->getName(),
ErrorCodes::ILLEGAL_COLUMN);

auto & nested_from = from_array_type->getNestedType();
auto & nested_to = to_array_type->getNestedType();

return ColumnArray::create(
recursiveLowCardinalityConversion(column_array->getDataPtr(), nested_from, nested_to),
column_array->getOffsetsPtr());
}
}

if (const auto * from_tuple_type = typeid_cast<const DataTypeTuple *>(from_type.get()))
{
if (const auto * to_tuple_type = typeid_cast<const DataTypeTuple *>(to_type.get()))
{
const auto * column_tuple = typeid_cast<const ColumnTuple *>(column.get());
if (!column_tuple)
throw Exception("Unexpected column " + column->getName() + " for type " + from_type->getName(),
ErrorCodes::ILLEGAL_COLUMN);

Columns columns = column_tuple->getColumns();
auto & from_elements = from_tuple_type->getElements();
auto & to_elements = to_tuple_type->getElements();
for (size_t i = 0; i < columns.size(); ++i)
{
auto & element = columns[i];
element = recursiveLowCardinalityConversion(element, from_elements.at(i), to_elements.at(i));
}
return ColumnTuple::create(columns);
}
}

throw Exception("Cannot convert: " + from_type->getName() + " to " + to_type->getName(), ErrorCodes::TYPE_MISMATCH);
}

}
52 changes: 0 additions & 52 deletions dbms/src/Functions/IFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,58 +103,6 @@ void PreparedFunctionImpl::createLowCardinalityResultCache(size_t cache_size)
}


static DataTypePtr recursiveRemoveLowCardinality(const DataTypePtr & type)
{
if (!type)
return type;

if (const auto * array_type = typeid_cast<const DataTypeArray *>(type.get()))
return std::make_shared<DataTypeArray>(recursiveRemoveLowCardinality(array_type->getNestedType()));

if (const auto * tuple_type = typeid_cast<const DataTypeTuple *>(type.get()))
{
DataTypes elements = tuple_type->getElements();
for (auto & element : elements)
element = recursiveRemoveLowCardinality(element);

if (tuple_type->haveExplicitNames())
return std::make_shared<DataTypeTuple>(elements, tuple_type->getElementNames());
else
return std::make_shared<DataTypeTuple>(elements);
}

if (const auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(type.get()))
return low_cardinality_type->getDictionaryType();

return type;
}

static ColumnPtr recursiveRemoveLowCardinality(const ColumnPtr & column)
{
if (!column)
return column;

if (const auto * column_array = typeid_cast<const ColumnArray *>(column.get()))
return ColumnArray::create(recursiveRemoveLowCardinality(column_array->getDataPtr()), column_array->getOffsetsPtr());

if (const auto * column_const = typeid_cast<const ColumnConst *>(column.get()))
return ColumnConst::create(recursiveRemoveLowCardinality(column_const->getDataColumnPtr()), column_const->size());

if (const auto * column_tuple = typeid_cast<const ColumnTuple *>(column.get()))
{
Columns columns = column_tuple->getColumns();
for (auto & element : columns)
element = recursiveRemoveLowCardinality(element);
return ColumnTuple::create(columns);
}

if (const auto * column_low_cardinality = typeid_cast<const ColumnLowCardinality *>(column.get()))
return column_low_cardinality->convertToFullColumn();

return column;
}


ColumnPtr wrapInNullable(const ColumnPtr & src, const Block & block, const ColumnNumbers & args, size_t result, size_t input_rows_count)
{
ColumnPtr result_null_map_column;
Expand Down