diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index c78ff804821..ad03cf37f24 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -274,106 +274,6 @@ void DAGStorageInterpreter::execute(DAGPipeline & pipeline) executeImpl(pipeline); } -<<<<<<< HEAD -======= -SourceOps DAGStorageInterpreter::execute(PipelineExecutorStatus & exec_status) -{ - prepare(); // learner read - - return executeImpl(exec_status); -} - -SourceOps DAGStorageInterpreter::executeImpl(PipelineExecutorStatus & exec_status) -{ - auto & dag_context = dagContext(); - - auto scan_context = std::make_shared(); - dag_context.scan_context_map[table_scan.getTableScanExecutorID()] = scan_context; - mvcc_query_info->scan_context = scan_context; - - SourceOps source_ops; - if (!mvcc_query_info->regions_query_info.empty()) - { - source_ops = buildLocalSourceOps(exec_status, context.getSettingsRef().max_block_size); - } - - // Should build `remote_requests` and `nullSourceOp` under protect of `table_structure_lock`. - if (source_ops.empty()) - { - source_ops.emplace_back(std::make_unique( - exec_status, - storage_for_logical_table->getSampleBlockForColumns(required_columns), - log->identifier())); - } - - // Note that `buildRemoteRequests` must be called after `buildLocalSourceOps` because - // `buildLocalSourceOps` will setup `region_retry_from_local_region` and we must - // retry those regions or there will be data lost. - auto remote_requests = buildRemoteRequests(scan_context); - - if (dag_context.is_disaggregated_task && !remote_requests.empty()) - { - // This means RN is sending requests with stale region info, we simply reject the request - // and ask RN to send requests again with correct region info. When RN updates region info, - // RN may be sending requests to other WN. - - RegionException::UnavailableRegions region_ids; - for (const auto & info : context.getDAGContext()->retry_regions) - region_ids.insert(info.region_id); - - throw RegionException( - std::move(region_ids), - RegionException::RegionReadStatus::EPOCH_NOT_MATCH); - } - - FAIL_POINT_PAUSE(FailPoints::pause_with_alter_locks_acquired); - - // Release alter locks - // The DeltaTree engine ensures that once sourceOps are created, the caller can get a consistent result - // from those sourceOps even if DDL operations are applied. Release the alter lock so that reading does not - // block DDL operations, keep the drop lock so that the storage not to be dropped during reading. - const TableLockHolders drop_locks = releaseAlterLocks(); - - remote_read_sources_start_index = source_ops.size(); - - if (!remote_requests.empty()) - buildRemoteSourceOps(source_ops, exec_status, remote_requests); - - for (const auto & lock : drop_locks) - dagContext().addTableLock(lock); - - FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired); - FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once); - - return source_ops; -} - -void DAGStorageInterpreter::executeSuffix(PipelineExecutorStatus & exec_status, PipelineExecGroupBuilder & group_builder) -{ - /// handle generated column if necessary. - executeGeneratedColumnPlaceholder(exec_status, group_builder, remote_read_sources_start_index, generated_column_infos, log); - NamesAndTypes source_columns; - source_columns.reserve(table_scan.getColumnSize()); - const auto table_scan_output_header = group_builder.getCurrentHeader(); - for (const auto & col : table_scan_output_header) - source_columns.emplace_back(col.name, col.type); - analyzer = std::make_unique(std::move(source_columns), context); - /// If there is no local source, there is no need to execute cast and push down filter, return directly. - /// But we should make sure that the analyzer is initialized before return. - if (remote_read_sources_start_index == 0) - return; - /// handle timezone/duration cast for local table scan. - executeCastAfterTableScan(exec_status, group_builder, remote_read_sources_start_index); - - /// handle filter conditions for local and remote table scan. - if (filter_conditions.hasValue()) - { - ::DB::executePushedDownFilter(exec_status, group_builder, remote_read_sources_start_index, filter_conditions, *analyzer, log); - /// TODO: record profile - } -} - ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) { auto & dag_context = dagContext(); @@ -440,26 +340,7 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired); FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once); -<<<<<<< HEAD - if (!table_scan.getPushedDownFilters().empty()) - { - /// If there are duration type pushed down filters, type of columns may be changed. - const auto table_scan_output_header = pipeline.firstStream()->getHeader(); - for (const auto & col : table_scan_output_header) - { - const auto & name = col.name; - auto it = std::find_if(source_columns.begin(), source_columns.end(), [&name](const auto & c) { return c.name == name; }); - if (it != source_columns.end() && it->type != col.type) - it->type = col.type; - } - } - analyzer = std::make_unique(std::move(source_columns), context); - - /// handle timezone/duration cast for local and remote table scan. - executeCastAfterTableScan(remote_read_streams_start_index, pipeline); -======= ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) /// handle generated column if necessary. executeGeneratedColumnPlaceholder(remote_read_streams_start_index, generated_column_infos, log, pipeline); NamesAndTypes source_columns; @@ -521,33 +402,9 @@ void DAGStorageInterpreter::prepare() assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end()); storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage; -<<<<<<< HEAD - std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan(); -======= std::tie(required_columns, is_need_add_cast_column) = getColumnsForTableScan(); } -void DAGStorageInterpreter::executeCastAfterTableScan( - PipelineExecutorStatus & exec_status, - PipelineExecGroupBuilder & group_builder, - size_t remote_read_sources_start_index) -{ - // execute timezone cast or duration cast if needed for local table scan - auto [has_cast, extra_cast] = addExtraCastsAfterTs(*analyzer, is_need_add_cast_column, table_scan); - if (has_cast) - { - RUNTIME_CHECK(remote_read_sources_start_index <= group_builder.group.size()); - size_t i = 0; - // local sources - while (i < remote_read_sources_start_index) - { - auto & group = group_builder.group[i++]; - group.appendTransformOp(std::make_unique(exec_status, log->identifier(), extra_cast)); - } - } ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) -} - void DAGStorageInterpreter::executeCastAfterTableScan( size_t remote_read_streams_start_index, DAGPipeline & pipeline) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 0036a47b578..5e8b821f4f8 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1086,11 +1086,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, { stream = std::make_shared( read_task_pool, -<<<<<<< HEAD - filter && filter->before_where ? filter->columns_after_cast : columns_to_read, -======= filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read, ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) extra_table_id_index, physical_table_id, log_tracing_id); diff --git a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h index 39f7b35c1cf..fd910a9a6ba 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h +++ b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h @@ -32,32 +32,17 @@ class PushDownFilter : public std::enable_shared_from_this const ColumnDefines & filter_columns_, const String filter_column_name_, const ExpressionActionsPtr & extra_cast_, -<<<<<<< HEAD - const ColumnDefines & columns_after_cast_) -======= const ColumnDefinesPtr & columns_after_cast_) ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) : rs_operator(rs_operator_) , before_where(beofre_where_) , filter_column_name(std::move(filter_column_name_)) , filter_columns(std::move(filter_columns_)) , extra_cast(extra_cast_) -<<<<<<< HEAD - , columns_after_cast(std::move(columns_after_cast_)) -======= , columns_after_cast(columns_after_cast_) ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) {} explicit PushDownFilter(const RSOperatorPtr & rs_operator_) : rs_operator(rs_operator_) -<<<<<<< HEAD - , before_where(nullptr) - , filter_columns({}) - , extra_cast(nullptr) - , columns_after_cast({}) -======= ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) {} // Rough set operator @@ -71,11 +56,7 @@ class PushDownFilter : public std::enable_shared_from_this // The expression actions used to cast the timestamp/datetime column ExpressionActionsPtr extra_cast; // If the extra_cast is not null, the types of the columns may be changed -<<<<<<< HEAD - ColumnDefines columns_after_cast; -======= ColumnDefinesPtr columns_after_cast; ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) }; } // namespace DB::DM diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index ea7590d7a74..d05e7e314d5 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -839,21 +839,6 @@ DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter(const RSOperatorPtr auto [before_where, filter_column_name, _] = ::DB::buildPushDownFilter(pushed_down_filters, *analyzer); LOG_DEBUG(tracing_logger, "Push down filter: {}", before_where->dumpActions()); -<<<<<<< HEAD - ColumnDefines columns_after_cast; - columns_after_cast.reserve(columns_to_read.size()); - const auto & source_columns = analyzer->getCurrentInputColumns(); - for (size_t i = 0; i < table_scan_column_info.size(); ++i) - { - if (table_scan_column_info[i].hasGeneratedColumnFlag()) - continue; - auto it = columns_to_read_map.at(table_scan_column_info[i].id); - RUNTIME_CHECK(it.name == source_columns[i].name); - columns_after_cast.push_back(std::move(it)); - columns_after_cast.back().type = source_columns[i].type; - } - -======= auto columns_after_cast = std::make_shared(); if (extra_cast != nullptr) { @@ -869,7 +854,6 @@ DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter(const RSOperatorPtr columns_after_cast->back().type = current_names_and_types[i].type; } } ->>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469)) return std::make_shared(rs_operator, before_where, filter_columns, filter_column_name, extra_cast, columns_after_cast); } LOG_DEBUG(tracing_logger, "Push down filter is empty");