Skip to content

Commit

Permalink
Merge branch 'master' into 20240714_fix_arrowflight2
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Jul 15, 2024
2 parents 3f87505 + 01f00aa commit 637b247
Show file tree
Hide file tree
Showing 34 changed files with 559 additions and 391 deletions.
15 changes: 9 additions & 6 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1925,7 +1925,8 @@ Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids,
}

Status SegmentIterator::_init_current_block(
vectorized::Block* block, std::vector<vectorized::MutableColumnPtr>& current_columns) {
vectorized::Block* block, std::vector<vectorized::MutableColumnPtr>& current_columns,
uint32_t nrows_read_limit) {
block->clear_column_data(_schema->num_column_ids());

for (size_t i = 0; i < _schema->num_column_ids(); i++) {
Expand All @@ -1945,7 +1946,7 @@ Status SegmentIterator::_init_current_block(
column_desc->path() == nullptr ? "" : column_desc->path()->get_path());
// TODO reuse
current_columns[cid] = file_column_type->create_column();
current_columns[cid]->reserve(_opts.block_row_max);
current_columns[cid]->reserve(nrows_read_limit);
} else {
// the column in block must clear() here to insert new data
if (_is_pred_column[cid] ||
Expand All @@ -1964,7 +1965,7 @@ Status SegmentIterator::_init_current_block(
} else if (column_desc->type() == FieldType::OLAP_FIELD_TYPE_DATETIME) {
current_columns[cid]->set_datetime_type();
}
current_columns[cid]->reserve(_opts.block_row_max);
current_columns[cid]->reserve(nrows_read_limit);
}
}
}
Expand Down Expand Up @@ -2378,14 +2379,16 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
}
}
}
RETURN_IF_ERROR(_init_current_block(block, _current_return_columns));
_converted_column_ids.assign(_schema->columns().size(), 0);

_current_batch_rows_read = 0;
uint32_t nrows_read_limit = _opts.block_row_max;
if (_can_opt_topn_reads()) {
nrows_read_limit = std::min(static_cast<uint32_t>(_opts.topn_limit), nrows_read_limit);
}

RETURN_IF_ERROR(_init_current_block(block, _current_return_columns, nrows_read_limit));
_converted_column_ids.assign(_schema->columns().size(), 0);

_current_batch_rows_read = 0;
RETURN_IF_ERROR(_read_columns_by_index(
nrows_read_limit, _current_batch_rows_read,
_lazy_materialization_read || _opts.record_rowids || _is_need_expr_eval));
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ class SegmentIterator : public RowwiseIterator {
bool set_block_rowid);
void _replace_version_col(size_t num_rows);
Status _init_current_block(vectorized::Block* block,
std::vector<vectorized::MutableColumnPtr>& non_pred_vector);
std::vector<vectorized::MutableColumnPtr>& non_pred_vector,
uint32_t nrows_read_limit);
uint16_t _evaluate_vectorization_predicate(uint16_t* sel_rowid_idx, uint16_t selected_size);
uint16_t _evaluate_short_circuit_predicate(uint16_t* sel_rowid_idx, uint16_t selected_size);
void _output_non_pred_columns(vectorized::Block* block);
Expand Down
5 changes: 0 additions & 5 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,11 +559,6 @@ Status AnalyticLocalState::close(RuntimeState* state) {

std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
_result_window_columns.swap(tmp_result_window_columns);
// Some kinds of source operators has a 1-1 relationship with a sink operator (such as AnalyticOperator).
// We must ensure AnalyticSinkOperator will not be blocked if AnalyticSourceOperator already closed.
if (_shared_state && _shared_state->sink_deps.size() == 1) {
_shared_state->sink_deps.front()->set_always_ready();
}
return PipelineXLocalState<AnalyticSharedState>::close(state);
}

Expand Down
6 changes: 5 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,11 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
} else if (!local_state._should_build_hash_table) {
DCHECK(_shared_hashtable_controller != nullptr);
DCHECK(_shared_hash_table_context != nullptr);
CHECK(_shared_hash_table_context->signaled);
// the instance which is not build hash table, it's should wait the signal of hash table build finished.
// but if it's running and signaled == false, maybe the source operator have closed caused by some short circuit,
if (!_shared_hash_table_context->signaled) {
return Status::Error<ErrorCode::END_OF_FILE>("source have closed");
}

if (!_shared_hash_table_context->status.ok()) {
return _shared_hash_table_context->status;
Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,11 @@ Status PipelineXLocalState<SharedStateArg>::close(RuntimeState* state) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
}
_closed = true;
// Some kinds of source operators has a 1-1 relationship with a sink operator (such as AnalyticOperator).
// We must ensure AnalyticSinkOperator will not be blocked if AnalyticSourceOperator already closed.
if (_shared_state && _shared_state->sink_deps.size() == 1) {
_shared_state->sink_deps.front()->set_always_ready();
}
return Status::OK();
}

