Skip to content

Commit

Permalink
fix conflict
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
Lloyd-Pottiger committed May 16, 2023
1 parent 61754ed commit 939c6a1
Show file tree
Hide file tree
Showing 4 changed files with 0 additions and 182 deletions.
143 changes: 0 additions & 143 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,106 +274,6 @@ void DAGStorageInterpreter::execute(DAGPipeline & pipeline)
executeImpl(pipeline);
}

<<<<<<< HEAD
=======
SourceOps DAGStorageInterpreter::execute(PipelineExecutorStatus & exec_status)
{
prepare(); // learner read

return executeImpl(exec_status);
}

SourceOps DAGStorageInterpreter::executeImpl(PipelineExecutorStatus & exec_status)
{
auto & dag_context = dagContext();

auto scan_context = std::make_shared<DM::ScanContext>();
dag_context.scan_context_map[table_scan.getTableScanExecutorID()] = scan_context;
mvcc_query_info->scan_context = scan_context;

SourceOps source_ops;
if (!mvcc_query_info->regions_query_info.empty())
{
source_ops = buildLocalSourceOps(exec_status, context.getSettingsRef().max_block_size);
}

// Should build `remote_requests` and `nullSourceOp` under protect of `table_structure_lock`.
if (source_ops.empty())
{
source_ops.emplace_back(std::make_unique<NullSourceOp>(
exec_status,
storage_for_logical_table->getSampleBlockForColumns(required_columns),
log->identifier()));
}

// Note that `buildRemoteRequests` must be called after `buildLocalSourceOps` because
// `buildLocalSourceOps` will setup `region_retry_from_local_region` and we must
// retry those regions or there will be data lost.
auto remote_requests = buildRemoteRequests(scan_context);

if (dag_context.is_disaggregated_task && !remote_requests.empty())
{
// This means RN is sending requests with stale region info, we simply reject the request
// and ask RN to send requests again with correct region info. When RN updates region info,
// RN may be sending requests to other WN.

RegionException::UnavailableRegions region_ids;
for (const auto & info : context.getDAGContext()->retry_regions)
region_ids.insert(info.region_id);

throw RegionException(
std::move(region_ids),
RegionException::RegionReadStatus::EPOCH_NOT_MATCH);
}

FAIL_POINT_PAUSE(FailPoints::pause_with_alter_locks_acquired);

// Release alter locks
// The DeltaTree engine ensures that once sourceOps are created, the caller can get a consistent result
// from those sourceOps even if DDL operations are applied. Release the alter lock so that reading does not
// block DDL operations, keep the drop lock so that the storage not to be dropped during reading.
const TableLockHolders drop_locks = releaseAlterLocks();

remote_read_sources_start_index = source_ops.size();

if (!remote_requests.empty())
buildRemoteSourceOps(source_ops, exec_status, remote_requests);

for (const auto & lock : drop_locks)
dagContext().addTableLock(lock);

FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired);
FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once);

return source_ops;
}

void DAGStorageInterpreter::executeSuffix(PipelineExecutorStatus & exec_status, PipelineExecGroupBuilder & group_builder)
{
/// handle generated column if necessary.
executeGeneratedColumnPlaceholder(exec_status, group_builder, remote_read_sources_start_index, generated_column_infos, log);
NamesAndTypes source_columns;
source_columns.reserve(table_scan.getColumnSize());
const auto table_scan_output_header = group_builder.getCurrentHeader();
for (const auto & col : table_scan_output_header)
source_columns.emplace_back(col.name, col.type);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
/// If there is no local source, there is no need to execute cast and push down filter, return directly.
/// But we should make sure that the analyzer is initialized before return.
if (remote_read_sources_start_index == 0)
return;
/// handle timezone/duration cast for local table scan.
executeCastAfterTableScan(exec_status, group_builder, remote_read_sources_start_index);

/// handle filter conditions for local and remote table scan.
if (filter_conditions.hasValue())
{
::DB::executePushedDownFilter(exec_status, group_builder, remote_read_sources_start_index, filter_conditions, *analyzer, log);
/// TODO: record profile
}
}

>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469))
void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
{
auto & dag_context = dagContext();
Expand Down Expand Up @@ -440,26 +340,7 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)

FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired);
FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once);
<<<<<<< HEAD

if (!table_scan.getPushedDownFilters().empty())
{
/// If there are duration type pushed down filters, type of columns may be changed.
const auto table_scan_output_header = pipeline.firstStream()->getHeader();
for (const auto & col : table_scan_output_header)
{
const auto & name = col.name;
auto it = std::find_if(source_columns.begin(), source_columns.end(), [&name](const auto & c) { return c.name == name; });
if (it != source_columns.end() && it->type != col.type)
it->type = col.type;
}
}
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);

