Skip to content

Commit

Permalink
replace LOG_XXX with LOG_FMT_XXX in dbms/src/DataStreams, `dbms…
Browse files Browse the repository at this point in the history
…/src/Debug` and `dbms/src/IO` (#4378)

ref #4346
  • Loading branch information
Lloyd-Pottiger authored Mar 24, 2022
1 parent 303950f commit 6ab3623
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 63 deletions.
20 changes: 9 additions & 11 deletions dbms/src/DataStreams/CollapsingFinalBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/CollapsingFinalBlockInputStream.h>
#include <Common/typeid_cast.h>
#include <DataStreams/CollapsingFinalBlockInputStream.h>

/// Maximum number of messages about incorrect data in the log.
#define MAX_ERROR_MESSAGES 10
Expand All @@ -31,25 +31,23 @@ CollapsingFinalBlockInputStream::~CollapsingFinalBlockInputStream()
while (!queue.empty())
{
Cursor c = queue.top();
queue.pop();
queue.pop(); // NOLINT
c.block.cancel();
}

for (size_t i = 0; i < output_blocks.size(); ++i)
delete output_blocks[i];
for (auto & output_block : output_blocks)
delete output_block;
}

void CollapsingFinalBlockInputStream::reportBadCounts()
{
/// With inconsistent data, this is an unavoidable error that can not be easily fixed by admins. Therefore Warning.
LOG_WARNING(log, "Incorrect data: number of rows with sign = 1 (" << count_positive
<< ") differs with number of rows with sign = -1 (" << count_negative
<< ") by more than one");
LOG_FMT_WARNING(log, "Incorrect data: number of rows with sign = 1 ({}) differs with number of rows with sign = -1 ({}) by more than one", count_positive, count_negative);
}

void CollapsingFinalBlockInputStream::reportBadSign(Int8 sign)
{
LOG_ERROR(log, "Invalid sign: " << static_cast<int>(sign));
LOG_FMT_ERROR(log, "Invalid sign: {}", static_cast<int>(sign));
}

void CollapsingFinalBlockInputStream::fetchNextBlock(size_t input_index)
Expand Down Expand Up @@ -113,7 +111,7 @@ Block CollapsingFinalBlockInputStream::readImpl()
{
if (!current.equal(previous))
{
commitCurrent();
commitCurrent(); // NOLINT
previous = current;
}

Expand Down Expand Up @@ -159,7 +157,7 @@ Block CollapsingFinalBlockInputStream::readImpl()
if (output_blocks.empty())
{
if (blocks_fetched != blocks_output)
LOG_ERROR(log, "Logical error: CollapsingFinalBlockInputStream has output " << blocks_output << " blocks instead of " << blocks_fetched);
LOG_FMT_ERROR(log, "Logical error: CollapsingFinalBlockInputStream has output {} blocks instead of {}", blocks_output, blocks_fetched);

return Block();
}
Expand All @@ -180,4 +178,4 @@ Block CollapsingFinalBlockInputStream::readImpl()
}
}

}
} // namespace DB
56 changes: 36 additions & 20 deletions dbms/src/DataStreams/CollapsingFinalBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
// limitations under the License.

#pragma once
#include <common/logger_useful.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Core/SortDescription.h>
#include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h>
#include <Core/SortDescription.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <common/logger_useful.h>

#include <queue>

namespace DB
Expand All @@ -33,7 +34,8 @@ class CollapsingFinalBlockInputStream : public IProfilingBlockInputStream
const BlockInputStreams & inputs,
const SortDescription & description_,
const String & sign_column_name_)
: description(description_), sign_column_name(sign_column_name_)
: description(description_)
, sign_column_name(sign_column_name_)
{
children.insert(children.end(), inputs.begin(), inputs.end());
}
Expand All @@ -52,7 +54,7 @@ class CollapsingFinalBlockInputStream : public IProfilingBlockInputStream

private:
struct MergingBlock;
using BlockPlainPtrs = std::vector<MergingBlock*>;
using BlockPlainPtrs = std::vector<MergingBlock *>;

