Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove some useless settings and related codes #6677

Merged
merged 8 commits into from
Jan 29, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ BlockInputStreamPtr FormatFactory::getInput(
std::forward<decltype(row_stream)>(row_stream),
sample,
max_block_size,
settings.input_format_allow_errors_num,
settings.input_format_allow_errors_ratio);
0,
0);
};

if (name == "Native")
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
if (!dependencies.empty())
{
views_context = std::make_unique<Context>(context);
// Do not deduplicate insertions into MV if the main insertion is Ok
views_context->getSettingsRef().insert_deduplicate = false;
}

for (const auto & database_table : dependencies)
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ Aggregator::Params buildParams(
before_agg_header,
keys,
aggregate_descriptions,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by,
Expand Down
45 changes: 2 additions & 43 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,37 +168,6 @@ bool hasRegionToRead(const DAGContext & dag_context, const TiDBTableScan & table
return has_region_to_read;
}

void setQuotaAndLimitsOnTableScan(Context & context, DAGPipeline & pipeline)
{
const Settings & settings = context.getSettingsRef();

IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL;
limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode);
limits.max_execution_time = settings.max_execution_time;
limits.timeout_overflow_mode = settings.timeout_overflow_mode;

/** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers,
* because the initiating server has a summary of the execution of the request on all servers.
*
* But limits on data size to read and maximum execution time are reasonable to check both on initiator and
* additionally on each remote server, because these limits are checked per block of data processed,
* and remote servers may process way more blocks of data than are received by initiator.
*/
limits.min_execution_speed = settings.min_execution_speed;
limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed;

QuotaForIntervals & quota = context.getQuota();

pipeline.transform([&](auto & stream) {
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(stream.get()))
{
p_stream->setLimits(limits);
p_stream->setQuota(quota);
}
});
}

// add timezone cast for timestamp type, this is used to support session level timezone
// <has_cast, extra_cast, project_for_remote_read>
std::tuple<bool, ExpressionActionsPtr, ExpressionActionsPtr> addExtraCastsAfterTs(
Expand Down Expand Up @@ -361,8 +330,6 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
for (const auto & lock : drop_locks)
dagContext().addTableLock(lock);

/// Set the limits and quota for reading data, the speed and time of the query.
setQuotaAndLimitsOnTableScan(context, pipeline);
FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired);
FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once);

Expand Down Expand Up @@ -407,7 +374,7 @@ void DAGStorageInterpreter::prepare()
assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end());
storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage;

std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan(context.getSettingsRef().max_columns_to_read);
std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan();

analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
}
Expand Down Expand Up @@ -958,16 +925,8 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
return storages_with_lock;
}

