Skip to content

Commit

Permalink
reduce column mem
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Oct 29, 2024
1 parent b3b04ab commit 8e31fa5
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 22 deletions.
55 changes: 55 additions & 0 deletions be/src/olap/tablet_column_cache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "olap/tablet_column_cache.h"

#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/olap_file.pb.h>

#include "olap/tablet_schema.h"

namespace doris {

TabletColumnPtr TabletColumnCache::insert(const std::string& key) {
auto* lru_handle = lookup(key);
TabletColumnPtr tablet_column_ptr;
if (lru_handle) {
auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle);
tablet_column_ptr = value->tablet_column;
VLOG_DEBUG << "reuse column ";
} else {
auto* value = new CacheValue;
tablet_column_ptr = std::make_shared<TabletColumn>();
ColumnPB pb;
pb.ParseFromString(key);
tablet_column_ptr->init_from_pb(pb);
VLOG_DEBUG << "create column ";
value->tablet_column = tablet_column_ptr;
lru_handle =
LRUCachePolicy::insert(key, value, sizeof(TabletColumn), 0, CachePriority::NORMAL);
}
DCHECK(lru_handle != nullptr);
return tablet_column_ptr;
}

void TabletColumnCache::release(Cache::Handle* lru_handle) {
LRUCachePolicy::release(lru_handle);
}

TabletColumnCache::CacheValue::~CacheValue() = default;

} // namespace doris
60 changes: 60 additions & 0 deletions be/src/olap/tablet_column_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "olap/tablet_fwd.h"
#include "runtime/exec_env.h"
#include "runtime/memory/lru_cache_policy.h"

namespace doris {

// TabletColumnCache is a cache for TabletColumn objects. It is used to reduce memory consumption
// when there are a large number of identical TabletColumns in the cluster, which usually occurs
// when VARIANT type columns are modified and added, each Rowset has an individual TabletSchema.
// Excessive TabletSchemas can lead to significant memory overhead. Reusing memory for identical
// TabletColumns would greatly reduce this memory consumption.

class TabletColumnCache : public LRUCachePolicy {
public:
TabletColumnCache(size_t capacity)
: LRUCachePolicy(CachePolicy::CacheType::TABLET_COLUMN_CACHE, capacity,
LRUCacheType::NUMBER, config::tablet_schema_cache_recycle_interval) {}

static TabletColumnCache* create_global_column_cache(size_t capacity) {
auto* res = new TabletColumnCache(capacity);
return res;
}

static TabletColumnCache* instance() {
return ExecEnv::GetInstance()->get_tablet_column_cache();
}

TabletColumnPtr insert(const std::string& key);

void release(Cache::Handle*);

private:
class CacheValue : public LRUCacheValueBase {
public:
~CacheValue() override;

TabletColumnPtr tablet_column;
};
};

} // namespace doris
33 changes: 16 additions & 17 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "exec/tablet_info.h"
#include "olap/inverted_index_parser.h"
#include "olap/olap_define.h"
#include "olap/tablet_column_cache.h"
#include "olap/types.h"
#include "olap/utils.h"
#include "runtime/thread_context.h"
Expand Down Expand Up @@ -942,7 +943,8 @@ void TabletSchema::clear_columns() {
_cols.clear();
}

