Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StorageKafka: check dependencies recursively, add max_block setting #3396

Merged
merged 2 commits into from
Oct 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbms/src/Storages/Kafka/KafkaSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ struct KafkaSettings
M(SettingString, kafka_format, "", "Message format for Kafka engine.") \
M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.")
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \
M(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.")

#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
TYPE NAME {DEFAULT};
Expand Down
88 changes: 65 additions & 23 deletions dbms/src/Storages/Kafka/StorageKafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <Parsers/ASTCreateQuery.h>
#include <Storages/Kafka/KafkaSettings.h>
#include <Storages/Kafka/StorageKafka.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/StorageFactory.h>
#include <IO/ReadBuffer.h>
#include <common/logger_useful.h>
Expand Down Expand Up @@ -268,7 +269,8 @@ StorageKafka::StorageKafka(
Context & context_,
const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, char row_delimiter_, const String & schema_name_, size_t num_consumers_)
const String & format_name_, char row_delimiter_, const String & schema_name_,
size_t num_consumers_, size_t max_block_size_)
: IStorage{columns_},
table_name(table_name_), database_name(database_name_), context(context_),
topics(context.getMacros()->expand(topics_)),
Expand All @@ -277,7 +279,7 @@ StorageKafka::StorageKafka(
format_name(context.getMacros()->expand(format_name_)),
row_delimiter(row_delimiter_),
schema_name(context.getMacros()->expand(schema_name_)),
num_consumers(num_consumers_), log(&Logger::get("StorageKafka (" + table_name_ + ")")),
num_consumers(num_consumers_), max_block_size(max_block_size_), log(&Logger::get("StorageKafka (" + table_name_ + ")")),
semaphore(0, num_consumers_), mutex(), consumers()
{
task = context.getSchedulePool().createTask(log->name(), [this]{ streamThread(); });
Expand All @@ -295,10 +297,10 @@ BlockInputStreams StorageKafka::read(
{
check(column_names);

if (num_consumers == 0)
if (num_created_consumers == 0)
return BlockInputStreams();

const size_t stream_count = std::min(num_streams, num_consumers);
const size_t stream_count = std::min(num_streams, num_created_consumers);

BlockInputStreams streams;
streams.reserve(stream_count);
Expand Down Expand Up @@ -434,26 +436,44 @@ void StorageKafka::pushConsumer(StorageKafka::ConsumerPtr c)
semaphore.set();
}

bool StorageKafka::checkDependencies(const String & database_name, const String & table_name)
{
// Check if all dependencies are attached
auto dependencies = context.getDependencies(database_name, table_name);
if (dependencies.size() == 0)
return true;

// Check the dependencies are ready?
for (const auto & db_tab : dependencies)
{
auto table = context.tryGetTable(db_tab.first, db_tab.second);
if (!table)
return false;

// If it materialized view, check it's target table
auto * materialized_view = dynamic_cast<StorageMaterializedView *>(table.get());
if (materialized_view && !materialized_view->tryGetTargetTable())
return false;

// Check all its dependencies
if (!checkDependencies(db_tab.first, db_tab.second))
return false;
}

return true;
}

void StorageKafka::streamThread()
{
try
{
// Check if at least one direct dependency is attached
auto dependencies = context.getDependencies(database_name, table_name);

// Keep streaming as long as there are attached views and streaming is not cancelled
while (!stream_cancelled)
while (!stream_cancelled && num_created_consumers > 0 && dependencies.size() > 0)
{
// Check if all dependencies are attached
auto dependencies = context.getDependencies(database_name, table_name);
if (dependencies.size() == 0)
break;

// Check the dependencies are ready?
bool ready = true;
for (const auto & db_tab : dependencies)
{
if (!context.tryGetTable(db_tab.first, db_tab.second))
ready = false;
}
if (!ready)
if (!checkDependencies(database_name, table_name))
break;

LOG_DEBUG(log, "Started streaming to " << dependencies.size() << " attached views");
Expand Down Expand Up @@ -488,12 +508,14 @@ bool StorageKafka::streamToViews()

// Limit the number of batched messages to allow early cancellations
const Settings & settings = context.getSettingsRef();
const size_t block_size = settings.max_block_size.value;
size_t block_size = max_block_size;
if (block_size == 0)
block_size = settings.max_block_size.value;

// Create a stream for each consumer and join them in a union stream
BlockInputStreams streams;
streams.reserve(num_consumers);
for (size_t i = 0; i < num_consumers; ++i)
streams.reserve(num_created_consumers);
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto stream = std::make_shared<KafkaBlockInputStream>(*this, context, schema_name, block_size);
streams.emplace_back(stream);
Expand All @@ -509,7 +531,7 @@ bool StorageKafka::streamToViews()
// Join multiple streams if necessary
BlockInputStreamPtr in;
if (streams.size() > 1)
in = std::make_shared<UnionBlockInputStream<>>(streams, nullptr, num_consumers);
in = std::make_shared<UnionBlockInputStream<>>(streams, nullptr, streams.size());
else
in = streams[0];

Expand Down Expand Up @@ -644,6 +666,7 @@ void registerStorageKafka(StorageFactory & factory)
CHECK_KAFKA_STORAGE_ARGUMENT(5, kafka_row_delimiter)
CHECK_KAFKA_STORAGE_ARGUMENT(6, kafka_schema)
CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers)
CHECK_KAFKA_STORAGE_ARGUMENT(8, kafka_max_block_size)
#undef CHECK_KAFKA_STORAGE_ARGUMENT

// Get and check broker list
Expand Down Expand Up @@ -790,9 +813,28 @@ void registerStorageKafka(StorageFactory & factory)
num_consumers = kafka_settings.kafka_num_consumers.value;
}

// Parse max block size (optional)
size_t max_block_size = 0;
if (args_count >= 8)
{
auto ast = typeid_cast<const ASTLiteral *>(engine_args[7].get());
if (ast && ast->value.getType() == Field::Types::UInt64)
{
max_block_size = static_cast<size_t>(safeGet<UInt64>(ast->value));
}
else
{
throw Exception("Maximum block size must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
}
}
else if (kafka_settings.kafka_max_block_size.changed)
{
max_block_size = static_cast<size_t>(kafka_settings.kafka_max_block_size.value);
}

return StorageKafka::create(
args.table_name, args.database_name, args.context, args.columns,
brokers, group, topics, format, row_delimiter, schema, num_consumers);
brokers, group, topics, format, row_delimiter, schema, num_consumers, max_block_size);
});
}

Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Storages/Kafka/StorageKafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ friend class KafkaBlockOutputStream;
// in order to make various input stream parsers happy.
char row_delimiter;
const String schema_name;
/// Total number of consumers
/// Total number of consumers
size_t num_consumers;
/// Maximum block size for insertion into this table
size_t max_block_size;
/// Number of actually created consumers.
/// Can differ from num_consumers in case of exception in startup() (or if startup() hasn't been called).
/// In this case we still need to be able to shutdown() properly.
Expand All @@ -105,6 +107,7 @@ friend class KafkaBlockOutputStream;

void streamThread();
bool streamToViews();
bool checkDependencies(const String & database_name, const String & table_name);

protected:
StorageKafka(
Expand All @@ -113,7 +116,8 @@ friend class KafkaBlockOutputStream;
Context & context_,
const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, char row_delimiter_, const String & schema_name_, size_t num_consumers_);
const String & format_name_, char row_delimiter_, const String & schema_name_,
size_t num_consumers_, size_t max_block_size_);
};

}
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/StorageMaterializedView.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class StorageMaterializedView : public ext::shared_ptr_helper<StorageMaterialize
unsigned num_streams) override;

String getDataPath() const override;
StoragePtr tryGetTargetTable() const;

private:
String select_database_name;
Expand All @@ -72,6 +73,7 @@ class StorageMaterializedView : public ext::shared_ptr_helper<StorageMaterialize
Context & global_context;
bool has_inner_table = false;

StoragePtr getTargetTable() const;
void checkStatementCanBeForwarded() const;

protected:
Expand Down