Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ALTER MODIFY SETTING for Memory tables #62039

Merged
merged 8 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/en/engines/table-engines/special/memory.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ Upper and lower bounds can be specified to limit Memory engine table size, effec
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 100, max_rows_to_keep = 1000;
```

**Modify settings**
```sql
ALTER TABLE memory MODIFY SETTING min_rows_to_keep = 100, max_rows_to_keep = 1000;
```

**Note:** Both `bytes` and `rows` capping parameters can be set at the same time, however, the lower bounds of `max` and `min` will be adhered to.

## Examples {#examples}
Expand Down Expand Up @@ -97,3 +102,4 @@ SELECT total_bytes, total_rows FROM system.tables WHERE name = 'memory' and data
│ 65536 │ 10000 │
└─────────────┴────────────┘
```

28 changes: 27 additions & 1 deletion src/Storages/MemorySettings.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include <Storages/MemorySettings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTFunction.h>
#include <Common/Exception.h>

Expand All @@ -11,6 +10,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
extern const int SETTING_CONSTRAINT_VIOLATION;
}

IMPLEMENT_SETTINGS_TRAITS(memorySettingsTraits, MEMORY_SETTINGS)
Expand All @@ -32,5 +32,31 @@ void MemorySettings::loadFromQuery(ASTStorage & storage_def)
}
}

ASTPtr MemorySettings::getSettingsChangesQuery()
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
for (const auto & change : changes())
settings_ast->changes.push_back(change);

return settings_ast;
}

void MemorySettings::sanityCheck() const
{
if (min_bytes_to_keep > max_bytes_to_keep)
throw Exception(ErrorCodes::SETTING_CONSTRAINT_VIOLATION,
"Setting `min_bytes_to_keep` cannot be higher than the `max_bytes_to_keep`. `min_bytes_to_keep`: {}, `max_bytes_to_keep`: {}",
min_bytes_to_keep,
max_bytes_to_keep);


if (min_rows_to_keep > max_rows_to_keep)
throw Exception(ErrorCodes::SETTING_CONSTRAINT_VIOLATION,
"Setting `min_rows_to_keep` cannot be higher than the `max_rows_to_keep`. `min_rows_to_keep`: {}, `max_rows_to_keep`: {}",
min_rows_to_keep,
max_rows_to_keep);
}

}

3 changes: 3 additions & 0 deletions src/Storages/MemorySettings.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <Core/BaseSettings.h>
#include <Parsers/ASTSetQuery.h>


namespace DB
Expand All @@ -24,6 +25,8 @@ DECLARE_SETTINGS_TRAITS(memorySettingsTraits, MEMORY_SETTINGS)
struct MemorySettings : public BaseSettings<memorySettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
ASTPtr getSettingsChangesQuery();
void sanityCheck() const;
};

}
Expand Down
85 changes: 69 additions & 16 deletions src/Storages/StorageMemory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int CANNOT_RESTORE_TABLE;
extern const int NOT_IMPLEMENTED;
extern const int SETTING_CONSTRAINT_VIOLATION;
}

class MemorySink : public SinkToStorage
Expand Down Expand Up @@ -76,7 +75,7 @@ class MemorySink : public SinkToStorage
convertDynamicColumnsToTuples(block, storage_snapshot);
}

