Skip to content

Commit

Permalink
Merge pull request #3879 from yandex/setting-low_cardinality_allow_in…
Browse files Browse the repository at this point in the history
…_native_format

Added low_cardinality_allow_in_native_format setting.
  • Loading branch information
alexey-milovidov authored Dec 20, 2018
2 parents f85857d + 18b5305 commit d9e8cd4
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 36 deletions.
33 changes: 7 additions & 26 deletions dbms/programs/server/TCPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,19 +370,7 @@ void TCPHandler::processInsertQuery(const Settings & global_settings)
}

/// Send block to the client - table structure.
Block block = state.io.out->getHeader();

/// Support insert from old clients without low cardinality type.
if (client_revision && client_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE)
{
for (auto & col : block)
{
col.type = recursiveRemoveLowCardinality(col.type);
col.column = recursiveRemoveLowCardinality(col.column);
}
}

sendData(block);
sendData(state.io.out->getHeader());

readData(global_settings);
state.io.out->writeSuffix();
Expand All @@ -399,16 +387,6 @@ void TCPHandler::processOrdinaryQuery()
{
Block header = state.io.in->getHeader();

/// Send data to old clients without low cardinality type.
if (client_revision && client_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE)
{
for (auto & column : header)
{
column.column = recursiveRemoveLowCardinality(column.column);
column.type = recursiveRemoveLowCardinality(column.type);
}
}

if (header)
sendData(header);
}
Expand Down Expand Up @@ -782,7 +760,8 @@ void TCPHandler::initBlockInput()
state.block_in = std::make_shared<NativeBlockInputStream>(
*state.maybe_compressed_in,
header,
client_revision);
client_revision,
!connection_context.getSettingsRef().low_cardinality_allow_in_native_format);
}
}

Expand All @@ -803,7 +782,8 @@ void TCPHandler::initBlockOutput(const Block & block)
state.block_out = std::make_shared<NativeBlockOutputStream>(
*state.maybe_compressed_out,
client_revision,
block.cloneEmpty());
block.cloneEmpty(),
!connection_context.getSettingsRef().low_cardinality_allow_in_native_format);
}
}

Expand All @@ -815,7 +795,8 @@ void TCPHandler::initLogsBlockOutput(const Block & block)
state.logs_block_out = std::make_shared<NativeBlockOutputStream>(
*out,
client_revision,
block.cloneEmpty());
block.cloneEmpty(),
!connection_context.getSettingsRef().low_cardinality_allow_in_native_format);
}
}

Expand Down
7 changes: 4 additions & 3 deletions dbms/src/DataStreams/NativeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server
{
}

NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_)
: istr(istr_), header(header_), server_revision(server_revision_)
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_, bool convert_types_to_low_cardinality_)
: istr(istr_), header(header_), server_revision(server_revision_), convert_types_to_low_cardinality(convert_types_to_low_cardinality_)
{
}

Expand Down Expand Up @@ -154,7 +154,8 @@ Block NativeBlockInputStream::readImpl()
column.column = std::move(read_column);

/// Support insert from old clients without low cardinality type.
if (header && server_revision && server_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE)
bool revision_without_low_cardinality = server_revision && server_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE;
if (header && (convert_types_to_low_cardinality || revision_without_low_cardinality))
{
column.column = recursiveLowCardinalityConversion(column.column, column.type, header.getByPosition(i).type);
column.type = header.getByPosition(i).type;
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/DataStreams/NativeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class NativeBlockInputStream : public IProfilingBlockInputStream

/// For cases when data structure (header) is known in advance.
/// NOTE We may use header for data validation and/or type conversions. It is not implemented.
NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_);
NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_, bool convert_types_to_low_cardinality_ = false);

/// For cases when we have an index. It allows to skip columns. Only columns specified in the index will be read.
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
Expand All @@ -91,6 +91,8 @@ class NativeBlockInputStream : public IProfilingBlockInputStream
IndexForNativeFormat::Blocks::const_iterator index_block_end;
IndexOfBlockForNativeFormat::Columns::const_iterator index_column_it;

bool convert_types_to_low_cardinality = false;

/// If an index is specified, then `istr` must be CompressedReadBufferFromFile. Unused otherwise.
CompressedReadBufferFromFile * istr_concrete = nullptr;

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/DataStreams/NativeBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ namespace ErrorCodes


NativeBlockOutputStream::NativeBlockOutputStream(
WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_,
WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_, bool remove_low_cardinality_,
WriteBuffer * index_ostr_, size_t initial_size_of_file_)
: ostr(ostr_), client_revision(client_revision_), header(header_),
index_ostr(index_ostr_), initial_size_of_file(initial_size_of_file_)
index_ostr(index_ostr_), initial_size_of_file(initial_size_of_file_), remove_low_cardinality(remove_low_cardinality_)
{
if (index_ostr)
{
Expand Down Expand Up @@ -104,7 +104,7 @@ void NativeBlockOutputStream::write(const Block & block)
ColumnWithTypeAndName column = block.safeGetByPosition(i);

/// Send data to old clients without low cardinality type.
if (client_revision && client_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE)
if (remove_low_cardinality || (client_revision && client_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE))
{
column.column = recursiveRemoveLowCardinality(column.column);
column.type = recursiveRemoveLowCardinality(column.type);
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/DataStreams/NativeBlockOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class NativeBlockOutputStream : public IBlockOutputStream
/** If non-zero client_revision is specified, additional block information can be written.
*/
NativeBlockOutputStream(
WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_,
WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_, bool remove_low_cardinality_ = false,
WriteBuffer * index_ostr_ = nullptr, size_t initial_size_of_file_ = 0);

Block getHeader() const override { return header; }
Expand All @@ -42,6 +42,8 @@ class NativeBlockOutputStream : public IBlockOutputStream
size_t initial_size_of_file; /// The initial size of the data file, if `append` done. Used for the index.
/// If you need to write index, then `ostr` must be a CompressedWriteBuffer.
CompressedWriteBuffer * ostr_concrete = nullptr;

bool remove_low_cardinality;
};

}
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ struct Settings
M(SettingBool, parallel_view_processing, false, "Enables pushing to attached views concurrently instead of sequentially.") \
M(SettingBool, enable_debug_queries, false, "Enables debug queries such as AST.") \
M(SettingBool, enable_unaligned_array_join, false, "Allow ARRAY JOIN with multiple arrays that have different sizes. When this settings is enabled, arrays will be resized to the longest one.") \

M(SettingBool, low_cardinality_allow_in_native_format, true, "Use LowCardinality type in Native format. Otherwise, convert LowCardinality columns to ordinary for select query, and convert ordinary columns to required LowCardinality for insert query.") \

#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
TYPE NAME {DEFAULT};
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageStripeLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class StripeLogBlockOutputStream final : public IBlockOutputStream
data_out(data_out_compressed, CompressionSettings(CompressionMethod::LZ4), storage.max_compress_block_size),
index_out_compressed(storage.full_path() + "index.mrk", INDEX_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT),
index_out(index_out_compressed),
block_out(data_out, 0, storage.getSampleBlock(), &index_out, Poco::File(storage.full_path() + "data.bin").getSize())
block_out(data_out, 0, storage.getSampleBlock(), false, &index_out, Poco::File(storage.full_path() + "data.bin").getSize())
{
}

Expand Down

0 comments on commit d9e8cd4

Please sign in to comment.