Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SETTINGS clause for Kafka storage engine #2781

Merged
merged 1 commit into from
Aug 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ add_headers_and_sources(dbms src/Interpreters/ClusterProxy)
add_headers_and_sources(dbms src/Columns)
add_headers_and_sources(dbms src/Storages)
add_headers_and_sources(dbms src/Storages/Distributed)
add_headers_and_sources(dbms src/Storages/Kafka)
add_headers_and_sources(dbms src/Storages/MergeTree)
add_headers_and_sources(dbms src/Client)
add_headers_and_sources(dbms src/Formats)
Expand Down
44 changes: 44 additions & 0 deletions dbms/src/Storages/Kafka/KafkaSettings.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#include <Common/config.h>
#if USE_RDKAFKA

#include <Storages/Kafka/KafkaSettings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Common/Exception.h>


namespace DB
{

namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}

void KafkaSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
for (const ASTSetQuery::Change & setting : storage_def.settings->changes)
{
#define SET(TYPE, NAME, DEFAULT, DESCRIPTION) \
else if (setting.name == #NAME) NAME.set(setting.value);

if (false) {}
APPLY_FOR_KAFKA_SETTINGS(SET)
else
throw Exception(
"Unknown setting " + setting.name + " for storage " + storage_def.engine->name,
ErrorCodes::BAD_ARGUMENTS);
#undef SET
}
}
else
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
}

}
#endif
43 changes: 43 additions & 0 deletions dbms/src/Storages/Kafka/KafkaSettings.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once
#include <Common/config.h>
#if USE_RDKAFKA

#include <Poco/Util/AbstractConfiguration.h>
#include <Core/Defines.h>
#include <Core/Types.h>
#include <Interpreters/SettingsCommon.h>


namespace DB
{

class ASTStorage;

/** Settings for the Kafka engine.
* Could be loaded from a CREATE TABLE query (SETTINGS clause).
*/
struct KafkaSettings
{

#define APPLY_FOR_KAFKA_SETTINGS(M) \
M(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \
M(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \
M(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \
M(SettingString, kafka_format, "", "Message format for Kafka engine.") \
M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.")

#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
TYPE NAME {DEFAULT};

APPLY_FOR_KAFKA_SETTINGS(DECLARE)

#undef DECLARE

public:
void loadFromQuery(ASTStorage & storage_def);
};

}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageKafka.h> // Y_IGNORE
#include <Parsers/ASTCreateQuery.h>
#include <Storages/Kafka/KafkaSettings.h>
#include <Storages/Kafka/StorageKafka.h> // Y_IGNORE
#include <Storages/StorageFactory.h>
#include <IO/ReadBuffer.h>
#include <common/logger_useful.h>
Expand Down Expand Up @@ -566,93 +568,200 @@ void registerStorageKafka(StorageFactory & factory)
factory.registerStorage("Kafka", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
size_t args_count = engine_args.size();
bool has_settings = args.storage_def->settings;

KafkaSettings kafka_settings;
if (has_settings)
{
kafka_settings.loadFromQuery(*args.storage_def);
}

/** Arguments of engine is following:
* - Kafka broker list
* - List of topics
* - Group ID (may be a constaint expression with a string result)
* - Message format (string)
* - Row delimiter
* - Schema (optional, if the format supports it)
* - Number of consumers
*/

if (engine_args.size() < 3 || engine_args.size() > 7)
throw Exception(
"Storage Kafka requires 3-7 parameters"
" - Kafka broker list, list of topics to consume, consumer group ID, message format, row delimiter, schema, number of consumers",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
// Check arguments and settings
#define CHECK_KAFKA_STORAGE_ARGUMENT(ARG_NUM, PAR_NAME) \
/* One of the four required arguments is not specified */ \
if (args_count < ARG_NUM && ARG_NUM <= 4 && \
!kafka_settings.PAR_NAME.changed) \
{ \
throw Exception( \
"Required parameter '" #PAR_NAME "' " \
"for storage Kafka not specified", \
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); \
} \
/* The same argument is given in two places */ \
if (has_settings && \
kafka_settings.PAR_NAME.changed && \
args_count >= ARG_NUM) \
{ \
throw Exception( \
"The argument №" #ARG_NUM " of storage Kafka " \
"and the parameter '" #PAR_NAME "' " \
"in SETTINGS cannot be specified at the same time", \
ErrorCodes::BAD_ARGUMENTS); \
}

CHECK_KAFKA_STORAGE_ARGUMENT(1, kafka_broker_list)
CHECK_KAFKA_STORAGE_ARGUMENT(2, kafka_topic_list)
CHECK_KAFKA_STORAGE_ARGUMENT(3, kafka_group_name)
CHECK_KAFKA_STORAGE_ARGUMENT(4, kafka_format)
CHECK_KAFKA_STORAGE_ARGUMENT(5, kafka_row_delimiter)
CHECK_KAFKA_STORAGE_ARGUMENT(6, kafka_schema)
CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers)
#undef CHECK_KAFKA_STORAGE_ARGUMENT

// Get and check broker list
String brokers;
auto ast = typeid_cast<const ASTLiteral *>(engine_args[0].get());
if (ast && ast->value.getType() == Field::Types::String)
brokers = safeGet<String>(ast->value);
else
throw Exception(String("Kafka broker list must be a string"), ErrorCodes::BAD_ARGUMENTS);
if (args_count >= 1)
{
auto ast = typeid_cast<const ASTLiteral *>(engine_args[0].get());
if (ast && ast->value.getType() == Field::Types::String)
{
brokers = safeGet<String>(ast->value);
}
else
{
throw Exception(String("Kafka broker list must be a string"), ErrorCodes::BAD_ARGUMENTS);
}
}
else if (kafka_settings.kafka_broker_list.changed)
{
brokers = kafka_settings.kafka_broker_list.value;
}

// Get and check topic list
String topic_list;
if (args_count >= 2)
{
engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context);
topic_list = static_cast<const ASTLiteral &>(*engine_args[1]).value.safeGet<String>();
}
else if (kafka_settings.kafka_topic_list.changed)
{
topic_list = kafka_settings.kafka_topic_list.value;
}
Names topics;
boost::split(topics, topic_list , [](char c){ return c == ','; });
for (String & topic : topics)
{
boost::trim(topic);
}

// Get and check group name
String group;
if (args_count >= 3)
{
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
group = static_cast<const ASTLiteral &>(*engine_args[2]).value.safeGet<String>();
}
else if (kafka_settings.kafka_group_name.changed)
{
group = kafka_settings.kafka_group_name.value;
}

engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context);
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context);
// Get and check message format name
String format;
if (args_count >= 4)
{
engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context);

auto ast = typeid_cast<const ASTLiteral *>(engine_args[3].get());
if (ast && ast->value.getType() == Field::Types::String)
{
format = safeGet<String>(ast->value);
}
else
{
throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS);
}
}
else if (kafka_settings.kafka_format.changed)
{
format = kafka_settings.kafka_format.value;
}