if (storage.compress)
if (storage.getMemorySettingsRef().compress)
{
Block compressed_block;
for (const auto & elem : block)
Expand Down Expand Up @@ -106,15 +105,16 @@ class MemorySink : public SinkToStorage
auto new_data = std::make_unique<Blocks>(*(storage.data.get()));
UInt64 new_total_rows = storage.total_size_rows.load(std::memory_order_relaxed) + inserted_rows;
UInt64 new_total_bytes = storage.total_size_bytes.load(std::memory_order_relaxed) + inserted_bytes;
const auto & memory_settings = storage.getMemorySettingsRef();
while (!new_data->empty()
&& ((storage.max_bytes_to_keep && new_total_bytes > storage.max_bytes_to_keep)
|| (storage.max_rows_to_keep && new_total_rows > storage.max_rows_to_keep)))
&& ((memory_settings.max_bytes_to_keep && new_total_bytes > memory_settings.max_bytes_to_keep)
|| (memory_settings.max_rows_to_keep && new_total_rows > memory_settings.max_rows_to_keep)))
{
Block oldest_block = new_data->front();
UInt64 rows_to_remove = oldest_block.rows();
UInt64 bytes_to_remove = oldest_block.allocatedBytes();
if (new_total_bytes - bytes_to_remove < storage.min_bytes_to_keep
|| new_total_rows - rows_to_remove < storage.min_rows_to_keep)
if (new_total_bytes - bytes_to_remove < memory_settings.min_bytes_to_keep
|| new_total_rows - rows_to_remove < memory_settings.min_rows_to_keep)
{
break; // stop - removing next block will put us under min_bytes / min_rows threshold
}
Expand Down Expand Up @@ -145,15 +145,16 @@ StorageMemory::StorageMemory(
ColumnsDescription columns_description_,
ConstraintsDescription constraints_,
const String & comment,
const MemorySettings & settings)
: IStorage(table_id_), data(std::make_unique<const Blocks>()), compress(settings.compress),
min_rows_to_keep(settings.min_rows_to_keep), max_rows_to_keep(settings.max_rows_to_keep),
min_bytes_to_keep(settings.min_bytes_to_keep), max_bytes_to_keep(settings.max_bytes_to_keep)
const MemorySettings & memory_settings_)
: IStorage(table_id_)
, data(std::make_unique<const Blocks>())
, memory_settings(memory_settings_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(std::move(columns_description_));
storage_metadata.setConstraints(std::move(constraints_));
storage_metadata.setComment(comment);
storage_metadata.setSettingsChanges(memory_settings.getSettingsChangesQuery());
setInMemoryMetadata(storage_metadata);
}

