Skip to content

Commit

Permalink
Improve LogWithPrefix of mpp (#4209)
Browse files Browse the repository at this point in the history
close #4249
  • Loading branch information
SeaRise authored Mar 21, 2022
1 parent 91653bc commit 2bd5626
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 53 deletions.
21 changes: 13 additions & 8 deletions dbms/src/Common/Exception.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,27 @@ void tryLogCurrentException(const char * log_name, const std::string & start_of_
tryLogCurrentException(&Poco::Logger::get(log_name), start_of_message);
}

#define TRY_LOG_CURRENT_EXCEPTION(logger, start_of_message) \
try \
{ \
LOG_FMT_ERROR((logger), "{}{}{}", (start_of_message), ((start_of_message).empty() ? "" : ": "), getCurrentExceptionMessage(true)); \
} \
catch (...) \
{ \
}

void tryLogCurrentException(const LogWithPrefixPtr & logger, const std::string & start_of_message)
{
tryLogCurrentException(logger->getLog(), start_of_message);
TRY_LOG_CURRENT_EXCEPTION(logger, start_of_message);
}

void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_message)
{
try
{
LOG_FMT_ERROR(logger, "{}{}{}", start_of_message, (start_of_message.empty() ? "" : ": "), getCurrentExceptionMessage(true));
}
catch (...)
{
}
TRY_LOG_CURRENT_EXCEPTION(logger, start_of_message);
}

#undef TRY_LOG_CURRENT_EXCEPTION

std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded_stacktrace)
{
std::stringstream stream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ ParallelAggregatingBlockInputStream::ParallelAggregatingBlockInputStream(
, keys_size(params.keys_size)
, aggregates_size(params.aggregates_size)
, handler(*this)
, processor(inputs, additional_input_at_end, max_threads, handler)
, processor(inputs, additional_input_at_end, max_threads, handler, log)
{
children = inputs;
if (additional_input_at_end)
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/DataStreams/ParallelInputsProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Common/CurrentMetrics.h>
#include <Common/LogWithPrefix.h>
#include <Common/MemoryTracker.h>
#include <Common/ThreadFactory.h>
#include <Common/ThreadManager.h>
Expand Down Expand Up @@ -95,11 +96,12 @@ class ParallelInputsProcessor
* - where you must first make JOIN in parallel, while noting which keys are not found,
* and only after the completion of this work, create blocks of keys that are not found.
*/
ParallelInputsProcessor(const BlockInputStreams & inputs_, const BlockInputStreamPtr & additional_input_at_end_, size_t max_threads_, Handler & handler_)
ParallelInputsProcessor(const BlockInputStreams & inputs_, const BlockInputStreamPtr & additional_input_at_end_, size_t max_threads_, Handler & handler_, const LogWithPrefixPtr & log_)
: inputs(inputs_)
, additional_input_at_end(additional_input_at_end_)
, max_threads(std::min(inputs_.size(), max_threads_))
, handler(handler_)
, log(log_)
{
for (size_t i = 0; i < inputs_.size(); ++i)
unprepared_inputs.emplace(inputs_[i], i);
Expand All @@ -113,7 +115,7 @@ class ParallelInputsProcessor
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}

Expand Down Expand Up @@ -366,7 +368,7 @@ class ParallelInputsProcessor
/// Wait for the completion of all threads.
std::atomic<bool> joined_threads{false};

Poco::Logger * log = &Poco::Logger::get("ParallelInputsProcessor");
const LogWithPrefixPtr log;
};


Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/SharedQueryBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}

Expand Down
10 changes: 5 additions & 5 deletions dbms/src/DataStreams/UnionBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream
const LogWithPrefixPtr & log_,
ExceptionCallback exception_callback_ = ExceptionCallback())
: output_queue(std::min(inputs.size(), max_threads))
, log(getMPPTaskLog(log_, NAME))
, handler(*this)
, processor(inputs, additional_input_at_end, max_threads, handler)
, processor(inputs, additional_input_at_end, max_threads, handler, log)
, exception_callback(exception_callback_)
, log(getMPPTaskLog(log_, NAME))
{
children = inputs;
if (additional_input_at_end)
Expand Down Expand Up @@ -131,7 +131,7 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}