struct MergingBlock : boost::noncopyable
{
Expand All @@ -61,7 +63,9 @@ class CollapsingFinalBlockInputStream : public IProfilingBlockInputStream
const SortDescription & desc,
const String & sign_column_name,
BlockPlainPtrs * output_blocks)
: block(block_), stream_index(stream_index_), output_blocks(output_blocks)
: block(block_)
, stream_index(stream_index_)
, output_blocks(output_blocks)
{
sort_columns.resize(desc.size());
for (size_t i = 0; i < desc.size(); ++i)
Expand Down Expand Up @@ -109,26 +113,33 @@ class CollapsingFinalBlockInputStream : public IProfilingBlockInputStream
class MergingBlockPtr
{
public:
MergingBlockPtr() : ptr() {}
MergingBlockPtr()
: ptr()
{}

explicit MergingBlockPtr(MergingBlock * ptr_) : ptr(ptr_)
explicit MergingBlockPtr(MergingBlock * ptr_)
: ptr(ptr_)
{
if (ptr)
++ptr->refcount;
}

MergingBlockPtr(const MergingBlockPtr & rhs) : ptr(rhs.ptr)
MergingBlockPtr(const MergingBlockPtr & rhs)
: ptr(rhs.ptr)
{
if (ptr)
++ptr->refcount;
}

MergingBlockPtr & operator=(const MergingBlockPtr & rhs)
{
if (this == std::addressof(rhs))
return *this;

destroy();
ptr = rhs.ptr;
if (ptr)
++ptr->refcount;
++ptr->refcount; // NOLINT
return *this;
}

Expand Down Expand Up @@ -179,10 +190,15 @@ class CollapsingFinalBlockInputStream : public IProfilingBlockInputStream
MergingBlockPtr block;
size_t pos;

Cursor() {}
explicit Cursor(const MergingBlockPtr & block_, size_t pos_ = 0) : block(block_), pos(pos_) {}
Cursor()
: pos(0)
{}
explicit Cursor(const MergingBlockPtr & block_, size_t pos_ = 0)
: block(block_)
, pos(pos_)
{}

bool operator< (const Cursor & rhs) const
bool operator<(const Cursor & rhs) const
{
for (size_t i = 0; i < block->sort_columns.size(); ++i)
{
Expand Down Expand Up @@ -247,14 +263,14 @@ class CollapsingFinalBlockInputStream : public IProfilingBlockInputStream

Queue queue;

Cursor previous; /// The current primary key.
Cursor last_positive; /// The last positive row for the current primary key.
Cursor previous; /// The current primary key.
Cursor last_positive; /// The last positive row for the current primary key.

size_t count_positive = 0; /// The number of positive rows for the current primary key.
size_t count_negative = 0; /// The number of negative rows for the current primary key.
bool last_is_positive = false; /// true if the last row for the current primary key is positive.
size_t count_positive = 0; /// The number of positive rows for the current primary key.
size_t count_negative = 0; /// The number of negative rows for the current primary key.
bool last_is_positive = false; /// true if the last row for the current primary key is positive.

size_t count_incorrect_data = 0; /// To prevent too many error messages from writing to the log.
size_t count_incorrect_data = 0; /// To prevent too many error messages from writing to the log.

/// Count the number of blocks fetched and outputted.
size_t blocks_fetched = 0;
Expand All @@ -267,4 +283,4 @@ class CollapsingFinalBlockInputStream : public IProfilingBlockInputStream
void reportBadSign(Int8 sign);
};

}
} // namespace DB
6 changes: 3 additions & 3 deletions dbms/src/DataStreams/DedupSortedBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ DedupSortedBlockInputStream::DedupSortedBlockInputStream(BlockInputStreams & inp
children.insert(children.end(), inputs_.begin(), inputs_.end());

for (size_t i = 0; i < inputs_.size(); ++i)
readers.schedule(std::bind(&DedupSortedBlockInputStream::asynFetch, this, i));
readers.schedule([this, i] { asynFetch(i); });

LOG_DEBUG(log, "Start deduping in single thread, using priority-queue");
dedup_thread = std::make_unique<std::thread>(ThreadFactory::newThread(true, "AsyncDedup", [this] { asyncDedupByQueue(); }));
Expand Down Expand Up @@ -126,7 +126,7 @@ void DedupSortedBlockInputStream::asyncDedupByQueue()
BoundQueue bounds;
DedupCursors cursors(source_blocks.size());
readFromSource(cursors, bounds);
LOG_DEBUG(log, "P Init Bounds " << bounds.str() << " Cursors " << cursors.size());
LOG_FMT_DEBUG(log, "P Init Bounds {} Cursors {}", bounds.str(), cursors.size());

CursorQueue queue;
DedupCursor max;
Expand Down Expand Up @@ -249,7 +249,7 @@ void DedupSortedBlockInputStream::asyncDedupByQueue()
}
}