Expand Down Expand Up @@ -239,7 +240,7 @@ void StorageMemory::mutate(const MutationCommands & commands, ContextPtr context
Block block;
while (executor.pull(block))
{
if (compress)
if (memory_settings.compress)
for (auto & elem : block)
elem.column = elem.column->compress();

Expand Down Expand Up @@ -294,6 +295,59 @@ void StorageMemory::truncate(
total_size_rows.store(0, std::memory_order_relaxed);
}

void StorageMemory::alter(const DB::AlterCommands & params, DB::ContextPtr context, DB::IStorage::AlterLockHolder & /*alter_lock_holder*/)
{
auto table_id = getStorageID();
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
params.apply(new_metadata, context);

if (params.isSettingsAlter())
{
auto & settings_changes = new_metadata.settings_changes->as<ASTSetQuery &>();
auto changed_settings = memory_settings;
changed_settings.applyChanges(settings_changes.changes);
changed_settings.sanityCheck();

/// When modifying the values of max_bytes_to_keep and max_rows_to_keep to be smaller than the old values,
/// the old data needs to be removed.
if (!memory_settings.max_bytes_to_keep || memory_settings.max_bytes_to_keep > changed_settings.max_bytes_to_keep
|| !memory_settings.max_rows_to_keep || memory_settings.max_rows_to_keep > changed_settings.max_rows_to_keep)
{
std::lock_guard lock(mutex);

auto new_data = std::make_unique<Blocks>(*(data.get()));
UInt64 new_total_rows = total_size_rows.load(std::memory_order_relaxed);
UInt64 new_total_bytes = total_size_bytes.load(std::memory_order_relaxed);
while (!new_data->empty()
&& ((changed_settings.max_bytes_to_keep && new_total_bytes > changed_settings.max_bytes_to_keep)
|| (changed_settings.max_rows_to_keep && new_total_rows > changed_settings.max_rows_to_keep)))
{
Block oldest_block = new_data->front();
UInt64 rows_to_remove = oldest_block.rows();
UInt64 bytes_to_remove = oldest_block.allocatedBytes();
if (new_total_bytes - bytes_to_remove < changed_settings.min_bytes_to_keep
|| new_total_rows - rows_to_remove < changed_settings.min_rows_to_keep)
{
break; // stop - removing next block will put us under min_bytes / min_rows threshold
}

// delete old block from current storage table
new_total_rows -= rows_to_remove;
new_total_bytes -= bytes_to_remove;
new_data->erase(new_data->begin());
}

data.set(std::move(new_data));
total_size_rows.store(new_total_rows, std::memory_order_relaxed);
total_size_bytes.store(new_total_bytes, std::memory_order_relaxed);
}
memory_settings = std::move(changed_settings);
}

DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id, new_metadata);
setInMemoryMetadata(new_metadata);
}


namespace
{
Expand Down Expand Up @@ -499,7 +553,7 @@ void StorageMemory::restoreDataImpl(const BackupPtr & backup, const String & dat

while (auto block = block_in.read())
{
if (compress)
if (memory_settings.compress)
{
Block compressed_block;
for (const auto & elem : block)
Expand Down Expand Up @@ -534,7 +588,8 @@ void StorageMemory::checkAlterIsPossible(const AlterCommands & commands, Context
{
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
&& command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN
&& command.type != AlterCommand::Type::COMMENT_TABLE && command.type != AlterCommand::Type::RENAME_COLUMN)
&& command.type != AlterCommand::Type::COMMENT_TABLE && command.type != AlterCommand::Type::RENAME_COLUMN
&& command.type != AlterCommand::Type::MODIFY_SETTING)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by storage {}",
command.type, getName());
}
Expand Down Expand Up @@ -566,9 +621,7 @@ void registerStorageMemory(StorageFactory & factory)
if (has_settings)
settings.loadFromQuery(*args.storage_def);

if (settings.min_bytes_to_keep > settings.max_bytes_to_keep
|| settings.min_rows_to_keep > settings.max_rows_to_keep)
throw Exception(ErrorCodes::SETTING_CONSTRAINT_VIOLATION, "Min. bytes / rows must be set with a max.");
settings.sanityCheck();

return std::make_shared<StorageMemory>(args.table_id, args.columns, args.constraints, args.comment, settings);
},
Expand Down
12 changes: 5 additions & 7 deletions src/Storages/StorageMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ friend class MemorySink;
ColumnsDescription columns_description_,
ConstraintsDescription constraints_,
const String & comment,
const MemorySettings & settings = MemorySettings());
const MemorySettings & memory_settings_ = MemorySettings());

String getName() const override { return "Memory"; }

Expand All @@ -46,6 +46,8 @@ friend class MemorySink;

StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override;

const MemorySettings & getMemorySettingsRef() const { return memory_settings; }

void read(
QueryPlan & query_plan,
const Names & column_names,
Expand Down Expand Up @@ -78,6 +80,7 @@ friend class MemorySink;
void restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions) override;

void checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const override;
void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & alter_lock_holder) override;

std::optional<UInt64> totalRows(const Settings &) const override;
std::optional<UInt64> totalBytes(const Settings &) const override;
Expand Down Expand Up @@ -134,12 +137,7 @@ friend class MemorySink;
std::atomic<size_t> total_size_bytes = 0;
std::atomic<size_t> total_size_rows = 0;

bool compress;
UInt64 min_rows_to_keep;
UInt64 max_rows_to_keep;
UInt64 min_bytes_to_keep;
UInt64 max_bytes_to_keep;

MemorySettings memory_settings;

friend class ReadFromMemoryStorageStep;
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
TESTING MODIFY SMALLER BYTES
17408
16384
65536
TESTING MODIFY SMALLER ROWS
1100
1000
500
TESTING ADD SETTINGS
50
1000
1070
1020
1100
TESTING ADD SETTINGS
50
1000
1020
1100
TESTING INVALID SETTINGS
76 changes: 76 additions & 0 deletions tests/queries/0_stateless/03032_storage_memory_modify_settings.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
SET max_block_size = 65409; -- Default value