Expand Down Expand Up @@ -337,6 +337,8 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream
Self & parent;
};

LogWithPrefixPtr log;

Handler handler;
ParallelInputsProcessor<Handler, mode> processor;

Expand All @@ -346,8 +348,6 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream

bool started = false;
bool all_read = false;

LogWithPrefixPtr log;
};

} // namespace DB
4 changes: 3 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ void DAGContext::initExchangeReceiverIfMPP(Context & context, size_t max_streams
{
assert(executor.has_executor_id());
const auto & executor_id = executor.executor_id();
// In order to distinguish different exchange receivers.
auto executor_id_prefix_log = getMPPTaskLog(log, executor_id);
auto exchange_receiver = std::make_shared<ExchangeReceiver>(
std::make_shared<GRPCReceiverContext>(
executor.exchange_receiver(),
Expand All @@ -199,7 +201,7 @@ void DAGContext::initExchangeReceiverIfMPP(Context & context, size_t max_streams
context.getSettingsRef().enable_async_grpc_client),
executor.exchange_receiver().encoded_task_meta_size(),
max_streams,
log);
executor_id_prefix_log);
mpp_exchange_receiver_map[executor_id] = exchange_receiver;
new_thread_count_of_exchange_receiver += exchange_receiver->computeNewThreadCount();
}
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -960,9 +960,11 @@ void DAGQueryBlockInterpreter::handleExchangeReceiver(DAGPipeline & pipeline)
throw Exception("Can not find exchange receiver for " + query_block.source_name, ErrorCodes::LOGICAL_ERROR);
// todo choose a more reasonable stream number
auto & exchange_receiver_io_input_streams = dagContext().getInBoundIOInputStreamsMap()[query_block.source_name];
// In order to distinguish different exchange receivers.
auto executor_id_prefix_log = getMPPTaskLog(taskLogger(), query_block.source_name);
for (size_t i = 0; i < max_streams; ++i)
{
BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(it->second, taskLogger());
BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(it->second, executor_id_prefix_log);
exchange_receiver_io_input_streams.push_back(stream);
stream = std::make_shared<SquashingBlockInputStream>(stream, 8192, 0, taskLogger());
pipeline.streams.push_back(stream);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
, msg_channel(max_buffer_size)
, live_connections(source_num)
, state(ExchangeReceiverState::NORMAL)
, exc_log(getMPPTaskLog(log_, "ExchangeReceiver"))
, exc_log(log_)
, collected(false)
{
rpc_context->fillSchema(schema);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ void MPPTask::writeErrToAllTunnels(const String & e)
catch (...)
{
it.second->close("Failed to write error msg to tunnel");
tryLogCurrentException(log->getLog(), "Failed to write error " + e + " to tunnel: " + it.second->id());
tryLogCurrentException(log, "Failed to write error " + e + " to tunnel: " + it.second->id());
}
}
}
Expand Down
57 changes: 28 additions & 29 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@
#include <AggregateFunctions/AggregateFunctionArray.h>
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <AggregateFunctions/AggregateFunctionState.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnsNumber.h>
#include <Common/ClickHouseRevision.h>
#include <Common/MemoryTracker.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadManager.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <Common/wrapInvocable.h>
#include <DataStreams/IProfilingBlockInputStream.h>
Expand Down Expand Up @@ -75,7 +72,7 @@ AggregatedDataVariants::~AggregatedDataVariants()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(aggregator->log, __PRETTY_FUNCTION__);
}
}
}
Expand All @@ -88,11 +85,11 @@ void AggregatedDataVariants::convertToTwoLevel()

