Skip to content

Commit

Permalink
Remove some useless settings and related codes (#6677)
Browse files Browse the repository at this point in the history
ref #5900
  • Loading branch information
SeaRise authored Jan 29, 2023
1 parent b88b2aa commit cf36c1c
Show file tree
Hide file tree
Showing 23 changed files with 36 additions and 1,113 deletions.
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ BlockInputStreamPtr FormatFactory::getInput(
std::forward<decltype(row_stream)>(row_stream),
sample,
max_block_size,
settings.input_format_allow_errors_num,
settings.input_format_allow_errors_ratio);
0,
0);
};

if (name == "Native")
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
if (!dependencies.empty())
{
views_context = std::make_unique<Context>(context);
// Do not deduplicate insertions into MV if the main insertion is Ok
views_context->getSettingsRef().insert_deduplicate = false;
}

for (const auto & database_table : dependencies)
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ Aggregator::Params buildParams(
before_agg_header,
keys,
aggregate_descriptions,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by,
Expand Down
45 changes: 2 additions & 43 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,37 +168,6 @@ bool hasRegionToRead(const DAGContext & dag_context, const TiDBTableScan & table
return has_region_to_read;
}

void setQuotaAndLimitsOnTableScan(Context & context, DAGPipeline & pipeline)
{
const Settings & settings = context.getSettingsRef();

IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL;
limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode);
limits.max_execution_time = settings.max_execution_time;
limits.timeout_overflow_mode = settings.timeout_overflow_mode;

/** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers,
* because the initiating server has a summary of the execution of the request on all servers.
*
* But limits on data size to read and maximum execution time are reasonable to check both on initiator and
* additionally on each remote server, because these limits are checked per block of data processed,
* and remote servers may process way more blocks of data than are received by initiator.
*/
limits.min_execution_speed = settings.min_execution_speed;
limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed;

QuotaForIntervals & quota = context.getQuota();

pipeline.transform([&](auto & stream) {
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(stream.get()))
{
p_stream->setLimits(limits);
p_stream->setQuota(quota);
}
});
}

// add timezone cast for timestamp type, this is used to support session level timezone
// <has_cast, extra_cast, project_for_remote_read>
std::tuple<bool, ExpressionActionsPtr, ExpressionActionsPtr> addExtraCastsAfterTs(
Expand Down Expand Up @@ -361,8 +330,6 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
for (const auto & lock : drop_locks)
dagContext().addTableLock(lock);

/// Set the limits and quota for reading data, the speed and time of the query.
setQuotaAndLimitsOnTableScan(context, pipeline);
FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired);
FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once);

Expand Down Expand Up @@ -407,7 +374,7 @@ void DAGStorageInterpreter::prepare()
assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end());
storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage;

std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan(context.getSettingsRef().max_columns_to_read);
std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan();

analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
}
Expand Down Expand Up @@ -958,16 +925,8 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
return storages_with_lock;
}

