Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: support the HTTP API of sync table schema #9449

Merged
merged 3 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ namespace DB
M(force_fail_to_create_etcd_session) \
M(force_remote_read_for_batch_cop_once) \
M(exception_new_dynamic_thread) \
M(force_wait_index_timeout)
M(force_wait_index_timeout) \
M(sync_schema_request_failure)

#define APPLY_FOR_FAILPOINTS(M) \
M(skip_check_segment_update) \
Expand Down
106 changes: 106 additions & 0 deletions dbms/src/Storages/KVStore/FFI/ProxyFFIStatusService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/TiFlashMetrics.h>
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <RaftStoreProxyFFI/ProxyFFI.h>
Expand All @@ -23,13 +25,21 @@
#include <Storages/KVStore/TMTContext.h>
#include <Storages/S3/S3GCManager.h>
#include <TiDB/OwnerManager.h>
#include <TiDB/Schema/SchemaSyncService.h>
#include <TiDB/Schema/TiDBSchemaManager.h>
#include <common/logger_useful.h>
#include <fmt/core.h>

#include <boost/algorithm/string.hpp>
#include <magic_enum.hpp>

namespace DB
{
namespace FailPoints
{
extern const char sync_schema_request_failure[];
} // namespace FailPoints

HttpRequestRes HandleHttpRequestSyncStatus(
EngineStoreServerWrap * server,
std::string_view path,
Expand Down Expand Up @@ -277,6 +287,101 @@ HttpRequestRes HandleHttpRequestRemoteGC(
};
}

// Acquiring load schema to sync schema from TiKV in this TiFlash node with given keyspace id.
HttpRequestRes HandleHttpRequestSyncSchema(
EngineStoreServerWrap * server,
std::string_view path,
const std::string & api_name,
std::string_view,
std::string_view)
{
pingcap::pd::KeyspaceID keyspace_id = NullspaceID;
TableID table_id = InvalidTableID;
HttpRequestStatus status = HttpRequestStatus::Ok;
auto log = Logger::get("HandleHttpRequestSyncSchema");

auto & global_context = server->tmt->getContext();
// For compute node, simply return OK
if (global_context.getSharedContextDisagg()->isDisaggregatedComputeMode())
{
return HttpRequestRes{
.status = status,
.res = CppStrWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{nullptr, 0}},
};
}

{
LOG_TRACE(log, "handling sync schema request, path: {}, api_name: {}", path, api_name);

// schema: /keyspace/{keyspace_id}/table/{table_id}
auto query = path.substr(api_name.size());
std::vector<std::string> query_parts;
boost::split(query_parts, query, boost::is_any_of("/"));
if (query_parts.size() != 4 || query_parts[0] != "keyspace" || query_parts[2] != "table")
{
LOG_ERROR(log, "invalid SyncSchema request: {}", query);
status = HttpRequestStatus::ErrorParam;
return HttpRequestRes{
.status = HttpRequestStatus::ErrorParam,
.res = CppStrWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{nullptr, 0}}};
}

try
{
keyspace_id = std::stoll(query_parts[1]);
table_id = std::stoll(query_parts[3]);
}
catch (...)
{
status = HttpRequestStatus::ErrorParam;
}

if (status != HttpRequestStatus::Ok)
return HttpRequestRes{
.status = status,
.res = CppStrWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{nullptr, 0}}};
}

std::string err_msg;
try
{
auto & tmt_ctx = *server->tmt;
bool done = tmt_ctx.getSchemaSyncerManager()->syncTableSchema(global_context, keyspace_id, table_id);
if (!done)
{
err_msg = "sync schema failed";
}
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::sync_schema_request_failure);
}
catch (const DB::Exception & e)
{
err_msg = e.message();
}
catch (...)
{
err_msg = "sync schema failed, unknown exception";
}

if (!err_msg.empty())
{
Poco::JSON::Object::Ptr json = new Poco::JSON::Object();
json->set("errMsg", err_msg);
std::stringstream ss;
json->stringify(ss);

auto * s = RawCppString::New(ss.str());
return HttpRequestRes{
.status = HttpRequestStatus::ErrorParam,
.res = CppStrWithView{
.inner = GenRawCppPtr(s, RawCppPtrTypeImpl::String),
.view = BaseBuffView{s->data(), s->size()}}};
}

return HttpRequestRes{
.status = status,
.res = CppStrWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{nullptr, 0}}};
}

