diff --git a/dbms/src/DataStreams/FormatFactory.cpp b/dbms/src/DataStreams/FormatFactory.cpp index aea5176548b..26e42891d28 100644 --- a/dbms/src/DataStreams/FormatFactory.cpp +++ b/dbms/src/DataStreams/FormatFactory.cpp @@ -69,8 +69,8 @@ BlockInputStreamPtr FormatFactory::getInput( std::forward(row_stream), sample, max_block_size, - settings.input_format_allow_errors_num, - settings.input_format_allow_errors_ratio); + 0, + 0); }; if (name == "Native") diff --git a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 28f1aa34edf..ded38317f19 100644 --- a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -43,8 +43,6 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( if (!dependencies.empty()) { views_context = std::make_unique(context); - // Do not deduplicate insertions into MV if the main insertion is Ok - views_context->getSettingsRef().insert_deduplicate = false; } for (const auto & database_table : dependencies) diff --git a/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp b/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp index 1e02b5d1844..8ee50a111fe 100644 --- a/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp +++ b/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp @@ -98,8 +98,6 @@ Aggregator::Params buildParams( before_agg_header, keys, aggregate_descriptions, - settings.max_rows_to_group_by, - settings.group_by_overflow_mode, allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), settings.max_bytes_before_external_group_by, diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 050d360c8ba..95584f02fe8 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -168,37 +168,6 @@ bool hasRegionToRead(const DAGContext & dag_context, const TiDBTableScan & table return has_region_to_read; } -void setQuotaAndLimitsOnTableScan(Context & context, DAGPipeline & pipeline) -{ - const Settings & settings = context.getSettingsRef(); - - IProfilingBlockInputStream::LocalLimits limits; - limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; - limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode); - limits.max_execution_time = settings.max_execution_time; - limits.timeout_overflow_mode = settings.timeout_overflow_mode; - - /** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers, - * because the initiating server has a summary of the execution of the request on all servers. - * - * But limits on data size to read and maximum execution time are reasonable to check both on initiator and - * additionally on each remote server, because these limits are checked per block of data processed, - * and remote servers may process way more blocks of data than are received by initiator. - */ - limits.min_execution_speed = settings.min_execution_speed; - limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed; - - QuotaForIntervals & quota = context.getQuota(); - - pipeline.transform([&](auto & stream) { - if (auto * p_stream = dynamic_cast(stream.get())) - { - p_stream->setLimits(limits); - p_stream->setQuota(quota); - } - }); -} - // add timezone cast for timestamp type, this is used to support session level timezone // std::tuple addExtraCastsAfterTs( @@ -361,8 +330,6 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) for (const auto & lock : drop_locks) dagContext().addTableLock(lock); - /// Set the limits and quota for reading data, the speed and time of the query. - setQuotaAndLimitsOnTableScan(context, pipeline); FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired); FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once); @@ -407,7 +374,7 @@ void DAGStorageInterpreter::prepare() assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end()); storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage; - std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan(context.getSettingsRef().max_columns_to_read); + std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan(); analyzer = std::make_unique(std::move(source_columns), context); } @@ -958,16 +925,8 @@ std::unordered_map DAG return storages_with_lock; } -std::tuple> DAGStorageInterpreter::getColumnsForTableScan(Int64 max_columns_to_read) +std::tuple> DAGStorageInterpreter::getColumnsForTableScan() { - // todo handle alias column - if (max_columns_to_read && table_scan.getColumnSize() > max_columns_to_read) - { - throw TiFlashException( - fmt::format("Limit for number of columns to read exceeded. Requested: {}, maximum: {}", table_scan.getColumnSize(), max_columns_to_read), - Errors::BroadcastJoin::TooManyColumns); - } - Names required_columns_tmp; NamesAndTypes source_columns_tmp; std::vector need_cast_column; diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h index 52576706a7c..edb0665026f 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h @@ -79,7 +79,7 @@ class DAGStorageInterpreter std::unordered_map getAndLockStorages(Int64 query_schema_version); - std::tuple> getColumnsForTableScan(Int64 max_columns_to_read); + std::tuple> getColumnsForTableScan(); std::vector buildRemoteRequests(const DM::ScanContextPtr & scan_context); diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index 22e2b675705..8fed1a5db1d 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -108,15 +108,7 @@ void orderStreams( extra_info = enableFineGrainedShuffleExtraInfo; pipeline.transform([&](auto & stream) { - auto sorting_stream = std::make_shared(stream, order_descr, log->identifier(), limit); - - /// Limits on sorting - IProfilingBlockInputStream::LocalLimits limits; - limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; - limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode); - sorting_stream->setLimits(limits); - - stream = sorting_stream; + stream = std::make_shared(stream, order_descr, log->identifier(), limit); stream->setExtraInfo(extra_info); }); diff --git a/dbms/src/Flash/executeQuery.cpp b/dbms/src/Flash/executeQuery.cpp index 6eeafc63d4f..9a42ead72a4 100644 --- a/dbms/src/Flash/executeQuery.cpp +++ b/dbms/src/Flash/executeQuery.cpp @@ -92,7 +92,7 @@ BlockIO executeDAG(IQuerySource & dag, Context & context, bool internal) /// Hold element of process list till end of query execution. res.process_list_entry = process_list_entry; - prepareForInputStream(context, QueryProcessingStage::Complete, res.in); + prepareForInputStream(context, res.in); if (likely(!internal)) logQueryPipeline(logger, res.in); diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 73e1ee2fb15..1c73d3e28e9 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -836,10 +836,6 @@ bool Aggregator::executeOnBlock( if (result.isConvertibleToTwoLevel() && worth_convert_to_two_level) result.convertToTwoLevel(); - /// Checking the constraints. - if (!checkLimits(result_size)) - return false; - /** Flush data to disk if too much RAM is consumed. * Data can only be flushed to disk if a two-level aggregation structure is used. */ @@ -972,25 +968,6 @@ void Aggregator::spillImpl( } -bool Aggregator::checkLimits(size_t result_size) const -{ - if (params.max_rows_to_group_by && result_size > params.max_rows_to_group_by) - { - switch (params.group_by_overflow_mode) - { - case OverflowMode::THROW: - throw Exception("Limit for rows to GROUP BY exceeded: has " + toString(result_size) - + " rows, maximum: " + toString(params.max_rows_to_group_by), - ErrorCodes::TOO_MANY_ROWS); - - case OverflowMode::BREAK: - return false; - } - } - return true; -} - - void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVariants & result) { if (is_cancelled()) @@ -1899,9 +1876,6 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl( /// We merge all aggregation results to the first. for (size_t result_num = 1, size = non_empty_data.size(); result_num < size; ++result_num) { - if (!checkLimits(res->size())) - break; - AggregatedDataVariants & current = *non_empty_data[result_num]; mergeDataImpl( @@ -2265,9 +2239,6 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV return; } - if (!checkLimits(result.size())) - break; - if (result.type == AggregatedDataVariants::Type::without_key) mergeWithoutKeyStreamsImpl(block, result); diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 8c77d459823..e8dbf0d4d69 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -559,7 +559,7 @@ struct AggregatedDataVariants : private boost::noncopyable void * aggregation_method_impl{}; - /** Specialization for the case when there are no keys, and for keys not fitted into max_rows_to_group_by. + /** Specialization for the case when there are no keys. */ AggregatedDataWithoutKey without_key = nullptr; @@ -890,10 +890,6 @@ class Aggregator size_t aggregates_size; Int64 local_delta_memory = 0; - /// The settings of approximate calculation of GROUP BY. - const size_t max_rows_to_group_by; - const OverflowMode group_by_overflow_mode; - /// Two-level aggregation settings (used for a large number of keys). /** With how many keys or the size of the aggregation state in bytes, * two-level aggregation begins to be used. Enough to reach of at least one of the thresholds. @@ -918,8 +914,6 @@ class Aggregator const Block & src_header_, const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, - size_t max_rows_to_group_by_, - OverflowMode group_by_overflow_mode_, size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_, size_t max_bytes_before_external_group_by_, @@ -932,8 +926,6 @@ class Aggregator , aggregates(aggregates_) , keys_size(keys.size()) , aggregates_size(aggregates.size()) - , max_rows_to_group_by(max_rows_to_group_by_) - , group_by_overflow_mode(group_by_overflow_mode_) , group_by_two_level_threshold(group_by_two_level_threshold_) , group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_) , max_bytes_before_external_group_by(max_bytes_before_external_group_by_) @@ -951,7 +943,7 @@ class Aggregator const SpillConfig & spill_config, UInt64 max_block_size_, const TiDB::TiDBCollators & collators_ = TiDB::dummy_collators) - : Params(Block(), keys_, aggregates_, 0, OverflowMode::THROW, 0, 0, 0, false, spill_config, max_block_size_, collators_) + : Params(Block(), keys_, aggregates_, 0, 0, 0, false, spill_config, max_block_size_, collators_) { intermediate_header = intermediate_header_; } @@ -983,7 +975,7 @@ class Aggregator using AggregateColumnsConstData = std::vector; using AggregateFunctionsPlainPtrs = std::vector; - /// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break'). + /// Process one block. Return false if the processing should be aborted. bool executeOnBlock( const Block & block, AggregatedDataVariants & result, @@ -1292,14 +1284,6 @@ class Aggregator void destroyWithoutKey( AggregatedDataVariants & result) const; - - - /** Checks constraints on the maximum number of keys for aggregation. - * If it is exceeded, then, depending on the group_by_overflow_mode, either - * - throws an exception; - * - returns false, which means that execution must be aborted; - */ - bool checkLimits(size_t result_size) const; }; /** Get the aggregation variant by its type. */ diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 16477d5f587..d3a5ccbfd4d 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -1857,8 +1857,6 @@ size_t Context::getMaxStreams() const max_streams = 1; } } - if (max_streams > 1) - max_streams *= settings.max_streams_to_max_threads_ratio; if (max_streams == 0) max_streams = 1; if (unlikely(max_streams != 1 && is_cop_request)) diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index 9064f5ee68a..5dffe7344d2 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -658,8 +658,6 @@ static std::shared_ptr interpretSubquery( */ Context subquery_context = context; Settings subquery_settings = context.getSettings(); - subquery_settings.max_result_rows = 0; - subquery_settings.max_result_bytes = 0; /// The calculation of `extremes` does not make sense and is not necessary (if you do it, then the `extremes` of the subquery can be taken instead of the whole query). subquery_settings.extremes = false; subquery_context.setSettings(subquery_settings); @@ -1195,7 +1193,6 @@ void ExpressionAnalyzer::executeScalarSubqueriesImpl(ASTPtr & ast) { Context subquery_context = context; Settings subquery_settings = context.getSettings(); - subquery_settings.max_result_rows = 1; subquery_settings.extremes = false; subquery_context.setSettings(subquery_settings); diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index aa2e58850f2..b2f6e70d3c1 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -217,7 +217,7 @@ static ColumnsAndDefaults parseColumns(const ASTExpressionList & column_list_ast { const auto & final_column_name = col_decl.name; const auto tmp_column_name = final_column_name + "_tmp"; - const auto data_type_ptr = columns.back().type.get(); + const auto * const data_type_ptr = columns.back().type.get(); default_expr_list->children.emplace_back(setAlias(makeASTFunction("CAST", std::make_shared(tmp_column_name), std::make_shared(Field(data_type_ptr->getName()))), final_column_name)); @@ -236,8 +236,8 @@ static ColumnsAndDefaults parseColumns(const ASTExpressionList & column_list_ast for (auto & column : defaulted_columns) { - const auto name_and_type_ptr = column.first; - const auto col_decl_ptr = column.second; + auto * const name_and_type_ptr = column.first; + auto * const col_decl_ptr = column.second; const auto & column_name = col_decl_ptr->name; const auto has_explicit_type = nullptr != col_decl_ptr->type; @@ -307,8 +307,8 @@ ASTPtr InterpreterCreateQuery::formatColumns(const NamesAndTypesList & columns) column_declaration->name = column.name; StringPtr type_name = std::make_shared(column.type->getName()); - auto pos = type_name->data(); - const auto end = pos + type_name->size(); + auto * pos = type_name->data(); + auto * const end = pos + type_name->size(); ParserIdentifierWithOptionalParameters storage_p; column_declaration->type = parseQuery(storage_p, pos, end, "data type", 0); @@ -331,8 +331,8 @@ ASTPtr InterpreterCreateQuery::formatColumns(const ColumnsDescription & columns) column_declaration->name = column.name; StringPtr type_name = std::make_shared(column.type->getName()); - auto pos = type_name->data(); - const auto end = pos + type_name->size(); + auto * pos = type_name->data(); + auto * const end = pos + type_name->size(); ParserIdentifierWithOptionalParameters storage_p; column_declaration->type = parseQuery(storage_p, pos, end, "data type", 0); @@ -472,7 +472,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) { // Table SQL definition is available even if the table is detached auto query = context.getCreateTableQuery(database_name, table_name); - auto & as_create = typeid_cast(*query); + const auto & as_create = typeid_cast(*query); create = as_create; // Copy the saved create query, but use ATTACH instead of CREATE create.attach = true; } @@ -571,7 +571,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) return InterpreterInsertQuery( insert, create.is_temporary ? context.getSessionContext() : context, - context.getSettingsRef().insert_allow_materialized_columns) + false) .execute(); } diff --git a/dbms/src/Interpreters/InterpreterFactory.cpp b/dbms/src/Interpreters/InterpreterFactory.cpp index 8240926f1a3..269eaed8330 100644 --- a/dbms/src/Interpreters/InterpreterFactory.cpp +++ b/dbms/src/Interpreters/InterpreterFactory.cpp @@ -82,8 +82,7 @@ std::unique_ptr InterpreterFactory::get(ASTPtr & query, Context & else if (typeid_cast(query.get())) { /// readonly is checked inside InterpreterInsertQuery - bool allow_materialized = static_cast(context.getSettingsRef().insert_allow_materialized_columns); - return std::make_unique(query, context, allow_materialized); + return std::make_unique(query, context, false); } else if (typeid_cast(query.get())) { diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.cpp b/dbms/src/Interpreters/InterpreterInsertQuery.cpp index 782a254925a..dac1648c0ba 100644 --- a/dbms/src/Interpreters/InterpreterInsertQuery.cpp +++ b/dbms/src/Interpreters/InterpreterInsertQuery.cpp @@ -128,7 +128,7 @@ BlockIO InterpreterInsertQuery::execute() /// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side. /// Client-side bufferization might cause excessive timeouts (especially in case of big blocks). - if (!(context.getSettingsRef().insert_distributed_sync && table->getName() == "Distributed")) + if (table->getName() != "Distributed") { out = std::make_shared( out, diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 4fd474f3d10..35d71695914 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -154,10 +154,6 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names) initSettings(); const Settings & settings = context.getSettingsRef(); - if (settings.max_subquery_depth && subquery_depth > settings.max_subquery_depth) - throw Exception("Too deep subqueries. Maximum: " + settings.max_subquery_depth.toString(), - ErrorCodes::TOO_DEEP_SUBQUERIES); - max_streams = settings.max_threads; const auto & table_expression = query.table(); @@ -697,8 +693,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline */ Context subquery_context = context; Settings subquery_settings = context.getSettings(); - subquery_settings.max_result_rows = 0; - subquery_settings.max_result_bytes = 0; /// The calculation of extremes does not make sense and is not necessary (if you do it, then the extremes of the subquery can be taken for whole query). subquery_settings.extremes = false; subquery_context.setSettings(subquery_settings); @@ -717,15 +711,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline const Settings & settings = context.getSettingsRef(); - /// Limitation on the number of columns to read. - /// It's not applied in 'dry_run' mode, because the query could be analyzed without removal of unnecessary columns. - if (!dry_run && settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read) - throw Exception("Limit for number of columns to read exceeded. " - "Requested: " - + toString(required_columns.size()) - + ", maximum: " + settings.max_columns_to_read.toString(), - ErrorCodes::TOO_MANY_COLUMNS); - size_t limit_length = 0; size_t limit_offset = 0; getLimitLengthAndOffset(query, limit_length, limit_offset); @@ -737,10 +722,8 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline * To simultaneously query more remote servers, * instead of max_threads, max_distributed_connections is used. */ - bool is_remote = false; if (storage && storage->isRemote()) { - is_remote = true; max_streams = settings.max_distributed_connections; } @@ -788,10 +771,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline if (max_streams == 0) throw Exception("Logical error: zero number of streams requested", ErrorCodes::LOGICAL_ERROR); - /// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads. - if (max_streams > 1 && !is_remote) - max_streams *= settings.max_streams_to_max_threads_ratio; - query_analyzer->makeSetsForIndex(); SelectQueryInfo query_info; @@ -889,40 +868,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline pipeline.transform([&](auto & stream) { stream->addTableLock(table_lock); }); - - /// Set the limits and quota for reading data, the speed and time of the query. - { - IProfilingBlockInputStream::LocalLimits limits; - limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; - limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode); - limits.max_execution_time = settings.max_execution_time; - limits.timeout_overflow_mode = settings.timeout_overflow_mode; - - /** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers, - * because the initiating server has a summary of the execution of the request on all servers. - * - * But limits on data size to read and maximum execution time are reasonable to check both on initiator and - * additionally on each remote server, because these limits are checked per block of data processed, - * and remote servers may process way more blocks of data than are received by initiator. - */ - if (to_stage == QueryProcessingStage::Complete) - { - limits.min_execution_speed = settings.min_execution_speed; - limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed; - } - - QuotaForIntervals & quota = context.getQuota(); - - pipeline.transform([&](auto & stream) { - if (auto * p_stream = dynamic_cast(stream.get())) - { - p_stream->setLimits(limits); - - if (to_stage == QueryProcessingStage::Complete) - p_stream->setQuota(quota); - } - }); - } } else throw Exception("Logical error in InterpreterSelectQuery: nowhere to read", ErrorCodes::LOGICAL_ERROR); @@ -975,7 +920,7 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre bool allow_to_use_two_level_group_by = pipeline.streams.size() > 1 || settings.max_bytes_before_external_group_by != 0; SpillConfig spill_config(context.getTemporaryPath(), "aggregation", settings.max_spilled_size_per_spill, context.getFileProvider()); - Aggregator::Params params(header, keys, aggregates, settings.max_rows_to_group_by, settings.group_by_overflow_mode, allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, spill_config, settings.max_block_size); + Aggregator::Params params(header, keys, aggregates, allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), settings.max_bytes_before_external_group_by, false, spill_config, settings.max_block_size); /// If there are several sources, then we perform parallel aggregation if (pipeline.streams.size() > 1 || pipeline.streams_with_non_joined_data.size() > 1) @@ -1047,26 +992,15 @@ void InterpreterSelectQuery::executeMergeAggregated(Pipeline & pipeline, bool fi Aggregator::Params params(header, keys, aggregates, SpillConfig(context.getTemporaryPath(), "aggregation", settings.max_spilled_size_per_spill, context.getFileProvider()), settings.max_block_size); - if (!settings.distributed_aggregation_memory_efficient) - { - /// We union several sources into one, parallelizing the work. - executeUnion(pipeline); - - /// Now merge the aggregated blocks - pipeline.firstStream() = std::make_shared(pipeline.firstStream(), params, final, settings.max_threads); - } - else - { - pipeline.firstStream() = std::make_shared( - pipeline.streams, - params, - final, - max_streams, - settings.aggregation_memory_efficient_merge_threads ? static_cast(settings.aggregation_memory_efficient_merge_threads) : static_cast(settings.max_threads), - /*req_id=*/""); + pipeline.firstStream() = std::make_shared( + pipeline.streams, + params, + final, + max_streams, + settings.aggregation_memory_efficient_merge_threads ? static_cast(settings.aggregation_memory_efficient_merge_threads) : static_cast(settings.max_threads), + /*req_id=*/""); - pipeline.streams.resize(1); - } + pipeline.streams.resize(1); } @@ -1143,15 +1077,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline) const Settings & settings = context.getSettingsRef(); pipeline.transform([&](auto & stream) { - auto sorting_stream = std::make_shared(stream, order_descr, /*req_id=*/"", limit); - - /// Limits on sorting - IProfilingBlockInputStream::LocalLimits limits; - limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; - limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode); - sorting_stream->setLimits(limits); - - stream = sorting_stream; + stream = std::make_shared(stream, order_descr, /*req_id=*/"", limit); }); /// If there are several streams, we merge them into one diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 25ee8b9cfde..44d6c3e887c 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -55,7 +55,6 @@ struct Settings M(SettingUInt64, mpp_task_running_timeout, DEFAULT_MPP_TASK_RUNNING_TIMEOUT, "mpp task max time that running without any progress.") \ M(SettingUInt64, mpp_task_waiting_timeout, DEFAULT_MPP_TASK_WAITING_TIMEOUT, "mpp task max time that waiting first data block from source input stream.") \ M(SettingInt64, safe_point_update_interval_seconds, 1, "The interval in seconds to update safe point from PD.") \ - M(SettingUInt64, batch_commands_threads, 0, "Number of threads to use for handling batch commands concurrently. 0 means - same as 'max_threads'.") \ M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value " \ "and no less than the volume of data for one mark.") \ M(SettingUInt64, max_compress_block_size, DEFAULT_MAX_COMPRESS_BLOCK_SIZE, "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \ @@ -89,16 +88,11 @@ struct Settings \ M(SettingLoadBalancing, load_balancing, LoadBalancing::RANDOM, "Which replicas (among healthy replicas) to preferably send a query to (on the first attempt) for distributed processing.") \ \ - M(SettingTotalsMode, totals_mode, TotalsMode::AFTER_HAVING_EXCLUSIVE, "How to calculate TOTALS when HAVING is present, as well as when max_rows_to_group_by and group_by_overflow_mode = ‘any’ are " \ - "present.") \ - M(SettingFloat, totals_auto_threshold, 0.5, "The threshold for totals_mode = 'auto'.") \ + M(SettingTotalsMode, totals_mode, TotalsMode::AFTER_HAVING_EXCLUSIVE, "How to calculate TOTALS when HAVING is present.") \ \ - M(SettingBool, compile, false, "Whether query compilation is enabled.") \ - M(SettingUInt64, min_count_to_compile, 3, "The number of structurally identical queries before they are compiled.") \ M(SettingUInt64, group_by_two_level_threshold, 100000, "From what number of keys, a two-level aggregation starts. 0 - the threshold is not set.") \ M(SettingUInt64, group_by_two_level_threshold_bytes, 100000000, "From what size of the aggregation state in bytes, a two-level aggregation begins to be used. 0 - the threshold is not set. " \ "Two-level aggregation is used when at least one of the thresholds is triggered.") \ - M(SettingBool, distributed_aggregation_memory_efficient, false, "Is the memory-saving mode of distributed aggregation enabled.") \ M(SettingUInt64, aggregation_memory_efficient_merge_threads, 0, "Number of threads to use for merge intermediate aggregation results in memory efficient mode. When bigger, then more memory is " \ "consumed. 0 means - same as 'max_threads'.") \ \ @@ -114,9 +108,6 @@ struct Settings M(SettingUInt64, min_bytes_to_use_direct_io, 0, "The minimum number of bytes for input/output operations is bypassing the page cache. 0 - disabled.") \ M(SettingUInt64, mark_cache_min_lifetime, 0, "Deprecated setting. Do not affect the mark cache") \ \ - M(SettingFloat, max_streams_to_max_threads_ratio, 1, "Allows you to use more sources than the number of threads - to more evenly distribute work across threads. It is assumed that " \ - "this is a temporary solution, since it will be possible in the future to make the number of sources equal to the number of " \ - "threads, but for each source to dynamically select available work for itself.") \ \ M(SettingCompressionMethod, network_compression_method, CompressionMethod::LZ4, "Allows you to select the method of data compression when writing.") \ \ @@ -129,21 +120,7 @@ struct Settings "printed query in ordinary text log.") \ \ M(SettingUInt64, max_concurrent_queries_for_user, 0, "The maximum number of concurrent requests per user.") \ - M(SettingBool, insert_deduplicate, true, "For INSERT queries in the replicated table, specifies that deduplication of insertings blocks should be preformed") \ - \ - M(SettingUInt64, insert_quorum, 0, "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the " \ - "data. 0 - disabled.") \ - M(SettingMilliseconds, insert_quorum_timeout, 600000, "") \ - M(SettingUInt64, select_sequential_consistency, 0, "For SELECT queries from the replicated table, throw an exception if the replica does not have a chunk written with the quorum; " \ - "do not read the parts that have not yet been written with the quorum.") \ - M(SettingUInt64, table_function_remote_max_addresses, 1000, "The maximum number of different shards and the maximum number of replicas of one shard in the `remote` function.") \ - M(SettingMilliseconds, read_backoff_min_latency_ms, 1000, "Setting to reduce the number of threads in case of slow reads. Pay attention only to reads that took at least that much time.") \ - M(SettingUInt64, read_backoff_max_throughput, 1048576, "Settings to reduce the number of threads in case of slow reads. Count events when the read bandwidth is less than that many " \ - "bytes per second.") \ - M(SettingMilliseconds, read_backoff_min_interval_between_events_ms, 1000, "Settings to reduce the number of threads in case of slow reads. Do not pay attention to the event, if the previous one has " \ - "passed less than a certain amount of time.") \ - M(SettingUInt64, read_backoff_min_events, 2, "Settings to reduce the number of threads in case of slow reads. The number of events after which the number of threads will be " \ - "reduced.") \ + \ \ M(SettingFloat, memory_tracker_fault_probability, 0., "For testing of `exception safety` - throw an exception every time you allocate memory with the specified probability.") \ \ @@ -180,67 +157,24 @@ struct Settings M(SettingBool, fsync_metadata, 1, "Do fsync after changing metadata for tables and databases (.sql files). Could be disabled in case of poor latency on server " \ "with high load of DDL queries and high load of disk subsystem.") \ \ - M(SettingUInt64, input_format_allow_errors_num, 0, "Maximum absolute amount of errors while reading text formats (like CSV, TSV). In case of error, if both absolute and relative " \ - "values are non-zero, and at least absolute or relative amount of errors is lower than corresponding value, will skip until next " \ - "line and continue.") \ - M(SettingFloat, input_format_allow_errors_ratio, 0, "Maximum relative amount of errors while reading text formats (like CSV, TSV). In case of error, if both absolute and relative " \ - "values are non-zero, and at least absolute or relative amount of errors is lower than corresponding value, will skip until next " \ - "line and continue.") \ - \ - M(SettingUInt64, preferred_block_size_bytes, 1000000, "") \ - \ M(SettingUInt64, max_replica_delay_for_distributed_queries, 300, "If set, distributed queries of Replicated tables will choose servers with replication delay in seconds less than the specified " \ "value (not inclusive). Zero means do not take delay into account.") \ M(SettingBool, fallback_to_stale_replicas_for_distributed_queries, 1, "Suppose max_replica_delay_for_distributed_queries is set and all replicas for the queried table are stale. If this setting is " \ "enabled, the query will be performed anyway, otherwise the error will be reported.") \ - M(SettingUInt64, preferred_max_column_in_block_size_bytes, 0, "Limit on max column size in block while reading. Helps to decrease cache misses count. Should be close to L2 cache size.") \ \ - M(SettingBool, insert_distributed_sync, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster.") \ - M(SettingBool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.") \ M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \ M(SettingSeconds, http_send_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP send timeout") \ M(SettingSeconds, http_receive_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP receive timeout") \ M(SettingBool, use_index_for_in_with_subqueries, true, "Try using an index if there is a subquery or a table expression on the right side of the IN operator.") \ \ - M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.") \ - \ - /** Limits during query execution are part of the settings. \ - * Used to provide a more safe execution of queries from the user interface. \ - * Basically, limits are checked for each block (not every row). That is, the limits can be slightly violated. \ - * Almost all limits apply only to SELECTs. \ - * Almost all limits apply to each stream individually. \ - */ \ - \ - M(SettingUInt64, max_rows_to_read, 0, "Limit on read rows from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it " \ - "is only checked on a remote server.") \ - M(SettingUInt64, max_bytes_to_read, 0, "Limit on read bytes (after decompression) from the most 'deep' sources. That is, only in the deepest subquery. When reading " \ - "from a remote server, it is only checked on a remote server.") \ - M(SettingOverflowMode, read_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ - \ - M(SettingUInt64, max_rows_to_group_by, 0, "") \ - M(SettingOverflowMode, group_by_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ M(SettingUInt64, max_bytes_before_external_group_by, 0, "") \ \ - M(SettingUInt64, max_rows_to_sort, 0, "") \ - M(SettingUInt64, max_bytes_to_sort, 0, "") \ - M(SettingOverflowMode, sort_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ M(SettingUInt64, max_bytes_before_external_sort, 0, "") \ \ - M(SettingUInt64, max_result_rows, 0, "Limit on result size in rows. Also checked for intermediate data sent from remote servers.") \ - M(SettingUInt64, max_result_bytes, 0, "Limit on result size in bytes (uncompressed). Also checked for intermediate data sent from remote servers.") \ - M(SettingOverflowMode, result_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ \ /* TODO: Check also when merging and finalizing aggregate functions. */ \ - M(SettingSeconds, max_execution_time, 0, "") \ - M(SettingOverflowMode, timeout_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ - \ - M(SettingUInt64, min_execution_speed, 0, "In rows per second.") \ - M(SettingSeconds, timeout_before_checking_execution_speed, 0, "Check that the speed is not too low after the specified time has elapsed.") \ \ - M(SettingUInt64, max_columns_to_read, 0, "") \ \ - M(SettingUInt64, max_subquery_depth, 100, "") \ - M(SettingUInt64, max_pipeline_depth, 1000, "") \ M(SettingUInt64, max_ast_depth, 1000, "Maximum depth of query syntax tree. Checked after parsing.") \ M(SettingUInt64, max_ast_elements, 50000, "Maximum size of query syntax tree in number of nodes. Checked after parsing.") \ M(SettingUInt64, max_expanded_ast_elements, 500000, "Maximum size of query syntax tree in number of nodes after expansion of aliases and the asterisk.") \ @@ -248,7 +182,6 @@ struct Settings M(SettingUInt64, readonly, 0, "0 - everything is allowed. 1 - only read requests. 2 - only read requests, as well as changing settings, except for the " \ "'readonly' setting.") \ \ - M(SettingUInt64, shared_query_clients, 0, "How many clients will share the same query_id. If > 0, enable shared query mode.") \ M(SettingString, query_id, "", "The query_id, only for testing.") \ M(SettingUInt64, mutable_deduper, 5, "The deduper used by MutableMergeTree storage. By default 5. 0: OriginStreams, 1: OriginUnity, 2: ReplacingUnity, 3: " \ "ReplacingPartitioning, 4: DedupPartitioning, 5: ReplacingPartitioningOpt.") \ diff --git a/dbms/src/Interpreters/SettingsCommon.h b/dbms/src/Interpreters/SettingsCommon.h index ce973118378..8c90586500f 100644 --- a/dbms/src/Interpreters/SettingsCommon.h +++ b/dbms/src/Interpreters/SettingsCommon.h @@ -590,13 +590,10 @@ struct SettingLoadBalancing enum class TotalsMode { /// Count HAVING for all read rows; - /// including those not in max_rows_to_group_by - /// and have not passed HAVING after grouping. BEFORE_HAVING = 0, /// Count on all rows except those that have not passed HAVING; - /// that is, to include in TOTALS all the rows that did not pass max_rows_to_group_by. AFTER_HAVING_INCLUSIVE = 1, - /// Include only the rows that passed and max_rows_to_group_by, and HAVING. + /// Include only the rows that passed and HAVING. AFTER_HAVING_EXCLUSIVE = 2, /// Automatically select between INCLUSIVE and EXCLUSIVE, AFTER_HAVING_AUTO = 3, diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index fb6aa7c1a3a..19103149a1a 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -221,7 +221,7 @@ std::tuple executeQueryImpl( if (res.in) { - prepareForInputStream(context, stage, res.in); + prepareForInputStream(context, res.in); } if (res.out) @@ -388,7 +388,6 @@ void logQuery(const String & query, const Context & context, const LoggerPtr & l void prepareForInputStream( Context & context, - QueryProcessingStage::Enum stage, const BlockInputStreamPtr & in) { assert(in); @@ -396,19 +395,6 @@ void prepareForInputStream( { stream->setProgressCallback(context.getProgressCallback()); stream->setProcessListElement(context.getProcessListElement()); - - /// Limits on the result, the quota on the result, and also callback for progress. - /// Limits apply only to the final result. - if (stage == QueryProcessingStage::Complete) - { - IProfilingBlockInputStream::LocalLimits limits; - limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT; - const auto & settings = context.getSettingsRef(); - limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode); - - stream->setLimits(limits); - stream->setQuota(context.getQuota()); - } } } diff --git a/dbms/src/Interpreters/executeQuery.h b/dbms/src/Interpreters/executeQuery.h index b326f9602ba..f33aed02ea3 100644 --- a/dbms/src/Interpreters/executeQuery.h +++ b/dbms/src/Interpreters/executeQuery.h @@ -63,7 +63,6 @@ void logQuery(const String & query, const Context & context, const LoggerPtr & l void prepareForInputStream( Context & context, - QueryProcessingStage::Enum stage, const BlockInputStreamPtr & in); } // namespace DB diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index ab6749a302c..1c4ec2fd22e 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -177,29 +177,7 @@ void TCPHandler::runImpl() state.maybe_compressed_in.reset(); /// For more accurate accounting by MemoryTracker. /// Processing Query - const Settings & settings = query_context.getSettingsRef(); - if (settings.shared_query_clients && !state.query_id.empty()) - { - LOG_DEBUG(log, "shared query"); - - state.io = query_context.getSharedQueries()->getOrCreateBlockIO( - state.query_id, - settings.shared_query_clients, - [&]() { - return executeQuery(state.query, query_context, false, state.stage); - }); - - /// As getOrCreateBlockIO could produce exception, this line must be put after. - shared_query_id = state.query_id; - - if (state.io.out) - throw Exception("Insert query is not supported in shared query mode"); - } - else - { - state.io = executeQuery(state.query, query_context, false, state.stage); - } - + state.io = executeQuery(state.query, query_context, false, state.stage); if (state.io.out) state.need_receive_data_for_insert = true; diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp deleted file mode 100644 index ae152b09b32..00000000000 --- a/dbms/src/Storages/StorageBuffer.cpp +++ /dev/null @@ -1,641 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -namespace DB -{ -namespace ErrorCodes -{ -extern const int INFINITE_LOOP; -extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; -} // namespace ErrorCodes - - -StorageBuffer::StorageBuffer(const std::string & name_, const ColumnsDescription & columns_, Context & context_, size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, const String & destination_database_, const String & destination_table_, bool allow_materialized_) - : IStorage{columns_} - , name(name_) - , context(context_) - , num_shards(num_shards_) - , buffers(num_shards_) - , min_thresholds(min_thresholds_) - , max_thresholds(max_thresholds_) - , destination_database(destination_database_) - , destination_table(destination_table_) - , no_destination(destination_database.empty() && destination_table.empty()) - , allow_materialized(allow_materialized_) - , log(&Poco::Logger::get("StorageBuffer (" + name + ")")) -{ -} - - -/// Reads from one buffer (from one block) under its mutex. -class BufferBlockInputStream : public IProfilingBlockInputStream -{ -public: - BufferBlockInputStream(const Names & column_names_, StorageBuffer::Buffer & buffer_, const StorageBuffer & storage_) - : column_names(column_names_.begin(), column_names_.end()) - , buffer(buffer_) - , storage(storage_) - {} - - String getName() const override { return "Buffer"; } - - Block getHeader() const override { return storage.getSampleBlockForColumns(column_names); }; - -protected: - Block readImpl() override - { - Block res; - - if (has_been_read) - return res; - has_been_read = true; - - std::lock_guard lock(buffer.mutex); - - if (!buffer.data.rows()) - return res; - - for (const auto & name : column_names) - res.insert(buffer.data.getByName(name)); - - return res; - } - -private: - Names column_names; - StorageBuffer::Buffer & buffer; - const StorageBuffer & storage; - bool has_been_read = false; -}; - - -BlockInputStreams StorageBuffer::read( - const Names & column_names, - const SelectQueryInfo & query_info, - const Context & context, - QueryProcessingStage::Enum & processed_stage, - size_t max_block_size, - unsigned num_streams) -{ - processed_stage = QueryProcessingStage::FetchColumns; - - BlockInputStreams streams_from_dst; - - if (!no_destination) - { - auto destination = context.getTable(destination_database, destination_table); - - if (destination.get() == this) - throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP); - - streams_from_dst = destination->read(column_names, query_info, context, processed_stage, max_block_size, num_streams); - } - - BlockInputStreams streams_from_buffers; - streams_from_buffers.reserve(num_shards); - for (auto & buf : buffers) - streams_from_buffers.push_back(std::make_shared(column_names, buf, *this)); - - /** If the sources from the table were processed before some non-initial stage of query execution, - * then sources from the buffers must also be wrapped in the processing pipeline before the same stage. - */ - if (processed_stage > QueryProcessingStage::FetchColumns) - for (auto & stream : streams_from_buffers) - stream = InterpreterSelectQuery(query_info.query, context, {}, processed_stage, 0, stream).execute().in; - - streams_from_dst.insert(streams_from_dst.end(), streams_from_buffers.begin(), streams_from_buffers.end()); - return streams_from_dst; -} - - -static void appendBlock(const Block & from, Block & to) -{ - if (!to) - throw Exception("Cannot append to empty block", ErrorCodes::LOGICAL_ERROR); - - assertBlocksHaveEqualStructure(from, to, "Buffer"); - - from.checkNumberOfRows(); - to.checkNumberOfRows(); - - size_t rows = from.rows(); - - size_t old_rows = to.rows(); - - try - { - for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no) - { - const IColumn & col_from = *from.getByPosition(column_no).column.get(); - MutableColumnPtr col_to = (*std::move(to.getByPosition(column_no).column)).mutate(); - - col_to->insertRangeFrom(col_from, 0, rows); - - to.getByPosition(column_no).column = std::move(col_to); - } - } - catch (...) - { - /// Rollback changes. - try - { - /// Avoid "memory limit exceeded" exceptions during rollback. - TemporarilyDisableMemoryTracker temporarily_disable_memory_tracker; - - for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no) - { - ColumnPtr & col_to = to.getByPosition(column_no).column; - if (col_to->size() != old_rows) - col_to = (*std::move(col_to)).mutate()->cut(0, old_rows); - } - } - catch (...) - { - /// In case when we cannot rollback, do not leave incorrect state in memory. - std::terminate(); - } - - throw; - } -} - - -class BufferBlockOutputStream : public IBlockOutputStream -{ -public: - explicit BufferBlockOutputStream(StorageBuffer & storage_) - : storage(storage_) - {} - - Block getHeader() const override { return storage.getSampleBlock(); } - - void write(const Block & block) override - { - if (!block) - return; - - size_t rows = block.rows(); - if (!rows) - return; - - StoragePtr destination; - if (!storage.no_destination) - { - destination = storage.context.tryGetTable(storage.destination_database, storage.destination_table); - - if (destination) - { - if (destination.get() == &storage) - throw Exception("Destination table is myself. Write will cause infinite loop.", ErrorCodes::INFINITE_LOOP); - - /// Check table structure. - try - { - destination->check(block, true); - } - catch (Exception & e) - { - e.addMessage("(when looking at destination table " + storage.destination_database + "." + storage.destination_table + ")"); - throw; - } - } - } - - size_t bytes = block.bytes(); - - /// If the block already exceeds the maximum limit, then we skip the buffer. - if (rows > storage.max_thresholds.rows || bytes > storage.max_thresholds.bytes) - { - if (!storage.no_destination) - { - LOG_TRACE(storage.log, "Writing block with {} rows, {} bytes directly.", rows, bytes); - storage.writeBlockToDestination(block, destination); - } - return; - } - - /// We distribute the load on the shards by the stream number. - const auto start_shard_num = Poco::ThreadNumber::get() % storage.num_shards; - - /// We loop through the buffers, trying to lock mutex. No more than one lap. - auto shard_num = start_shard_num; - - StorageBuffer::Buffer * least_busy_buffer = nullptr; - std::unique_lock least_busy_lock; - size_t least_busy_shard_rows = 0; - - for (size_t try_no = 0; try_no < storage.num_shards; ++try_no) - { - std::unique_lock lock(storage.buffers[shard_num].mutex, std::try_to_lock_t()); - - if (lock.owns_lock()) - { - size_t num_rows = storage.buffers[shard_num].data.rows(); - if (!least_busy_buffer || num_rows < least_busy_shard_rows) - { - least_busy_buffer = &storage.buffers[shard_num]; - least_busy_lock = std::move(lock); - least_busy_shard_rows = num_rows; - } - } - - shard_num = (shard_num + 1) % storage.num_shards; - } - - /// If you still can not lock anything at once, then we'll wait on mutex. - if (!least_busy_buffer) - insertIntoBuffer(block, storage.buffers[start_shard_num], std::unique_lock(storage.buffers[start_shard_num].mutex)); - else - insertIntoBuffer(block, *least_busy_buffer, std::move(least_busy_lock)); - } - -private: - StorageBuffer & storage; - - void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer, std::unique_lock && lock) - { - time_t current_time = time(nullptr); - - /// Sort the columns in the block. This is necessary to make it easier to concatenate the blocks later. - Block sorted_block = block.sortColumns(); - - if (!buffer.data) - { - buffer.data = sorted_block.cloneEmpty(); - } - else if (storage.checkThresholds(buffer, current_time, sorted_block.rows(), sorted_block.bytes())) - { - /** If, after inserting the buffer, the constraints are exceeded, then we will reset the buffer. - * This also protects against unlimited consumption of RAM, since if it is impossible to write to the table, - * an exception will be thrown, and new data will not be added to the buffer. - */ - - lock.unlock(); - storage.flushBuffer(buffer, true); - lock.lock(); - } - - if (!buffer.first_write_time) - buffer.first_write_time = current_time; - - appendBlock(sorted_block, buffer.data); - } -}; - - -BlockOutputStreamPtr StorageBuffer::write(const ASTPtr & /*query*/, const Settings & /*settings*/) -{ - return std::make_shared(*this); -} - - -bool StorageBuffer::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) const -{ - if (no_destination) - return false; - - auto destination = context.getTable(destination_database, destination_table); - - if (destination.get() == this) - throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP); - - return destination->mayBenefitFromIndexForIn(left_in_operand); -} - - -void StorageBuffer::startup() -{ - if (context.getSettingsRef().readonly) - { - LOG_WARNING(log, "Storage {} is run with readonly settings, it will not be able to insert data. Set apropriate system_profile to fix this.", getName()); - } - - flush_thread = std::thread(&StorageBuffer::flushThread, this); -} - - -void StorageBuffer::shutdown() -{ - shutdown_event.set(); - - if (flush_thread.joinable()) - flush_thread.join(); - - try - { - optimize(nullptr /*query*/, {} /*partition*/, false /*final*/, false /*deduplicate*/, context); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } -} - - -/** NOTE If you do OPTIMIZE after insertion, - * it does not guarantee, that all data will be in destination table at the time of next SELECT just after OPTIMIZE. - * - * Because in case if there was already running flushBuffer method, - * then call to flushBuffer inside OPTIMIZE will see empty buffer and return quickly, - * but at the same time, the already running flushBuffer method possibly is not finished, - * so next SELECT will observe missing data. - * - * This kind of race condition make very hard to implement proper tests. - */ -bool StorageBuffer::optimize(const ASTPtr & /*query*/, const ASTPtr & partition, bool final, bool deduplicate, const Context & /*context*/) -{ - if (partition) - throw Exception("Partition cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED); - - if (final) - throw Exception("FINAL cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED); - - if (deduplicate) - throw Exception("DEDUPLICATE cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED); - - flushAllBuffers(false); - return true; -} - - -bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const -{ - time_t time_passed = 0; - if (buffer.first_write_time) - time_passed = current_time - buffer.first_write_time; - - size_t rows = buffer.data.rows() + additional_rows; - size_t bytes = buffer.data.bytes() + additional_bytes; - - return checkThresholdsImpl(rows, bytes, time_passed); -} - - -bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const -{ - if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes) - { - return true; - } - - if (time_passed > max_thresholds.time) - { - return true; - } - - if (rows > max_thresholds.rows) - { - return true; - } - - if (bytes > max_thresholds.bytes) - { - return true; - } - - return false; -} - - -void StorageBuffer::flushAllBuffers(const bool check_thresholds) -{ - for (auto & buf : buffers) - flushBuffer(buf, check_thresholds); -} - - -void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds) -{ - Block block_to_write; - time_t current_time = time(nullptr); - - size_t rows = 0; - size_t bytes = 0; - time_t time_passed = 0; - - std::lock_guard lock(buffer.mutex); - - block_to_write = buffer.data.cloneEmpty(); - - rows = buffer.data.rows(); - bytes = buffer.data.bytes(); - if (buffer.first_write_time) - time_passed = current_time - buffer.first_write_time; - - if (check_thresholds) - { - if (!checkThresholdsImpl(rows, bytes, time_passed)) - return; - } - else - { - if (rows == 0) - return; - } - - buffer.data.swap(block_to_write); - buffer.first_write_time = 0; - - LOG_TRACE(log, "Flushing buffer with {} rows, {} bytes, age {} seconds.", rows, bytes, time_passed); - - if (no_destination) - return; - - /** For simplicity, buffer is locked during write. - * We could unlock buffer temporary, but it would lead to too many difficulties: - * - data, that is written, will not be visible for SELECTs; - * - new data could be appended to buffer, and in case of exception, we must merge it with old data, that has not been written; - * - this could lead to infinite memory growth. - */ - try - { - writeBlockToDestination(block_to_write, context.tryGetTable(destination_database, destination_table)); - } - catch (...) - { - /// Return the block to its place in the buffer. - buffer.data.swap(block_to_write); - - if (!buffer.first_write_time) - buffer.first_write_time = current_time; - - /// After a while, the next write attempt will happen. - throw; - } -} - - -void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr table) -{ - if (no_destination || !block) - return; - - if (!table) - { - LOG_ERROR(log, "Destination table {}.{} doesn't exist. Block of data is discarded.", destination_database, destination_table); - return; - } - - auto insert = std::make_shared(); - - insert->database = destination_database; - insert->table = destination_table; - - /** We will insert columns that are the intersection set of columns of the buffer table and the subordinate table. - * This will support some of the cases (but not all) when the table structure does not match. - */ - Block structure_of_destination_table = allow_materialized ? table->getSampleBlock() : table->getSampleBlockNonMaterialized(); - Names columns_intersection; - columns_intersection.reserve(block.columns()); - for (size_t i : ext::range(0, structure_of_destination_table.columns())) - { - auto dst_col = structure_of_destination_table.getByPosition(i); - if (block.has(dst_col.name)) - { - if (!block.getByName(dst_col.name).type->equals(*dst_col.type)) - { - LOG_ERROR(log, "Destination table {}.{} have different type of column {} ({} != {}). Block of data is discarded.", destination_database, destination_table, dst_col.name, block.getByName(dst_col.name).type->getName(), dst_col.type->getName()); - return; - } - - columns_intersection.push_back(dst_col.name); - } - } - - if (columns_intersection.empty()) - { - LOG_ERROR(log, "Destination table {}.{} have no common columns with block in buffer. Block of data is discarded.", destination_database, destination_table); - return; - } - - if (columns_intersection.size() != block.columns()) - LOG_WARNING(log, "Not all columns from block in buffer exist in destination table {}.{}. Some columns are discarded.", destination_database, destination_table); - - auto list_of_columns = std::make_shared(); - insert->columns = list_of_columns; - list_of_columns->children.reserve(columns_intersection.size()); - for (const String & column : columns_intersection) - list_of_columns->children.push_back(std::make_shared(column, ASTIdentifier::Column)); - - InterpreterInsertQuery interpreter{insert, context, allow_materialized}; - - auto block_io = interpreter.execute(); - block_io.out->writePrefix(); - block_io.out->write(block); - block_io.out->writeSuffix(); -} - - -void StorageBuffer::flushThread() -{ - setThreadName("BufferFlush"); - - do - { - try - { - flushAllBuffers(true); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } while (!shutdown_event.tryWait(1000)); -} - - -void StorageBuffer::alter(const TableLockHolder &, const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) -{ - for (const auto & param : params) - if (param.type == AlterCommand::MODIFY_PRIMARY_KEY) - throw Exception("Storage engine " + getName() + " doesn't support primary key.", ErrorCodes::NOT_IMPLEMENTED); - - /// So that no blocks of the old structure remain. - optimize({} /*query*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, context); - - ColumnsDescription new_columns = getColumns(); - params.apply(new_columns); - context.getDatabase(database_name)->alterTable(context, table_name, new_columns, {}); - setColumns(std::move(new_columns)); -} - - -void registerStorageBuffer(StorageFactory & factory) -{ - /** Buffer(db, table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes) - * - * db, table - in which table to put data from buffer. - * num_buckets - level of parallelism. - * min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer. - */ - - factory.registerStorage("Buffer", [](const StorageFactory::Arguments & args) { - ASTs & engine_args = args.engine_args; - - if (engine_args.size() != 9) - throw Exception("Storage Buffer requires 9 parameters: " - " destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.", - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - - engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context); - engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context); - - String destination_database = static_cast(*engine_args[0]).value.safeGet(); - String destination_table = static_cast(*engine_args[1]).value.safeGet(); - - UInt64 num_buckets = applyVisitor(FieldVisitorConvertToNumber(), typeid_cast(*engine_args[2]).value); - - Int64 min_time = applyVisitor(FieldVisitorConvertToNumber(), typeid_cast(*engine_args[3]).value); - Int64 max_time = applyVisitor(FieldVisitorConvertToNumber(), typeid_cast(*engine_args[4]).value); - UInt64 min_rows = applyVisitor(FieldVisitorConvertToNumber(), typeid_cast(*engine_args[5]).value); - UInt64 max_rows = applyVisitor(FieldVisitorConvertToNumber(), typeid_cast(*engine_args[6]).value); - UInt64 min_bytes = applyVisitor(FieldVisitorConvertToNumber(), typeid_cast(*engine_args[7]).value); - UInt64 max_bytes = applyVisitor(FieldVisitorConvertToNumber(), typeid_cast(*engine_args[8]).value); - - return StorageBuffer::create( - args.table_name, - args.columns, - args.context, - num_buckets, - StorageBuffer::Thresholds{min_time, min_rows, min_bytes}, - StorageBuffer::Thresholds{max_time, max_rows, max_bytes}, - destination_database, - destination_table, - static_cast(args.local_context.getSettingsRef().insert_allow_materialized_columns)); - }); -} - -} // namespace DB diff --git a/dbms/src/Storages/StorageBuffer.h b/dbms/src/Storages/StorageBuffer.h deleted file mode 100644 index 51688df0ad1..00000000000 --- a/dbms/src/Storages/StorageBuffer.h +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include - -#include -#include -#include - - -namespace Poco -{ -class Logger; -} - - -namespace DB -{ -class Context; - - -/** During insertion, buffers the data in the RAM until certain thresholds are exceeded. - * When thresholds are exceeded, flushes the data to another table. - * When reading, it reads both from its buffers and from the subordinate table. - * - * The buffer is a set of num_shards blocks. - * When writing, select the block number by the remainder of the `ThreadNumber` division by `num_shards` (or one of the others), - * and add rows to the corresponding block. - * When using a block, it is locked by some mutex. If during write the corresponding block is already occupied - * - try to lock the next block in a round-robin fashion, and so no more than `num_shards` times (then wait for lock). - * Thresholds are checked on insertion, and, periodically, in the background thread (to implement time thresholds). - * Thresholds act independently for each shard. Each shard can be flushed independently of the others. - * If a block is inserted into the table, which itself exceeds the max-thresholds, it is written directly to the subordinate table without buffering. - * Thresholds can be exceeded. For example, if max_rows = 1 000 000, the buffer already had 500 000 rows, - * and a part of 800 000 rows is added, then there will be 1 300 000 rows in the buffer, and then such a block will be written to the subordinate table. - * - * When you destroy a Buffer table, all remaining data is flushed to the subordinate table. - * The data in the buffer is not replicated, not logged to disk, not indexed. With a rough restart of the server, the data is lost. - */ -class StorageBuffer : public ext::SharedPtrHelper - , public IStorage -{ - friend class BufferBlockInputStream; - friend class BufferBlockOutputStream; - -public: - /// Thresholds. - struct Thresholds - { - time_t time; /// The number of seconds from the insertion of the first row into the block. - size_t rows; /// The number of rows in the block. - size_t bytes; /// The number of (uncompressed) bytes in the block. - }; - - std::string getName() const override { return "Buffer"; } - std::string getTableName() const override { return name; } - - BlockInputStreams read( - const Names & column_names, - const SelectQueryInfo & query_info, - const Context & context, - QueryProcessingStage::Enum & processed_stage, - size_t max_block_size, - unsigned num_streams) override; - - BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override; - - void startup() override; - /// Flush all buffers into the subordinate table and stop background thread. - void shutdown() override; - bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override; - - void rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & new_table_name) override { name = new_table_name; } - - bool supportsSampling() const override { return true; } - bool supportsPrewhere() const override { return false; } - bool supportsFinal() const override { return true; } - bool supportsIndexForIn() const override { return true; } - - bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) const override; - - /// The structure of the subordinate table is not checked and does not change. - void alter(const TableLockHolder &, const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) override; - -private: - String name; - - Context & context; - - struct Buffer - { - time_t first_write_time = 0; - Block data; - std::mutex mutex; - }; - - /// There are `num_shards` of independent buffers. - const size_t num_shards; - std::vector buffers; - - const Thresholds min_thresholds; - const Thresholds max_thresholds; - - const String destination_database; - const String destination_table; - bool no_destination; /// If set, do not write data from the buffer, but simply empty the buffer. - bool allow_materialized; - - Poco::Logger * log; - - Poco::Event shutdown_event; - /// Resets data by timeout. - std::thread flush_thread; - - void flushAllBuffers(bool check_thresholds = true); - /// Reset the buffer. If check_thresholds is set - resets only if thresholds are exceeded. - void flushBuffer(Buffer & buffer, bool check_thresholds); - bool checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const; - bool checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const; - - /// `table` argument is passed, as it is sometimes evaluated beforehand. It must match the `destination`. - void writeBlockToDestination(const Block & block, StoragePtr table); - - void flushThread(); - -protected: - /** num_shards - the level of internal parallelism (the number of independent buffers) - * The buffer is flushed if all minimum thresholds or at least one of the maximum thresholds are exceeded. - */ - StorageBuffer(const std::string & name_, const ColumnsDescription & columns_, Context & context_, size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, const String & destination_database_, const String & destination_table_, bool allow_materialized_); -}; - -} // namespace DB diff --git a/dbms/src/Storages/registerStorages.cpp b/dbms/src/Storages/registerStorages.cpp index 0baf65d966f..a93d7be3a29 100644 --- a/dbms/src/Storages/registerStorages.cpp +++ b/dbms/src/Storages/registerStorages.cpp @@ -25,7 +25,6 @@ void registerStorageDeltaMerge(StorageFactory & factory); void registerStorageStripeLog(StorageFactory & factory); void registerStorageNull(StorageFactory & factory); void registerStorageMerge(StorageFactory & factory); -void registerStorageBuffer(StorageFactory & factory); void registerStorageMemory(StorageFactory & factory); void registerStorageSet(StorageFactory & factory); void registerStorageView(StorageFactory & factory); @@ -42,7 +41,6 @@ void registerStorages() registerStorageStripeLog(factory); registerStorageNull(factory); registerStorageMerge(factory); - registerStorageBuffer(factory); registerStorageMemory(factory); registerStorageSet(factory); registerStorageView(factory);