Skip to content

Commit

Permalink
[yugabyte#7798] DocDB: Only the YB-Master Leader should refresh the t…
Browse files Browse the repository at this point in the history
…ablespace info in memory

Summary:
Today the Catalog Manager spins up a background task that reads the PG sys catalog
tables periodically and builds the table->tablespace and tablespace->replication_info
maps in memory. This information is used during load balancing and to place the replicas
of a table when it is created. Thus, only the master leader really needs to run this
background task.

This patch causes the tablespace refresh task to be started up by the
CatalogManagerBackgroundTask that invokes the load balancer. This
guarantees that the background task will run only on the leader.

Additionally the TestTablespaceProperties was timing out on clang_asan,
gcc-release, gcc-debug build types. Fixed the test by increasing
timeouts for some build types. Added logs to the test to facilitate easy
debugging.

Test Plan: ybd --scb --sj --java-test org.yb.pgsql.TestTablespaceProperties

Reviewers: rahuldesirazu

Reviewed By: rahuldesirazu

Subscribers: zyu, kannan, yql

Differential Revision: https://phabricator.dev.yugabyte.com/D11133
  • Loading branch information
deeps1991 authored and YintongMa committed May 26, 2021
1 parent df4c7a3 commit 53a124e
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class TestTablespaceProperties extends BasePgSQLTest {

private static final int MASTER_REFRESH_TABLESPACE_INFO_SECS = 2;

private static final int MASTER_LOAD_BALANCER_WAIT_TIME_MS = 60 * 1000;

private List<Map<String, String>> perTserverZonePlacementFlags = Arrays.asList(
ImmutableMap.of(
"placement_cloud", "cloud1",
Expand All @@ -63,7 +65,7 @@ public class TestTablespaceProperties extends BasePgSQLTest {
"placement_zone", "zone3"));
@Override
public int getTestMethodTimeoutSec() {
return getPerfMaxRuntime(800, 1000, 1500, 1500, 1500);
return getPerfMaxRuntime(1000, 1200, 1500, 1500, 1500);
}

@Override
Expand Down Expand Up @@ -99,6 +101,7 @@ private void createTestData (String prefixName) throws Exception {
// Create tables in default and custom tablespaces.
setupStatement.execute(
"CREATE TABLE " + customTable + "(a int) TABLESPACE testTablespace");

setupStatement.execute(
"CREATE TABLE " + defaultTable + "(a int)");

Expand All @@ -122,13 +125,13 @@ private void createTestData (String prefixName) throws Exception {
}

private void addTserversAndWaitForLB() throws Exception {
int expectedTServers = miniCluster.getTabletServers().size() + 2;
int expectedTServers = miniCluster.getTabletServers().size() + 1;
miniCluster.startTServer(perTserverZonePlacementFlags.get(1));
miniCluster.startTServer(perTserverZonePlacementFlags.get(2));
miniCluster.waitForTabletServers(expectedTServers);

// Wait for loadbalancer to run.
assertTrue(miniCluster.getClient().waitForLoadBalancerActive(30000 /* timeoutMs */));
assertTrue(miniCluster.getClient().waitForLoadBalancerActive(
MASTER_LOAD_BALANCER_WAIT_TIME_MS));

// Wait for load balancer to become idle.
assertTrue(miniCluster.getClient().waitForLoadBalance(Long.MAX_VALUE, expectedTServers));
Expand All @@ -137,13 +140,16 @@ private void addTserversAndWaitForLB() throws Exception {
@Test
public void testTablespaces() throws Exception {
// Run sanity tests for tablespaces.
LOG.info("Running tablespace sanity tests");
sanityTest();

// Test with tablespaces disabled.
LOG.info("Run tests with tablespaces disabled");
testDisabledTablespaces();

// Test load balancer functions as expected when
// tables are placed incorrectly at creation time.
LOG.info("Run load balancer tablespace placement tests");
testLBTablespacePlacement();
}

Expand All @@ -162,6 +168,7 @@ public void sanityTest() throws Exception {

// Verify that the table was created and its tablets were placed
// according to the tablespace replication info.
LOG.info("Verify whether tablet replicas were placed correctly at creation time");
verifyPlacement();

// Wait for tablespace info to be refreshed in load balancer.
Expand All @@ -171,14 +178,17 @@ public void sanityTest() throws Exception {

// Verify that the loadbalancer also placed the tablets of the table based on the
// tablespace replication info.
LOG.info("Verify whether the load balancer maintained the placement of tablet replicas" +
" after TServers were added");
verifyPlacement();

// Trigger a master leader change.
LeaderStepDownResponse resp = client.masterLeaderStepDown();
assertFalse(resp.hasError());

Thread.sleep(5 * MiniYBCluster.TSERVER_HEARTBEAT_INTERVAL_MS);
Thread.sleep(10 * MiniYBCluster.TSERVER_HEARTBEAT_INTERVAL_MS);

LOG.info("Verify that tablets have been placed correctly even after master leader changed");
verifyPlacement();
}

Expand All @@ -191,42 +201,52 @@ public void testDisabledTablespaces() throws Exception {

// At this point, since tablespaces are disabled, the LB will detect that the older
// tables have not been correctly placed. Wait until the load balancer is active.
assertTrue(miniCluster.getClient().waitForLoadBalancerActive(30000 /* timeoutMs */));
assertTrue(miniCluster.getClient().waitForLoadBalancerActive(
MASTER_LOAD_BALANCER_WAIT_TIME_MS));

// Wait for LB to finish its run.
assertTrue(miniCluster.getClient().waitForLoadBalancerIdle(200000 /* timeoutMs */));
assertTrue(miniCluster.getClient().waitForLoadBalancerIdle(
MASTER_LOAD_BALANCER_WAIT_TIME_MS));

createTestData("disabled_tablespace_test");

// Verify that the loadbalancer also placed the tablets of the table based on the
// tablespace replication info.
LOG.info("Verify placement of tablets after tablespaces have been disabled");
verifyDefaultPlacementForAll();
}

public void testLBTablespacePlacement() throws Exception {
// This test disables setting the tablespace id at creation time. Thus, the
// tablets of the table will be incorrectly placed based on cluster config
// at creation time, and we will rely on the LB to correctly place the table
// based on its tablespace.
// This test disables using tablespaces at creation time. Thus, the tablets of the table will be
// incorrectly placed based on cluster config at creation time, and we will rely on the LB to
// correctly place the table based on its tablespace.
// Set master flags.
YBClient client = miniCluster.getClient();
for (HostAndPort hp : miniCluster.getMasters().keySet()) {
assertTrue(client.setFlag(hp, "enable_ysql_tablespaces_for_placement", "false"));
}

createTestData("test_lb_placement");

for (HostAndPort hp : miniCluster.getMasters().keySet()) {
assertTrue(client.setFlag(hp, "enable_ysql_tablespaces_for_placement", "true"));
assertTrue(client.setFlag(hp, "TEST_disable_setting_tablespace_id_at_creation", "true"));
assertTrue(client.setFlag(hp, "v", "3"));
}
createTestData("test_lb_placement");

// Since the tablespace-id was not checked during creation, the tablet replicas
// would have been placed wrongly. This condition will be detected by the load
// balancer. Wait until it starts running to fix these wrongly placed tablets.
assertTrue(miniCluster.getClient().waitForLoadBalancerActive(30000 /* timeoutMs */));
assertTrue(miniCluster.getClient().waitForLoadBalancerActive(
MASTER_LOAD_BALANCER_WAIT_TIME_MS));

// Wait for LB to finish its run.
assertTrue(miniCluster.getClient().waitForLoadBalancerIdle(200000 /* timeoutMs */));
assertTrue(miniCluster.getClient().waitForLoadBalancerIdle(
MASTER_LOAD_BALANCER_WAIT_TIME_MS));

// Verify that the loadbalancer placed the tablets of the table based on the
// tablespace replication info.
LOG.info("Verify whether tablet replicas placed incorrectly at creation time are moved to " +
"their appropriate placement by the load balancer");
verifyPlacement();
}

Expand Down
105 changes: 68 additions & 37 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,6 @@ DEFINE_int32(ysql_tablespace_info_refresh_secs, 30,
"from pg catalog tables. A value of -1 disables the refresh task.");
TAG_FLAG(ysql_tablespace_info_refresh_secs, runtime);

DEFINE_test_flag(bool, disable_setting_tablespace_id_at_creation, false,
"When set, placement of the tablets of a newly created table will not honor "
"its tablespace placement policy until the loadbalancer runs.");
TAG_FLAG(TEST_disable_setting_tablespace_id_at_creation, runtime);

DEFINE_test_flag(bool, crash_server_on_sys_catalog_leader_affinity_move, false,
"When set, crash the master process if it performs a sys catalog leader affinity "
"move.");
Expand Down Expand Up @@ -572,8 +567,6 @@ bool IsIndexBackfillEnabled(TableType table_type, bool is_transactional) {

constexpr auto kDefaultYQLPartitionsRefreshBgTaskSleep = 10s;

constexpr auto kDefaultYSQLTablespaceRefreshBgTaskSleepSecs = 10s;

} // anonymous namespace

////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -611,7 +604,7 @@ CatalogManager::CatalogManager(Master* master)
ysql_transaction_(this, master_),
tablespace_placement_map_(std::make_shared<TablespaceIdToReplicationInfoMap>()),
table_to_tablespace_map_(std::make_shared<TableToTablespaceIdMap>()),
tablespace_info_task_running_(false) {
tablespace_bg_task_running_(false) {
yb::InitCommonFlags();
CHECK_OK(ThreadPoolBuilder("leader-initialization")
.set_max_threads(1)
Expand Down Expand Up @@ -1784,47 +1777,75 @@ CatalogManager::GetAndUpdateYsqlTablespaceInfo() {
return tablespace_map;
}

void CatalogManager::StartRefreshYSQLTablePlacementInfo() {
bool is_task_running = tablespace_info_task_running_.exchange(true);
if (!is_task_running) {
// The task is not already running. Start it.
RefreshYSQLTablePlacementInfo();
void CatalogManager::StartTablespaceBgTaskIfStopped() {
if (GetAtomicFlag(&FLAGS_ysql_tablespace_info_refresh_secs) <= 0 ||
!GetAtomicFlag(&FLAGS_enable_ysql_tablespaces_for_placement)) {
// The tablespace bg task is disabled. Nothing to do.
return;
}
}

void CatalogManager::RefreshYSQLTablePlacementInfo() {
auto wait_time = GetAtomicFlag(&FLAGS_ysql_tablespace_info_refresh_secs) * 1s;
// If FLAGS_enable_ysql_tablespaces_for_placement is not set, refresh is disabled.
if (GetAtomicFlag(&FLAGS_enable_ysql_tablespaces_for_placement) && wait_time > 0s) {
DoRefreshYSQLTablePlacementInfo();
const bool is_task_running = tablespace_bg_task_running_.exchange(true);
if (is_task_running) {
// Task already running, nothing to do.
return;
}

if (wait_time <= 0s) {
wait_time = kDefaultYSQLTablespaceRefreshBgTaskSleepSecs;
ScheduleRefreshTablespaceInfoTask(true /* schedule_now */);
}

void CatalogManager::ScheduleRefreshTablespaceInfoTask(const bool schedule_now) {
int wait_time = 0;

if (!schedule_now) {
wait_time = GetAtomicFlag(&FLAGS_ysql_tablespace_info_refresh_secs);
if (wait_time <= 0) {
// The tablespace refresh task has been disabled.
tablespace_bg_task_running_ = false;
return;
}
}

refresh_ysql_tablespace_info_task_.Schedule([this](const Status& status) {
Status s = background_tasks_thread_pool_->SubmitFunc(
std::bind(&CatalogManager::RefreshYSQLTablePlacementInfo, this));
std::bind(&CatalogManager::RefreshTablespaceInfoPeriodically, this));
if (!s.IsOk()) {
LOG(WARNING) << "Failed to schedule: RefreshYSQLTablePlacementInfo";
tablespace_info_task_running_ = false;
// Failed to submit task to the thread pool. Mark that the task is now
// no longer running.
LOG(WARNING) << "Failed to schedule: RefreshTablespaceInfoPeriodically";
tablespace_bg_task_running_ = false;
}
}, wait_time);
}, wait_time * 1s);
}

void CatalogManager::DoRefreshYSQLTablePlacementInfo() {
// First refresh the tablespace info in memory.
auto table_info = GetTableInfo(kPgTablespaceTableId);
if (table_info == nullptr) {
LOG(WARNING) << "Table info not found for pg_tablespace catalog table";
void CatalogManager::RefreshTablespaceInfoPeriodically() {
if (!GetAtomicFlag(&FLAGS_enable_ysql_tablespaces_for_placement)) {
tablespace_bg_task_running_ = false;
return;
}

// Update tablespace_placement_map_.
auto&& s = GetAndUpdateYsqlTablespaceInfo();
if (!CheckIsLeaderAndReady().IsOk()) {
LOG(INFO) << "No longer the leader, so cancelling tablespace info task";
tablespace_bg_task_running_ = false;
return;
}

// Refresh the tablespace info in memory.
DoRefreshTablespaceInfo();

// Schedule the next iteration of the task.
ScheduleRefreshTablespaceInfoTask();
}

void CatalogManager::DoRefreshTablespaceInfo() {
VLOG(2) << "Running RefreshTablespaceInfoPeriodically task";

// First refresh the tablespace info in memory.
auto s = GetAndUpdateYsqlTablespaceInfo();
if (!s.ok()) {
// Refresh of tablespaces failed.
LOG(WARNING) << "Updating tablespace information failed with error "
<< StatusToString(s);
return;
}

// Now the table->tablespace information has to be updated in memory. To do this, first,
Expand All @@ -1839,6 +1860,12 @@ void CatalogManager::DoRefreshYSQLTablePlacementInfo() {
if (ns.second->database_type() != YQL_DATABASE_PGSQL) {
continue;
}

if (ns.second->colocated()) {
// Skip processing tables in colocated databases.
continue;
}

// TODO (Deepthi): Investigate if safe to skip template0 and template1 as well.
namespace_id_vec.emplace_back(ns.first);
}
Expand All @@ -1858,11 +1885,14 @@ void CatalogManager::DoRefreshYSQLTablePlacementInfo() {
VLOG(5) << "Successfully refreshed placement information for namespace "
<< nsid;
}
// Update table_to_tablespace_map_ and update the last refreshed time.
// Update table_to_tablespace_map_.
{
std::lock_guard<LockType> l(tablespace_lock_);
table_to_tablespace_map_ = table_to_tablespace_map;
}

VLOG(3) << "Refreshed tablespace information in memory";
return;
}

Status CatalogManager::AddIndexInfoToTable(const scoped_refptr<TableInfo>& indexed_table,
Expand Down Expand Up @@ -7133,6 +7163,11 @@ Status CatalogManager::UpdateMastersListInMemoryAndDisk() {

Status CatalogManager::EnableBgTasks() {
std::lock_guard<LockType> l(lock_);
// Initialize refresh_ysql_tablespace_info_task_. This will be used to
// manage the background task that refreshes tablespace info. This task
// will be started by the CatalogManagerBgTasks below.
refresh_ysql_tablespace_info_task_.Bind(&master_->messenger()->scheduler());

background_tasks_.reset(new CatalogManagerBgTasks(this));
RETURN_NOT_OK_PREPEND(background_tasks_->Init(),
"Failed to initialize catalog manager background tasks");
Expand All @@ -7143,10 +7178,6 @@ Status CatalogManager::EnableBgTasks() {
RETURN_NOT_OK(background_tasks_thread_pool_->SubmitFunc(
[this]() { RebuildYQLSystemPartitions(); }));

// Add bg thread to refresh tablespace information for ysql tables.
refresh_ysql_tablespace_info_task_.Bind(&master_->messenger()->scheduler());
RETURN_NOT_OK(background_tasks_thread_pool_->SubmitFunc(
[this]() { StartRefreshYSQLTablePlacementInfo(); }));
return Status::OK();
}

Expand Down
15 changes: 10 additions & 5 deletions src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1160,14 +1160,19 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
Result<std::shared_ptr<TablespaceIdToReplicationInfoMap>> GetAndUpdateYsqlTablespaceInfo();

// Starts the periodic job to update tablespace info if it is not running.
void StartRefreshYSQLTablePlacementInfo();
void StartTablespaceBgTaskIfStopped();

// Background task that refreshes the in-memory state for YSQL tables with their associated
// tablespace info.
void RefreshYSQLTablePlacementInfo();
// Note: This function should only ever be called by StartTablespaceBgTaskIfStopped()
// above.
void RefreshTablespaceInfoPeriodically();

// Helper function to refresh tablespace_placement_map_ and table_to_tablespace_map_.
void DoRefreshYSQLTablePlacementInfo();
// Helper function to schedule the next iteration of the tablespace info task.
void ScheduleRefreshTablespaceInfoTask(const bool schedule_now = false);

// Helper function to refresh the tablespace info.
void DoRefreshTablespaceInfo();

// Report metrics.
void ReportMetrics();
Expand Down Expand Up @@ -1428,7 +1433,7 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
std::shared_ptr<TableToTablespaceIdMap> table_to_tablespace_map_ GUARDED_BY(tablespace_lock_);

// Whether the periodic job to update tablespace info is running.
std::atomic<bool> tablespace_info_task_running_;
std::atomic<bool> tablespace_bg_task_running_;

private:
virtual bool CDCStreamExistsUnlocked(const CDCStreamId& id) REQUIRES_SHARED(lock_);
Expand Down
3 changes: 3 additions & 0 deletions src/yb/master/catalog_manager_bg_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ void CatalogManagerBgTasks::Run() {
YB_LOG_EVERY_N(INFO, 10) << s.message().ToBuffer();
}
}

// Start the tablespace background task.
catalog_manager_->StartTablespaceBgTaskIfStopped();
}
WARN_NOT_OK(catalog_manager_->encryption_manager_->
GetUniverseKeyRegistry(&catalog_manager_->master_->proxy_cache()),
Expand Down
3 changes: 0 additions & 3 deletions src/yb/master/cluster_balance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1393,9 +1393,6 @@ const PlacementInfoPB& ClusterLoadBalancer::GetClusterPlacementInfo() const {
}

void ClusterLoadBalancer::InitTablespaceInfo() {
// Start the tablespace refresh task if its not running.
catalog_manager_->StartRefreshYSQLTablePlacementInfo();

// Get the tablespace information.
catalog_manager_->GetTablespaceInfo(&tablespace_placement_map_,
&table_to_tablespace_map_);
Expand Down

0 comments on commit 53a124e

Please sign in to comment.