Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization schema memory utilization of ColumnFile in instance level #6589

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
24e94df
first commit of columnfile memory optimization
hongyunyan Dec 21, 2022
c15b76b
merge conflict
hongyunyan Dec 21, 2022
27a580d
format
hongyunyan Dec 21, 2022
0400dfc
format typo
hongyunyan Dec 21, 2022
e21392a
test format
hongyunyan Dec 21, 2022
eb323fe
add metrics
hongyunyan Dec 28, 2022
e79ab6a
Merge branch 'master' of https://github.com/pingcap/tiflash into hong…
hongyunyan Dec 29, 2022
1ea2949
Merge branch 'master' of https://github.com/pingcap/tiflash into hong…
hongyunyan Dec 29, 2022
151077c
fix restore
hongyunyan Dec 29, 2022
4d31fde
add comments
hongyunyan Dec 30, 2022
7c624cc
add comments
hongyunyan Dec 30, 2022
c016e41
for test
hongyunyan Dec 30, 2022
f82fc9a
Merge branch 'master' of https://github.com/pingcap/tiflash into hong…
hongyunyan Jan 4, 2023
d91c14a
basic implementation of instance level
hongyunyan Jan 5, 2023
e1ce45d
fix typo
hongyunyan Jan 5, 2023
b78c821
fix typo
hongyunyan Jan 5, 2023
109fe8c
for compile
hongyunyan Jan 5, 2023
f117362
for test
hongyunyan Jan 5, 2023
4e8ccb9
for typo
hongyunyan Jan 6, 2023
a24fc30
fix test
hongyunyan Jan 6, 2023
46d371c
fix
hongyunyan Jan 6, 2023
043c29d
Merge branch 'master' of https://github.com/pingcap/tiflash into hong…
hongyunyan Jan 6, 2023
f82f99d
fix
hongyunyan Jan 9, 2023
86cc581
Merge branch 'master' of https://github.com/pingcap/tiflash into hong…
hongyunyan Jan 9, 2023
24a2d2b
add background task
hongyunyan Jan 9, 2023
6fae2c9
add check digest
hongyunyan Jan 13, 2023
6d80ed0
Merge branch 'master' into hongyunyan_column_file_tiny_instance_level
hongyunyan Jan 13, 2023
bb4d5f8
add metrics
hongyunyan Jan 13, 2023
920bf2d
Merge branch 'hongyunyan_column_file_tiny_instance_level' of https://…
hongyunyan Jan 13, 2023
2487a97
for code review
hongyunyan Jan 18, 2023
f25a3c3
fix typo
hongyunyan Jan 18, 2023
fb53b2c
fix for comments
hongyunyan Jan 29, 2023
733c7b8
fix
hongyunyan Jan 29, 2023
0abe993
for comments in PR
hongyunyan Jan 30, 2023
d62e156
fix
hongyunyan Jan 30, 2023
b3e2e7f
fix hash bug
hongyunyan Jan 30, 2023
8e16f0e
for comments
hongyunyan Feb 2, 2023
736a91f
merge conflict
hongyunyan Feb 2, 2023
5663d0c
for format
hongyunyan Feb 2, 2023
b0148ab
Merge branch 'master' of https://github.com/pingcap/tiflash into hong…
hongyunyan Feb 3, 2023
39ca91b
update code
hongyunyan Feb 6, 2023
8433058
update code
hongyunyan Feb 6, 2023
827beb6
update code
hongyunyan Feb 6, 2023
f0f2a39
add code
hongyunyan Feb 6, 2023
2e0b023
update
hongyunyan Feb 6, 2023
766e29a
Merge branch 'master' into hongyunyan_column_file_tiny_instance_level
hongyunyan Feb 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include <Server/RaftConfigParser.h>
#include <Server/ServerInfo.h>
#include <Storages/BackgroundProcessingPool.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/StoragePool.h>
Expand Down Expand Up @@ -206,6 +207,8 @@ struct ContextShared

Context::ConfigReloadCallback config_reload_callback;

std::shared_ptr<DB::DM::SharedBlockSchemas> shared_block_schemas;