// Parse row delimiter (optional)
char row_delimiter = '\0';
if (engine_args.size() >= 5)
if (args_count >= 5)
{
engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context);

auto ast = typeid_cast<const ASTLiteral *>(engine_args[4].get());
String arg;
if (ast && ast->value.getType() == Field::Types::String)
{
arg = safeGet<String>(ast->value);
}
else
{
throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
}
if (arg.size() > 1)
{
throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
}
else if (arg.size() == 0)
{
row_delimiter = '\0';
}
else
{
row_delimiter = arg[0];
}
}
else if (kafka_settings.kafka_row_delimiter.changed)
{
row_delimiter = kafka_settings.kafka_row_delimiter.value;
}

// Parse format schema if supported (optional)
String schema;
if (engine_args.size() >= 6)
if (args_count >= 6)
{
engine_args[5] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context);

auto ast = typeid_cast<const ASTLiteral *>(engine_args[5].get());
if (ast && ast->value.getType() == Field::Types::String)
{
schema = safeGet<String>(ast->value);
}
else
{
throw Exception("Format schema must be a string", ErrorCodes::BAD_ARGUMENTS);
}
}
else if (kafka_settings.kafka_schema.changed)
{
schema = kafka_settings.kafka_schema.value;
}

// Parse number of consumers (optional)
UInt64 num_consumers = 1;
if (engine_args.size() >= 7)
if (args_count >= 7)
{
auto ast = typeid_cast<const ASTLiteral *>(engine_args[6].get());
if (ast && ast->value.getType() == Field::Types::UInt64)
{
num_consumers = safeGet<UInt64>(ast->value);
}
else
{
throw Exception("Number of consumers must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
}
}
else if (kafka_settings.kafka_num_consumers.changed)
{
num_consumers = kafka_settings.kafka_num_consumers.value;
}

// Parse topic list
Names topics;
String topic_arg = static_cast<const ASTLiteral &>(*engine_args[1]).value.safeGet<String>();
boost::split(topics, topic_arg , [](char c){ return c == ','; });
for(String & topic : topics)
boost::trim(topic);

// Parse consumer group
String group = static_cast<const ASTLiteral &>(*engine_args[2]).value.safeGet<String>();

// Parse format from string
String format;
ast = typeid_cast<const ASTLiteral *>(engine_args[3].get());
if (ast && ast->value.getType() == Field::Types::String)
format = safeGet<String>(ast->value);
else
throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS);

return StorageKafka::create(
args.table_name, args.database_name, args.context, args.columns,
Expand Down
12 changes: 10 additions & 2 deletions dbms/src/Storages/StorageFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,19 @@ StoragePtr StorageFactory::get(

name = engine_def.name;

if ((storage_def->partition_by || storage_def->order_by || storage_def->sample_by || storage_def->settings)
if (storage_def->settings && !endsWith(name, "MergeTree") && name != "Kafka")
{
throw Exception(
"Engine " + name + " doesn't support SETTINGS clause. "
"Currently only the MergeTree family of engines and Kafka engine supports it",
ErrorCodes::BAD_ARGUMENTS);
}

if ((storage_def->partition_by || storage_def->order_by || storage_def->sample_by)
&& !endsWith(name, "MergeTree"))
{
throw Exception(
"Engine " + name + " doesn't support PARTITION BY, ORDER BY, SAMPLE BY or SETTINGS clauses. "
"Engine " + name + " doesn't support PARTITION BY, ORDER BY or SAMPLE BY clauses. "
"Currently only the MergeTree family of engines supports them", ErrorCodes::BAD_ARGUMENTS);
}

Expand Down
Loading