switch (type)
{
#define M(NAME) \
case Type::NAME: \
NAME##_two_level = std::make_unique<decltype(NAME##_two_level)::element_type>(*NAME); \
NAME.reset(); \
type = Type::NAME##_two_level; \
#define M(NAME) \
case Type::NAME: \
NAME##_two_level = std::make_unique<decltype(NAME##_two_level)::element_type>(*(NAME)); \
(NAME).reset(); \
type = Type::NAME##_two_level; \
break;

APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
Expand Down Expand Up @@ -467,7 +464,7 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const
Arena * arena)
{
/// Adding values
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
Expand Down Expand Up @@ -593,9 +590,9 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, result.aggregates_pool, num_rows, key_columns, result.collators, aggregate_functions_instructions.data(), no_more_keys, overflow_row_ptr);

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
Expand Down Expand Up @@ -656,7 +653,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
writeToTemporaryFileImpl(data_variants, *data_variants.NAME, block_out);

if (false)
if (false) // NOLINT
{
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
Expand Down Expand Up @@ -1113,9 +1110,9 @@ Block Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_v
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
convertToBlockImpl(*data_variants.NAME, data_variants.NAME->data, key_columns, aggregate_columns, final_aggregate_columns, data_variants.aggregates_pool, final_);
if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
#undef M
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand All @@ -1130,9 +1127,9 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevel(AggregatedDataVariants & dat
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) return prepareBlocksAndFillTwoLevelImpl(data_variants, *data_variants.NAME, final, thread_pool);

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand Down Expand Up @@ -1536,7 +1533,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
#define M(NAME) \
else if (first->type == AggregatedDataVariants::Type::NAME) \
aggregator.mergeSingleLevelDataImpl<decltype(first->NAME)::element_type>(data);
if (false)
if (false) // NOLINT
{
}
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
Expand Down Expand Up @@ -1636,7 +1633,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
size_t thread_number = static_cast<size_t>(bucket_num) % threads;
Arena * arena = merged_data.aggregates_pools.at(thread_number).get();

if (false) {}
if (false) {} // NOLINT
#define M(NAME) \
else if (method == AggregatedDataVariants::Type::NAME) \
{ \
Expand Down Expand Up @@ -1996,7 +1993,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false);

if (false)
if (false) // NOLINT
{
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
Expand All @@ -2020,7 +2017,9 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
result.aggregates_pools.push_back(std::make_shared<Arena>());
Arena * aggregates_pool = result.aggregates_pools.back().get();

auto task = std::bind(merge_bucket, bucket, aggregates_pool);
auto task = [&merge_bucket, bucket, aggregates_pool] {
return merge_bucket(bucket, aggregates_pool);
};

if (thread_pool)
thread_pool->schedule(wrapInvocable(true, task));
Expand Down Expand Up @@ -2241,9 +2240,9 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
type \
= AggregatedDataVariants::Type::NAME##_two_level;

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
#undef M
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand All @@ -2257,9 +2256,9 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
num_buckets \
= data.NAME->data.NUM_BUCKETS;

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand All @@ -2270,9 +2269,9 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
else if (data.type == AggregatedDataVariants::Type::NAME) \
convertBlockToTwoLevelImpl(*data.NAME, data.aggregates_pool, key_columns, block, splitted_blocks);

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand Down Expand Up @@ -2329,9 +2328,9 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
else if (result.type == AggregatedDataVariants::Type::NAME) \
destroyImpl<decltype(result.NAME)::element_type>(result.NAME->data);

if (false)
if (false) // NOLINT
{
} // NOLINT
}
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
else if (result.type != AggregatedDataVariants::Type::without_key) throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -953,11 +953,11 @@ class Aggregator
AggregateDataPtr overflow_row) const;

/// For case when there are no keys (all aggregate into one row).
void executeWithoutKeyImpl(
static void executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const;
Arena * arena);

template <typename Method>
void writeToTemporaryFileImpl(
Expand Down

0 comments on commit 2bd5626

Please sign in to comment.