explicit ContextShared(std::shared_ptr<IRuntimeComponentsFactory> runtime_components_factory_)
: runtime_components_factory(std::move(runtime_components_factory_))
, storage_run_mode(PageStorageRunMode::ONLY_V3)
Expand Down Expand Up @@ -1843,6 +1846,16 @@ SharedQueriesPtr Context::getSharedQueries()
return shared->shared_queries;
}

const std::shared_ptr<DB::DM::SharedBlockSchemas> & Context::getSharedBlockSchemas() const
{
return shared->shared_block_schemas;
}

void Context::initializeSharedBlockSchemas()
{
shared->shared_block_schemas = std::make_shared<DB::DM::SharedBlockSchemas>(*this);
}

size_t Context::getMaxStreams() const
{
size_t max_streams = settings.max_threads;
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ namespace DM
class MinMaxIndexCache;
class DeltaIndexManager;
class GlobalStoragePool;
class SharedBlockSchemas;
using GlobalStoragePoolPtr = std::shared_ptr<GlobalStoragePool>;
} // namespace DM

Expand Down Expand Up @@ -178,7 +179,6 @@ class Context
DAGContext * dag_context = nullptr;
using DatabasePtr = std::shared_ptr<IDatabase>;
using Databases = std::map<String, std::shared_ptr<IDatabase>>;

/// Use copy constructor or createGlobal() instead
Context();

Expand Down Expand Up @@ -511,6 +511,9 @@ class Context
return disaggregated_mode == DisaggregatedMode::Storage;
}

const std::shared_ptr<DB::DM::SharedBlockSchemas> & getSharedBlockSchemas() const;
void initializeSharedBlockSchemas();