LOG_DEBUG(log, "P All Done. Bounds " << bounds.str() << " Queue " << queue.str() << "Streams finished " << finished_streams << "/" << cursors.size());
LOG_FMT_DEBUG(log, "P All Done. Bounds {} Queue {} Streams finished {}/{}", bounds.str(), queue.str(), finished_streams, cursors.size());
}


Expand Down
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/DedupSortedBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ class DedupSortedBlockInputStream : public IProfilingBlockInputStream

private:
void asyncDedupByQueue();
void asynFetch(size_t pisition);
void asynFetch(size_t position);

void fetchBlock(size_t pisition);

void readFromSource(DedupCursors & output, BoundQueue & bounds);

void pushBlockBounds(const DedupingBlockPtr & block, BoundQueue & queue);
void pushBlockBounds(const DedupingBlockPtr & block, BoundQueue & bounds);

bool outputAndUpdateCursor(DedupCursors & cursors, BoundQueue & bounds, DedupCursor & cursor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::cancel(bool kill)
* (example: connection reset during distributed query execution)
* - then don't care.
*/
LOG_ERROR(log, "Exception while cancelling " << child->getName());
LOG_FMT_ERROR(log, "Exception while cancelling {}", child->getName());
}
}
}
Expand Down
16 changes: 5 additions & 11 deletions dbms/src/DataStreams/MergingSortedBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ void MergingSortedBlockInputStream::initQueue()
template <typename TSortCursor>
void MergingSortedBlockInputStream::initQueue(std::priority_queue<TSortCursor> & queue)
{
for (size_t i = 0; i < cursors.size(); ++i)
if (!cursors[i].empty())
queue.push(TSortCursor(&cursors[i]));
for (auto & cursor : cursors)
if (!cursor.empty())
queue.push(TSortCursor(&cursor));
}


Expand Down Expand Up @@ -177,12 +177,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
}

++merged_rows;
if (merged_rows == max_block_size)
{
return true;
}

return false;
return merged_rows == expected_block_size;
};

/// Take rows in required order and put them into `merged_columns`, while the rows are no more than `max_block_size`
Expand Down Expand Up @@ -296,8 +291,7 @@ void MergingSortedBlockInputStream::readSuffixImpl()