std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageInterpreter::getColumnsForTableScan(Int64 max_columns_to_read)
std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageInterpreter::getColumnsForTableScan()
{
// todo handle alias column
if (max_columns_to_read && table_scan.getColumnSize() > max_columns_to_read)
{
throw TiFlashException(
fmt::format("Limit for number of columns to read exceeded. Requested: {}, maximum: {}", table_scan.getColumnSize(), max_columns_to_read),
Errors::BroadcastJoin::TooManyColumns);
}

Names required_columns_tmp;
NamesAndTypes source_columns_tmp;
std::vector<ExtraCastAfterTSMode> need_cast_column;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class DAGStorageInterpreter

std::unordered_map<TableID, StorageWithStructureLock> getAndLockStorages(Int64 query_schema_version);

std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> getColumnsForTableScan(Int64 max_columns_to_read);
std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> getColumnsForTableScan();

std::vector<RemoteRequest> buildRemoteRequests(const DM::ScanContextPtr & scan_context);

Expand Down
10 changes: 1 addition & 9 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,7 @@ void orderStreams(
extra_info = enableFineGrainedShuffleExtraInfo;

pipeline.transform([&](auto & stream) {
auto sorting_stream = std::make_shared<PartialSortingBlockInputStream>(stream, order_descr, log->identifier(), limit);

/// Limits on sorting
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL;
limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
sorting_stream->setLimits(limits);

stream = sorting_stream;
stream = std::make_shared<PartialSortingBlockInputStream>(stream, order_descr, log->identifier(), limit);
stream->setExtraInfo(extra_info);
});

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ BlockIO executeDAG(IQuerySource & dag, Context & context, bool internal)
/// Hold element of process list till end of query execution.
res.process_list_entry = process_list_entry;

prepareForInputStream(context, QueryProcessingStage::Complete, res.in);
prepareForInputStream(context, res.in);
if (likely(!internal))
logQueryPipeline(logger, res.in);

Expand Down
29 changes: 0 additions & 29 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -836,10 +836,6 @@ bool Aggregator::executeOnBlock(
if (result.isConvertibleToTwoLevel() && worth_convert_to_two_level)
result.convertToTwoLevel();

/// Checking the constraints.
if (!checkLimits(result_size))
return false;

/** Flush data to disk if too much RAM is consumed.
* Data can only be flushed to disk if a two-level aggregation structure is used.
*/
Expand Down Expand Up @@ -972,25 +968,6 @@ void Aggregator::spillImpl(
}


bool Aggregator::checkLimits(size_t result_size) const
{
if (params.max_rows_to_group_by && result_size > params.max_rows_to_group_by)
{
switch (params.group_by_overflow_mode)
{
case OverflowMode::THROW:
throw Exception("Limit for rows to GROUP BY exceeded: has " + toString(result_size)
+ " rows, maximum: " + toString(params.max_rows_to_group_by),
ErrorCodes::TOO_MANY_ROWS);

case OverflowMode::BREAK:
return false;
}
}
return true;
}


void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVariants & result)
{
if (is_cancelled())
Expand Down Expand Up @@ -1899,9 +1876,6 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
/// We merge all aggregation results to the first.
for (size_t result_num = 1, size = non_empty_data.size(); result_num < size; ++result_num)
{
if (!checkLimits(res->size()))
break;

AggregatedDataVariants & current = *non_empty_data[result_num];

mergeDataImpl<Method>(
Expand Down Expand Up @@ -2265,9 +2239,6 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
return;
}

if (!checkLimits(result.size()))
break;

if (result.type == AggregatedDataVariants::Type::without_key)
mergeWithoutKeyStreamsImpl(block, result);

Expand Down
22 changes: 3 additions & 19 deletions dbms/src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ struct AggregatedDataVariants : private boost::noncopyable

void * aggregation_method_impl{};

/** Specialization for the case when there are no keys, and for keys not fitted into max_rows_to_group_by.
/** Specialization for the case when there are no keys.
*/
AggregatedDataWithoutKey without_key = nullptr;

Expand Down Expand Up @@ -890,10 +890,6 @@ class Aggregator
size_t aggregates_size;
Int64 local_delta_memory = 0;

/// The settings of approximate calculation of GROUP BY.
const size_t max_rows_to_group_by;
const OverflowMode group_by_overflow_mode;

/// Two-level aggregation settings (used for a large number of keys).
/** With how many keys or the size of the aggregation state in bytes,
* two-level aggregation begins to be used. Enough to reach of at least one of the thresholds.
Expand All @@ -918,8 +914,6 @@ class Aggregator
const Block & src_header_,
const ColumnNumbers & keys_,
const AggregateDescriptions & aggregates_,
size_t max_rows_to_group_by_,
OverflowMode group_by_overflow_mode_,
size_t group_by_two_level_threshold_,
size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
Expand All @@ -932,8 +926,6 @@ class Aggregator
, aggregates(aggregates_)
, keys_size(keys.size())
, aggregates_size(aggregates.size())
, max_rows_to_group_by(max_rows_to_group_by_)
, group_by_overflow_mode(group_by_overflow_mode_)
, group_by_two_level_threshold(group_by_two_level_threshold_)
, group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_)
, max_bytes_before_external_group_by(max_bytes_before_external_group_by_)
Expand All @@ -951,7 +943,7 @@ class Aggregator
const SpillConfig & spill_config,
UInt64 max_block_size_,
const TiDB::TiDBCollators & collators_ = TiDB::dummy_collators)
: Params(Block(), keys_, aggregates_, 0, OverflowMode::THROW, 0, 0, 0, false, spill_config, max_block_size_, collators_)
: Params(Block(), keys_, aggregates_, 0, 0, 0, false, spill_config, max_block_size_, collators_)
{
intermediate_header = intermediate_header_;
}
Expand Down Expand Up @@ -983,7 +975,7 @@ class Aggregator
using AggregateColumnsConstData = std::vector<const ColumnAggregateFunction::Container *>;
using AggregateFunctionsPlainPtrs = std::vector<IAggregateFunction *>;

/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
/// Process one block. Return false if the processing should be aborted.
bool executeOnBlock(
const Block & block,
AggregatedDataVariants & result,
Expand Down Expand Up @@ -1292,14 +1284,6 @@ class Aggregator

void destroyWithoutKey(
AggregatedDataVariants & result) const;


/** Checks constraints on the maximum number of keys for aggregation.
* If it is exceeded, then, depending on the group_by_overflow_mode, either
* - throws an exception;
* - returns false, which means that execution must be aborted;
*/
bool checkLimits(size_t result_size) const;
};

/** Get the aggregation variant by its type. */
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1857,8 +1857,6 @@ size_t Context::getMaxStreams() const
max_streams = 1;
}
}
if (max_streams > 1)
max_streams *= settings.max_streams_to_max_threads_ratio;
if (max_streams == 0)
max_streams = 1;
if (unlikely(max_streams != 1 && is_cop_request))
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,6 @@ static std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
*/
Context subquery_context = context;
Settings subquery_settings = context.getSettings();
subquery_settings.max_result_rows = 0;
subquery_settings.max_result_bytes = 0;
/// The calculation of `extremes` does not make sense and is not necessary (if you do it, then the `extremes` of the subquery can be taken instead of the whole query).
subquery_settings.extremes = false;
subquery_context.setSettings(subquery_settings);
Expand Down Expand Up @@ -1195,7 +1193,6 @@ void ExpressionAnalyzer::executeScalarSubqueriesImpl(ASTPtr & ast)
{
Context subquery_context = context;
Settings subquery_settings = context.getSettings();
subquery_settings.max_result_rows = 1;
subquery_settings.extremes = false;
subquery_context.setSettings(subquery_settings);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
return InterpreterInsertQuery(
insert,
create.is_temporary ? context.getSessionContext() : context,
context.getSettingsRef().insert_allow_materialized_columns)
0)
.execute();
}

Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Interpreters/InterpreterFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context &
else if (typeid_cast<ASTInsertQuery *>(query.get()))
{
/// readonly is checked inside InterpreterInsertQuery
bool allow_materialized = static_cast<bool>(context.getSettingsRef().insert_allow_materialized_columns);
return std::make_unique<InterpreterInsertQuery>(query, context, allow_materialized);
return std::make_unique<InterpreterInsertQuery>(query, context, false);
}
else if (typeid_cast<ASTCreateQuery *>(query.get()))
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/InterpreterInsertQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ BlockIO InterpreterInsertQuery::execute()

/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
if (!(context.getSettingsRef().insert_distributed_sync && table->getName() == "Distributed"))
if (table->getName() != "Distributed")
{
out = std::make_shared<SquashingBlockOutputStream>(
out,
Expand Down
Loading