Skip to content

Commit

Permalink
storage: support the HTTP API of sync table schema (#9449)
Browse files Browse the repository at this point in the history
ref #9032

storage: support the HTTP API of sync table schema

Signed-off-by: Lloyd-Pottiger <[email protected]>

Co-authored-by: Lynn <[email protected]>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 20, 2024
1 parent db17e31 commit 355ab8c
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 1 deletion.
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ namespace DB
M(force_fail_to_create_etcd_session) \
M(force_remote_read_for_batch_cop_once) \
M(exception_new_dynamic_thread) \
M(force_wait_index_timeout)
M(force_wait_index_timeout) \
M(sync_schema_request_failure)

#define APPLY_FOR_FAILPOINTS(M) \
M(skip_check_segment_update) \
Expand Down
106 changes: 106 additions & 0 deletions dbms/src/Storages/KVStore/FFI/ProxyFFIStatusService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/TiFlashMetrics.h>
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <RaftStoreProxyFFI/ProxyFFI.h>
Expand All @@ -23,13 +25,21 @@
#include <Storages/KVStore/TMTContext.h>
#include <Storages/S3/S3GCManager.h>
#include <TiDB/OwnerManager.h>
#include <TiDB/Schema/SchemaSyncService.h>
#include <TiDB/Schema/TiDBSchemaManager.h>
#include <common/logger_useful.h>
#include <fmt/core.h>

#include <boost/algorithm/string.hpp>
#include <magic_enum.hpp>

namespace DB
{
namespace FailPoints
{
extern const char sync_schema_request_failure[];
} // namespace FailPoints

HttpRequestRes HandleHttpRequestSyncStatus(
EngineStoreServerWrap * server,
std::string_view path,
Expand Down Expand Up @@ -277,6 +287,101 @@ HttpRequestRes HandleHttpRequestRemoteGC(
};
}

// Acquiring load schema to sync schema from TiKV in this TiFlash node with given keyspace id.
HttpRequestRes HandleHttpRequestSyncSchema(
EngineStoreServerWrap * server,
std::string_view path,
const std::string & api_name,
std::string_view,
std::string_view)
{
pingcap::pd::KeyspaceID keyspace_id = NullspaceID;
TableID table_id = InvalidTableID;
HttpRequestStatus status = HttpRequestStatus::Ok;
auto log = Logger::get("HandleHttpRequestSyncSchema");

auto & global_context = server->tmt->getContext();
// For compute node, simply return OK
if (global_context.getSharedContextDisagg()->isDisaggregatedComputeMode())
{
return HttpRequestRes{
.status = status,
.res = CppStrWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{nullptr, 0}},
};
}

{
LOG_TRACE(log, "handling sync schema request, path: {}, api_name: {}", path, api_name);

// schema: /keyspace/{keyspace_id}/table/{table_id}
auto query = path.substr(api_name.size());
std::vector<std::string> query_parts;
boost::split(query_parts, query, boost::is_any_of("/"));
if (query_parts.size() != 4 || query_parts[0] != "keyspace" || query_parts[2] != "table")
{
LOG_ERROR(log, "invalid SyncSchema request: {}", query);
status = HttpRequestStatus::ErrorParam;
return HttpRequestRes{
.status = HttpRequestStatus::ErrorParam,
.res = CppStrWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{nullptr, 0}}};
}

try
{
keyspace_id = std::stoll(query_parts[1]);
table_id = std::stoll(query_parts[3]);
}
catch (...)
{
status = HttpRequestStatus::ErrorParam;
}

if (status != HttpRequestStatus::Ok)
return HttpRequestRes{
.status = status,
.res = CppStrWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{nullptr, 0}}};
}

std::string err_msg;
try
{
auto & tmt_ctx = *server->tmt;
bool done = tmt_ctx.getSchemaSyncerManager()->syncTableSchema(global_context, keyspace_id, table_id);
if (!done)
{
err_msg = "sync schema failed";
}
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::sync_schema_request_failure);
}
catch (const DB::Exception & e)
{
err_msg = e.message();
}
catch (...)
{
err_msg = "sync schema failed, unknown exception";
}

if (!err_msg.empty())
{
Poco::JSON::Object::Ptr json = new Poco::JSON::Object();
json->set("errMsg", err_msg);
std::stringstream ss;
json->stringify(ss);

auto * s = RawCppString::New(ss.str());
return HttpRequestRes{
.status = HttpRequestStatus::ErrorParam,
.res = CppStrWithView{
.inner = GenRawCppPtr(s, RawCppPtrTypeImpl::String),
.view = BaseBuffView{s->data(), s->size()}}};
}

return HttpRequestRes{
.status = status,
.res = CppStrWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{nullptr, 0}}};
}