void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns) {
void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns,
bool reuse_cache_column) {
_keys_type = schema.keys_type();
_num_columns = 0;
_num_variant_columns = 0;
Expand All @@ -957,21 +959,27 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extrac
_cluster_key_idxes.push_back(i);
}
for (auto& column_pb : schema.column()) {
TabletColumn column;
column.init_from_pb(column_pb);
if (ignore_extracted_columns && column.is_extracted_column()) {
TabletColumnPtr column;
if (reuse_cache_column) {
column = TabletColumnCache::instance()->insert(
deterministic_string_serialize(column_pb));
} else {
column = std::make_shared<TabletColumn>();
column->init_from_pb(column_pb);
}
if (ignore_extracted_columns && column->is_extracted_column()) {
continue;
}
if (column.is_key()) {
if (column->is_key()) {
_num_key_columns++;
}
if (column.is_nullable()) {
if (column->is_nullable()) {
_num_null_columns++;
}
if (column.is_variant_type()) {
if (column->is_variant_type()) {
++_num_variant_columns;
}
_cols.emplace_back(std::make_shared<TabletColumn>(std::move(column)));
_cols.emplace_back(std::move(column));
_vl_field_mem_size +=
sizeof(StringRef) + sizeof(char) * _cols.back()->name().size() + sizeof(size_t);
_field_name_to_index.emplace(StringRef(_cols.back()->name()), _num_columns);
Expand Down Expand Up @@ -1565,13 +1573,4 @@ bool operator!=(const TabletSchema& a, const TabletSchema& b) {
return !(a == b);
}

std::string TabletSchema::deterministic_string_serialize(const TabletSchemaPB& schema_pb) {
std::string output;
google::protobuf::io::StringOutputStream string_output_stream(&output);
google::protobuf::io::CodedOutputStream output_stream(&string_output_stream);
output_stream.SetSerializationDeterministic(true);
schema_pb.SerializeToCodedStream(&output_stream);
return output;
}

} // namespace doris
16 changes: 14 additions & 2 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,22 @@ class TabletSchema : public MetadataAdder<TabletSchema> {
TabletSchema();
virtual ~TabletSchema();

void init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns = false);
// Init from pb
// ignore_extracted_columns: ignore the extracted columns from variant column
// reuse_cached_column: reuse the cached column in the schema if they are the same, to reduce memory usage
void init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns = false,
bool reuse_cached_column = false);
// Notice: Use deterministic way to serialize protobuf,
// since serialize Map in protobuf may could lead to un-deterministic by default
static std::string deterministic_string_serialize(const TabletSchemaPB& schema_pb);
template <class PbType>
static std::string deterministic_string_serialize(const PbType& pb) {
std::string output;
google::protobuf::io::StringOutputStream string_output_stream(&output);
google::protobuf::io::CodedOutputStream output_stream(&string_output_stream);
output_stream.SetSerializationDeterministic(true);
pb.SerializeToCodedStream(&output_stream);
return output;
}
void to_schema_pb(TabletSchemaPB* tablet_meta_pb) const;
void append_column(TabletColumn column, ColumnType col_type = ColumnType::NORMAL);
void append_index(TabletIndex index);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet_schema_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::insert(const std:
tablet_schema_ptr = std::make_shared<TabletSchema>();
TabletSchemaPB pb;
pb.ParseFromString(key);
tablet_schema_ptr->init_from_pb(pb);
tablet_schema_ptr->init_from_pb(pb, false, true);
value->tablet_schema = tablet_schema_ptr;
lru_handle = LRUCachePolicy::insert(key, value, tablet_schema_ptr->num_columns(), 0,
CachePriority::NORMAL);
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class FrontendServiceClient;
class FileMetaCache;
class GroupCommitMgr;
class TabletSchemaCache;
class TabletColumnCache;
class UserFunctionCache;
class SchemaCache;
class StoragePageCache;
Expand Down Expand Up @@ -300,6 +301,7 @@ class ExecEnv {
std::map<TNetworkAddress, FrontendInfo> get_running_frontends();

TabletSchemaCache* get_tablet_schema_cache() { return _tablet_schema_cache; }
TabletColumnCache* get_tablet_column_cache() { return _tablet_column_cache; }
SchemaCache* schema_cache() { return _schema_cache; }
StoragePageCache* get_storage_page_cache() { return _storage_page_cache; }
SegmentLoader* segment_loader() { return _segment_loader; }
Expand Down Expand Up @@ -439,6 +441,7 @@ class ExecEnv {
// these redundancy header could introduce potential bug, at least, more header means slow compile.
// So we choose to use raw pointer, please remember to delete these pointer in deconstructor.
TabletSchemaCache* _tablet_schema_cache = nullptr;
TabletColumnCache* _tablet_column_cache = nullptr;
std::unique_ptr<BaseStorageEngine> _storage_engine;
SchemaCache* _schema_cache = nullptr;
StoragePageCache* _storage_page_cache = nullptr;
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "olap/schema_cache.h"
#include "olap/segment_loader.h"
#include "olap/storage_engine.h"
#include "olap/tablet_column_cache.h"
#include "olap/tablet_schema_cache.h"
#include "olap/wal/wal_manager.h"
#include "pipeline/pipeline_tracing.h"
Expand Down Expand Up @@ -339,6 +340,9 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_tablet_schema_cache =
TabletSchemaCache::create_global_schema_cache(config::tablet_schema_cache_capacity);

_tablet_column_cache =
TabletColumnCache::create_global_column_cache(config::tablet_schema_cache_capacity);

// Storage engine
doris::EngineOptions options;
options.store_paths = store_paths;
Expand Down
8 changes: 6 additions & 2 deletions be/src/runtime/memory/cache_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class CachePolicy {
CLOUD_TXN_DELETE_BITMAP_CACHE = 17,
NONE = 18, // not be used
FOR_UT_CACHE_NUMBER = 19,
QUERY_CACHE = 20
QUERY_CACHE = 20,
TABLET_COLUMN_CACHE = 21,
};

static std::string type_string(CacheType type) {
Expand Down Expand Up @@ -93,6 +94,8 @@ class CachePolicy {
return "ForUTCacheNumber";
case CacheType::QUERY_CACHE:
return "QueryCache";
case CacheType::TABLET_COLUMN_CACHE:
return "TabletColumnCache";
default:
LOG(FATAL) << "not match type of cache policy :" << static_cast<int>(type);
}
Expand All @@ -119,7 +122,8 @@ class CachePolicy {
{"CreateTabletRRIdxCache", CacheType::CREATE_TABLET_RR_IDX_CACHE},
{"CloudTabletCache", CacheType::CLOUD_TABLET_CACHE},
{"CloudTxnDeleteBitmapCache", CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE},
{"ForUTCacheNumber", CacheType::FOR_UT_CACHE_NUMBER}};
{"ForUTCacheNumber", CacheType::FOR_UT_CACHE_NUMBER},
{"TabletColumnCache", CacheType::TABLET_COLUMN_CACHE}};

static CacheType string_to_type(std::string type) {
if (StringToType.contains(type)) {
Expand Down

0 comments on commit 8e31fa5

Please sign in to comment.