Skip to content

Commit

Permalink
[BACKPORT 2.23.0.3690][PLAT-14846] Add pause/resume dr config along w…
Browse files Browse the repository at this point in the history
…ith associated universes workflow

Summary:
Original commit:
f410277 / D37598

Add 2 APIs to allow both pausing/resuming DR config + pausing/resuming underlying VMs.

To pause DR replication + pause both source and target universes:
```
curl --location --request POST 'http://localhost:9000/api/v1/customers/{customer_uuid}/dr_configs/{dr_config_uuid}/pause_universes' \
--header 'X-AUTH-YW-API-TOKEN: fef7705f-950f-42ae-b8e2-10173157fe8b'
```

To resume DR replicaiton + resume both source and target universes:
```
curl --location --request POST 'http://localhost:9000/api/v1/customers/f33e3c9b-75ab-4c30-80ad-cba85646ea39/dr_configs/1a4990cb-3ea3-4c8b-ae0d-2d637cfd5c75/resume_universes' \
--header 'X-AUTH-YW-API-TOKEN: fef7705f-950f-42ae-b8e2-10173157fe8b'
```

During `pause_universes` we will set the gflag for tserver `log_min_seconds_to_retain` to a very high value both in memory and written to conf file before pausing replication and pausing the universes.

During `resume_universes` we will resume/start the universes, wait for replication drain, and then revert the `log_min_seconds_to_retain` gflag from the tserver to it's original value both in memory and in the tserver conf file.

Test Plan:
Added both UTs + local provider test.

Manually tested the following scenario:
1. Set up db scoped DR replication. Set the `log_min_seconds_to_retain` to a random value.
2. Run the `pause_universes` api endpoint. During the task run, check that the tserver conf file has the `log_min_seconds_to_retain` set to high value and that replication is disabled
3. Run `resume_universes`. Make sure replication is enabled again, replication drain subtask is generated, and the tserver conf file has `log_min_seconds_to_retain` set back to original override. Also check the tserver UI for gflags and make sure it is set to the original override also.
4. Run some inserts into tables on the source universe and make sure replicated on target universe.

Reviewers: hzare, sanketh, jmak, spothuraju, amindrov

Reviewed By: sanketh

Subscribers: svc_phabricator, yugaware, anijhawan

Differential Revision: https://phorge.dev.yugabyte.com/D38840
  • Loading branch information
charleswang234 committed Oct 11, 2024
1 parent 9b82e22 commit 78485cc
Show file tree
Hide file tree
Showing 17 changed files with 1,043 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,11 @@

package com.yugabyte.yw.commissioner.tasks;

import com.google.common.collect.Sets;
import com.yugabyte.yw.commissioner.BaseTaskDependencies;
import com.yugabyte.yw.commissioner.UserTaskDetails.SubTaskGroupType;
import com.yugabyte.yw.commissioner.tasks.params.NodeTaskParams;
import com.yugabyte.yw.forms.UniverseDefinitionTaskParams;
import com.yugabyte.yw.forms.UniverseTaskParams;
import com.yugabyte.yw.models.Universe;
import com.yugabyte.yw.models.helpers.NodeDetails;
import com.yugabyte.yw.models.helpers.NodeDetails.NodeState;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -57,71 +47,13 @@ public void run() {
throw new RuntimeException(msg);
}

preTaskActions();
createPauseUniverseTasks(universe, params().customerUUID);

Map<UUID, UniverseDefinitionTaskParams.Cluster> clusterMap =
universe.getUniverseDetails().clusters.stream()
.collect(Collectors.toMap(c -> c.uuid, c -> c));

Set<NodeDetails> tserverNodes =
universe.getTServers().stream()
.filter(tserverNode -> tserverNode.state == NodeDetails.NodeState.Live)
.collect(Collectors.toSet());
Set<NodeDetails> masterNodes =
universe.getMasters().stream()
.filter(masterNode -> masterNode.state == NodeDetails.NodeState.Live)
.collect(Collectors.toSet());

for (NodeDetails node : Sets.union(masterNodes, tserverNodes)) {
if (!node.disksAreMountedByUUID) {
UniverseDefinitionTaskParams.Cluster cluster = clusterMap.get(node.placementUuid);
createUpdateMountedDisksTask(
node, node.getInstanceType(), cluster.userIntent.getDeviceInfoForNode(node));
}
}

// Stop yb-controller processes on nodes
if (universe.isYbcEnabled()) {
createStopYbControllerTasks(tserverNodes)
.setSubTaskGroupType(SubTaskGroupType.StoppingNodeProcesses);
}