std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageInterpreter::getColumnsForTableScan(Int64 max_columns_to_read)
std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageInterpreter::getColumnsForTableScan()
{
// todo handle alias column
if (max_columns_to_read && table_scan.getColumnSize() > max_columns_to_read)
{
throw TiFlashException(
fmt::format("Limit for number of columns to read exceeded. Requested: {}, maximum: {}", table_scan.getColumnSize(), max_columns_to_read),
Errors::BroadcastJoin::TooManyColumns);
}

Names required_columns_tmp;
NamesAndTypes source_columns_tmp;
std::vector<ExtraCastAfterTSMode> need_cast_column;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class DAGStorageInterpreter

std::unordered_map<TableID, StorageWithStructureLock> getAndLockStorages(Int64 query_schema_version);

std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> getColumnsForTableScan(Int64 max_columns_to_read);
std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> getColumnsForTableScan();

std::vector<RemoteRequest> buildRemoteRequests(const DM::ScanContextPtr & scan_context);

Expand Down
10 changes: 1 addition & 9 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,7 @@ void orderStreams(
extra_info = enableFineGrainedShuffleExtraInfo;

pipeline.transform([&](auto & stream) {
auto sorting_stream = std::make_shared<PartialSortingBlockInputStream>(stream, order_descr, log->identifier(), limit);

/// Limits on sorting
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL;
limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
sorting_stream->setLimits(limits);

stream = sorting_stream;
stream = std::make_shared<PartialSortingBlockInputStream>(stream, order_descr, log->identifier(), limit);
stream->setExtraInfo(extra_info);
});

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ BlockIO executeDAG(IQuerySource & dag, Context & context, bool internal)
/// Hold element of process list till end of query execution.
res.process_list_entry = process_list_entry;

prepareForInputStream(context, QueryProcessingStage::Complete, res.in);
prepareForInputStream(context, res.in);
if (likely(!internal))
logQueryPipeline(logger, res.in);

Expand Down
29 changes: 0 additions & 29 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -836,10 +836,6 @@ bool Aggregator::executeOnBlock(
if (result.isConvertibleToTwoLevel() && worth_convert_to_two_level)
result.convertToTwoLevel();

/// Checking the constraints.
if (!checkLimits(result_size))
return false;

/** Flush data to disk if too much RAM is consumed.
* Data can only be flushed to disk if a two-level aggregation structure is used.
*/
Expand Down Expand Up @@ -972,25 +968,6 @@ void Aggregator::spillImpl(
}


bool Aggregator::checkLimits(size_t result_size) const
{
if (params.max_rows_to_group_by && result_size > params.max_rows_to_group_by)
{
switch (params.group_by_overflow_mode)
{
case OverflowMode::THROW:
throw Exception("Limit for rows to GROUP BY exceeded: has " + toString(result_size)
+ " rows, maximum: " + toString(params.max_rows_to_group_by),
ErrorCodes::TOO_MANY_ROWS);

case OverflowMode::BREAK:
return false;
}
}
return true;
}


void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVariants & result)
{
if (is_cancelled())
Expand Down Expand Up @@ -1899,9 +1876,6 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
/// We merge all aggregation results to the first.
for (size_t result_num = 1, size = non_empty_data.size(); result_num < size; ++result_num)
{
if (!checkLimits(res->size()))
break;

AggregatedDataVariants & current = *non_empty_data[result_num];

mergeDataImpl<Method>(
Expand Down Expand Up @@ -2265,9 +2239,6 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
return;
}

if (!checkLimits(result.size()))
break;

if (result.type == AggregatedDataVariants::Type::without_key)
mergeWithoutKeyStreamsImpl(block, result);

Expand Down
22 changes: 3 additions & 19 deletions dbms/src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ struct AggregatedDataVariants : private boost::noncopyable

void * aggregation_method_impl{};

/** Specialization for the case when there are no keys, and for keys not fitted into max_rows_to_group_by.
/** Specialization for the case when there are no keys.
*/
AggregatedDataWithoutKey without_key = nullptr;

Expand Down Expand Up @@ -890,10 +890,6 @@ class Aggregator
size_t aggregates_size;
Int64 local_delta_memory = 0;

/// The settings of approximate calculation of GROUP BY.
const size_t max_rows_to_group_by;
const OverflowMode group_by_overflow_mode;

/// Two-level aggregation settings (used for a large number of keys).
/** With how many keys or the size of the aggregation state in bytes,
* two-level aggregation begins to be used. Enough to reach of at least one of the thresholds.
Expand All @@ -918,8 +914,6 @@ class Aggregator
const Block & src_header_,
const ColumnNumbers & keys_,
const AggregateDescriptions & aggregates_,
size_t max_rows_to_group_by_,
OverflowMode group_by_overflow_mode_,
size_t group_by_two_level_threshold_,
size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
Expand All @@ -932,8 +926,6 @@ class Aggregator
, aggregates(aggregates_)
, keys_size(keys.size())
, aggregates_size(aggregates.size())
, max_rows_to_group_by(max_rows_to_group_by_)
, group_by_overflow_mode(group_by_overflow_mode_)
, group_by_two_level_threshold(group_by_two_level_threshold_)
, group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_)
, max_bytes_before_external_group_by(max_bytes_before_external_group_by_)
Expand All @@ -951,7 +943,7 @@ class Aggregator
const SpillConfig & spill_config,
UInt64 max_block_size_,
const TiDB::TiDBCollators & collators_ = TiDB::dummy_collators)
: Params(Block(), keys_, aggregates_, 0, OverflowMode::THROW, 0, 0, 0, false, spill_config, max_block_size_, collators_)
: Params(Block(), keys_, aggregates_, 0, 0, 0, false, spill_config, max_block_size_, collators_)
{
intermediate_header = intermediate_header_;
}
Expand Down Expand Up @@ -983,7 +975,7 @@ class Aggregator
using AggregateColumnsConstData = std::vector<const ColumnAggregateFunction::Container *>;
using AggregateFunctionsPlainPtrs = std::vector<IAggregateFunction *>;

/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
/// Process one block. Return false if the processing should be aborted.
bool executeOnBlock(
const Block & block,
AggregatedDataVariants & result,
Expand Down Expand Up @@ -1292,14 +1284,6 @@ class Aggregator

void destroyWithoutKey(
AggregatedDataVariants & result) const;


/** Checks constraints on the maximum number of keys for aggregation.
* If it is exceeded, then, depending on the group_by_overflow_mode, either
* - throws an exception;
* - returns false, which means that execution must be aborted;
*/
bool checkLimits(size_t result_size) const;
};

/** Get the aggregation variant by its type. */
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1857,8 +1857,6 @@ size_t Context::getMaxStreams() const
max_streams = 1;
}
}
if (max_streams > 1)
max_streams *= settings.max_streams_to_max_threads_ratio;
if (max_streams == 0)
max_streams = 1;
if (unlikely(max_streams != 1 && is_cop_request))
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,6 @@ static std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
*/
Context subquery_context = context;
Settings subquery_settings = context.getSettings();
subquery_settings.max_result_rows = 0;
subquery_settings.max_result_bytes = 0;
/// The calculation of `extremes` does not make sense and is not necessary (if you do it, then the `extremes` of the subquery can be taken instead of the whole query).
subquery_settings.extremes = false;
subquery_context.setSettings(subquery_settings);
Expand Down Expand Up @@ -1195,7 +1193,6 @@ void ExpressionAnalyzer::executeScalarSubqueriesImpl(ASTPtr & ast)
{
Context subquery_context = context;
Settings subquery_settings = context.getSettings();
subquery_settings.max_result_rows = 1;
subquery_settings.extremes = false;
subquery_context.setSettings(subquery_settings);

Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ static ColumnsAndDefaults parseColumns(const ASTExpressionList & column_list_ast
{
const auto & final_column_name = col_decl.name;
const auto tmp_column_name = final_column_name + "_tmp";
const auto data_type_ptr = columns.back().type.get();
const auto * const data_type_ptr = columns.back().type.get();

default_expr_list->children.emplace_back(setAlias(makeASTFunction("CAST", std::make_shared<ASTIdentifier>(tmp_column_name), std::make_shared<ASTLiteral>(Field(data_type_ptr->getName()))),
final_column_name));
Expand All @@ -236,8 +236,8 @@ static ColumnsAndDefaults parseColumns(const ASTExpressionList & column_list_ast

for (auto & column : defaulted_columns)
{
const auto name_and_type_ptr = column.first;
const auto col_decl_ptr = column.second;
auto * const name_and_type_ptr = column.first;
auto * const col_decl_ptr = column.second;

const auto & column_name = col_decl_ptr->name;
const auto has_explicit_type = nullptr != col_decl_ptr->type;
Expand Down Expand Up @@ -307,8 +307,8 @@ ASTPtr InterpreterCreateQuery::formatColumns(const NamesAndTypesList & columns)
column_declaration->name = column.name;

StringPtr type_name = std::make_shared<String>(column.type->getName());
auto pos = type_name->data();
const auto end = pos + type_name->size();
auto * pos = type_name->data();
auto * const end = pos + type_name->size();

ParserIdentifierWithOptionalParameters storage_p;
column_declaration->type = parseQuery(storage_p, pos, end, "data type", 0);
Expand All @@ -331,8 +331,8 @@ ASTPtr InterpreterCreateQuery::formatColumns(const ColumnsDescription & columns)
column_declaration->name = column.name;

StringPtr type_name = std::make_shared<String>(column.type->getName());
auto pos = type_name->data();
const auto end = pos + type_name->size();
auto * pos = type_name->data();
auto * const end = pos + type_name->size();

ParserIdentifierWithOptionalParameters storage_p;
column_declaration->type = parseQuery(storage_p, pos, end, "data type", 0);
Expand Down Expand Up @@ -472,7 +472,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
{
// Table SQL definition is available even if the table is detached
auto query = context.getCreateTableQuery(database_name, table_name);
auto & as_create = typeid_cast<const ASTCreateQuery &>(*query);
const auto & as_create = typeid_cast<const ASTCreateQuery &>(*query);
create = as_create; // Copy the saved create query, but use ATTACH instead of CREATE
create.attach = true;
}
Expand Down Expand Up @@ -571,7 +571,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
return InterpreterInsertQuery(
insert,
create.is_temporary ? context.getSessionContext() : context,
context.getSettingsRef().insert_allow_materialized_columns)
false)
.execute();
}

Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Interpreters/InterpreterFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context &
else if (typeid_cast<ASTInsertQuery *>(query.get()))
{
/// readonly is checked inside InterpreterInsertQuery
bool allow_materialized = static_cast<bool>(context.getSettingsRef().insert_allow_materialized_columns);
return std::make_unique<InterpreterInsertQuery>(query, context, allow_materialized);
return std::make_unique<InterpreterInsertQuery>(query, context, false);
}
else if (typeid_cast<ASTCreateQuery *>(query.get()))
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/InterpreterInsertQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ BlockIO InterpreterInsertQuery::execute()

/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
if (!(context.getSettingsRef().insert_distributed_sync && table->getName() == "Distributed"))
if (table->getName() != "Distributed")
{
out = std::make_shared<SquashingBlockOutputStream>(
out,
Expand Down
Loading

0 comments on commit cf36c1c

Please sign in to comment.