using HANDLE_HTTP_URI_METHOD = HttpRequestRes (*)(
EngineStoreServerWrap *,
std::string_view,
Expand All @@ -286,6 +391,7 @@ using HANDLE_HTTP_URI_METHOD = HttpRequestRes (*)(

static const std::map<std::string, HANDLE_HTTP_URI_METHOD> AVAILABLE_HTTP_URI = {
{"/tiflash/sync-status/", HandleHttpRequestSyncStatus},
{"/tiflash/sync-schema/", HandleHttpRequestSyncSchema},
{"/tiflash/store-status", HandleHttpRequestStoreStatus},
{"/tiflash/remote/owner/info", HandleHttpRequestRemoteOwnerInfo},
{"/tiflash/remote/owner/resign", HandleHttpRequestRemoteOwnerResign},
Expand Down
177 changes: 177 additions & 0 deletions dbms/src/Storages/KVStore/tests/gtest_sync_schema.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Databases/DatabaseTiFlash.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Storages/IManageableStorage.h>
#include <Storages/KVStore/FFI/ProxyFFI.h>
#include <Storages/KVStore/FFI/ProxyFFICommon.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/KVStore/TMTStorages.h>
#include <Storages/KVStore/Types.h>
#include <Storages/KVStore/tests/region_helper.h>
#include <Storages/KVStore/tests/region_kvstore_test.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/registerStorages.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <TiDB/Schema/SchemaNameMapper.h>
#include <TiDB/Schema/TiDBSchemaManager.h>

namespace DB
{
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
} // namespace ErrorCodes

namespace FailPoints
{
extern const char sync_schema_request_failure[];
} // namespace FailPoints

namespace tests
{
class SyncSchemaTest : public ::testing::Test
{
public:
SyncSchemaTest() = default;
static void SetUpTestCase()
{
try
{
registerStorages();
}
catch (DB::Exception &)
{
// Maybe another test has already registed, ignore exception here.
}
}
void SetUp() override { recreateMetadataPath(); }

void TearDown() override
{
// Clean all database from context.
auto ctx = TiFlashTestEnv::getContext();
for (const auto & [name, db] : ctx->getDatabases())
{
ctx->detachDatabase(name);
db->shutdown();
}
}
static void recreateMetadataPath()
{
String path = TiFlashTestEnv::getContext()->getPath();
auto p = path + "/metadata/";
TiFlashTestEnv::tryRemovePath(p, /*recreate=*/true);
p = path + "/data/";
TiFlashTestEnv::tryRemovePath(p, /*recreate=*/true);
}
};

TEST_F(SyncSchemaTest, TestNormal)
try
{
auto ctx = TiFlashTestEnv::getContext();
auto pd_client = ctx->getGlobalContext().getTMTContext().getPDClient();

MockTiDB::instance().newDataBase("db_1");
auto cols = ColumnsDescription({
{"col1", typeFromString("Int64")},
});
auto table_id = MockTiDB::instance().newTable("db_1", "t_1", cols, pd_client->getTS(), "");
auto schema_syncer = ctx->getTMTContext().getSchemaSyncerManager();
KeyspaceID keyspace_id = NullspaceID;
schema_syncer->syncSchemas(ctx->getGlobalContext(), keyspace_id);

EngineStoreServerWrap store_server_wrap{};
store_server_wrap.tmt = &ctx->getTMTContext();
auto helper = GetEngineStoreServerHelper(&store_server_wrap);
String path = fmt::format("/tiflash/sync-schema/keyspace/{}/table/{}", keyspace_id, table_id);
auto res = helper.fn_handle_http_request(
&store_server_wrap,
BaseBuffView{path.data(), path.length()},
BaseBuffView{path.data(), path.length()},
BaseBuffView{"", 0});
EXPECT_EQ(res.status, HttpRequestStatus::Ok);
{
// normal errmsg is nil.
EXPECT_EQ(res.res.view.len, 0);
}
delete (static_cast<RawCppString *>(res.res.inner.ptr));

// do sync table schema twice
{
path = fmt::format("/tiflash/sync-schema/keyspace/{}/table/{}", keyspace_id, table_id);
auto res = helper.fn_handle_http_request(
&store_server_wrap,
BaseBuffView{path.data(), path.length()},
BaseBuffView{path.data(), path.length()},
BaseBuffView{"", 0});
EXPECT_EQ(res.status, HttpRequestStatus::Ok);
{
// normal errmsg is nil.
EXPECT_EQ(res.res.view.len, 0);
}
delete (static_cast<RawCppString *>(res.res.inner.ptr));
}

// test wrong table ID
{
TableID wrong_table_id = table_id + 1;
path = fmt::format("/tiflash/sync-schema/keyspace/{}/table/{}", keyspace_id, wrong_table_id);
auto res_err = helper.fn_handle_http_request(
&store_server_wrap,
BaseBuffView{path.data(), path.length()},
BaseBuffView{path.data(), path.length()},
BaseBuffView{"", 0});
EXPECT_EQ(res_err.status, HttpRequestStatus::ErrorParam);
StringRef sr(res_err.res.view.data, res_err.res.view.len);
{
EXPECT_EQ(sr.toString(), "{\"errMsg\":\"sync schema failed\"}");
}
delete (static_cast<RawCppString *>(res_err.res.inner.ptr));
}

// test sync schema failed
{
path = fmt::format("/tiflash/sync-schema/keyspace/{}/table/{}", keyspace_id, table_id);
FailPointHelper::enableFailPoint(FailPoints::sync_schema_request_failure);
auto res_err1 = helper.fn_handle_http_request(
&store_server_wrap,
BaseBuffView{path.data(), path.length()},
BaseBuffView{path.data(), path.length()},
BaseBuffView{"", 0});
EXPECT_EQ(res_err1.status, HttpRequestStatus::ErrorParam);
StringRef sr(res_err1.res.view.data, res_err1.res.view.len);
{
EXPECT_EQ(
sr.toString(),
"{\"errMsg\":\"Fail point FailPoints::sync_schema_request_failure is triggered.\"}");
}
delete (static_cast<RawCppString *>(res_err1.res.inner.ptr));
}

dropDataBase("db_1");
}
CATCH

} // namespace tests
} // namespace DB
4 changes: 4 additions & 0 deletions dbms/src/Storages/KVStore/tests/region_kvstore_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,8 @@ inline void validateSSTGeneration(
ASSERT_EQ(counter, key_count);
}

ASTPtr parseCreateStatement(const String & statement);
TableID createDBAndTable(String db_name, String table_name);
void dropDataBase(String db_name);

} // namespace DB::tests

0 comments on commit 355ab8c

Please sign in to comment.