Skip to content

Commit

Permalink
Merge branch 'master' into switch_rf_local
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 7, 2023
2 parents fac66cb + f992ced commit d7a1a38
Show file tree
Hide file tree
Showing 28 changed files with 364 additions and 129 deletions.
13 changes: 13 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include <Server/RaftConfigParser.h>
#include <Server/ServerInfo.h>
#include <Storages/BackgroundProcessingPool.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/StoragePool.h>
Expand Down Expand Up @@ -206,6 +207,8 @@ struct ContextShared

Context::ConfigReloadCallback config_reload_callback;

std::shared_ptr<DB::DM::SharedBlockSchemas> shared_block_schemas;

explicit ContextShared(std::shared_ptr<IRuntimeComponentsFactory> runtime_components_factory_)
: runtime_components_factory(std::move(runtime_components_factory_))
, storage_run_mode(PageStorageRunMode::ONLY_V3)
Expand Down Expand Up @@ -1843,6 +1846,16 @@ SharedQueriesPtr Context::getSharedQueries()
return shared->shared_queries;
}

const std::shared_ptr<DB::DM::SharedBlockSchemas> & Context::getSharedBlockSchemas() const
{
return shared->shared_block_schemas;
}

void Context::initializeSharedBlockSchemas()
{
shared->shared_block_schemas = std::make_shared<DB::DM::SharedBlockSchemas>(*this);
}

size_t Context::getMaxStreams() const
{
size_t max_streams = settings.max_threads;
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ namespace DM
class MinMaxIndexCache;
class DeltaIndexManager;
class GlobalStoragePool;
class SharedBlockSchemas;
using GlobalStoragePoolPtr = std::shared_ptr<GlobalStoragePool>;
} // namespace DM

Expand Down Expand Up @@ -178,7 +179,6 @@ class Context
DAGContext * dag_context = nullptr;
using DatabasePtr = std::shared_ptr<IDatabase>;
using Databases = std::map<String, std::shared_ptr<IDatabase>>;

/// Use copy constructor or createGlobal() instead
Context();

Expand Down Expand Up @@ -511,6 +511,9 @@ class Context
return disaggregated_mode == DisaggregatedMode::Storage;
}

const std::shared_ptr<DB::DM::SharedBlockSchemas> & getSharedBlockSchemas() const;
void initializeSharedBlockSchemas();

