Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into str_to_date_ut
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 committed Dec 14, 2021
2 parents c5d83d9 + b2d3ba1 commit fbbf522
Show file tree
Hide file tree
Showing 50 changed files with 1,277 additions and 716 deletions.
53 changes: 0 additions & 53 deletions dbms/src/DataStreams/PKColumnIterator.hpp

This file was deleted.

81 changes: 0 additions & 81 deletions dbms/src/DataStreams/RangesFilterBlockInputStream.cpp

This file was deleted.

41 changes: 0 additions & 41 deletions dbms/src/DataStreams/RangesFilterBlockInputStream.h

This file was deleted.

8 changes: 5 additions & 3 deletions dbms/src/DataTypes/DataTypeDecimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ namespace ErrorCodes
extern const int ARGUMENT_OUT_OF_BOUND;
}

// Implements Decimal(P, S), where P is precision, S is scale.
// Implements Decimal(P, S), where P is precision (significant digits), and S is scale (digits following the decimal point).
// For example, Decimal(5, 2) can represent numbers from -999.99 to 999.99
// Maximum precisions for underlying types are:
// Int32 9
// Int64 18
Expand All @@ -42,9 +43,10 @@ class DataTypeDecimal final : public IDataType

static constexpr size_t maxPrecision() { return maxDecimalPrecision<T>(); }

// If scale is omitted, the default is 0. If precision is omitted, the default is 10.
// default values
DataTypeDecimal()
: DataTypeDecimal(10, 0)
: precision(10)
, scale(0)
{}

DataTypeDecimal(size_t precision_, size_t scale_)
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2411,10 +2411,15 @@ tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString());
tipb::SelectResponse dag_response;
RegionInfoMap regions;
RegionInfoList retry_regions;

regions.emplace(region_id, RegionInfo(region_id, region_version, region_conf_version, std::move(key_ranges), nullptr));
DAGDriver driver(context, dag_request, regions, retry_regions, start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true);

DAGContext dag_context(dag_request);
dag_context.regions_for_local_read = regions;
dag_context.log = std::make_shared<LogWithPrefix>(log, "");
context.setDAGContext(&dag_context);

DAGDriver driver(context, start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true);
driver.execute();
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done");
return dag_response;
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ struct MockSSTReader
struct Data : std::vector<std::pair<std::string, std::string>>
{
Data(const Data &) = delete;
Data & operator=(const Data &) = delete;
Data(Data &&) = default;
Data & operator=(Data &&) = default;
Data() = default;
};

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/BatchCommandsHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ThreadPool::Job BatchCommandsHandler::handleCommandJob(
return;
}

CoprocessorContext cop_context(context, cop_req.context(), batch_commands_context.grpc_server_context);
CoprocessorContext cop_context(*context, cop_req.context(), batch_commands_context.grpc_server_context);
CoprocessorHandler cop_handler(cop_context, &cop_req, cop_resp);

ret = cop_handler.execute();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/BatchCommandsHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ struct BatchCommandsContext

/// Context creation function for each individual command - they should be handled isolated,
/// given that context is being used to pass arguments regarding queries.
using DBContextCreationFunc = std::function<std::tuple<Context, grpc::Status>(const grpc::ServerContext *)>;
using DBContextCreationFunc = std::function<std::tuple<ContextPtr, grpc::Status>(const grpc::ServerContext *)>;
DBContextCreationFunc db_context_creation_func;

const grpc::ServerContext & grpc_server_context;
Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Flash/BatchCoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ grpc::Status BatchCoprocessorHandler::execute()
LOG_DEBUG(log,
__PRETTY_FUNCTION__ << ": Handling " << regions.size() << " regions in DAG request: " << dag_request.DebugString());

DAGDriver<true> driver(cop_context.db_context, dag_request, regions, retry_regions, cop_request->start_ts() > 0 ? cop_request->start_ts() : dag_request.start_ts_fallback(), cop_request->schema_ver(), writer);
DAGContext dag_context(dag_request);
dag_context.is_batch_cop = true;
dag_context.regions_for_local_read = std::move(regions);
dag_context.regions_for_remote_read = std::move(retry_regions);
dag_context.log = std::make_shared<LogWithPrefix>(log, "");
cop_context.db_context.setDAGContext(&dag_context);