createSetNodeStateTasks(tserverNodes, NodeState.Stopping)
.setSubTaskGroupType(SubTaskGroupType.StoppingNodeProcesses);
createStopServerTasks(tserverNodes, ServerType.TSERVER, false)
.setSubTaskGroupType(SubTaskGroupType.StoppingNodeProcesses);
createSetNodeStateTasks(masterNodes, NodeState.Stopping);
createStopMasterTasks(masterNodes)
.setSubTaskGroupType(SubTaskGroupType.StoppingNodeProcesses);

if (!universe.getUniverseDetails().isImportedUniverse()) {
// Create tasks to pause the existing nodes.
Collection<NodeDetails> activeUniverseNodes = getActiveUniverseNodes(universe.getNodes());
createPauseServerTasks(universe, activeUniverseNodes) // Pass in filtered nodes
.setSubTaskGroupType(SubTaskGroupType.PauseUniverse);
}
createSwamperTargetUpdateTask(false);
// Remove alert definition files.
createUnivManageAlertDefinitionsTask(false)
.setSubTaskGroupType(SubTaskGroupType.PauseUniverse);
// Mark universe task state to success.
createMarkUniverseUpdateSuccessTasks().setSubTaskGroupType(SubTaskGroupType.PauseUniverse);
// Run all the tasks.
getRunnableTask().runSubTasks();

saveUniverseDetails(
u -> {
UniverseDefinitionTaskParams universeDetails = u.getUniverseDetails();
universeDetails.universePaused = true;
for (NodeDetails node : universeDetails.nodeDetailsSet) {
if (node.isMaster || node.isTserver) {
node.disksAreMountedByUUID = true;
}
}
u.setUniverseDetails(universeDetails);
});
getRunnableTask().runSubTasks();

metricService.markSourceInactive(params().customerUUID, params().getUniverseUUID());
} catch (Throwable t) {
log.error("Error executing task {} with error='{}'.", getName(), t.getMessage(), t);
throw t;
Expand All @@ -130,21 +62,4 @@ public void run() {
}
log.info("Finished {} task.", getName());
}

