Skip to content

Commit

Permalink
[Opt](TabletSchema) reuse TabletColumn info to reduce mem (apache#42448)
Browse files Browse the repository at this point in the history
1. When there are a large number of identical TabletColumns in the
cluster, which usually occurs when VARIANT type columns are modified and
added, each Rowset has an individual TabletSchema. Excessive
TabletSchemas can lead to significant memory overhead. Reusing memory
for identical TabletColumns would greatly reduce this memory
consumption.
2. Serialized TabletSchema as LRU cache key could also increase memusage
when large sets of schemas are in LRU cache, so inorder to reduce the
memory footprint we just record the key signature caculated by
generating an UUID by hash algorithm, and lookup the key signature in
LRU cache, and check the key in case of hash collision
  • Loading branch information
eldenmoon committed Nov 6, 2024
1 parent bd327c4 commit fc8cbc5
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 43 deletions.
17 changes: 17 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,23 @@ const int32_t MAX_LEAF_COUNT = 1024;
const float MAXMBSortInHeap = 512.0 * 8;
const int DIMS = 1;

bool InvertedIndexColumnWriter::check_support_inverted_index(const TabletColumn& column) {
// bellow types are not supported in inverted index for extracted columns
static std::set<FieldType> invalid_types = {
FieldType::OLAP_FIELD_TYPE_DOUBLE,
FieldType::OLAP_FIELD_TYPE_JSONB,
FieldType::OLAP_FIELD_TYPE_ARRAY,
FieldType::OLAP_FIELD_TYPE_FLOAT,
};
if (column.is_extracted_column() && (invalid_types.contains(column.type()))) {
return false;
}
if (column.is_variant_type()) {
return false;
}
return true;
}

template <FieldType field_type>
class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
public:
Expand Down
19 changes: 2 additions & 17 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@
#include "io/fs/local_file_system.h"
#include "olap/olap_common.h"
#include "olap/options.h"
#include "olap/tablet_schema.h"

namespace doris {
class CollectionValue;

class Field;

class TabletIndex;
class TabletColumn;

namespace segment_v2 {
class InvertedIndexFileWriter;
Expand Down Expand Up @@ -74,22 +74,7 @@ class InvertedIndexColumnWriter {

// check if the column is valid for inverted index, some columns
// are generated from variant, but not all of them are supported
static bool check_support_inverted_index(const TabletColumn& column) {
// bellow types are not supported in inverted index for extracted columns
static std::set<FieldType> invalid_types = {
FieldType::OLAP_FIELD_TYPE_DOUBLE,
FieldType::OLAP_FIELD_TYPE_JSONB,
FieldType::OLAP_FIELD_TYPE_ARRAY,
FieldType::OLAP_FIELD_TYPE_FLOAT,
};
if (column.is_extracted_column() && (invalid_types.contains(column.type()))) {
return false;
}
if (column.is_variant_type()) {
return false;
}
return true;
}
static bool check_support_inverted_index(const TabletColumn& column);

private:
DISALLOW_COPY_AND_ASSIGN(InvertedIndexColumnWriter);
Expand Down
57 changes: 57 additions & 0 deletions be/src/olap/tablet_column_object_pool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "olap/tablet_column_object_pool.h"

#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/olap_file.pb.h>

#include "olap/tablet_schema.h"

namespace doris {

bvar::Adder<int64_t> g_tablet_column_cache_count("tablet_column_cache_count");
bvar::Adder<int64_t> g_tablet_column_cache_hit_count("tablet_column_cache_hit_count");

std::pair<Cache::Handle*, TabletColumnPtr> TabletColumnObjectPool::insert(const std::string& key) {
auto* lru_handle = lookup(key);
TabletColumnPtr tablet_column_ptr;
if (lru_handle) {
auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle);
tablet_column_ptr = value->tablet_column;
VLOG_DEBUG << "reuse column ";
g_tablet_column_cache_hit_count << 1;
} else {
auto* value = new CacheValue;
tablet_column_ptr = std::make_shared<TabletColumn>();
ColumnPB pb;
pb.ParseFromString(key);
tablet_column_ptr->init_from_pb(pb);
VLOG_DEBUG << "create column ";
value->tablet_column = tablet_column_ptr;
lru_handle = LRUCachePolicy::insert(key, value, 1, 0, CachePriority::NORMAL);
g_tablet_column_cache_count << 1;
}
DCHECK(lru_handle != nullptr);
return {lru_handle, tablet_column_ptr};
}

TabletColumnObjectPool::CacheValue::~CacheValue() {
g_tablet_column_cache_count << -1;
}

} // namespace doris
58 changes: 58 additions & 0 deletions be/src/olap/tablet_column_object_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "olap/tablet_fwd.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "runtime/memory/lru_cache_policy.h"

namespace doris {

// TabletColumnObjectPool is a cache for TabletColumn objects. It is used to reduce memory consumption
// when there are a large number of identical TabletColumns in the cluster, which usually occurs
// when VARIANT type columns are modified and added, each Rowset has an individual TabletSchema.
// Excessive TabletSchemas can lead to significant memory overhead. Reusing memory for identical
// TabletColumns would greatly reduce this memory consumption.

class TabletColumnObjectPool : public LRUCachePolicy {
public:
TabletColumnObjectPool(size_t capacity)
: LRUCachePolicy(CachePolicy::CacheType::TABLET_COLUMN_OBJECT_POOL, capacity,
LRUCacheType::NUMBER, config::tablet_schema_cache_recycle_interval) {}

static TabletColumnObjectPool* create_global_column_cache(size_t capacity) {
auto* res = new TabletColumnObjectPool(capacity);
return res;
}

static TabletColumnObjectPool* instance() {
return ExecEnv::GetInstance()->get_tablet_column_object_pool();
}

std::pair<Cache::Handle*, TabletColumnPtr> insert(const std::string& key);

private:
class CacheValue : public LRUCacheValueBase {
public:
~CacheValue() override;
TabletColumnPtr tablet_column;
};
};

} // namespace doris
47 changes: 30 additions & 17 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
#include "exec/tablet_info.h"
#include "olap/inverted_index_parser.h"
#include "olap/olap_define.h"
#include "olap/tablet_column_object_pool.h"
#include "olap/types.h"
#include "olap/utils.h"
#include "runtime/memory/lru_cache_policy.h"
#include "runtime/thread_context.h"
#include "tablet_meta.h"
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
Expand Down Expand Up @@ -859,6 +861,7 @@ TabletSchema::TabletSchema() {

TabletSchema::~TabletSchema() {
g_total_tablet_schema_num << -1;
clear_column_cache_handlers();
}

void TabletSchema::append_column(TabletColumn column, ColumnType col_type) {
Expand Down Expand Up @@ -948,9 +951,18 @@ void TabletSchema::clear_columns() {
_num_null_columns = 0;
_num_key_columns = 0;
_cols.clear();
clear_column_cache_handlers();
}

void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns) {
void TabletSchema::clear_column_cache_handlers() {
for (auto* cache_handle : _column_cache_handlers) {
TabletColumnObjectPool::instance()->release(cache_handle);
}
_column_cache_handlers.clear();
}

void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns,
bool reuse_cache_column) {
_keys_type = schema.keys_type();
_num_columns = 0;
_num_variant_columns = 0;
Expand All @@ -961,25 +973,34 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extrac
_field_name_to_index.clear();
_field_id_to_index.clear();
_cluster_key_idxes.clear();
clear_column_cache_handlers();
for (const auto& i : schema.cluster_key_idxes()) {
_cluster_key_idxes.push_back(i);
}
for (auto& column_pb : schema.column()) {
TabletColumn column;
column.init_from_pb(column_pb);
if (ignore_extracted_columns && column.is_extracted_column()) {
TabletColumnPtr column;
if (reuse_cache_column) {
auto pair = TabletColumnObjectPool::instance()->insert(
deterministic_string_serialize(column_pb));
column = pair.second;
_column_cache_handlers.push_back(pair.first);
} else {
column = std::make_shared<TabletColumn>();
column->init_from_pb(column_pb);
}
if (ignore_extracted_columns && column->is_extracted_column()) {
continue;
}
if (column.is_key()) {
if (column->is_key()) {
_num_key_columns++;
}
if (column.is_nullable()) {
if (column->is_nullable()) {
_num_null_columns++;
}
if (column.is_variant_type()) {
if (column->is_variant_type()) {
++_num_variant_columns;
}
_cols.emplace_back(std::make_shared<TabletColumn>(std::move(column)));
_cols.emplace_back(std::move(column));
_field_name_to_index.emplace(StringRef(_cols.back()->name()), _num_columns);
_field_id_to_index[_cols.back()->unique_id()] = _num_columns;
_num_columns++;
Expand Down Expand Up @@ -1091,6 +1112,7 @@ void TabletSchema::build_current_tablet_schema(int64_t index_id, int32_t version
_sequence_col_idx = -1;
_version_col_idx = -1;
_cluster_key_idxes.clear();
clear_column_cache_handlers();
for (const auto& i : ori_tablet_schema._cluster_key_idxes) {
_cluster_key_idxes.push_back(i);
}
Expand Down Expand Up @@ -1563,13 +1585,4 @@ bool operator!=(const TabletSchema& a, const TabletSchema& b) {
return !(a == b);
}

std::string TabletSchema::deterministic_string_serialize(const TabletSchemaPB& schema_pb) {
std::string output;
google::protobuf::io::StringOutputStream string_output_stream(&output);
google::protobuf::io::CodedOutputStream output_stream(&string_output_stream);
output_stream.SetSerializationDeterministic(true);
schema_pb.SerializeToCodedStream(&output_stream);
return output;
}

} // namespace doris
20 changes: 18 additions & 2 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "olap/rowset/segment_v2/options.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/memory/lru_cache_policy.h"
#include "util/string_util.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/common/string_ref.h"
Expand Down Expand Up @@ -296,10 +297,22 @@ class TabletSchema {
TabletSchema();
virtual ~TabletSchema();

void init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns = false);
// Init from pb
// ignore_extracted_columns: ignore the extracted columns from variant column
// reuse_cached_column: reuse the cached column in the schema if they are the same, to reduce memory usage
void init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns = false,
bool reuse_cached_column = false);
// Notice: Use deterministic way to serialize protobuf,
// since serialize Map in protobuf may could lead to un-deterministic by default
static std::string deterministic_string_serialize(const TabletSchemaPB& schema_pb);
template <class PbType>
static std::string deterministic_string_serialize(const PbType& pb) {
std::string output;
google::protobuf::io::StringOutputStream string_output_stream(&output);
google::protobuf::io::CodedOutputStream output_stream(&string_output_stream);
output_stream.SetSerializationDeterministic(true);
pb.SerializeToCodedStream(&output_stream);
return output;
}
void to_schema_pb(TabletSchemaPB* tablet_meta_pb) const;
void append_column(TabletColumn column, ColumnType col_type = ColumnType::NORMAL);
void append_index(TabletIndex index);
Expand Down Expand Up @@ -501,10 +514,13 @@ class TabletSchema {
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
friend bool operator!=(const TabletSchema& a, const TabletSchema& b);

void clear_column_cache_handlers();

KeysType _keys_type = DUP_KEYS;
SortType _sort_type = SortType::LEXICAL;
size_t _sort_col_num = 0;
std::vector<TabletColumnPtr> _cols;
std::vector<Cache::Handle*> _column_cache_handlers;

std::vector<TabletIndex> _indexes;
std::unordered_map<StringRef, int32_t, StringRefHash> _field_name_to_index;
Expand Down
23 changes: 19 additions & 4 deletions be/src/olap/tablet_schema_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,45 @@
#include "olap/tablet_schema_cache.h"

#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>
#include <json2pb/pb_to_json.h>

#include "bvar/bvar.h"
#include "olap/tablet_schema.h"
#include "util/sha.h"

bvar::Adder<int64_t> g_tablet_schema_cache_count("tablet_schema_cache_count");
bvar::Adder<int64_t> g_tablet_schema_cache_columns_count("tablet_schema_cache_columns_count");
bvar::Adder<int64_t> g_tablet_schema_cache_hit_count("tablet_schema_cache_hit_count");

namespace doris {

// to reduce the memory consumption of the serialized TabletSchema as key.
// use sha256 to prevent from hash collision
static std::string get_key_signature(const std::string& origin) {
SHA256Digest digest;
digest.reset(origin.data(), origin.length());
return std::string {digest.digest().data(), digest.digest().length()};
}

std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::insert(const std::string& key) {
auto* lru_handle = lookup(key);
std::string key_signature = get_key_signature(key);
auto* lru_handle = lookup(key_signature);
TabletSchemaSPtr tablet_schema_ptr;
if (lru_handle) {
auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle);
tablet_schema_ptr = value->tablet_schema;
g_tablet_schema_cache_hit_count << 1;
} else {
auto* value = new CacheValue;
tablet_schema_ptr = std::make_shared<TabletSchema>();
TabletSchemaPB pb;
pb.ParseFromString(key);
tablet_schema_ptr->init_from_pb(pb);
// We should reuse the memory of the same TabletColumn object, set reuse_cached_column to true
tablet_schema_ptr->init_from_pb(pb, false, true);
value->tablet_schema = tablet_schema_ptr;
lru_handle = LRUCachePolicy::insert(key, value, tablet_schema_ptr->num_columns(), 0,
CachePriority::NORMAL);
lru_handle = LRUCachePolicy::insert(key_signature, value, tablet_schema_ptr->num_columns(),
0, CachePriority::NORMAL);
g_tablet_schema_cache_count << 1;
g_tablet_schema_cache_columns_count << tablet_schema_ptr->num_columns();
}
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/query_cache/query_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "runtime/memory/mem_tracker.h"
#include "util/slice.h"
#include "util/time.h"
#include "vec/core/block.h"

namespace doris {

Expand Down
Loading

0 comments on commit fc8cbc5

Please sign in to comment.