Skip to content

Commit

Permalink
Merge branch 'release-5.0' into cherry-pick-5056-to-release-5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu authored Jun 21, 2022
2 parents aa54dfb + ec82932 commit 92c818f
Show file tree
Hide file tree
Showing 27 changed files with 1,530 additions and 1,240 deletions.
3 changes: 1 addition & 2 deletions cluster_manage/flash_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def __init__(self, file_path):
self.http_port = self.conf_toml['http_port']
else:
self.http_port = self.conf_toml['https_port']
tmp_path = self.conf_toml['tmp_path']

p = self.conf_toml['flash']
service_addr = p['service_addr']
Expand All @@ -32,7 +31,7 @@ def __init__(self, file_path):
self.cluster_refresh_interval = min(
int(flash_cluster.get('refresh_interval', 20)), self.cluster_master_ttl)
self.update_rule_interval = int(flash_cluster.get('update_rule_interval', 10))
self.log_path = flash_cluster.get('log', '{}/flash_cluster_manager.log'.format(tmp_path))
self.log_path = flash_cluster.get('log', '{}/flash_cluster_manager.log'.format(self.conf_toml.get('tmp_path', '/tmp')))
self.max_time_out = self.cluster_master_ttl

self.enable_tls = False
Expand Down
2 changes: 1 addition & 1 deletion contrib/client-c
4 changes: 3 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(segment_merge_after_ingest_packs) \
M(force_formal_page_file_not_exists) \
M(force_legacy_or_checkpoint_page_file_exists) \
M(exception_in_creating_set_input_stream)
M(exception_in_creating_set_input_stream) \
M(exception_when_read_from_log) \
M(exception_mpp_hash_build)

#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
Expand Down
124 changes: 57 additions & 67 deletions dbms/src/Common/MyTime.cpp

Large diffs are not rendered by default.

89 changes: 89 additions & 0 deletions dbms/src/Common/tests/gtest_mytime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,96 @@ try
// {"Tuesday 52 2001", "%W %V %Y", std::nullopt}, //
// {"Tuesday 52 2001", "%W %V %x", std::nullopt}, //
// {"Tuesday 52 2001", "%W %u %x", std::nullopt}, //

// Test cases for %b
{"10/JAN/2010", "%d/%b/%Y", MyDateTime{2010, 1, 10, 0, 0, 0, 0}}, // Right spill, case-insensitive
{"10/FeB/2010", "%d/%b/%Y", MyDateTime{2010, 2, 10, 0, 0, 0, 0}},
{"10/MAr/2010", "%d/%b/%Y", MyDateTime{2010, 3, 10, 0, 0, 0, 0}},
{"10/ApR/2010", "%d/%b/%Y", MyDateTime{2010, 4, 10, 0, 0, 0, 0}},
{"10/mAY/2010", "%d/%b/%Y", MyDateTime{2010, 5, 10, 0, 0, 0, 0}},
{"10/JuN/2010", "%d/%b/%Y", MyDateTime{2010, 6, 10, 0, 0, 0, 0}},
{"10/JUL/2010", "%d/%b/%Y", MyDateTime{2010, 7, 10, 0, 0, 0, 0}},
{"10/Aug/2010", "%d/%b/%Y", MyDateTime{2010, 8, 10, 0, 0, 0, 0}},
{"10/seP/2010", "%d/%b/%Y", MyDateTime{2010, 9, 10, 0, 0, 0, 0}},
{"10/Oct/2010", "%d/%b/%Y", MyDateTime{2010, 10, 10, 0, 0, 0, 0}},
{"10/NOV/2010", "%d/%b/%Y", MyDateTime{2010, 11, 10, 0, 0, 0, 0}},
{"10/DEC/2010", "%d/%b/%Y", MyDateTime{2010, 12, 10, 0, 0, 0, 0}},
{"10/January/2010", "%d/%b/%Y", std::nullopt}, // Test full spilling