// todo: remove after AutoScaler is stable.
void setUseAutoScaler(bool use)
{
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
#include <Server/StorageConfigParser.h>
#include <Server/TCPHandlerFactory.h>
#include <Server/UserConfigParser.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/ReadThread/ColumnSharingCache.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReader.h>
Expand Down Expand Up @@ -1200,6 +1201,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
DM::SegmentReaderPoolManager::instance().init(server_info);
DM::SegmentReadTaskScheduler::instance();

global_context->initializeSharedBlockSchemas();

{
// Note that this must do before initialize schema sync service.
do
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class ColumnFile
/// been persisted in the disk and their data will be immutable.
virtual bool isAppendable() const { return false; }
virtual void disableAppend() {}
virtual bool append(DMContext & /*dm_context*/, const Block & /*data*/, size_t /*offset*/, size_t /*limit*/, size_t /*data_bytes*/)
virtual bool append(const DMContext & /*dm_context*/, const Block & /*data*/, size_t /*offset*/, size_t /*limit*/, size_t /*data_bytes*/)
{
throw Exception("Unsupported operation", ErrorCodes::LOGICAL_ERROR);
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void ColumnFileBig::serializeMetadata(WriteBuffer & buf, bool /*save_schema*/) c
writeIntBinary(valid_bytes, buf);
}

ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(DMContext & context, //
ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(const DMContext & context, //
const RowKeyRange & segment_range,
ReadBuffer & buf)
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ColumnFileBig : public ColumnFilePersisted

void serializeMetadata(WriteBuffer & buf, bool save_schema) const override;

static ColumnFilePersistedPtr deserializeMetadata(DMContext & context, //
static ColumnFilePersistedPtr deserializeMetadata(const DMContext & context, //
const RowKeyRange & segment_range,
ReadBuffer & buf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ void ColumnFileInMemory::fillColumns(const ColumnDefines & col_defs, size_t col_
Columns read_cols;

std::scoped_lock lock(cache->mutex);
const auto & colid_to_offset = schema->getColIdToOffset();
for (size_t i = col_start; i < col_end; ++i)
{
const auto & cd = col_defs[i];
Expand Down Expand Up @@ -61,7 +62,7 @@ ColumnFileInMemory::getReader(const DMContext & /*context*/, const StorageSnapsh
return std::make_shared<ColumnFileInMemoryReader>(*this, col_defs);
}

bool ColumnFileInMemory::append(DMContext & context, const Block & data, size_t offset, size_t limit, size_t data_bytes)
bool ColumnFileInMemory::append(const DMContext & context, const Block & data, size_t offset, size_t limit, size_t data_bytes)
{
if (disable_append)
return false;
Expand Down
27 changes: 9 additions & 18 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Storages/DeltaMerge/ColumnFile/ColumnFile.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>

namespace DB
{
Expand All @@ -29,7 +30,7 @@ class ColumnFileInMemory : public ColumnFile
friend class ColumnFileInMemoryReader;

private:
BlockPtr schema;
ColumnFileSchemaPtr schema;

UInt64 rows = 0;
UInt64 bytes = 0;
Expand All @@ -39,28 +40,20 @@ class ColumnFileInMemory : public ColumnFile

// The cache data in memory.
CachePtr cache;
// Used to map column id to column instance in a Block.
ColIdToOffset colid_to_offset;

private:
void fillColumns(const ColumnDefines & col_defs, size_t col_count, Columns & result) const;

const DataTypePtr & getDataType(ColId column_id) const
{
// Note that column_id must exist
auto index = colid_to_offset.at(column_id);
return schema->getByPosition(index).type;
return schema->getDataType(column_id);
}

public:
explicit ColumnFileInMemory(const BlockPtr & schema_, const CachePtr & cache_ = nullptr)
explicit ColumnFileInMemory(const ColumnFileSchemaPtr & schema_, const CachePtr & cache_ = nullptr)
: schema(schema_)
, cache(cache_ ? cache_ : std::make_shared<Cache>(*schema_))
{
colid_to_offset.clear();
for (size_t i = 0; i < schema->columns(); ++i)
colid_to_offset.emplace(schema->getByPosition(i).column_id, i);
}
, cache(cache_ ? cache_ : std::make_shared<Cache>(schema_->getSchema()))
{}

Type getType() const override { return Type::INMEMORY_FILE; }

Expand All @@ -70,9 +63,7 @@ class ColumnFileInMemory : public ColumnFile
CachePtr getCache() { return cache; }

/// The schema of this pack.
BlockPtr getSchema() const { return schema; }
/// Replace the schema with a new schema, and the new schema instance should be exactly the same as the previous one.
void resetIdenticalSchema(BlockPtr schema_) { schema = schema_; }
ColumnFileSchemaPtr getSchema() const { return schema; }

ColumnInMemoryFilePtr clone()
{
Expand All @@ -90,7 +81,7 @@ class ColumnFileInMemory : public ColumnFile
{
disable_append = true;
}
bool append(DMContext & dm_context, const Block & data, size_t offset, size_t limit, size_t data_bytes) override;
bool append(const DMContext & dm_context, const Block & data, size_t offset, size_t limit, size_t data_bytes) override;

Block readDataForFlush() const;

Expand All @@ -101,7 +92,7 @@ class ColumnFileInMemory : public ColumnFile
String s = "{in_memory_file,rows:" + DB::toString(rows) //
+ ",bytes:" + DB::toString(bytes) //
+ ",disable_append:" + DB::toString(disable_append) //
+ ",schema:" + (schema ? schema->dumpStructure() : "none") //
+ ",schema:" + (schema ? schema->toString() : "none") //
+ ",cache_block:" + (cache ? cache->block.dumpStructure() : "none") + "}";
return s;
}
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ namespace DB
{
namespace DM
{
void serializeSchema(WriteBuffer & buf, const BlockPtr & schema)
void serializeSchema(WriteBuffer & buf, const Block & schema)
{
if (schema)
{
writeIntBinary(static_cast<UInt32>(schema->columns()), buf);
for (auto & col : *schema)
writeIntBinary(static_cast<UInt32>(schema.columns()), buf);
for (const auto & col : schema)
{
writeIntBinary(col.column_id, buf);
writeStringBinary(col.name, buf);
Expand Down Expand Up @@ -105,7 +105,7 @@ void serializeSavedColumnFiles(WriteBuffer & buf, const ColumnFilePersisteds & c
}
}

ColumnFilePersisteds deserializeSavedColumnFiles(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf)
ColumnFilePersisteds deserializeSavedColumnFiles(const DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf)
{
// Check binary version
DeltaFormat::Version version;
Expand All @@ -117,7 +117,7 @@ ColumnFilePersisteds deserializeSavedColumnFiles(DMContext & context, const RowK
// V1 and V2 share the same deserializer.
case DeltaFormat::V1:
case DeltaFormat::V2:
column_files = deserializeSavedColumnFilesInV2Format(buf, version);
column_files = deserializeSavedColumnFilesInV2Format(context, buf, version);
break;
case DeltaFormat::V3:
column_files = deserializeSavedColumnFilesInV3Format(context, segment_range, buf);
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ColumnFilePersisted : public ColumnFile
virtual void serializeMetadata(WriteBuffer & buf, bool save_schema) const = 0;
};

void serializeSchema(WriteBuffer & buf, const BlockPtr & schema);
void serializeSchema(WriteBuffer & buf, const Block & schema);
BlockPtr deserializeSchema(ReadBuffer & buf);

void serializeColumn(MemoryWriteBuffer & buf, const IColumn & column, const DataTypePtr & type, size_t offset, size_t limit, CompressionMethod compression_method, Int64 compression_level);
Expand All @@ -44,13 +44,13 @@ void deserializeColumn(IColumn & column, const DataTypePtr & type, const ByteBuf
/// Serialize those column files' metadata into buf.
void serializeSavedColumnFiles(WriteBuffer & buf, const ColumnFilePersisteds & column_files);
/// Recreate column file instances from buf.
ColumnFilePersisteds deserializeSavedColumnFiles(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf);
ColumnFilePersisteds deserializeSavedColumnFiles(const DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf);

void serializeSavedColumnFilesInV2Format(WriteBuffer & buf, const ColumnFilePersisteds & column_files);
ColumnFilePersisteds deserializeSavedColumnFilesInV2Format(ReadBuffer & buf, UInt64 version);
ColumnFilePersisteds deserializeSavedColumnFilesInV2Format(const DMContext & context, ReadBuffer & buf, UInt64 version);

void serializeSavedColumnFilesInV3Format(WriteBuffer & buf, const ColumnFilePersisteds & column_files);
ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf);
ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(const DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf);

} // namespace DM
} // namespace DB
101 changes: 101 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSchema.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>

namespace DB
{
namespace DM
{

ColumnFileSchema::ColumnFileSchema(const Block & block)
: schema(block.cloneEmpty())
{
for (size_t i = 0; i < schema.columns(); ++i)
colid_to_offset.emplace(schema.getByPosition(i).column_id, i);
}

const DataTypePtr & ColumnFileSchema::getDataType(ColId column_id) const
{
/// Returns the data type of a column.
/// The specified column id must exist, otherwise something unexpected will happen.
auto index = colid_to_offset.at(column_id);
return schema.getByPosition(index).type;
}

String ColumnFileSchema::toString() const
{
return "{schema:" + (schema ? schema.dumpJsonStructure() : "none") + "}";
}

SharedBlockSchemas::SharedBlockSchemas(DB::Context & context)
: background_pool(context.getBackgroundPool())
{
handle = background_pool.addTask([&, this] {
std::lock_guard<std::mutex> lock(mutex);
for (auto iter = column_file_schemas.begin(); iter != column_file_schemas.end();)
{
if (iter->second.expired())
{
iter = column_file_schemas.erase(iter);
}
else
{
++iter;
}
}
return true;
},
/*multi*/ false,
/*interval_ms*/ 60000);
}

SharedBlockSchemas::~SharedBlockSchemas()
{
if (handle)
{
background_pool.removeTask(handle);
}
}

ColumnFileSchemaPtr SharedBlockSchemas::find(const Digest & digest)
{
std::lock_guard<std::mutex> lock(mutex);
auto it = column_file_schemas.find(digest);
if (it == column_file_schemas.end())
return nullptr;
return it->second.lock();
}

ColumnFileSchemaPtr SharedBlockSchemas::getOrCreate(const Block & block)
{
Digest digest = hashSchema(block);
std::lock_guard<std::mutex> lock(mutex);
auto it = column_file_schemas.find(digest);
if (it == column_file_schemas.end() || it->second.expired())
{
auto schema = std::make_shared<ColumnFileSchema>(block);
column_file_schemas.emplace(digest, schema);
return schema;
}
else
return it->second.lock();
}

std::shared_ptr<DB::DM::SharedBlockSchemas> getSharedBlockSchemas(const DMContext & context)
{
return context.db_context.getSharedBlockSchemas();
}
} // namespace DM
} // namespace DB
Loading

0 comments on commit d7a1a38

Please sign in to comment.