Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into add_learner_read_…
Browse files Browse the repository at this point in the history
…comment
  • Loading branch information
JaySon-Huang committed May 7, 2022
2 parents 5b1ac2c + 04da47f commit 1eae738
Show file tree
Hide file tree
Showing 19 changed files with 506 additions and 317 deletions.
2 changes: 2 additions & 0 deletions dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class MockTiDB : public ext::Singleton<MockTiDB>

Int64 getVersion() { return version; }

TableID newTableID() { return table_id_allocator++; }

private:
TablePtr dropTableInternal(Context & context, const String & database_name, const String & table_name, bool drop_regions);
TablePtr getTableByNameInternal(const String & database_name, const String & table_name);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Encryption/RateLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ using WriteLimiterPtr = std::shared_ptr<WriteLimiter>;
// `get_io_stat_period_us` is the interval between calling getIOStatistic_.
//
// Other parameters are the same as WriteLimiter.
class ReadLimiter final : public WriteLimiter
class ReadLimiter : public WriteLimiter
{
public:
ReadLimiter(
Expand Down
140 changes: 0 additions & 140 deletions dbms/src/Flash/BatchCommandsHandler.cpp

This file was deleted.

72 changes: 0 additions & 72 deletions dbms/src/Flash/BatchCommandsHandler.h

This file was deleted.

10 changes: 5 additions & 5 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzerHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ String DAGExpressionAnalyzerHelper::buildIfNullFunction(
const ExpressionActionsPtr & actions)
{
// rewrite IFNULL function with multiIf
// ifNull(arg1, arg2) -> multiIf(isNull(arg1), arg2, arg1)
// ifNull(arg1, arg2) -> multiIf(isNull(arg1), arg2, assumeNotNull(arg1))
// todo if arg1 is not nullable, then just return arg1 is ok
const String & func_name = "multiIf";
Names argument_names;
Expand All @@ -112,13 +112,13 @@ String DAGExpressionAnalyzerHelper::buildIfNullFunction(
}

String condition_arg_name = analyzer->getActions(expr.children(0), actions, false);
String tmp_else_arg_name = analyzer->getActions(expr.children(1), actions, false);
String else_arg_name = analyzer->getActions(expr.children(1), actions, false);
String is_null_result = analyzer->applyFunction("isNull", {condition_arg_name}, actions, getCollatorFromExpr(expr));
String not_null_else_arg_name = analyzer->applyFunction("assumeNotNull", {tmp_else_arg_name}, actions, nullptr);
String not_null_condition_arg_name = analyzer->applyFunction("assumeNotNull", {condition_arg_name}, actions, nullptr);

argument_names.push_back(std::move(is_null_result));
argument_names.push_back(std::move(not_null_else_arg_name));
argument_names.push_back(std::move(condition_arg_name));
argument_names.push_back(std::move(else_arg_name));
argument_names.push_back(std::move(not_null_condition_arg_name));

return analyzer->applyFunction(func_name, argument_names, actions, getCollatorFromExpr(expr));
}
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ EstablishCallData::EstablishCallData(AsyncFlashService * service, grpc::ServerCo
// As part of the initial CREATE state, we *request* that the system
// start processing requests. In this request, "this" acts are
// the tag uniquely identifying the request.
service->RequestEstablishMPPConnection(&ctx, &request, &responder, cq, notify_cq, this);
service->requestEstablishMPPConnection(&ctx, &request, &responder, cq, notify_cq, this);
}

EstablishCallData::~EstablishCallData()
Expand Down Expand Up @@ -71,7 +71,7 @@ void EstablishCallData::initRpc()
std::exception_ptr eptr = nullptr;
try
{
service->EstablishMPPConnectionSyncOrAsync(&ctx, &request, nullptr, this);
service->establishMPPConnectionSyncOrAsync(&ctx, &request, nullptr, this);
}
catch (...)
{
Expand Down
64 changes: 1 addition & 63 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <Common/TiFlashMetrics.h>
#include <Common/setThreadName.h>
#include <Core/Types.h>
#include <Flash/BatchCommandsHandler.h>
#include <Flash/BatchCoprocessorHandler.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGUtils.h>
Expand Down Expand Up @@ -213,7 +212,7 @@ ::grpc::Status returnStatus(EstablishCallData * calldata, const grpc::Status & s
return status;
}

::grpc::Status FlashService::EstablishMPPConnectionSyncOrAsync(::grpc::ServerContext * grpc_context,
::grpc::Status FlashService::establishMPPConnectionSyncOrAsync(::grpc::ServerContext * grpc_context,
const ::mpp::EstablishMPPConnectionRequest * request,
::grpc::ServerWriter<::mpp::MPPDataPacket> * sync_writer,
EstablishCallData * calldata)
Expand Down Expand Up @@ -345,67 +344,6 @@ ::grpc::Status FlashService::CancelMPPTask(
return grpc::Status::OK;
}

// This function is deprecated.
grpc::Status FlashService::BatchCommands(
grpc::ServerContext * grpc_context,
grpc::ServerReaderWriter<::tikvpb::BatchCommandsResponse, tikvpb::BatchCommandsRequest> * stream)
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
if (!security_config.checkGrpcContext(grpc_context))
{
return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg);
}

auto [context, status] = createDBContext(grpc_context);
if (!status.ok())
{
return status;
}

tikvpb::BatchCommandsRequest request;
while (stream->Read(&request))
{
tikvpb::BatchCommandsResponse response;
GET_METRIC(tiflash_coprocessor_request_count, type_batch).Increment();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_batch).Increment();
SCOPE_EXIT({ GET_METRIC(tiflash_coprocessor_handling_request_count, type_batch).Decrement(); });
auto start_time = std::chrono::system_clock::now();
SCOPE_EXIT({
std::chrono::duration<double> duration_sec = std::chrono::system_clock::now() - start_time;
GET_METRIC(tiflash_coprocessor_request_duration_seconds, type_batch).Observe(duration_sec.count());
GET_METRIC(tiflash_coprocessor_response_bytes).Increment(response.ByteSizeLong());
});

LOG_FMT_DEBUG(log, "Handling batch commands: {}", request.DebugString());

BatchCommandsContext batch_commands_context(
*context,
[this](const grpc::ServerContext * grpc_server_context) { return createDBContext(grpc_server_context); },
*grpc_context);
BatchCommandsHandler batch_commands_handler(batch_commands_context, request, response);
auto ret = batch_commands_handler.execute();
if (!ret.ok())
{
LOG_FMT_DEBUG(
log,
"Handle batch commands request done: {}, {}",
ret.error_code(),
ret.error_message());
return ret;
}

if (!stream->Write(response))
{
LOG_FMT_DEBUG(log, "Write response failed for unknown reason.");
return grpc::Status(grpc::StatusCode::UNKNOWN, "Write response failed for unknown reason.");
}

LOG_FMT_DEBUG(log, "Handle batch commands request done: {}, {}", ret.error_code(), ret.error_message());
}

return grpc::Status::OK;
}

String getClientMetaVarWithDefault(const grpc::ServerContext * grpc_context, const String & name, const String & default_val)
{
if (auto it = grpc_context->client_metadata().find(name); it != grpc_context->client_metadata().end())
Expand Down
10 changes: 3 additions & 7 deletions dbms/src/Flash/FlashService.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ class FlashService : public tikvpb::Tikv::Service
const coprocessor::Request * request,
coprocessor::Response * response) override;

grpc::Status BatchCommands(grpc::ServerContext * grpc_context,
grpc::ServerReaderWriter<tikvpb::BatchCommandsResponse, tikvpb::BatchCommandsRequest> * stream) override;

::grpc::Status BatchCoprocessor(::grpc::ServerContext * context,
const ::coprocessor::BatchRequest * request,
::grpc::ServerWriter<::coprocessor::BatchResponse> * writer) override;
Expand All @@ -66,11 +63,11 @@ class FlashService : public tikvpb::Tikv::Service
const ::mpp::IsAliveRequest * request,
::mpp::IsAliveResponse * response) override;

