From b3cd5abb22db8395915479c709cea8ef6edef5de Mon Sep 17 00:00:00 2001 From: Pavel Balaev Date: Tue, 14 Dec 2021 00:40:21 +0300 Subject: [PATCH] Add caching of sharding function The ddl.bucket_id() function needs to know a sharding function. It is costly to obtain the function declaration / definition stored in the _ddl_sharding_func space. This cache adds sharding function cache divided into two parts: raw and processed. Raw part is used for get_schema() method. Raw cache stored as is. Processed part is used for bucket_id(). Processed sharding_func cache entry may be: * table with parsed dot notation (like {'foo', 'bar'}) * function ready to call, this offloads using of loadstring() * string with an error Cache will be rebuilded if: * _ddl_sharding_func space changed: cache sets _ddl_sharding_func:on_replace trigger * schema changed: cache checks box.internal.schema_version changes This patch does not serve hot reload techniques. This entails an on_replace trigger duplication if hot reload occurs. Hot reload support will be done in separate task: https://github.com/tarantool/ddl/issues/87 Closes #82 --- ddl/cache.lua | 136 +++++++++++++++++++++++++++++++++++++++ ddl/get.lua | 34 +++++++--- ddl/utils.lua | 12 +++- test/cache_test.lua | 151 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 322 insertions(+), 11 deletions(-) create mode 100644 ddl/cache.lua create mode 100644 test/cache_test.lua diff --git a/ddl/cache.lua b/ddl/cache.lua new file mode 100644 index 0000000..06ee50f --- /dev/null +++ b/ddl/cache.lua @@ -0,0 +1,136 @@ +local fiber = require('fiber') + +local cache = nil + +local CACHE_LOCK_TIMEOUT = 3 +local SPACE_NAME_IDX = 1 +local SHARD_FUNC_NAME_IDX = 2 +local SHARD_FUNC_BODY_IDX = 3 + +-- Function decorator that is used to prevent cache_build() from being +-- called concurrently by different fibers. +local function locked(f) + return function(...) + local ok = cache.lock:put(true, CACHE_LOCK_TIMEOUT) + + if not ok then + error("cache lock timeout is exceeded") + end + + local status, err = pcall(f, ...) + cache.lock:get() + + if not status or err ~= nil then + return err + end + end +end + +-- Build cache. +-- +-- Cache structure format: +-- cache = { +-- spaces = { +-- 'space_name' = { +-- raw = {}, -- raw sharding metadata, used for ddl.get() +-- processed = {}, -- table with parsed dot notation (like {'foo', 'bar'}) +-- -- or a function ready to call (or a string with an error) +-- } +-- }, +-- lock, -- locking based on fiber.channel() +-- schema_version, -- current schema version +-- } +-- +-- function returns nothing +local cache_build = locked(function() + -- clear cache + cache.spaces = {} + + if box.space._ddl_sharding_func == nil then + return + end + + for _, tuple in box.space._ddl_sharding_func:pairs() do + local space_name = tuple[SPACE_NAME_IDX] + local func_name = tuple[SHARD_FUNC_NAME_IDX] + local func_body = tuple[SHARD_FUNC_BODY_IDX] + + cache.spaces[space_name] = { + raw = tuple + } + + if func_body ~= nil then + local sharding_func, err = loadstring('return ' .. func_body) + if sharding_func == nil then + cache.spaces[space_name].processed = + string.format("Body is incorrect in sharding_func for space (%s): %s", + space_name, err) + else + cache.spaces[space_name].processed = + sharding_func() + end + elseif func_name ~= nil then + local chunks = string.split(func_name, '.') + cache.spaces[space_name].processed = chunks + end + end + + cache.schema_version = box.internal.schema_version() + +end) + +-- Rebuild cache if _ddl_sharding_func space changed. +local function cache_set_trigger() + if box.space._ddl_sharding_func == nil then + return + end + + local trigger_found = false + + for _, func in pairs(box.space._ddl_sharding_func:on_replace()) do + if func == cache_build then + trigger_found = true + break + end + end + + if not trigger_found then + box.space._ddl_sharding_func:on_replace(cache_build) + end +end + +-- Get data from cache. +-- Returns all cached data for "space_name" or nil. +local function cache_get(space_name) + if space_name == nil then + return nil + end + + -- using tarantool internal API. + -- this is not reliable, but it is the only way to track + -- schema_version changes. Fix it if a public method appears: + -- https://github.com/tarantool/tarantool/issues/6544 + local schema_version = box.internal.schema_version() + + if not cache then + cache = { + lock = fiber.channel(1) + } + cache_build() + cache_set_trigger() + end + + -- rebuild cache if database schema changed + if schema_version ~= cache.schema_version then + cache_build() + cache_set_trigger() + end + + return cache.spaces[space_name] +end + +return { + internal = { + get = cache_get, + } +} diff --git a/ddl/get.lua b/ddl/get.lua index f151ef4..e7bc901 100644 --- a/ddl/get.lua +++ b/ddl/get.lua @@ -1,4 +1,5 @@ local utils = require('ddl.utils') +local cache = require('ddl.cache') local ddl_check = require('ddl.check') local function _get_index_field_path(space, index_part) @@ -66,11 +67,24 @@ local function get_metadata(space_name, metadata_name) end local function get_sharding_func(space_name) - local record = get_metadata(space_name, "sharding_func") + local record = cache.internal.get(space_name) + if not record then return nil end + return record.processed +end + +local function get_sharding_func_raw(space_name) + local record = cache.internal.get(space_name) + + if not record or not record.raw then + return nil + end + + record = record.raw + if record.sharding_func_body ~= nil then return {body = record.sharding_func_body} end @@ -97,7 +111,7 @@ local function get_space_schema(space_name) space_ddl.engine = box_space.engine space_ddl.format = box_space:format() space_ddl.sharding_key = get_sharding_key(space_name) - space_ddl.sharding_func = get_sharding_func(space_name) + space_ddl.sharding_func = get_sharding_func_raw(space_name) for _, field in ipairs(space_ddl.format) do if field.is_nullable == nil then field.is_nullable = false @@ -115,7 +129,7 @@ local function get_space_schema(space_name) end local function prepare_sharding_func_for_call(space_name, sharding_func_def) - if type(sharding_func_def) == 'string' then + if type(sharding_func_def) == 'table' then local sharding_func = utils.get_G_function(sharding_func_def) if sharding_func ~= nil and ddl_check.internal.is_callable(sharding_func) == true then @@ -123,13 +137,13 @@ local function prepare_sharding_func_for_call(space_name, sharding_func_def) end end - if type(sharding_func_def) == 'table' then - local sharding_func, err = loadstring('return ' .. sharding_func_def.body) - if sharding_func == nil then - return nil, string.format( - "Body is incorrect in sharding_func for space (%s): %s", space_name, err) - end - return sharding_func() + if type(sharding_func_def) == 'function' then + return sharding_func_def + end + + -- error from cache + if type(sharding_func_def) == 'string' then + return nil, sharding_func_def end return nil, string.format( diff --git a/ddl/utils.lua b/ddl/utils.lua index bb5a433..0f6b4f3 100644 --- a/ddl/utils.lua +++ b/ddl/utils.lua @@ -189,9 +189,19 @@ end -- split sharding func name in dot notation by dot -- foo.bar.baz -> chunks: foo bar baz -- foo -> chunks: foo +-- +-- func_name parameter may be a string in dot notation or table +-- if func_name type is of type table it is assumed that it is already split local function get_G_function(func_name) - local chunks = string.split(func_name, '.') local sharding_func = _G + local chunks + + if type(func_name) == 'string' then + chunks = string.split(func_name, '.') + else + chunks = func_name + end + -- check is the each chunk an identifier for _, chunk in pairs(chunks) do if not check_name_isident(chunk) or sharding_func == nil then diff --git a/test/cache_test.lua b/test/cache_test.lua new file mode 100644 index 0000000..9922f78 --- /dev/null +++ b/test/cache_test.lua @@ -0,0 +1,151 @@ +#!/usr/bin/env tarantool + +local t = require('luatest') +local db = require('test.db') +local ddl = require('ddl') +local cache = require('ddl.cache') +local helper = require('test.helper') + +local SPACE_NAME_IDX = 1 +local SHARD_FUNC_NAME_IDX = 2 +local SHARD_FUNC_BODY_IDX = 3 + +local primary_index = { + type = 'HASH', + unique = true, + parts = { + {path = 'string_nonnull', is_nullable = false, type = 'string'}, + {path = 'unsigned_nonnull', is_nullable = false, type = 'unsigned'}, + }, + name = 'primary' +} + +local bucket_id_idx = { + type = 'TREE', + unique = false, + parts = {{path = 'bucket_id', type = 'unsigned', is_nullable = false}}, + name = 'bucket_id' +} + +local func_body_first = 'function() return 42 end' +local func_body_second = 'function() return 24 end' + +local function space_init(g) + db.drop_all() + + g.space = { + engine = 'memtx', + is_local = true, + temporary = false, + format = table.deepcopy(helper.test_space_format()) + } + table.insert(g.space.format, 1, { + name = 'bucket_id', type = 'unsigned', is_nullable = false + }) + + g.space.indexes = { + table.deepcopy(primary_index), + table.deepcopy(bucket_id_idx) + } + g.space.sharding_key = {'unsigned_nonnull', 'integer_nonnull'} + g.schema = { + spaces = { + space = g.space, + } + } +end + +local g = t.group() +g.before_all(db.init) +g.before_each(space_init) + +function g.test_cache_processed_func_body() + g.schema.spaces.space.sharding_func = { + body = func_body_first + } + local ok, err = ddl.set_schema(g.schema) + t.assert_equals(err, nil) + t.assert_equals(ok, true) + + local res = cache.internal.get('space') + t.assert(res) + t.assert(res.processed) + t.assert(type(res.processed) == 'function') + t.assert_equals(res.processed(), 42) +end + +function g.test_cache_processed_func_name() + local sharding_func_name = 'sharding_func' + rawset(_G, sharding_func_name, function(key) return key end) + g.schema.spaces.space.sharding_func = sharding_func_name + + local ok, err = ddl.set_schema(g.schema) + t.assert_equals(err, nil) + t.assert_equals(ok, true) + + local res = cache.internal.get('space') + t.assert(res) + t.assert(res.processed) + t.assert(type(res.processed) == 'table') + t.assert_equals(res.processed[1], 'sharding_func') + + rawset(_G, sharding_func_name, nil) +end + +function g.test_cache_schema_changed() + g.schema.spaces.space.sharding_func = { + body = func_body_first + } + local ok, err = ddl.set_schema(g.schema) + t.assert_equals(err, nil) + t.assert_equals(ok, true) + + local res = cache.internal.get('space') + t.assert(res) + t.assert(res.raw) + t.assert_equals(res.raw[SPACE_NAME_IDX], 'space') + t.assert_equals(res.raw[SHARD_FUNC_NAME_IDX], nil) + t.assert_equals(res.raw[SHARD_FUNC_BODY_IDX], func_body_first) + + space_init(g) + + g.schema.spaces.space.sharding_func = { + body = func_body_second + } + local ok, err = ddl.set_schema(g.schema) + t.assert_equals(err, nil) + t.assert_equals(ok, true) + + local res = cache.internal.get('space') + t.assert(res) + t.assert(res.raw) + t.assert_equals(res.raw[SPACE_NAME_IDX], 'space') + t.assert_equals(res.raw[SHARD_FUNC_NAME_IDX], nil) + t.assert_equals(res.raw[SHARD_FUNC_BODY_IDX], func_body_second) +end + +function g.test_cache_space_updated() + g.schema.spaces.space.sharding_func = { + body = func_body_first + } + local ok, err = ddl.set_schema(g.schema) + t.assert_equals(err, nil) + t.assert_equals(ok, true) + + local res = cache.internal.get('space') + t.assert(res) + t.assert(res.raw) + t.assert_equals(res.raw[SPACE_NAME_IDX], 'space') + t.assert_equals(res.raw[SHARD_FUNC_NAME_IDX], nil) + t.assert_equals(res.raw[SHARD_FUNC_BODY_IDX], func_body_first) + + box.space._ddl_sharding_func + :update({'space'}, {{'=', SHARD_FUNC_BODY_IDX, func_body_second}}) + + local res = cache.internal.get('space') + t.assert(res) + t.assert(res.raw) + t.assert_equals(res.raw[SPACE_NAME_IDX], 'space') + t.assert_equals(res.raw[SHARD_FUNC_NAME_IDX], nil) + t.assert_equals(res.raw[SHARD_FUNC_BODY_IDX], func_body_second) +end