Skip to content

Commit

Permalink
[PLAT-15278]: Fix DB Scoped XCluster replication restart
Browse files Browse the repository at this point in the history
Summary:
We had 2 issues in restarting the xCluster DB scoped replication.

  # After restarting the whole xCluster, the table was no longer in replication.

**Issue**: The task was removing the tables from the replication but it skipped the step where it was supposed to add as it thought we already had the namespace in a replication setup.
**Resolution**: We would update the namespace config with status updating, allowing the addition of a namespace in replication. If the task fails, we will mark the namespace config with updating status as false and if all namespace config is in failed status then, we will mark the xCluster config in the failed state too.

  # Support restart of the partial database in DB Scoped replication

**Issue**: Task only handled the restart of the whole DB Scoped replication.
**Resolution:** Added the changes where we add selective database and add them back in the replication.

Also, Enabled testDbScopedRestart local provider test.

Test Plan: Tested manually by creating a DB scoped replication and performing whole and partial database restart. Verified that task succeeded and master UI displays all tables/namespace in replication and compare streamId.

Reviewers: hzare, cwang, sanketh

Reviewed By: hzare, cwang

Subscribers: yugaware

Differential Revision: https://phorge.dev.yugabyte.com/D38208
  • Loading branch information
vipul-yb committed Sep 20, 2024
1 parent b53ed3a commit f0eab8f
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -521,9 +521,10 @@ protected void addSubtasksForTablesNeedBootstrap(
if (xClusterConfig.getType() == ConfigType.Db) {
if (!xClusterConfig.getDbIds().contains(namespaceId)) {
xClusterConfig.addNamespaces(Set.of(namespaceId));
xClusterConfig.updateStatusForNamespace(
namespaceId, XClusterNamespaceConfig.Status.Updating);
}
xClusterConfig.updateStatusForNamespace(
namespaceId, XClusterNamespaceConfig.Status.Updating);

if (!isReplicationConfigCreated) {
createCreateOutboundReplicationGroupTask(
xClusterConfig, Collections.singleton(namespaceId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public void run() {
addSubtasksToAddDatabasesToXClusterConfig(xClusterConfig, databaseIdsToAdd);
}
if (!databaseIdsToRemove.isEmpty()) {
addSubtasksToRemoveDatabasesFromXClusterConfig(xClusterConfig, databaseIdsToRemove);
addSubtasksToRemoveDatabasesFromXClusterConfig(
xClusterConfig, databaseIdsToRemove, false /* keepEntry */);
}

} else {
Expand Down Expand Up @@ -461,11 +462,12 @@ protected void addSubtasksToAddDatabasesToXClusterConfig(
}

protected void addSubtasksToRemoveDatabasesFromXClusterConfig(
XClusterConfig xClusterConfig, Set<String> databases) {
XClusterConfig xClusterConfig, Set<String> databases, boolean keepEntry) {

for (String dbId : databases) {
createXClusterRemoveNamespaceFromTargetUniverseTask(xClusterConfig, dbId);
createXClusterRemoveNamespaceFromOutboundReplicationGroupTask(xClusterConfig, dbId);
createXClusterRemoveNamespaceFromOutboundReplicationGroupTask(
xClusterConfig, dbId, keepEntry);
}

if (xClusterConfig.isUsedForDr()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
import com.yugabyte.yw.models.XClusterConfig;
import com.yugabyte.yw.models.XClusterConfig.ConfigType;
import com.yugabyte.yw.models.XClusterConfig.XClusterConfigStatusType;
import com.yugabyte.yw.models.XClusterNamespaceConfig;
import com.yugabyte.yw.models.XClusterTableConfig;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.yb.master.MasterDdlOuterClass;

@Slf4j
Expand Down Expand Up @@ -61,12 +64,14 @@ public void run() {
false /* deletePitrConfigs */));
}

boolean isDBScopedReplication = xClusterConfig.getType() == ConfigType.Db;

List<MasterDdlOuterClass.ListTablesResponsePB.TableInfo> tableInfoList =
taskParams().getTableInfoList();
Set<String> tableIds = getTableIds(tableInfoList);

boolean isRestartWholeConfig;
if (xClusterConfig.getType() != ConfigType.Db) {
if (!isDBScopedReplication) {
// Set table type for old xCluster configs.
xClusterConfig.updateTableType(tableInfoList);

Expand All @@ -90,9 +95,14 @@ public void run() {
taskParams().getDbs().size() == xClusterConfig.getNamespaces().size();
}

if (!CollectionUtils.isEmpty(taskParams().getDbs())) {
xClusterConfig.updateStatusForNamespaces(
taskParams().getDbs(), XClusterNamespaceConfig.Status.Updating);
}

log.info("isRestartWholeConfig is {}", isRestartWholeConfig);
if (isRestartWholeConfig) {
if (xClusterConfig.getType() != ConfigType.Db) {
if (!isDBScopedReplication) {
createXClusterConfigSetStatusForTablesTask(
xClusterConfig, getTableIds(tableInfoList), XClusterTableConfig.Status.Updating);
}
Expand All @@ -117,7 +127,7 @@ public void run() {
createXClusterConfigSetStatusTask(
xClusterConfig, XClusterConfig.XClusterConfigStatusType.Updating);

if (xClusterConfig.getType() == ConfigType.Db) {
if (isDBScopedReplication) {
addSubtasksToCreateXClusterConfig(
xClusterConfig,
taskParams().getDbs(),
Expand Down Expand Up @@ -149,17 +159,27 @@ public void run() {
.setSubTaskGroupType(UserTaskDetails.SubTaskGroupType.ConfigureUniverse);
}
} else {
createXClusterConfigSetStatusForTablesTask(
xClusterConfig, tableIds, XClusterTableConfig.Status.Updating);
if (!isDBScopedReplication) {
createXClusterConfigSetStatusForTablesTask(
xClusterConfig, tableIds, XClusterTableConfig.Status.Updating);

createRemoveTableFromXClusterConfigSubtasks(
xClusterConfig, tableIds, true /* keepEntry */);

createXClusterConfigSetStatusForTablesTask(
xClusterConfig, tableIds, XClusterTableConfig.Status.Updating);

createRemoveTableFromXClusterConfigSubtasks(
xClusterConfig, tableIds, true /* keepEntry */);
addSubtasksToAddTablesToXClusterConfig(
xClusterConfig, tableInfoList, taskParams().getMainTableIndexTablesMap(), tableIds);
} else {
createXClusterConfigSetStatusTask(
xClusterConfig, XClusterConfig.XClusterConfigStatusType.Updating);

createXClusterConfigSetStatusForTablesTask(
xClusterConfig, tableIds, XClusterTableConfig.Status.Updating);
addSubtasksToRemoveDatabasesFromXClusterConfig(
xClusterConfig, taskParams().getDbs(), true /* keepEntry */);

addSubtasksToAddTablesToXClusterConfig(
xClusterConfig, tableInfoList, taskParams().getMainTableIndexTablesMap(), tableIds);
addSubtasksToAddDatabasesToXClusterConfig(xClusterConfig, taskParams().getDbs());
}
}

createXClusterConfigSetStatusTask(
Expand All @@ -180,22 +200,42 @@ public void run() {
} catch (Exception e) {
log.error("{} hit error : {}", getName(), e.getMessage());

// Set XClusterConfig status to Running if at least one table is running.
Set<String> tablesInRunningStatus =
xClusterConfig.getTableIdsInStatus(
xClusterConfig.getTableIds(), XClusterTableConfig.Status.Running);
if (tablesInRunningStatus.isEmpty()) {
xClusterConfig.updateStatus(XClusterConfigStatusType.Failed);
if (xClusterConfig.getType().equals(XClusterConfig.ConfigType.Db)) {
// Set XClusterConfig status to Running if at least one namespace is running.
Set<String> namespacesInRunningStatus =
xClusterConfig.getNamespaceIdsInStatus(
xClusterConfig.getDbIds(),
Collections.singleton(XClusterNamespaceConfig.Status.Running));
if (namespacesInRunningStatus.isEmpty()) {
xClusterConfig.updateStatus(XClusterConfigStatusType.Failed);
} else {
xClusterConfig.updateStatus(XClusterConfigStatusType.Running);
}
// Set namespaces in updating status to failed.
Set<String> namespacesInUpdatingStatus =
xClusterConfig.getNamespaceIdsInStatus(
taskParams().getDbs(), X_CLUSTER_NAMESPACE_CONFIG_PENDING_STATUS_LIST);
xClusterConfig.updateStatusForNamespaces(
namespacesInUpdatingStatus, XClusterNamespaceConfig.Status.Failed);
} else {
xClusterConfig.updateStatus(XClusterConfigStatusType.Running);

// Set XClusterConfig status to Running if at least one table is running.
Set<String> tablesInRunningStatus =
xClusterConfig.getTableIdsInStatus(
xClusterConfig.getTableIds(), XClusterTableConfig.Status.Running);
if (tablesInRunningStatus.isEmpty()) {
xClusterConfig.updateStatus(XClusterConfigStatusType.Failed);
} else {
xClusterConfig.updateStatus(XClusterConfigStatusType.Running);
}
// Set tables in updating status to failed.
Set<String> tablesInUpdatingStatus =
xClusterConfig.getTableIdsInStatus(
getTableIds(taskParams().getTableInfoList()),
X_CLUSTER_TABLE_CONFIG_PENDING_STATUS_LIST);
xClusterConfig.updateStatusForTables(
tablesInUpdatingStatus, XClusterTableConfig.Status.Failed);
}
// Set tables in updating status to failed.
Set<String> tablesInUpdatingStatus =
xClusterConfig.getTableIdsInStatus(
getTableIds(taskParams().getTableInfoList()),
X_CLUSTER_TABLE_CONFIG_PENDING_STATUS_LIST);
xClusterConfig.updateStatusForTables(
tablesInUpdatingStatus, XClusterTableConfig.Status.Failed);
// Set backup and restore status to failed and alter load balanced.
boolean isLoadBalancerAltered = false;
for (Restore restore : restoreList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3045,16 +3045,18 @@ protected SubTaskGroup createXClusterAddNamespaceToOutboundReplicationGroupTask(
*
* @param xClusterConfig config used
* @param dbId db id on the source universe that are being added to checkpoint.
* @param keepEntry whether to keep the entry in the YBA DB.
* @return The created subtask group
*/
protected SubTaskGroup createXClusterRemoveNamespaceFromOutboundReplicationGroupTask(
XClusterConfig xClusterConfig, String dbId) {
XClusterConfig xClusterConfig, String dbId, boolean keepEntry) {
SubTaskGroup subTaskGroup =
createSubTaskGroup("XClusterRemoveNamespaceFromOutboundReplication");
XClusterRemoveNamespaceFromOutboundReplicationGroup.Params taskParams =
new XClusterRemoveNamespaceFromOutboundReplicationGroup.Params();
taskParams.xClusterConfig = xClusterConfig;
taskParams.dbToRemove = dbId;
taskParams.keepEntry = keepEntry;
XClusterRemoveNamespaceFromOutboundReplicationGroup task =
createTask(XClusterRemoveNamespaceFromOutboundReplicationGroup.class);
task.initialize(taskParams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ public static class Params extends XClusterConfigTaskParams {
// The parent xCluster config must be stored in xClusterConfig field.
// The db to be removed from the xcluster replication must be stored in the dbToRemove field.
public String dbToRemove;

public boolean keepEntry;
}

@Override
public String getName() {
return String.format(
"%s(xClusterConfig=%s,dbToRemove=%s,keepEntry=%s)",
this.getClass().getSimpleName(),
taskParams().getXClusterConfig().getUuid(),
taskParams().dbToRemove,
taskParams().keepEntry);
}

@Override
Expand Down Expand Up @@ -71,7 +83,12 @@ public void run() {
"XClusterRemoveNamespaceFromOutboundReplicationGroup rpc failed with error: %s",
createResponse.errorMessage()));
}
xClusterConfig.removeNamespaces(Set.of(dbId));
if (!taskParams().keepEntry) {
log.info(
"Removing db id: {} from xClusterConfig Object {}", dbId, xClusterConfig.getUuid());
xClusterConfig.removeNamespaces(Set.of(dbId));
}

log.debug(
"Removing source db id: {} from xClusterConfig {} completed",
taskParams().getDbToRemove(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ public void testDbScopedSwitchover() throws InterruptedException {
deleteDrConfig(drConfigUUID, newSourceUniverse, newTargetUniverse);
}

// @Test
@Test
public void testDbScopedRestart() throws InterruptedException {
CreateDRMetadata createData = defaultDbDRCreate();
UUID drConfigUUID = createData.drConfigUUID;
Expand Down

0 comments on commit f0eab8f

Please sign in to comment.