DAGDriver<true> driver(cop_context.db_context, cop_request->start_ts() > 0 ? cop_request->start_ts() : dag_request.start_ts_fallback(), cop_request->schema_ver(), writer);
// batch execution;
driver.execute();
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done");
Expand Down
50 changes: 33 additions & 17 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,43 @@ UInt64 inline getMaxErrorCount(const tipb::DAGRequest &)
class DAGContext
{
public:
explicit DAGContext(const tipb::DAGRequest & dag_request)
: collect_execution_summaries(dag_request.has_collect_execution_summaries() && dag_request.collect_execution_summaries())
explicit DAGContext(const tipb::DAGRequest & dag_request_)
: dag_request(&dag_request_)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, is_mpp_task(false)
, is_root_mpp_task(false)
, tunnel_set(nullptr)
, flags(dag_request.flags())
, sql_mode(dag_request.sql_mode())
, max_recorded_error_count(getMaxErrorCount(dag_request))
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
{
assert(dag_request.has_root_executor() || dag_request.executors_size() > 0);
return_executor_id = dag_request.root_executor().has_executor_id() || dag_request.executors(0).has_executor_id();
assert(dag_request->has_root_executor() || dag_request->executors_size() > 0);
return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id();
}

DAGContext(const tipb::DAGRequest & dag_request, const mpp::TaskMeta & meta_, bool is_root_mpp_task_)
: collect_execution_summaries(dag_request.has_collect_execution_summaries() && dag_request.collect_execution_summaries())
DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_)
: dag_request(&dag_request_)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, return_executor_id(true)
, is_mpp_task(true)
, is_root_mpp_task(is_root_mpp_task_)
, tunnel_set(nullptr)
, flags(dag_request.flags())
, sql_mode(dag_request.sql_mode())
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, mpp_task_meta(meta_)
, mpp_task_id(mpp_task_meta.start_ts(), mpp_task_meta.task_id())
, max_recorded_error_count(getMaxErrorCount(dag_request))
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
{
assert(dag_request.has_root_executor() && dag_request.root_executor().has_executor_id());
assert(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id());
}

explicit DAGContext(UInt64 max_error_count_)
: collect_execution_summaries(false)
: dag_request(nullptr)
, collect_execution_summaries(false)
, is_mpp_task(false)
, is_root_mpp_task(false)
, tunnel_set(nullptr)
Expand Down Expand Up @@ -114,6 +117,7 @@ class DAGContext
void clearWarnings() { warnings.clear(); }
UInt64 getWarningCount() { return warning_count; }
const mpp::TaskMeta & getMPPTaskMeta() const { return mpp_task_meta; }
bool isBatchCop() const { return is_batch_cop; }
bool isMPPTask() const { return is_mpp_task; }
/// root mpp task means mpp task that send data back to TiDB
bool isRootMPPTask() const { return is_root_mpp_task; }
Expand All @@ -126,17 +130,29 @@ class DAGContext

std::pair<bool, double> getTableScanThroughput();

const RegionInfoMap & getRegionsForLocalRead() const { return regions_for_local_read; }
const RegionInfoList & getRegionsForRemoteRead() const { return regions_for_remote_read; }

const tipb::DAGRequest * dag_request;
size_t final_concurrency = 1;
Int64 compile_time_ns;
String table_scan_executor_id = "";
bool collect_execution_summaries;
bool return_executor_id;
bool is_mpp_task;
bool is_root_mpp_task;
bool is_mpp_task = false;
bool is_root_mpp_task = false;
bool is_batch_cop = false;
MPPTunnelSetPtr tunnel_set;
RegionInfoMap regions_for_local_read;
RegionInfoList regions_for_remote_read;
// part of regions_for_local_read + regions_for_remote_read, only used for batch-cop
RegionInfoList retry_regions;

LogWithPrefixPtr mpp_task_log;
LogWithPrefixPtr log;

bool keep_session_timezone_info = false;
std::vector<tipb::FieldType> result_field_types;
tipb::EncodeType encode_type = tipb::EncodeType::TypeDefault;

private:
/// profile_streams_map is a map that maps from executor_id to ProfileStreamsInfo
Expand Down
Loading

0 comments on commit fbbf522

Please sign in to comment.