// todo: remove after AutoScaler is stable.
void setUseAutoScaler(bool use)
{
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
#include <Server/StorageConfigParser.h>
#include <Server/TCPHandlerFactory.h>
#include <Server/UserConfigParser.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/ReadThread/ColumnSharingCache.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReader.h>
Expand Down Expand Up @@ -1200,6 +1201,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
DM::SegmentReaderPoolManager::instance().init(server_info);
DM::SegmentReadTaskScheduler::instance();

global_context->initializeSharedBlockSchemas();

{
// Note that this must do before initialize schema sync service.
do
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class ColumnFile
/// been persisted in the disk and their data will be immutable.
virtual bool isAppendable() const { return false; }
virtual void disableAppend() {}
virtual bool append(DMContext & /*dm_context*/, const Block & /*data*/, size_t /*offset*/, size_t /*limit*/, size_t /*data_bytes*/)
virtual bool append(const DMContext & /*dm_context*/, const Block & /*data*/, size_t /*offset*/, size_t /*limit*/, size_t /*data_bytes*/)
{
throw Exception("Unsupported operation", ErrorCodes::LOGICAL_ERROR);
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void ColumnFileBig::serializeMetadata(WriteBuffer & buf, bool /*save_schema*/) c
writeIntBinary(valid_bytes, buf);
}

ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(DMContext & context, //
ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(const DMContext & context, //
const RowKeyRange & segment_range,
ReadBuffer & buf)
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ColumnFileBig : public ColumnFilePersisted

void serializeMetadata(WriteBuffer & buf, bool save_schema) const override;

static ColumnFilePersistedPtr deserializeMetadata(DMContext & context, //
static ColumnFilePersistedPtr deserializeMetadata(const DMContext & context, //
const RowKeyRange & segment_range,
ReadBuffer & buf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ void ColumnFileInMemory::fillColumns(const ColumnDefines & col_defs, size_t col_
Columns read_cols;

std::scoped_lock lock(cache->mutex);
const auto & colid_to_offset = schema->getColIdToOffset();
for (size_t i = col_start; i < col_end; ++i)
{
const auto & cd = col_defs[i];
Expand Down Expand Up @@ -61,7 +62,7 @@ ColumnFileInMemory::getReader(const DMContext & /*context*/, const StorageSnapsh
return std::make_shared<ColumnFileInMemoryReader>(*this, col_defs);
}

bool ColumnFileInMemory::append(DMContext & context, const Block & data, size_t offset, size_t limit, size_t data_bytes)
bool ColumnFileInMemory::append(const DMContext & context, const Block & data, size_t offset, size_t limit, size_t data_bytes)
{
if (disable_append)
return false;
Expand Down
27 changes: 9 additions & 18 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Storages/DeltaMerge/ColumnFile/ColumnFile.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>

namespace DB
{
Expand All @@ -29,7 +30,7 @@ class ColumnFileInMemory : public ColumnFile
friend class ColumnFileInMemoryReader;

private:
BlockPtr schema;
ColumnFileSchemaPtr schema;

UInt64 rows = 0;
UInt64 bytes = 0;
Expand All @@ -39,28 +40,20 @@ class ColumnFileInMemory : public ColumnFile

// The cache data in memory.
CachePtr cache;
// Used to map column id to column instance in a Block.
ColIdToOffset colid_to_offset;

private:
void fillColumns(const ColumnDefines & col_defs, size_t col_count, Columns & result) const;

const DataTypePtr & getDataType(ColId column_id) const
{
// Note that column_id must exist
auto index = colid_to_offset.at(column_id);
return schema->getByPosition(index).type;
return schema->getDataType(column_id);
}

public:
explicit ColumnFileInMemory(const BlockPtr & schema_, const CachePtr & cache_ = nullptr)
explicit ColumnFileInMemory(const ColumnFileSchemaPtr & schema_, const CachePtr & cache_ = nullptr)
: schema(schema_)
, cache(cache_ ? cache_ : std::make_shared<Cache>(*schema_))
{
colid_to_offset.clear();
for (size_t i = 0; i < schema->columns(); ++i)
colid_to_offset.emplace(schema->getByPosition(i).column_id, i);
}
, cache(cache_ ? cache_ : std::make_shared<Cache>(schema_->getSchema()))
{}

Type getType() const override { return Type::INMEMORY_FILE; }

Expand All @@ -70,9 +63,7 @@ class ColumnFileInMemory : public ColumnFile
CachePtr getCache() { return cache; }

/// The schema of this pack.
BlockPtr getSchema() const { return schema; }
/// Replace the schema with a new schema, and the new schema instance should be exactly the same as the previous one.
void resetIdenticalSchema(BlockPtr schema_) { schema = schema_; }
ColumnFileSchemaPtr getSchema() const { return schema; }

ColumnInMemoryFilePtr clone()
{
Expand All @@ -90,7 +81,7 @@ class ColumnFileInMemory : public ColumnFile
{
disable_append = true;
}
bool append(DMContext & dm_context, const Block & data, size_t offset, size_t limit, size_t data_bytes) override;
bool append(const DMContext & dm_context, const Block & data, size_t offset, size_t limit, size_t data_bytes) override;

Block readDataForFlush() const;

Expand All @@ -101,7 +92,7 @@ class ColumnFileInMemory : public ColumnFile
String s = "{in_memory_file,rows:" + DB::toString(rows) //
+ ",bytes:" + DB::toString(bytes) //
+ ",disable_append:" + DB::toString(disable_append) //
+ ",schema:" + (schema ? schema->dumpStructure() : "none") //
+ ",schema:" + (schema ? schema->toString() : "none") //
+ ",cache_block:" + (cache ? cache->block.dumpStructure() : "none") + "}";
return s;
}
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ namespace DB
{
namespace DM
{
void serializeSchema(WriteBuffer & buf, const BlockPtr & schema)
void serializeSchema(WriteBuffer & buf, const Block & schema)
{
if (schema)
{
writeIntBinary(static_cast<UInt32>(schema->columns()), buf);
for (auto & col : *schema)
writeIntBinary(static_cast<UInt32>(schema.columns()), buf);
for (const auto & col : schema)
{
writeIntBinary(col.column_id, buf);
writeStringBinary(col.name, buf);
Expand Down Expand Up @@ -105,7 +105,7 @@ void serializeSavedColumnFiles(WriteBuffer & buf, const ColumnFilePersisteds & c
}
}

ColumnFilePersisteds deserializeSavedColumnFiles(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf)
ColumnFilePersisteds deserializeSavedColumnFiles(const DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf)
{
// Check binary version
DeltaFormat::Version version;
Expand All @@ -117,7 +117,7 @@ ColumnFilePersisteds deserializeSavedColumnFiles(DMContext & context, const RowK
// V1 and V2 share the same deserializer.
case DeltaFormat::V1:
case DeltaFormat::V2:
column_files = deserializeSavedColumnFilesInV2Format(buf, version);
column_files = deserializeSavedColumnFilesInV2Format(context, buf, version);
break;
case DeltaFormat::V3:
column_files = deserializeSavedColumnFilesInV3Format(context, segment_range, buf);
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ColumnFilePersisted : public ColumnFile
virtual void serializeMetadata(WriteBuffer & buf, bool save_schema) const = 0;
};

void serializeSchema(WriteBuffer & buf, const BlockPtr & schema);
void serializeSchema(WriteBuffer & buf, const Block & schema);
BlockPtr deserializeSchema(ReadBuffer & buf);

void serializeColumn(MemoryWriteBuffer & buf, const IColumn & column, const DataTypePtr & type, size_t offset, size_t limit, CompressionMethod compression_method, Int64 compression_level);
Expand All @@ -44,13 +44,13 @@ void deserializeColumn(IColumn & column, const DataTypePtr & type, const ByteBuf
/// Serialize those column files' metadata into buf.
void serializeSavedColumnFiles(WriteBuffer & buf, const ColumnFilePersisteds & column_files);
/// Recreate column file instances from buf.
ColumnFilePersisteds deserializeSavedColumnFiles(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf);
ColumnFilePersisteds deserializeSavedColumnFiles(const DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf);

void serializeSavedColumnFilesInV2Format(WriteBuffer & buf, const ColumnFilePersisteds & column_files);
ColumnFilePersisteds deserializeSavedColumnFilesInV2Format(ReadBuffer & buf, UInt64 version);
ColumnFilePersisteds deserializeSavedColumnFilesInV2Format(const DMContext & context, ReadBuffer & buf, UInt64 version);

void serializeSavedColumnFilesInV3Format(WriteBuffer & buf, const ColumnFilePersisteds & column_files);
ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf);
ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(const DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf);

} // namespace DM
} // namespace DB
101 changes: 101 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSchema.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>

namespace DB
{
namespace DM
{

ColumnFileSchema::ColumnFileSchema(const Block & block)
: schema(block.cloneEmpty())
{
for (size_t i = 0; i < schema.columns(); ++i)
colid_to_offset.emplace(schema.getByPosition(i).column_id, i);
}

const DataTypePtr & ColumnFileSchema::getDataType(ColId column_id) const
{
/// Returns the data type of a column.
/// The specified column id must exist, otherwise something unexpected will happen.
auto index = colid_to_offset.at(column_id);
return schema.getByPosition(index).type;
}

String ColumnFileSchema::toString() const
{
return "{schema:" + (schema ? schema.dumpJsonStructure() : "none") + "}";
}

SharedBlockSchemas::SharedBlockSchemas(DB::Context & context)
: background_pool(context.getBackgroundPool())
{
handle = background_pool.addTask([&, this] {
std::lock_guard<std::mutex> lock(mutex);
for (auto iter = column_file_schemas.begin(); iter != column_file_schemas.end();)
{
if (iter->second.expired())
{
iter = column_file_schemas.erase(iter);
}
else
{
++iter;
}
}
return true;
},
/*multi*/ false,
/*interval_ms*/ 60000);
}

SharedBlockSchemas::~SharedBlockSchemas()
{
if (handle)
{
background_pool.removeTask(handle);
}
}

ColumnFileSchemaPtr SharedBlockSchemas::find(const Digest & digest)
{
std::lock_guard<std::mutex> lock(mutex);
auto it = column_file_schemas.find(digest);
if (it == column_file_schemas.end())
return nullptr;
return it->second.lock();
}

ColumnFileSchemaPtr SharedBlockSchemas::getOrCreate(const Block & block)
{
Digest digest = hashSchema(block);
std::lock_guard<std::mutex> lock(mutex);
auto it = column_file_schemas.find(digest);
if (it == column_file_schemas.end() || it->second.expired())
{
auto schema = std::make_shared<ColumnFileSchema>(block);
column_file_schemas.emplace(digest, schema);
return schema;
}
else
return it->second.lock();
}

std::shared_ptr<DB::DM::SharedBlockSchemas> getSharedBlockSchemas(const DMContext & context)
{
return context.db_context.getSharedBlockSchemas();
}
} // namespace DM
} // namespace DB
Loading