const BlockStreamProfileInfo & profile_info = getProfileInfo();
double seconds = profile_info.total_stopwatch.elapsedSeconds();
LOG_DEBUG(log, std::fixed << std::setprecision(2) << "Merge sorted " << profile_info.blocks << " blocks, " << profile_info.rows << " rows"
<< " in " << seconds << " sec., " << profile_info.rows / seconds << " rows/sec., " << profile_info.bytes / 1000000.0 / seconds << " MB/sec.");
LOG_FMT_DEBUG(log, "Merge sorted {} blocks, {} rows, {} bytes, {:.2f} rows/sec, {:.2f} MB/sec", profile_info.blocks, profile_info.rows, profile_info.bytes, profile_info.rows / seconds, profile_info.bytes / 1000000.0 / seconds);
}

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/ParallelInputsProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class ParallelInputsProcessor
* (for example, the connection is broken for distributed query processing)
* - then do not care.
*/
LOG_ERROR(log, "Exception while cancelling " << child->getName());
LOG_FMT_ERROR(log, "Exception while cancelling {}", child->getName());
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
auto & executor_id = execution_summary.executor_id();
if (unlikely(execution_summaries_map.find(executor_id) == execution_summaries_map.end()))
{
LOG_WARNING(log, "execution " + executor_id + " not found in execution_summaries, this should not happen");
LOG_FMT_WARNING(log, "execution {} not found in execution_summaries, this should not happen", executor_id);
continue;
}
auto & current_execution_summary = execution_summaries_map[executor_id];
Expand Down Expand Up @@ -123,14 +123,14 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
auto result = remote_reader->nextResult(block_queue, sample_block);
if (result.meet_error)
{
LOG_WARNING(log, "remote reader meets error: " << result.error_msg);
LOG_FMT_WARNING(log, "remote reader meets error: {}", result.error_msg);
throw Exception(result.error_msg);
}
if (result.eof)
return false;
if (result.resp != nullptr && result.resp->has_error())
{
LOG_WARNING(log, "remote reader meets error: " << result.resp->error().DebugString());
LOG_FMT_WARNING(log, "remote reader meets error: {}", result.resp->error().DebugString());
throw Exception(result.resp->error().DebugString());
}
/// only the last response contains execution summaries
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ void dbgFuncTiDBQueryFromNaturalDag(Context & context, const ASTs & args, DBGInv
DAGProperties properties = getDAGProperties("");
std::vector<std::pair<DecodedTiKVKeyPtr, DecodedTiKVKeyPtr>> key_ranges = CoprocessorHandler::GenCopKeyRange(req.ranges());
static auto log = Logger::get("MockDAG");
LOG_INFO(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString());
LOG_FMT_INFO(log, "Handling DAG request: {}", dag_request.DebugString());
tipb::SelectResponse dag_response;
TablesRegionsInfo tables_regions_info(true);
auto & table_regions_info = tables_regions_info.getSingleTableRegions();
Expand Down Expand Up @@ -2532,7 +2532,7 @@ std::tuple<QueryTasks, MakeResOutputStream> compileQuery(
tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version, Timestamp start_ts, std::vector<std::pair<DecodedTiKVKeyPtr, DecodedTiKVKeyPtr>> & key_ranges)
{
static auto log = Logger::get("MockDAG");
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString());
LOG_FMT_DEBUG(log, "Handling DAG request: {}", dag_request.DebugString());
tipb::SelectResponse dag_response;
TablesRegionsInfo tables_regions_info(true);
auto & table_regions_info = tables_regions_info.getSingleTableRegions();
Expand All @@ -2546,7 +2546,7 @@ tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest

DAGDriver driver(context, start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true);
driver.execute();
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done");
LOG_FMT_DEBUG(log, "Handle DAG request done");
return dag_response;
}

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Debug/dbgNaturalDag.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ void NaturalDag::init()
auto json_str = String((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
Poco::JSON::Parser parser;
Poco::Dynamic::Var result = parser.parse(json_str);
LOG_INFO(log, __PRETTY_FUNCTION__ << ": Succeed parsing json file: " << json_dag_path);
LOG_FMT_INFO(log, "Succeed parsing json file: {}", json_dag_path);

const auto & obj = result.extract<JSONObjectPtr>();
loadTables(obj);
LOG_INFO(log, __PRETTY_FUNCTION__ << ": Succeed loading table data!");
LOG_FMT_INFO(log, "Succeed loading table data!");
loadReqAndRsp(obj);
LOG_INFO(log, __PRETTY_FUNCTION__ << ": Succeed loading req and rsp data!");
LOG_FMT_INFO(log, "Succeed loading req and rsp data!");
}

void NaturalDag::loadReqAndRsp(const NaturalDag::JSONObjectPtr & obj)
Expand Down Expand Up @@ -156,7 +156,7 @@ NaturalDag::LoadedRegionInfo NaturalDag::loadRegion(const Poco::Dynamic::Var & r
String region_end;
decodeBase64(region_obj->getValue<String>(REGION_END), region_end);
region.end = RecordKVFormat::encodeAsTiKVKey(region_end);
LOG_INFO(log, __PRETTY_FUNCTION__ << ": RegionID: " << region.id << ", RegionStart: " << printAsBytes(region.start) << ", RegionEnd: " << printAsBytes(region.end));
LOG_FMT_INFO(log, "RegionID: {}, RegionStart: {}, RegionEnd: {}", region.id, printAsBytes(region.start), printAsBytes(region.end));

auto pairs_json = region_obj->getArray(REGION_KEYVALUE_DATA);
for (const auto & pair_json : *pairs_json)
Expand Down
Loading

0 comments on commit 6ab3623

Please sign in to comment.