Expand Down
8 changes: 5 additions & 3 deletions be/src/util/bitmap_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -1252,8 +1252,7 @@ class BitmapValue {
std::vector<const detail::Roaring64Map*> bitmaps;
std::vector<uint64_t> single_values;
std::vector<const SetContainer<uint64_t>*> sets;
for (int i = 0; i < values.size(); ++i) {
auto* value = values[i];
for (const auto* value : values) {
switch (value->_type) {
case EMPTY:
break;
Expand All @@ -1280,7 +1279,9 @@ class BitmapValue {
_bitmap->add(_sv);
break;
case BITMAP:
*_bitmap |= detail::Roaring64Map::fastunion(bitmaps.size(), bitmaps.data());
for (const auto* bitmap : bitmaps) {
*_bitmap |= *bitmap;
}
break;
case SET: {
*_bitmap = detail::Roaring64Map::fastunion(bitmaps.size(), bitmaps.data());
Expand Down Expand Up @@ -1315,6 +1316,7 @@ class BitmapValue {
_bitmap->add(v);
}
_type = BITMAP;
_set.clear();
break;
case SET: {
break;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exprs/vexpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ std::string VExpr::gen_predicate_result_sign(Block& block, const ColumnNumbers&
std::string column_name = block.get_by_position(arguments[0]).name;
pred_result_sign +=
BeConsts::BLOCK_TEMP_COLUMN_PREFIX + column_name + "_" + function_name + "_";
if (function_name == "in") {
if (function_name == "in" || function_name == "not_in") {
// Generating 'result_sign' from 'inlist' requires sorting the values.
std::set<std::string> values;
for (size_t i = 1; i < arguments.size(); i++) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/functions/function.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ class DefaultFunction final : public IFunctionBase {
auto function_name = function->get_name();
return function_name == "eq" || function_name == "ne" || function_name == "lt" ||
function_name == "gt" || function_name == "le" || function_name == "ge" ||
function_name == "in";
function_name == "in" || function_name == "not_in";
}

Status eval_inverted_index(FunctionContext* context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ private void getDeleteBitmapUpdateLock(Map<Long, Set<Long>> tableToParttions, lo
}
StopWatch stopWatch = new StopWatch();
stopWatch.start();
int totalRetryTime = 0;
for (Map.Entry<Long, Set<Long>> entry : tableToParttions.entrySet()) {
GetDeleteBitmapUpdateLockRequest.Builder builder = GetDeleteBitmapUpdateLockRequest.newBuilder();
builder.setTableId(entry.getKey())
Expand Down Expand Up @@ -790,10 +791,15 @@ private void getDeleteBitmapUpdateLock(Map<Long, Set<Long>> tableToParttions, lo
cumulativePoints.put(tabletId, respCumulativePoints.get(i));
}
}
totalRetryTime += retryTime;
}
stopWatch.stop();
LOG.info("get delete bitmap lock successfully. txns: {}. time cost: {} ms.",
transactionId, stopWatch.getTime());
if (totalRetryTime > 0 || stopWatch.getTime() > 20) {
LOG.info(
"get delete bitmap lock successfully. txns: {}. totalRetryTime: {}. "
+ "partitionSize: {}. time cost: {} ms.",
transactionId, totalRetryTime, tableToParttions.size(), stopWatch.getTime());
}
}

private void sendCalcDeleteBitmaptask(long dbId, long transactionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.doris.nereids.trees.expressions.IsNull;
import org.apache.doris.nereids.trees.expressions.LessThan;
import org.apache.doris.nereids.trees.expressions.LessThanEqual;
import org.apache.doris.nereids.trees.expressions.Like;
import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.NullSafeEqual;
import org.apache.doris.nereids.trees.expressions.Or;
Expand Down Expand Up @@ -403,11 +402,6 @@ public Expression visitOr(Or or, ExpressionRewriteContext context) {
}
}

@Override
public Expression visitLike(Like like, ExpressionRewriteContext context) {
return like;
}

@Override
public Expression visitCast(Cast cast, ExpressionRewriteContext context) {
cast = rewriteChildren(cast, context);
Expand Down
43 changes: 43 additions & 0 deletions regression-test/data/inverted_index_p0/test_index_rqg_bug3.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_bug_1 --
-10 2023-12-11
-10 2023-12-12
-10 2023-12-13
-10 2023-12-15
-10 2023-12-15
-10 2023-12-19
-10 2023-12-19
-10 2024-01-17
-10 2024-02-18
-10 2024-02-18
-10 2025-02-18
-10 2026-01-18
-10 2026-02-18
-4 2023-12-10
-4 2023-12-11
-4 2023-12-16
-4 2024-01-31
0 2024-01-19
1 2023-12-16
1 2024-01-09
2 2023-12-10
2 2023-12-11
2 2024-01-08
2 2024-01-31
3 2023-12-20
3 2024-01-19
3 2025-06-18
3 2026-02-18
3 2027-01-16
4 2023-12-12
4 2023-12-12
4 2024-01-08
5 2023-12-16
6 2024-02-18
7 2023-12-17
7 2023-12-20
7 2027-01-09
8 2025-02-18
9 2024-02-18
9 2024-02-18

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ suite("paimon_base_filesystem", "p0,external,doris,external_docker,external_dock

String s3ak = getS3AK()
String s3sk = getS3SK()
def s3Endpoint = getS3Endpoint()

def cos = """select c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c18 from ${catalog_cos}.zd.all_table order by c18"""
def oss = """select * from ${catalog_oss}.paimonossdb1.test_tableoss order by a"""
Expand All @@ -48,9 +49,9 @@ suite("paimon_base_filesystem", "p0,external,doris,external_docker,external_dock
create catalog if not exists ${catalog_oss} properties (
"type" = "paimon",
"warehouse" = "oss://paimon-zd/paimonoss",
"oss.endpoint"="oss-cn-beijing.aliyuncs.com",
"oss.access_key"="${ak}",
"oss.secret_key"="${sk}"
"oss.secret_key"="${sk}",
"oss.endpoint"="oss-cn-beijing.aliyuncs.com"
);
"""
logger.info("catalog " + catalog_cos + " created")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

suite("test_hive_write_insert_s3", "p2,external,hive,external_remote,external_remote_hive") {
def format_compressions = ["parquet_snappy"]
def s3BucketName = getS3BucketName()

def q01 = { String format_compression, String catalog_name ->
logger.info("hive sql: " + """ truncate table all_types_${format_compression}_s3; """)
Expand Down Expand Up @@ -76,8 +77,8 @@ suite("test_hive_write_insert_s3", "p2,external,hive,external_remote,external_re
hive_remote """ DROP TABLE IF EXISTS all_types_par_${format_compression}_s3_${catalog_name}_q02; """
logger.info("hive sql: " + """ CREATE TABLE IF NOT EXISTS all_types_par_${format_compression}_s3_${catalog_name}_q02 like all_types_par_${format_compression}_s3; """)
hive_remote """ CREATE TABLE IF NOT EXISTS all_types_par_${format_compression}_s3_${catalog_name}_q02 like all_types_par_${format_compression}_s3; """
logger.info("hive sql: " + """ ALTER TABLE all_types_par_${format_compression}_s3_${catalog_name}_q02 SET LOCATION 'cosn://doris-build-1308700295/regression/write/data/all_types_par_${format_compression}_s3_${catalog_name}_q02'; """)
hive_remote """ ALTER TABLE all_types_par_${format_compression}_s3_${catalog_name}_q02 SET LOCATION 'cosn://doris-build-1308700295/regression/write/data/all_types_par_${format_compression}_s3_${catalog_name}_q02'; """
logger.info("hive sql: " + """ ALTER TABLE all_types_par_${format_compression}_s3_${catalog_name}_q02 SET LOCATION 'cosn://${s3BucketName}/regression/write/data/all_types_par_${format_compression}_s3_${catalog_name}_q02'; """)
hive_remote """ ALTER TABLE all_types_par_${format_compression}_s3_${catalog_name}_q02 SET LOCATION 'cosn://${s3BucketName}/regression/write/data/all_types_par_${format_compression}_s3_${catalog_name}_q02'; """
sql """refresh catalog ${catalog_name};"""

sql """
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/github_events_p2/load.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ suite("load") {
ak "${getS3AK()}"
sk "${getS3SK()}"
endpoint "http://${getS3Endpoint()}"
region "ap-beijing"
region "${getS3Region()}"
repository "regression_test_github_events"
snapshot "github_events"
timestamp "2022-03-23-12-19-51"
Expand Down
Loading

0 comments on commit 637b247

Please sign in to comment.