Skip to content

Commit

Permalink
[pick](SchemaCache) remove redundant Schema cache (#40257)
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon authored Sep 2, 2024
1 parent 8ff068e commit 4a91b2d
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 69 deletions.
16 changes: 1 addition & 15 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
}
}
VLOG_NOTICE << "read columns size: " << read_columns.size();
std::string schema_key = SchemaCache::get_schema_key(
_read_options.tablet_id, _read_context->tablet_schema, read_columns,
_read_context->tablet_schema->schema_version(), SchemaCache::Type::SCHEMA);
// It is necessary to ensure that there is a schema version when using a cache
// because the absence of a schema version can result in reading a stale version
// of the schema after a schema change.
// For table contains variants, it's schema is unstable and variable so we could not use schema cache here
if (_read_context->tablet_schema->schema_version() < 0 ||
_read_context->tablet_schema->num_variant_columns() > 0 ||
(_input_schema = SchemaCache::instance()->get_schema<SchemaSPtr>(schema_key)) == nullptr) {
_input_schema =
std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
SchemaCache::instance()->insert_schema(schema_key, _input_schema);
}

_input_schema = std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
if (_read_context->predicates != nullptr) {
_read_options.column_predicates.insert(_read_options.column_predicates.end(),
_read_context->predicates->begin(),
Expand Down
21 changes: 2 additions & 19 deletions be/src/olap/schema_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,9 @@ SchemaCache* SchemaCache::instance() {
return ExecEnv::GetInstance()->schema_cache();
}

// format: tabletId-unique_id1-uniqueid2...-version-type
std::string SchemaCache::get_schema_key(int64_t tablet_id, const TabletSchemaSPtr& schema,
const std::vector<uint32_t>& column_ids, int32_t version,
Type type) {
if (column_ids.empty() || schema->column(column_ids[0]).unique_id() < 0) {
return "";
}
std::string key = fmt::format("{}-", tablet_id);
std::for_each(column_ids.begin(), column_ids.end(), [&](const ColumnId& cid) {
uint32_t col_unique_id = schema->column(cid).unique_id();
key.append(fmt::format("{}", col_unique_id));
key.append("-");
});
key.append(fmt::format("{}-{}", version, type));
return key;
}

// format: tabletId-unique_id1-uniqueid2...-version-type
std::string SchemaCache::get_schema_key(int64_t tablet_id, const std::vector<TColumn>& columns,
int32_t version, Type type) {
int32_t version) {
if (columns.empty() || columns[0].col_unique_id < 0) {
return "";
}
Expand All @@ -67,7 +50,7 @@ std::string SchemaCache::get_schema_key(int64_t tablet_id, const std::vector<TCo
key.append(fmt::format("{}", col.col_unique_id));
key.append("-");
});
key.append(fmt::format("{}-{}", version, type));
key.append(fmt::format("{}", version));
return key;
}

Expand Down
37 changes: 6 additions & 31 deletions be/src/olap/schema_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,15 @@ using SegmentIteratorUPtr = std::unique_ptr<SegmentIterator>;
// with high concurrency, where queries are executed simultaneously.
class SchemaCache : public LRUCachePolicyTrackingManual {
public:
enum class Type { TABLET_SCHEMA = 0, SCHEMA = 1 };

static SchemaCache* instance();

static void create_global_instance(size_t capacity);

// get cache schema key, delimiter with SCHEMA_DELIMITER
static std::string get_schema_key(int64_t tablet_id, const TabletSchemaSPtr& schema,
const std::vector<uint32_t>& column_ids, int32_t version,
Type type);
static std::string get_schema_key(int64_t tablet_id, const std::vector<TColumn>& columns,
int32_t version, Type type);
int32_t version);

// Get a shared cached schema from cache, schema_key is a subset of column unique ids
template <typename SchemaType>
SchemaType get_schema(const std::string& schema_key) {
TabletSchemaSPtr get_schema(const std::string& schema_key) {
if (!instance() || schema_key.empty()) {
return {};
}
Expand All @@ -70,44 +63,26 @@ class SchemaCache : public LRUCachePolicyTrackingManual {
Defer release([cache = this, lru_handle] { cache->release(lru_handle); });
auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle);
VLOG_DEBUG << "use cache schema";
if constexpr (std::is_same_v<SchemaType, TabletSchemaSPtr>) {
return value->tablet_schema;
}
if constexpr (std::is_same_v<SchemaType, SchemaSPtr>) {
return value->schema;
}
return value->tablet_schema;
}
return {};
}

// Insert a shared Schema into cache, schema_key is full column unique ids
template <typename SchemaType>
void insert_schema(const std::string& key, SchemaType schema) {
void insert_schema(const std::string& key, TabletSchemaSPtr schema) {
if (!instance() || key.empty()) {
return;
}
auto* value = new CacheValue;
if constexpr (std::is_same_v<SchemaType, TabletSchemaSPtr>) {
value->type = Type::TABLET_SCHEMA;
value->tablet_schema = schema;
} else if constexpr (std::is_same_v<SchemaType, SchemaSPtr>) {
value->type = Type::SCHEMA;
value->schema = schema;
}
value->tablet_schema = schema;

auto lru_handle = insert(key, value, 1, schema->mem_size(), CachePriority::NORMAL);
auto* lru_handle = insert(key, value, 1, schema->mem_size(), CachePriority::NORMAL);
release(lru_handle);
}

// Try to prune the cache if expired.
Status prune();

class CacheValue : public LRUCacheValueBase {
public:
Type type;
// either tablet_schema or schema
TabletSchemaSPtr tablet_schema = nullptr;
SchemaSPtr schema = nullptr;
};

SchemaCache(size_t capacity)
Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ Status NewOlapScanner::init() {
!olap_scan_node.columns_desc.empty() &&
olap_scan_node.columns_desc[0].col_unique_id >= 0 &&
tablet->tablet_schema()->num_variant_columns() == 0) {
schema_key = SchemaCache::get_schema_key(
tablet->tablet_id(), olap_scan_node.columns_desc, olap_scan_node.schema_version,
SchemaCache::Type::TABLET_SCHEMA);
cached_schema = SchemaCache::instance()->get_schema<TabletSchemaSPtr>(schema_key);
schema_key =
SchemaCache::get_schema_key(tablet->tablet_id(), olap_scan_node.columns_desc,
olap_scan_node.schema_version);
cached_schema = SchemaCache::instance()->get_schema(schema_key);
}
if (cached_schema) {
tablet_schema = cached_schema;
Expand Down

0 comments on commit 4a91b2d

Please sign in to comment.