// Test cases for %M
{"10/January/2010", "%d/%M/%Y", MyDateTime{2010, 1, 10, 0, 0, 0, 0}}, // Test full spilling
{"10/February/2010", "%d/%M/%Y", MyDateTime{2010, 2, 10, 0, 0, 0, 0}},
{"10/March/2010", "%d/%M/%Y", MyDateTime{2010, 3, 10, 0, 0, 0, 0}},
{"10/April/2010", "%d/%M/%Y", MyDateTime{2010, 4, 10, 0, 0, 0, 0}},
{"10/May/2010", "%d/%M/%Y", MyDateTime{2010, 5, 10, 0, 0, 0, 0}},
{"10/June/2010", "%d/%M/%Y", MyDateTime{2010, 6, 10, 0, 0, 0, 0}},
{"10/July/2010", "%d/%M/%Y", MyDateTime{2010, 7, 10, 0, 0, 0, 0}},
{"10/August/2010", "%d/%M/%Y", MyDateTime{2010, 8, 10, 0, 0, 0, 0}},
{"10/September/2010", "%d/%M/%Y", MyDateTime{2010, 9, 10, 0, 0, 0, 0}},
{"10/October/2010", "%d/%M/%Y", MyDateTime{2010, 10, 10, 0, 0, 0, 0}},
{"10/November/2010", "%d/%M/%Y", MyDateTime{2010, 11, 10, 0, 0, 0, 0}},
{"10/December/2010", "%d/%M/%Y", MyDateTime{2010, 12, 10, 0, 0, 0, 0}},

// Test cases for %c
// {"10/0/2010", "%d/%c/%Y", MyDateTime{2010, 0, 10, 0, 0, 0, 0}}, // TODO: Need Check NO_ZERO_DATE
{"10/1/2010", "%d/%c/%Y", MyDateTime{2010, 1, 10, 0, 0, 0, 0}},
{"10/01/2010", "%d/%c/%Y", MyDateTime{2010, 1, 10, 0, 0, 0, 0}},
{"10/001/2010", "%d/%c/%Y", std::nullopt},
{"10/13/2010", "%d/%c/%Y", std::nullopt},
{"10/12/2010", "%d/%c/%Y", MyDateTime{2010, 12, 10, 0, 0, 0, 0}},

// Test cases for %d, %e
// {"0/12/2010", "%d/%c/%Y", MyDateTime{2010, 12, 0, 0, 0, 0, 0}}, // TODO: Need Check NO_ZERO_DATE
{"1/12/2010", "%d/%c/%Y", MyDateTime{2010, 12, 1, 0, 0, 0, 0}},
{"05/12/2010", "%d/%c/%Y", MyDateTime{2010, 12, 5, 0, 0, 0, 0}},
{"05/12/2010", "%e/%c/%Y", MyDateTime{2010, 12, 5, 0, 0, 0, 0}},
{"31/12/2010", "%d/%c/%Y", MyDateTime{2010, 12, 31, 0, 0, 0, 0}},
{"32/12/2010", "%d/%c/%Y", std::nullopt},
{"30/11/2010", "%d/%c/%Y", MyDateTime{2010, 11, 30, 0, 0, 0, 0}},
{"31/11/2010", "%e/%c/%Y", MyDateTime{2010, 11, 31, 0, 0, 0, 0}},
{"28/2/2010", "%e/%c/%Y", MyDateTime{2010, 2, 28, 0, 0, 0, 0}},
{"29/2/2010", "%e/%c/%Y", MyDateTime{2010, 2, 29, 0, 0, 0, 0}},
{"29/2/2020", "%e/%c/%Y", MyDateTime{2020, 2, 29, 0, 0, 0, 0}},

// Test cases for %Y
// {"1/12/0000", "%d/%c/%Y", MyDateTime{0000, 12, 1, 0, 0, 0, 0}}, // TODO: Need Check NO_ZERO_DATE
{"1/12/01", "%d/%c/%Y", MyDateTime{2001, 12, 1, 0, 0, 0, 0}},
{"1/12/0001", "%d/%c/%Y", MyDateTime{0001, 12, 1, 0, 0, 0, 0}},
{"1/12/2020", "%d/%c/%Y", MyDateTime{2020, 12, 1, 0, 0, 0, 0}},
{"1/12/9999", "%d/%c/%Y", MyDateTime{9999, 12, 1, 0, 0, 0, 0}},