SELECT 'TESTING MODIFY SMALLER BYTES';
DROP TABLE IF EXISTS memory;
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_bytes_to_keep = 8192, max_bytes_to_keep = 32768;

INSERT INTO memory SELECT * FROM numbers(0, 100); -- 1024 bytes
INSERT INTO memory SELECT * FROM numbers(0, 3000); -- 16384 bytes
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -- 17408 in total

ALTER TABLE memory MODIFY SETTING min_bytes_to_keep = 4096, max_bytes_to_keep = 16384;
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -- 16384 in total after deleting

INSERT INTO memory SELECT * FROM numbers(3000, 10000); -- 65536 bytes
SELECT total_bytes FROM system.tables WHERE name = 'memory' and database = currentDatabase();

SELECT 'TESTING MODIFY SMALLER ROWS';
DROP TABLE IF EXISTS memory;
CREATE TABLE memory (i UInt32) ENGINE = Memory SETTINGS min_rows_to_keep = 200, max_rows_to_keep = 2000;

INSERT INTO memory SELECT * FROM numbers(0, 100); -- 100 rows
INSERT INTO memory SELECT * FROM numbers(100, 1000); -- 1000 rows
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -- 1100 in total

ALTER TABLE memory MODIFY SETTING min_rows_to_keep = 100, max_rows_to_keep = 1000;
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -- 1000 in total after deleting

INSERT INTO memory SELECT * FROM numbers(1000, 500); -- 500 rows
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -- 500 in total after deleting

SELECT 'TESTING ADD SETTINGS';
DROP TABLE IF EXISTS memory;
CREATE TABLE memory (i UInt32) ENGINE = Memory;

INSERT INTO memory SELECT * FROM numbers(0, 50); -- 50 rows
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -- 50 in total

INSERT INTO memory SELECT * FROM numbers(50, 950); -- 950 rows
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -- 1000 in total

INSERT INTO memory SELECT * FROM numbers(2000, 70); -- 70 rows
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -- 1070 in total

ALTER TABLE memory MODIFY SETTING min_rows_to_keep = 100, max_rows_to_keep = 1000;
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -- 1020 in total after deleting

INSERT INTO memory SELECT * FROM numbers(3000, 1100); -- 1100 rows
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -- 1100 in total after deleting

SELECT 'TESTING ADD SETTINGS';
DROP TABLE IF EXISTS memory;
CREATE TABLE memory (i UInt32) ENGINE = Memory;
ALTER TABLE memory MODIFY SETTING min_rows_to_keep = 100, max_rows_to_keep = 1000;

INSERT INTO memory SELECT * FROM numbers(0, 50); -- 50 rows
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -- 50 in total

INSERT INTO memory SELECT * FROM numbers(50, 950); -- 950 rows
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -- 1000 in total

INSERT INTO memory SELECT * FROM numbers(2000, 70); -- 70 rows
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -- 1020 in total after deleting

INSERT INTO memory SELECT * FROM numbers(3000, 1100); -- 1100 rows
SELECT total_rows FROM system.tables WHERE name = 'memory' and database = currentDatabase(); -- 1100 in total after deleting

SELECT 'TESTING INVALID SETTINGS';
DROP TABLE IF EXISTS memory;
CREATE TABLE memory (i UInt32) ENGINE = Memory;
ALTER TABLE memory MODIFY SETTING min_rows_to_keep = 100; -- { serverError 452 }
ALTER TABLE memory MODIFY SETTING min_bytes_to_keep = 100; -- { serverError 452 }
ALTER TABLE memory MODIFY SETTING max_rows_to_keep = 1000;
ALTER TABLE memory MODIFY SETTING max_bytes_to_keep = 1000;

DROP TABLE memory;

Loading