private Collection<NodeDetails> getActiveUniverseNodes(Collection<NodeDetails> universeNodes) {
Collection<NodeDetails> activeNodes = new HashSet<>();
for (NodeDetails node : universeNodes) {
NodeTaskParams nodeParams = new NodeTaskParams();
nodeParams.setUniverseUUID(taskParams().getUniverseUUID());
nodeParams.nodeName = node.nodeName;
nodeParams.nodeUuid = node.nodeUuid;
nodeParams.azUuid = node.azUuid;
nodeParams.placementUuid = node.placementUuid;

if (instanceExists(nodeParams)) {
activeNodes.add(node);
}
}
return activeNodes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.yugabyte.yw.commissioner.tasks;

import com.yugabyte.yw.commissioner.BaseTaskDependencies;
import com.yugabyte.yw.commissioner.UserTaskDetails.SubTaskGroupType;
import com.yugabyte.yw.common.XClusterUniverseService;
import com.yugabyte.yw.models.Customer;
import com.yugabyte.yw.models.Universe;
import com.yugabyte.yw.models.XClusterConfig;
import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class PauseXClusterUniverses extends XClusterConfigTaskBase {

@Inject
protected PauseXClusterUniverses(
BaseTaskDependencies baseTaskDependencies, XClusterUniverseService xClusterUniverseService) {
super(baseTaskDependencies, xClusterUniverseService);
}

@Override
public void run() {
log.info("Running {}", getName());

XClusterConfig xClusterConfig = getXClusterConfigFromTaskParams();
Universe sourceUniverse = Universe.getOrBadRequest(xClusterConfig.getSourceUniverseUUID());
Universe targetUniverse = Universe.getOrBadRequest(xClusterConfig.getTargetUniverseUUID());

try {
// Lock the source universe.
lockAndFreezeUniverseForUpdate(
sourceUniverse.getUniverseUUID(), sourceUniverse.getVersion(), null /* Txn callback */);
try {
// Lock the target universe.
lockAndFreezeUniverseForUpdate(
targetUniverse.getUniverseUUID(), targetUniverse.getVersion(), null /* Txn callback */);

// Used in createUpdateWalRetentionTasks.
taskParams().setUniverseUUID(sourceUniverse.getUniverseUUID());
taskParams().clusters = sourceUniverse.getUniverseDetails().clusters;
createUpdateWalRetentionTasks(sourceUniverse, XClusterUniverseAction.PAUSE);

createSetReplicationPausedTask(xClusterConfig, true /* pause */);

createPauseUniverseTasks(
sourceUniverse, Customer.get(sourceUniverse.getCustomerId()).getUuid());

taskParams().setUniverseUUID(targetUniverse.getUniverseUUID());
createPauseUniverseTasks(
targetUniverse, Customer.get(targetUniverse.getCustomerId()).getUuid());

createMarkUniverseUpdateSuccessTasks(sourceUniverse.getUniverseUUID())
.setSubTaskGroupType(SubTaskGroupType.PauseUniverse);

createMarkUniverseUpdateSuccessTasks(targetUniverse.getUniverseUUID())
.setSubTaskGroupType(SubTaskGroupType.PauseUniverse);

getRunnableTask().runSubTasks();

} finally {
// Unlock the target universe.
unlockUniverseForUpdate(targetUniverse.getUniverseUUID());
}
} catch (Throwable t) {
log.error("{} hit error : {}", getName(), t.getMessage());
throw t;
} finally {
// Unlock the source universe.
unlockUniverseForUpdate(sourceUniverse.getUniverseUUID());
}
log.info("Completed {}", getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,13 @@

package com.yugabyte.yw.commissioner.tasks;

import static com.yugabyte.yw.commissioner.UserTaskDetails.SubTaskGroupType.RotatingCert;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.yugabyte.yw.commissioner.BaseTaskDependencies;
import com.yugabyte.yw.commissioner.Common.CloudType;
import com.yugabyte.yw.commissioner.UserTaskDetails.SubTaskGroupType;
import com.yugabyte.yw.common.certmgmt.CertConfigType;
import com.yugabyte.yw.forms.CertsRotateParams;
import com.yugabyte.yw.forms.UniverseDefinitionTaskParams;
import com.yugabyte.yw.models.CertificateInfo;
import com.yugabyte.yw.models.Universe;
import com.yugabyte.yw.models.helpers.NodeDetails;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -59,101 +47,13 @@ public void run() {
// Update the universe DB with the update to be performed and set the 'updateInProgress' flag
// to prevent other updates from happening.
Universe universe = lockAndFreezeUniverseForUpdate(-1, null /* Txn callback */);
UniverseDefinitionTaskParams universeDetails = universe.getUniverseDetails();
Collection<NodeDetails> nodes = universe.getNodes();

if (!universeDetails.isImportedUniverse()) {
// Create tasks to resume the existing nodes.
createResumeServerTasks(universe).setSubTaskGroupType(SubTaskGroupType.ResumeUniverse);
}

List<NodeDetails> tserverNodeList = universe.getTServers();
List<NodeDetails> masterNodeList = universe.getMasters();

if (universeDetails.getPrimaryCluster().userIntent.providerType == CloudType.azu) {
createServerInfoTasks(nodes).setSubTaskGroupType(SubTaskGroupType.Provisioning);
}

// Optimistically rotate node-to-node server certificates before starting DB processes
// Also see CertsRotate
if (universeDetails.rootCA != null) {
CertificateInfo rootCert = CertificateInfo.get(universeDetails.rootCA);

if (rootCert == null) {
log.error("Root certificate not found for {}", universe.getUniverseUUID());
} else if (rootCert.getCertType() == CertConfigType.SelfSigned) {
SubTaskGroupType certRotate = RotatingCert;
taskParams().rootCA = universeDetails.rootCA;
taskParams().setClientRootCA(universeDetails.getClientRootCA());
createCertUpdateTasks(
masterNodeList,
tserverNodeList,
certRotate,
CertsRotateParams.CertRotationType.ServerCert,
CertsRotateParams.CertRotationType.None);
createUniverseSetTlsParamsTask(certRotate);
}
}

// Make sure clock skew is low enough on the master nodes.
createWaitForClockSyncTasks(universe, masterNodeList)
.setSubTaskGroupType(SubTaskGroupType.StartingMasterProcess);

createStartMasterProcessTasks(masterNodeList);
for (NodeDetails nodeDetails : masterNodeList) {
createWaitForServerReady(nodeDetails, ServerType.MASTER)
.setSubTaskGroupType(SubTaskGroupType.StartingNodeProcesses);
}

// Make sure clock skew is low enough on the tserver nodes.
createWaitForClockSyncTasks(universe, tserverNodeList)
.setSubTaskGroupType(SubTaskGroupType.StartingNodeProcesses);
createResumeUniverseTasks(universe, params().customerUUID);

createStartTServerTasks(tserverNodeList)
.setSubTaskGroupType(SubTaskGroupType.StartingNodeProcesses);
createWaitForServersTasks(tserverNodeList, ServerType.TSERVER)
.setSubTaskGroupType(SubTaskGroupType.StartingNodeProcesses);
for (NodeDetails nodeDetails : tserverNodeList) {
createWaitForServerReady(nodeDetails, ServerType.TSERVER)
.setSubTaskGroupType(SubTaskGroupType.StartingNodeProcesses);
}

if (universe.isYbcEnabled()) {
createStartYbcTasks(tserverNodeList)
.setSubTaskGroupType(SubTaskGroupType.StartingNodeProcesses);

// Wait for yb-controller to be responsive on each node.
createWaitForYbcServerTask(new HashSet<>(tserverNodeList))
.setSubTaskGroupType(SubTaskGroupType.ConfigureUniverse);
}

// Set the node state to live.
Set<NodeDetails> nodesToMarkLive =
nodes.stream()
.filter(node -> node.isMaster || node.isTserver)
.collect(Collectors.toSet());
createSetNodeStateTasks(nodesToMarkLive, NodeDetails.NodeState.Live)
.setSubTaskGroupType(SubTaskGroupType.ConfigureUniverse);

// Create alert definition files.
createUnivManageAlertDefinitionsTask(true)
.setSubTaskGroupType(SubTaskGroupType.ResumeUniverse);

createSwamperTargetUpdateTask(false);
// Mark universe task state to success.
createMarkUniverseUpdateSuccessTasks().setSubTaskGroupType(SubTaskGroupType.ResumeUniverse);

// Run all the tasks.
getRunnableTask().runSubTasks();

saveUniverseDetails(
u -> {
UniverseDefinitionTaskParams details = u.getUniverseDetails();
details.universePaused = false;
u.setUniverseDetails(details);
});

metricService.markSourceActive(params().customerUUID, params().getUniverseUUID());
} catch (Throwable t) {
log.error("Error executing task {} with error='{}'.", getName(), t.getMessage(), t);
throw t;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) YugaByte, Inc.

package com.yugabyte.yw.commissioner.tasks;

import com.yugabyte.yw.commissioner.BaseTaskDependencies;
import com.yugabyte.yw.commissioner.UserTaskDetails.SubTaskGroupType;
import com.yugabyte.yw.common.XClusterUniverseService;
import com.yugabyte.yw.models.Customer;
import com.yugabyte.yw.models.Universe;
import com.yugabyte.yw.models.XClusterConfig;
import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ResumeXClusterUniverses extends XClusterConfigTaskBase {

@Inject
protected ResumeXClusterUniverses(
BaseTaskDependencies baseTaskDependencies, XClusterUniverseService xClusterUniverseService) {
super(baseTaskDependencies, xClusterUniverseService);
}

@Override
public void run() {
log.info("Running {}", getName());

XClusterConfig xClusterConfig = getXClusterConfigFromTaskParams();
Universe sourceUniverse = Universe.getOrBadRequest(xClusterConfig.getSourceUniverseUUID());
Universe targetUniverse = Universe.getOrBadRequest(xClusterConfig.getTargetUniverseUUID());

try {
// Lock the source universe.
lockAndFreezeUniverseForUpdate(
sourceUniverse.getUniverseUUID(), sourceUniverse.getVersion(), null /* Txn callback */);
try {
// Lock the target universe.
lockAndFreezeUniverseForUpdate(
targetUniverse.getUniverseUUID(), targetUniverse.getVersion(), null /* Txn callback */);

taskParams().setUniverseUUID(sourceUniverse.getUniverseUUID());
taskParams().clusters = sourceUniverse.getUniverseDetails().clusters;
createResumeUniverseTasks(
sourceUniverse, Customer.get(sourceUniverse.getCustomerId()).getUuid());

taskParams().setUniverseUUID(targetUniverse.getUniverseUUID());
taskParams().clusters = targetUniverse.getUniverseDetails().clusters;
createResumeUniverseTasks(
targetUniverse, Customer.get(targetUniverse.getCustomerId()).getUuid());

createSetReplicationPausedTask(xClusterConfig, false /* pause */);
createWaitForReplicationDrainTask(xClusterConfig);

// Used in createUpdateWalRetentionTasks.
taskParams().setUniverseUUID(sourceUniverse.getUniverseUUID());
taskParams().clusters = sourceUniverse.getUniverseDetails().clusters;
createUpdateWalRetentionTasks(sourceUniverse, XClusterUniverseAction.RESUME);

createMarkUniverseUpdateSuccessTasks(targetUniverse.getUniverseUUID())
.setSubTaskGroupType(SubTaskGroupType.ResumeUniverse);

createMarkUniverseUpdateSuccessTasks(sourceUniverse.getUniverseUUID())
.setSubTaskGroupType(SubTaskGroupType.ResumeUniverse);

getRunnableTask().runSubTasks();
} finally {
// Unlock the target universe.
unlockUniverseForUpdate(targetUniverse.getUniverseUUID());
}
} catch (Throwable t) {
log.error("{} hit error : {}", getName(), t.getMessage());
throw t;
} finally {
// Unlock the source universe.
unlockUniverseForUpdate(sourceUniverse.getUniverseUUID());
}
log.info("Completed {}", getName());
}
}
Loading

0 comments on commit 78485cc

Please sign in to comment.