// Test cases for %f
{"01,5,2013 999999", "%d,%c,%Y %f", MyDateTime{2013, 5, 1, 0, 0, 0, 999999}},
{"01,5,2013 0", "%d,%c,%Y %f", MyDateTime{2013, 5, 1, 0, 0, 0, 0}},
{"01,5,2013 9999990000000", "%d,%c,%Y %f", MyDateTime{2013, 5, 1, 0, 0, 0, 999999}},
{"01,5,2013 1", "%d,%c,%Y %f", MyDateTime{2013, 5, 1, 0, 0, 0, 100000}},
{"01,5,2013 1230", "%d,%c,%Y %f", MyDateTime{2013, 5, 1, 0, 0, 0, 123000}},
{"01,5,2013 01", "%d,%c,%Y %f", MyDateTime{2013, 5, 1, 0, 0, 0, 10000}}, // issue 3556

// Test cases for %h, %I, %l
{"00:11:12 ", "%h:%i:%S ", std::nullopt}, // 0 is not a valid number of %h
{"01:11:12 ", "%I:%i:%S ", MyDateTime{0, 0, 0, 01, 11, 12, 0}},
{"12:11:12 ", "%l:%i:%S ", MyDateTime{0, 0, 0, 0, 11, 12, 0}},

// Test cases for %k, %H
{"00:11:12 ", "%H:%i:%S ", MyDateTime{0, 0, 0, 00, 11, 12, 0}},
{"01:11:12 ", "%k:%i:%S ", MyDateTime{0, 0, 0, 01, 11, 12, 0}},
{"12:11:12 ", "%H:%i:%S ", MyDateTime{0, 0, 0, 12, 11, 12, 0}},
{"24:11:12 ", "%k:%i:%S ", std::nullopt},

// Test cases for %i
{"00:00:12 ", "%H:%i:%S ", MyDateTime{0, 0, 0, 00, 00, 12, 0}},
{"00:01:12 ", "%H:%i:%S ", MyDateTime{0, 0, 0, 00, 01, 12, 0}},
{"00:59:12 ", "%H:%i:%S ", MyDateTime{0, 0, 0, 00, 59, 12, 0}},
{"00:60:12 ", "%H:%i:%S ", std::nullopt},

// Test cases for %s, %S
{"00:00:00 ", "%H:%i:%s ", MyDateTime{0, 0, 0, 00, 00, 00, 0}},
{"00:01:01 ", "%H:%i:%s ", MyDateTime{0, 0, 0, 00, 01, 01, 0}},
{"00:59:59 ", "%H:%i:%S ", MyDateTime{0, 0, 0, 00, 59, 59, 0}},
{"00:59:60 ", "%H:%i:%S ", std::nullopt},
};

auto result_formatter = MyDateTimeFormatter("%Y/%m/%d %T.%f");
size_t idx = 0;
for (const auto & [input, fmt, expected] : cases)
Expand Down
31 changes: 23 additions & 8 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ namespace DB
namespace FailPoints
{
extern const char exception_in_creating_set_input_stream[];
}
extern const char exception_mpp_hash_build[];
} // namespace FailPoints
namespace ErrorCodes
{
extern const int SET_SIZE_LIMIT_EXCEEDED;
Expand Down Expand Up @@ -93,9 +94,10 @@ void CreatingSetsBlockInputStream::createAll()
for (auto &elem : subqueries_for_sets)
{
if (elem.second.join)
elem.second.join->setFinishBuildTable(false);
elem.second.join->setBuildTableState(Join::BuildTableState::WAITING);
}
}
Stopwatch watch;
for (auto & subqueries_for_sets : subqueries_for_sets_list)
{
for (auto & elem : subqueries_for_sets)
Expand All @@ -115,29 +117,34 @@ void CreatingSetsBlockInputStream::createAll()
}

