Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
zclllyybb committed Mar 13, 2024
1 parent 9c579f3 commit 3184fd1
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 18 deletions.
9 changes: 6 additions & 3 deletions be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,22 @@ class BeExecVersionManager {
* c. change the string hash method in runtime filter
* d. elt function return type change to nullable(string)
* e. add repeat_max_num in repeat function
* 3: start from doris 2.1
* 3: start from doris 2.0 (by some mistakes)
* a. aggregation function do not serialize bitmap to string.
* 4: start from doris 2.1
* a. support window funnel mode from 2.0
* b. array contains/position/countequal function return nullable in less situations.
* c. cleared old version of Version 2.
* d. unix_timestamp function support timestamp with float for datetimev2, and change nullable mode.
* e. change shuffle serialize/deserialize way
* f. shrink some function's nullable mode.
* g. do local merge of remote runtime filter
*/
constexpr inline int BeExecVersionManager::max_be_exec_version = 3;
constexpr inline int BeExecVersionManager::max_be_exec_version = 4;
constexpr inline int BeExecVersionManager::min_be_exec_version = 0;

/// functional
constexpr inline int USE_NEW_SERDE = 3; // release on DORIS version 2.1
constexpr inline int BITMAP_SERDE = 3;
constexpr inline int USE_NEW_SERDE = 4; // release on DORIS version 2.1

} // namespace doris
3 changes: 2 additions & 1 deletion be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <mutex>
#include <ostream>

#include "agent/be_exec_version_manager.h"
#include "common/logging.h"
#include "common/object_pool.h"
#include "common/status.h"
Expand Down Expand Up @@ -993,7 +994,7 @@ Status IRuntimeFilter::publish(bool publish_local) {
} else if (_has_local_target) {
RETURN_IF_ERROR(send_to_local(_wrapper));
} else if (!publish_local) {
if (_is_broadcast_join || _state->be_exec_version < 3) {
if (_is_broadcast_join || _state->be_exec_version < USE_NEW_SERDE) {
RETURN_IF_ERROR(send_to_remote(this));
} else {
RETURN_IF_ERROR(do_local_merge());
Expand Down
19 changes: 10 additions & 9 deletions be/src/vec/aggregate_functions/aggregate_function_bitmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <string>
#include <vector>

#include "agent/be_exec_version_manager.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "util/bitmap_value.h"
#include "vec/aggregate_functions/aggregate_function.h"
Expand Down Expand Up @@ -159,7 +160,7 @@ class AggregateFunctionBitmapSerializationHelper

void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst,
const size_t num_rows, Arena* arena) const override {
if (version >= 3) {
if (version >= BITMAP_SERDE) {
auto& col = assert_cast<ColumnBitmap&>(*dst);
char place[sizeof(Data)];
col.resize(num_rows);
Expand All @@ -177,7 +178,7 @@ class AggregateFunctionBitmapSerializationHelper

void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset,
MutableColumnPtr& dst, const size_t num_rows) const override {
if (version >= 3) {
if (version >= BITMAP_SERDE) {
auto& col = assert_cast<ColumnBitmap&>(*dst);
col.resize(num_rows);
auto* data = col.get_data().data();
Expand All @@ -191,7 +192,7 @@ class AggregateFunctionBitmapSerializationHelper

void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column,
Arena* arena) const override {
if (version >= 3) {
if (version >= BITMAP_SERDE) {
auto& col = assert_cast<const ColumnBitmap&>(column);
const size_t num_rows = column.size();
auto* data = col.get_data().data();
Expand All @@ -209,7 +210,7 @@ class AggregateFunctionBitmapSerializationHelper
Arena* arena) const override {
DCHECK(end <= column.size() && begin <= end)
<< ", begin:" << begin << ", end:" << end << ", column.size():" << column.size();
if (version >= 3) {
if (version >= BITMAP_SERDE) {
auto& col = assert_cast<const ColumnBitmap&>(column);
auto* data = col.get_data().data();
for (size_t i = begin; i <= end; ++i) {
Expand All @@ -223,7 +224,7 @@ class AggregateFunctionBitmapSerializationHelper
void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
const size_t num_rows) const override {
if (version >= 3) {
if (version >= BITMAP_SERDE) {
auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const IColumn*>(column));
auto* data = col.get_data().data();
for (size_t i = 0; i != num_rows; ++i) {
Expand All @@ -237,7 +238,7 @@ class AggregateFunctionBitmapSerializationHelper
void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
Arena* arena, const size_t num_rows) const override {
if (version >= 3) {
if (version >= BITMAP_SERDE) {
auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const IColumn*>(column));
auto* data = col.get_data().data();
for (size_t i = 0; i != num_rows; ++i) {
Expand All @@ -253,7 +254,7 @@ class AggregateFunctionBitmapSerializationHelper

void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place,
IColumn& to) const override {
if (version >= 3) {
if (version >= BITMAP_SERDE) {
auto& col = assert_cast<ColumnBitmap&>(to);
size_t old_size = col.size();
col.resize(old_size + 1);
Expand All @@ -264,15 +265,15 @@ class AggregateFunctionBitmapSerializationHelper
}

[[nodiscard]] MutableColumnPtr create_serialize_column() const override {
if (version >= 3) {
if (version >= BITMAP_SERDE) {
return ColumnBitmap::create();
} else {
return ColumnString::create();
}
}

[[nodiscard]] DataTypePtr get_serialized_type() const override {
if (version >= 3) {
if (version >= BITMAP_SERDE) {
return std::make_shared<DataTypeBitMap>();
} else {
return IAggregateFunction::get_serialized_type();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <utility>
#include <vector>

#include "agent/be_exec_version_manager.h"
#include "common/compiler_util.h"
#include "util/binary_cast.hpp"
#include "vec/aggregate_functions/aggregate_function.h"
Expand Down Expand Up @@ -269,8 +270,8 @@ class AggregateFunctionWindowFunnel

void create(AggregateDataPtr __restrict place) const override {
auto data = new (place) WindowFunnelState<DateValueType, NativeType>();
/// support window funnel mode from 2.0. See `BeExecVersionManager::max_be_exec_version`
data->enable_mode = version >= 3;
/// support window funnel mode from 2.1. See `BeExecVersionManager::max_be_exec_version`
data->enable_mode = version >= USE_NEW_SERDE;
}

String get_name() const override { return "window_funnel"; }
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/functions/simple_function_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ class SimpleFunctionFactory {
using Creator = std::function<FunctionBuilderPtr()>;
using FunctionCreators = phmap::flat_hash_map<std::string, Creator>;
using FunctionIsVariadic = phmap::flat_hash_set<std::string>;
/// @TEMPORARY: for be_exec_version=3
constexpr static int NEWEST_VERSION_FUNCTION_SUBSTITUTE = 3;
/// @TEMPORARY: for be_exec_version=4
constexpr static int NEWEST_VERSION_FUNCTION_SUBSTITUTE = 4;

public:
void register_function(const std::string& name, const Creator& ptr) {
Expand Down
Binary file modified be/test/exec/test_data/wal_scanner/wal
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -1754,7 +1754,7 @@ public class Config extends ConfigBase {
* Max data version of backends serialize block.
*/
@ConfField(mutable = false)
public static int max_be_exec_version = 3;
public static int max_be_exec_version = 4;

/**
* Min data version of backends serialize block.
Expand Down

0 comments on commit 3184fd1

Please sign in to comment.