using HANDLE_HTTP_URI_METHOD = HttpRequestRes (*)(
EngineStoreServerWrap *,
std::string_view,
Expand All @@ -286,6 +391,7 @@ using HANDLE_HTTP_URI_METHOD = HttpRequestRes (*)(

static const std::map<std::string, HANDLE_HTTP_URI_METHOD> AVAILABLE_HTTP_URI = {
{"/tiflash/sync-status/", HandleHttpRequestSyncStatus},
{"/tiflash/sync-schema/", HandleHttpRequestSyncSchema},
{"/tiflash/store-status", HandleHttpRequestStoreStatus},
{"/tiflash/remote/owner/info", HandleHttpRequestRemoteOwnerInfo},
{"/tiflash/remote/owner/resign", HandleHttpRequestRemoteOwnerResign},
Expand Down
177 changes: 177 additions & 0 deletions dbms/src/Storages/KVStore/tests/gtest_sync_schema.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Databases/DatabaseTiFlash.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Storages/IManageableStorage.h>
#include <Storages/KVStore/FFI/ProxyFFI.h>
#include <Storages/KVStore/FFI/ProxyFFICommon.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/KVStore/TMTStorages.h>
#include <Storages/KVStore/Types.h>
#include <Storages/KVStore/tests/region_helper.h>
#include <Storages/KVStore/tests/region_kvstore_test.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/registerStorages.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <TiDB/Schema/SchemaNameMapper.h>
#include <TiDB/Schema/TiDBSchemaManager.h>

namespace DB
{
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
} // namespace ErrorCodes

namespace FailPoints
{
extern const char sync_schema_request_failure[];
} // namespace FailPoints

namespace tests
{
class SyncSchemaTest : public ::testing::Test
{
public:
SyncSchemaTest() = default;
static void SetUpTestCase()
{
try
{
registerStorages();
}
catch (DB::Exception &)
{
// Maybe another test has already registed, ignore exception here.
}
}
void SetUp() override { recreateMetadataPath(); }

void TearDown() override
{
// Clean all database from context.
auto ctx = TiFlashTestEnv::getContext();
for (const auto & [name, db] : ctx->getDatabases())
{
ctx->detachDatabase(name);
db->shutdown();
}
}
static void recreateMetadataPath()
{
String path = TiFlashTestEnv::getContext()->getPath();
auto p = path + "/metadata/";
TiFlashTestEnv::tryRemovePath(p, /*recreate=*/true);
p = path + "/data/";
TiFlashTestEnv::tryRemovePath(p, /*recreate=*/true);
}
};

TEST_F(SyncSchemaTest, TestNormal)
try
{
auto ctx = TiFlashTestEnv::getContext();
auto pd_client = ctx->getGlobalContext().getTMTContext().getPDClient();

MockTiDB::instance().newDataBase("db_1");
auto cols = ColumnsDescription({
{"col1", typeFromString("Int64")},
});
auto table_id = MockTiDB::instance().newTable("db_1", "t_1", cols, pd_client->getTS(), "");
auto schema_syncer = ctx->getTMTContext().getSchemaSyncerManager();
KeyspaceID keyspace_id = NullspaceID;
schema_syncer->syncSchemas(ctx->getGlobalContext(), keyspace_id);

EngineStoreServerWrap store_server_wrap{};
store_server_wrap.tmt = &ctx->getTMTContext();
auto helper = GetEngineStoreServerHelper(&store_server_wrap);
String path = fmt::format("/tiflash/sync-schema/keyspace/{}/table/{}", keyspace_id, table_id);
auto res = helper.fn_handle_http_request(
&store_server_wrap,
BaseBuffView{path.data(), path.length()},
BaseBuffView{path.data(), path.length()},
BaseBuffView{"", 0});
EXPECT_EQ(res.status, HttpRequestStatus::Ok);
{
// normal errmsg is nil.
EXPECT_EQ(res.res.view.len, 0);
}
delete (static_cast<RawCppString *>(res.res.inner.ptr));

// do sync table schema twice
{
path = fmt::format("/tiflash/sync-schema/keyspace/{}/table/{}", keyspace_id, table_id);
auto res = helper.fn_handle_http_request(
&store_server_wrap,
BaseBuffView{path.data(), path.length()},
BaseBuffView{path.data(), path.length()},
BaseBuffView{"", 0});
EXPECT_EQ(res.status, HttpRequestStatus::Ok);
{
// normal errmsg is nil.
EXPECT_EQ(res.res.view.len, 0);
}
delete (static_cast<RawCppString *>(res.res.inner.ptr));
}

// test wrong table ID
{
TableID wrong_table_id = table_id + 1;
path = fmt::format("/tiflash/sync-schema/keyspace/{}/table/{}", keyspace_id, wrong_table_id);
auto res_err = helper.fn_handle_http_request(
&store_server_wrap,
BaseBuffView{path.data(), path.length()},
BaseBuffView{path.data(), path.length()},
BaseBuffView{"", 0});
EXPECT_EQ(res_err.status, HttpRequestStatus::ErrorParam);
StringRef sr(res_err.res.view.data, res_err.res.view.len);
{
EXPECT_EQ(sr.toString(), "{\"errMsg\":\"sync schema failed\"}");
}
delete (static_cast<RawCppString *>(res_err.res.inner.ptr));
}

// test sync schema failed
{
path = fmt::format("/tiflash/sync-schema/keyspace/{}/table/{}", keyspace_id, table_id);
FailPointHelper::enableFailPoint(FailPoints::sync_schema_request_failure);
auto res_err1 = helper.fn_handle_http_request(
&store_server_wrap,
BaseBuffView{path.data(), path.length()},
BaseBuffView{path.data(), path.length()},
BaseBuffView{"", 0});
EXPECT_EQ(res_err1.status, HttpRequestStatus::ErrorParam);
StringRef sr(res_err1.res.view.data, res_err1.res.view.len);
{
EXPECT_EQ(
sr.toString(),
"{\"errMsg\":\"Fail point FailPoints::sync_schema_request_failure is triggered.\"}");
}
delete (static_cast<RawCppString *>(res_err1.res.inner.ptr));
}

dropDataBase("db_1");
}
CATCH

} // namespace tests
} // namespace DB
4 changes: 4 additions & 0 deletions dbms/src/Storages/KVStore/tests/region_kvstore_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,8 @@ inline void validateSSTGeneration(
ASSERT_EQ(counter, key_count);
}

ASTPtr parseCreateStatement(const String & statement);
TableID createDBAndTable(String db_name, String table_name);
void dropDataBase(String db_name);

} // namespace DB::tests