if (!exception_from_workers.empty())
{
LOG_ERROR(log,
"Creating all tasks of " << std::to_string(mpp_task_id) << " takes " << std::to_string(watch.elapsedSeconds())
<< " sec with exception and rethrow the first of total " << exception_from_workers.size()
<< " exceptions.");
std::rethrow_exception(exception_from_workers.front());

}
LOG_DEBUG(
log, "Creating all tasks of " << std::to_string(mpp_task_id) << " takes " << std::to_string(watch.elapsedSeconds()) << "sec. ");
created = true;
}
}

void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery, MemoryTracker * memory_tracker)
{
Stopwatch watch;
try
{

current_memory_tracker = memory_tracker;
LOG_DEBUG(log,
(subquery.set ? "Creating set. " : "")
<< (subquery.join ? "Creating join. " : "") << (subquery.table ? "Filling temporary table. " : "") << " for task "
<< std::to_string(mpp_task_id));
Stopwatch watch;

BlockOutputStreamPtr table_out;
if (subquery.table)
table_out = subquery.table->write({}, {});


bool done_with_set = !subquery.set;
bool done_with_join = !subquery.join;
bool done_with_table = !subquery.table;
Expand Down Expand Up @@ -194,7 +201,10 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery, MemoryTr


if (subquery.join)
subquery.join->setFinishBuildTable(true);
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build);
subquery.join->setBuildTableState(Join::BuildTableState::SUCCEED);
}

if (table_out)
table_out->writeSuffix();
Expand Down Expand Up @@ -235,10 +245,15 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery, MemoryTr
LOG_DEBUG(log, "Subquery has empty result for task " << std::to_string(mpp_task_id) << ".");
}
}
catch (std::exception & e)
catch (...)
{
std::unique_lock<std::mutex> lock(exception_mutex);
exception_from_workers.push_back(std::current_exception());
if (subquery.join)
subquery.join->setBuildTableState(Join::BuildTableState::FAILED);
LOG_ERROR(log,
"task" << std::to_string(mpp_task_id) << " throw exception: " << getCurrentExceptionMessage(false, true) << " In "
<< std::to_string(watch.elapsedSeconds()) << " sec. ");
}
}

Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ void ExchangeReceiver::ReadLoop(const String & meta_raw, size_t source_index)
req->set_allocated_sender_meta(sender_task);
LOG_DEBUG(log, "begin start and read : " << req->DebugString());
::grpc::Status status = ::grpc::Status::OK;
for (int i = 0; i < 10; i++)
static const Int32 MAX_RETRY_TIMES = 10;
for (int i = 0; i < MAX_RETRY_TIMES; i++)
{
pingcap::kv::RpcCall<mpp::EstablishMPPConnectionRequest> call(req);
grpc::ClientContext client_context;
Expand Down Expand Up @@ -93,8 +94,11 @@ void ExchangeReceiver::ReadLoop(const String & meta_raw, size_t source_index)
}
else
{
bool retriable = !has_data && i + 1 < MAX_RETRY_TIMES;
LOG_WARNING(log,
"EstablishMPPConnectionRequest meets rpc fail. Err msg is: " << status.error_message() << " req info " << req_info);
"EstablishMPPConnectionRequest meets rpc fail for req "
<< req_info << ". Err code = " << status.error_code() + ", err msg = " << status.error_message()
<< ", retriable = " << retriable);
// if we have received some data, we should not retry.
if (has_data)
break;
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/Mpp/MPPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ void MPPTask::runImpl()
LOG_ERROR(log, "task running meets error " << e.displayText() << " Stack Trace : " << e.getStackTrace().toString());
writeErrToAllTunnel(e.displayText());
}
catch (pingcap::Exception & e)
{
LOG_ERROR(log, "task running meets error " << e.message());
writeErrToAllTunnel(e.message());
}
catch (std::exception & e)
{
LOG_ERROR(log, "task running meets error " << e.what());
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Mpp/MPPHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ struct MPPTunnel
{
throw Exception("has connected");
}
if (finished)
throw Exception("has finished");
LOG_DEBUG(log, "ready to connect");
connected = true;
writer = writer_;
Expand Down
Loading

0 comments on commit 92c818f

Please sign in to comment.