/// handle timezone/duration cast for local and remote table scan.
executeCastAfterTableScan(remote_read_streams_start_index, pipeline);
=======
>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469))
/// handle generated column if necessary.
executeGeneratedColumnPlaceholder(remote_read_streams_start_index, generated_column_infos, log, pipeline);
NamesAndTypes source_columns;
Expand Down Expand Up @@ -521,33 +402,9 @@ void DAGStorageInterpreter::prepare()
assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end());
storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage;

<<<<<<< HEAD
std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan();
=======
std::tie(required_columns, is_need_add_cast_column) = getColumnsForTableScan();
}

void DAGStorageInterpreter::executeCastAfterTableScan(
PipelineExecutorStatus & exec_status,
PipelineExecGroupBuilder & group_builder,
size_t remote_read_sources_start_index)
{
// execute timezone cast or duration cast if needed for local table scan
auto [has_cast, extra_cast] = addExtraCastsAfterTs(*analyzer, is_need_add_cast_column, table_scan);
if (has_cast)
{
RUNTIME_CHECK(remote_read_sources_start_index <= group_builder.group.size());
size_t i = 0;
// local sources
while (i < remote_read_sources_start_index)
{
auto & group = group_builder.group[i++];
group.appendTransformOp(std::make_unique<ExpressionTransformOp>(exec_status, log->identifier(), extra_cast));
}
}
>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469))
}

void DAGStorageInterpreter::executeCastAfterTableScan(
size_t remote_read_streams_start_index,
DAGPipeline & pipeline)
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1086,11 +1086,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
{
stream = std::make_shared<UnorderedInputStream>(
read_task_pool,
<<<<<<< HEAD
filter && filter->before_where ? filter->columns_after_cast : columns_to_read,
=======
filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read,
>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469))
extra_table_id_index,
physical_table_id,
log_tracing_id);
Expand Down
19 changes: 0 additions & 19 deletions dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,17 @@ class PushDownFilter : public std::enable_shared_from_this<PushDownFilter>
const ColumnDefines & filter_columns_,
const String filter_column_name_,
const ExpressionActionsPtr & extra_cast_,
<<<<<<< HEAD
const ColumnDefines & columns_after_cast_)
=======
const ColumnDefinesPtr & columns_after_cast_)
>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469))
: rs_operator(rs_operator_)
, before_where(beofre_where_)
, filter_column_name(std::move(filter_column_name_))
, filter_columns(std::move(filter_columns_))
, extra_cast(extra_cast_)
<<<<<<< HEAD
, columns_after_cast(std::move(columns_after_cast_))
=======
, columns_after_cast(columns_after_cast_)
>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469))
{}

explicit PushDownFilter(const RSOperatorPtr & rs_operator_)
: rs_operator(rs_operator_)
<<<<<<< HEAD
, before_where(nullptr)
, filter_columns({})
, extra_cast(nullptr)
, columns_after_cast({})
=======
>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469))
{}

// Rough set operator
Expand All @@ -71,11 +56,7 @@ class PushDownFilter : public std::enable_shared_from_this<PushDownFilter>
// The expression actions used to cast the timestamp/datetime column
ExpressionActionsPtr extra_cast;
// If the extra_cast is not null, the types of the columns may be changed
<<<<<<< HEAD
ColumnDefines columns_after_cast;
=======
ColumnDefinesPtr columns_after_cast;
>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469))
};

} // namespace DB::DM
16 changes: 0 additions & 16 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -839,21 +839,6 @@ DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter(const RSOperatorPtr
auto [before_where, filter_column_name, _] = ::DB::buildPushDownFilter(pushed_down_filters, *analyzer);
LOG_DEBUG(tracing_logger, "Push down filter: {}", before_where->dumpActions());

<<<<<<< HEAD
ColumnDefines columns_after_cast;
columns_after_cast.reserve(columns_to_read.size());
const auto & source_columns = analyzer->getCurrentInputColumns();
for (size_t i = 0; i < table_scan_column_info.size(); ++i)
{
if (table_scan_column_info[i].hasGeneratedColumnFlag())
continue;
auto it = columns_to_read_map.at(table_scan_column_info[i].id);
RUNTIME_CHECK(it.name == source_columns[i].name);
columns_after_cast.push_back(std::move(it));
columns_after_cast.back().type = source_columns[i].type;
}

=======
auto columns_after_cast = std::make_shared<ColumnDefines>();
if (extra_cast != nullptr)
{
Expand All @@ -869,7 +854,6 @@ DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter(const RSOperatorPtr
columns_after_cast->back().type = current_names_and_types[i].type;
}
}
>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469))
return std::make_shared<PushDownFilter>(rs_operator, before_where, filter_columns, filter_column_name, extra_cast, columns_after_cast);
}
LOG_DEBUG(tracing_logger, "Push down filter is empty");
Expand Down

0 comments on commit 939c6a1

Please sign in to comment.