::grpc::Status EstablishMPPConnectionSyncOrAsync(::grpc::ServerContext * context, const ::mpp::EstablishMPPConnectionRequest * request, ::grpc::ServerWriter<::mpp::MPPDataPacket> * sync_writer, EstablishCallData * calldata);
::grpc::Status establishMPPConnectionSyncOrAsync(::grpc::ServerContext * context, const ::mpp::EstablishMPPConnectionRequest * request, ::grpc::ServerWriter<::mpp::MPPDataPacket> * sync_writer, EstablishCallData * calldata);

::grpc::Status EstablishMPPConnection(::grpc::ServerContext * context, const ::mpp::EstablishMPPConnectionRequest * request, ::grpc::ServerWriter<::mpp::MPPDataPacket> * sync_writer) override
{
return EstablishMPPConnectionSyncOrAsync(context, request, sync_writer, nullptr);
return establishMPPConnectionSyncOrAsync(context, request, sync_writer, nullptr);
}

::grpc::Status CancelMPPTask(::grpc::ServerContext * context, const ::mpp::CancelTaskRequest * request, ::mpp::CancelTaskResponse * response) override;
Expand Down Expand Up @@ -110,8 +107,7 @@ class AsyncFlashService final : public FlashService
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}

void RequestEstablishMPPConnection(::grpc::ServerContext * context, ::mpp::EstablishMPPConnectionRequest * request, ::grpc::ServerAsyncWriter<::mpp::MPPDataPacket> * writer, ::grpc::CompletionQueue * new_call_cq, ::grpc::ServerCompletionQueue * notification_cq, void * tag)
void requestEstablishMPPConnection(::grpc::ServerContext * context, ::mpp::EstablishMPPConnectionRequest * request, ::grpc::ServerAsyncWriter<::mpp::MPPDataPacket> * writer, ::grpc::CompletionQueue * new_call_cq, ::grpc::ServerCompletionQueue * notification_cq, void * tag)
{
::grpc::Service::RequestAsyncServerStreaming(EstablishMPPConnectionApiID, context, request, writer, new_call_cq, notification_cq, tag);
}
Expand Down
Loading

0 comments on commit 1eae738

Please sign in to comment.