From b039d1a77a80ea9b3547e0ab5fd1804b64548b85 Mon Sep 17 00:00:00 2001 From: Naorem Khogendro Singh Date: Fri, 19 Jul 2024 14:47:02 -0700 Subject: [PATCH] [PLAT-14366] Basic local provider test for master auto failover Summary: Local provider test for auto-master failover. Test Plan: Ran the failover in a docker container. ``` [info] Test run com.yugabyte.yw.commissioner.tasks.local.AutoMasterFailoverLocalTest finished: 0 failed, 0 ignored, 1 total, 364.275s [info] Passed: Total 1, Failed 0, Errors 0, Passed 1 ``` Reviewers: sanketh, cwang, yshchetinin Reviewed By: cwang Subscribers: yugaware Differential Revision: https://phorge.dev.yugabyte.com/D36726 --- .../opscli/ybops/cloud/common/method.py | 8 +- .../yw/commissioner/AutoMasterFailover.java | 31 ++- .../AutoMasterFailoverScheduler.java | 180 ++++++++++++------ .../tasks/SyncMasterAddresses.java | 31 ++- .../commissioner/tasks/UniverseTaskBase.java | 50 +++++ .../subtasks/CheckLeaderlessTablets.java | 1 + .../java/com/yugabyte/yw/common/AppInit.java | 16 +- .../yugabyte/yw/common/LocalNodeManager.java | 28 ++- .../yw/common/LocalNodeUniverseManager.java | 2 - .../yw/common/config/GlobalConfKeys.java | 8 + .../com/yugabyte/yw/models/CustomerTask.java | 12 ++ .../com/yugabyte/yw/models/JobSchedule.java | 18 ++ .../yw/models/helpers/schedule/JobConfig.java | 7 +- .../yugabyte/yw/scheduler/JobScheduler.java | 152 ++++++++++----- managed/src/main/resources/reference.conf | 2 +- .../local/AutoMasterFailoverLocalTest.java | 156 +++++++++++++++ .../local/LocalProviderUniverseTestBase.java | 67 ++++++- .../yw/scheduler/JobSchedulerTest.java | 11 +- 18 files changed, 626 insertions(+), 154 deletions(-) create mode 100644 managed/src/test/java/com/yugabyte/yw/commissioner/tasks/local/AutoMasterFailoverLocalTest.java diff --git a/managed/devops/opscli/ybops/cloud/common/method.py b/managed/devops/opscli/ybops/cloud/common/method.py index d5a80431627f..b4ba3f590844 100644 --- a/managed/devops/opscli/ybops/cloud/common/method.py +++ b/managed/devops/opscli/ybops/cloud/common/method.py @@ -854,8 +854,8 @@ def callback(self, args): raise YBOpsRecoverableError("Could not connect({}) into node {}:{} using username {}" .format(host_port_user["connection_type"], host_port_user["host"], - host_port_user["user"], - host_port_user["port"])) + host_port_user["port"], + host_port_user["user"])) def get_device_names(self, args, host_info=None): return self.cloud.get_device_names(args) @@ -2183,5 +2183,5 @@ def callback(self, args): raise YBOpsRecoverableError("Could not connect({}) into node {}:{} using username {}" .format(host_port_user["connection_type"], host_port_user["host"], - host_port_user["user"], - host_port_user["port"])) + host_port_user["port"], + host_port_user["user"])) diff --git a/managed/src/main/java/com/yugabyte/yw/commissioner/AutoMasterFailover.java b/managed/src/main/java/com/yugabyte/yw/commissioner/AutoMasterFailover.java index 784a61dae710..bf443b5bd8d4 100644 --- a/managed/src/main/java/com/yugabyte/yw/commissioner/AutoMasterFailover.java +++ b/managed/src/main/java/com/yugabyte/yw/commissioner/AutoMasterFailover.java @@ -26,9 +26,6 @@ import com.yugabyte.yw.models.helpers.NodeDetails.NodeState; import com.yugabyte.yw.models.helpers.TaskType; import com.yugabyte.yw.models.helpers.schedule.JobConfig.RuntimeParams; -import java.time.Duration; -import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -37,7 +34,9 @@ import java.util.UUID; import lombok.Builder; import lombok.Getter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import org.yb.client.GetMasterHeartbeatDelaysResponse; import org.yb.client.YBClient; import org.yb.util.ServerInfo; @@ -55,6 +54,7 @@ public class AutoMasterFailover extends UniverseDefinitionTaskBase { @Builder @Getter + @ToString // The fail-over action to be performed as a result of the detection. static class Action { @Builder.Default ActionType actionType = ActionType.NONE; @@ -413,7 +413,7 @@ public Action getAllowedMasterFailoverAction(Customer customer, Universe univers @VisibleForTesting Map getFollowerLagMs(String ip, int port) { String endpoint = String.format(FOLLOWER_LAG_URL_FORMAT, ip, port); - log.info("Getting follower lag for endpoint {} {}", endpoint, nodeUIApiHelper); + log.info("Getting follower lag for endpoint {}", endpoint); try { JsonNode currentNodeMetricsJson = nodeUIApiHelper.getRequest(endpoint); JsonNode errors = currentNodeMetricsJson.get("error"); @@ -439,21 +439,6 @@ Map getFollowerLagMs(String ip, int port) { private CustomerTask submitMasterFailoverTask( Customer customer, Universe universe, String failedNodeName) { - CustomerTask lastTask = CustomerTask.getLastTaskByTargetUuid(universe.getUniverseUUID()); - if (lastTask != null && lastTask.getCompletionTime() != null) { - // Cooldown is calculated from the last task. - Duration cooldownPeriod = - confGetter.getConfForScope(universe, UniverseConfKeys.autoMasterFailoverCooldown); - Instant restrictionEndTime = - lastTask - .getCompletionTime() - .toInstant() - .plus(cooldownPeriod.getSeconds(), ChronoUnit.SECONDS); - if (restrictionEndTime.isAfter(Instant.now())) { - log.info("Universe {} is cooling down", universe.getUniverseUUID()); - return null; - } - } NodeDetails node = universe.getNode(failedNodeName); NodeDetails possibleReplacementCandidate = findReplacementMaster(universe, node); if (possibleReplacementCandidate == null) { @@ -467,6 +452,14 @@ private CustomerTask submitMasterFailoverTask( "Found a possible replacement master candidate {} for universe {}", possibleReplacementCandidate.getNodeName(), universe.getUniverseUUID()); + Set leaderlessTablets = getLeaderlessTablets(universe.getUniverseUUID()); + if (CollectionUtils.isNotEmpty(leaderlessTablets)) { + log.error( + "Leaderless tablets {} found for universe {}", + Iterables.limit(leaderlessTablets, 10), + universe.getUniverseUUID()); + return null; + } NodeTaskParams taskParams = new NodeTaskParams(); taskParams.setUniverseUUID(universe.getUniverseUUID()); taskParams.nodeName = failedNodeName; diff --git a/managed/src/main/java/com/yugabyte/yw/commissioner/AutoMasterFailoverScheduler.java b/managed/src/main/java/com/yugabyte/yw/commissioner/AutoMasterFailoverScheduler.java index c0afbd0aa792..8405b97ad973 100644 --- a/managed/src/main/java/com/yugabyte/yw/commissioner/AutoMasterFailoverScheduler.java +++ b/managed/src/main/java/com/yugabyte/yw/commissioner/AutoMasterFailoverScheduler.java @@ -10,18 +10,24 @@ import com.yugabyte.yw.commissioner.Common.CloudType; import com.yugabyte.yw.common.PlatformExecutorFactory; import com.yugabyte.yw.common.PlatformScheduler; +import com.yugabyte.yw.common.Util; +import com.yugabyte.yw.common.config.GlobalConfKeys; import com.yugabyte.yw.common.config.RuntimeConfGetter; import com.yugabyte.yw.common.config.UniverseConfKeys; import com.yugabyte.yw.forms.UniverseDefinitionTaskParams.UserIntent; import com.yugabyte.yw.models.Customer; +import com.yugabyte.yw.models.CustomerTask; import com.yugabyte.yw.models.JobSchedule; import com.yugabyte.yw.models.TaskInfo; import com.yugabyte.yw.models.Universe; +import com.yugabyte.yw.models.helpers.TaskType; import com.yugabyte.yw.models.helpers.schedule.JobConfig; import com.yugabyte.yw.models.helpers.schedule.JobConfig.RuntimeParams; import com.yugabyte.yw.models.helpers.schedule.ScheduleConfig; import com.yugabyte.yw.scheduler.JobScheduler; import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -40,13 +46,11 @@ @Singleton @Slf4j public class AutoMasterFailoverScheduler { - // Polling interval for checking if a universe has the detection schedule created. - private static final String AUTO_MASTER_FAILOVER_POLLER_INTERVAL = - "yb.auto_master_failover.poller_interval"; - private static final String AUTO_MASTER_FAILOVER_POOL_NAME = "auto_master_failover.executor"; private static final String AUTO_MASTER_FAILOVER_SCHEDULE_NAME_FORMAT = "AutoMasterFailover_%s"; private static final String DETECT_MASTER_FAILURE_SCHEDULE_NAME_FORMAT = "DetectMasterFailure_%s"; + private static final String SUPPORTED_DB_STABLE_VERSION = "2.20.3.0-b10"; + private static final String SUPPORTED_DB_PREVIEW_VERSION = "2.21.0.0-b309"; private final RuntimeConfGetter confGetter; private final PlatformExecutorFactory platformExecutorFactory; @@ -117,10 +121,10 @@ public CompletableFuture executeJob(RuntimeParams runtime) { .ifPresent( tf -> { if (tf.getTaskState() == TaskInfo.State.Success) { - // Task executed successfully. Delete the fail-over schedule. + // Task executed successfully. Disable the schedule for tracking. runtime .getJobScheduler() - .deleteSchedule(runtime.getJobSchedule().getUuid()); + .disableSchedule(runtime.getJobSchedule().getUuid(), true); } else { // Fail the job and keep the schedule to keep track of the failed // counts. @@ -142,7 +146,7 @@ public CompletableFuture executeJob(RuntimeParams runtime) { public void init() { Duration pollingInterval = - confGetter.getStaticConf().getDuration(AUTO_MASTER_FAILOVER_POLLER_INTERVAL); + confGetter.getGlobalConf(GlobalConfKeys.autoMasterFailoverPollerInterval); platformScheduler.schedule( getClass().getSimpleName(), pollingInterval, pollingInterval, this::createSchedules); failoverExecutor = @@ -169,19 +173,51 @@ private void createSchedules() { u -> { UserIntent userIntent = u.getUniverseDetails().getPrimaryCluster().userIntent; - if (userIntent != null - && userIntent.providerType != CloudType.kubernetes) { - try { - createDetectMasterFailureSchedule(c, u); - } catch (Exception e) { - log.error( - "Error in creating master failure detection schedule for universe" - + " {} - {}", - u.getUniverseUUID(), - e.getMessage()); - } finally { - universeUuids.add(u.getUniverseUUID()); - } + if (userIntent == null + || userIntent.providerType == CloudType.kubernetes) { + return; + } + boolean isFailoverEnabled = + confGetter.getConfForScope( + u, UniverseConfKeys.enableAutoMasterFailover); + if (!isFailoverEnabled) { + log.debug( + "Automated master failover for universe {} is disabled", + u.getUniverseUUID()); + return; + } + if (u.getUniverseDetails().universePaused) { + log.debug( + "Automated master failover for universe {} is paused", + u.getUniverseUUID()); + return; + } + String ybDbVersion = userIntent.ybSoftwareVersion; + if (Util.compareYBVersions( + ybDbVersion, + SUPPORTED_DB_STABLE_VERSION, + SUPPORTED_DB_PREVIEW_VERSION, + true) + < 0) { + log.info( + "Auto master failover not supported in current version {}", + ybDbVersion); + log.info( + "Supported versions are from {} (stable) and {} (preview)", + SUPPORTED_DB_STABLE_VERSION, + SUPPORTED_DB_PREVIEW_VERSION); + return; + } + try { + createDetectMasterFailureSchedule(c, u); + } catch (Exception e) { + log.error( + "Error in creating master failure detection schedule for universe" + + " {} - {}", + u.getUniverseUUID(), + e.getMessage()); + } finally { + universeUuids.add(u.getUniverseUUID()); } })); try { @@ -199,41 +235,22 @@ private void createSchedules() { /** Create a schedule to detect master failure for the given universe */ private void createDetectMasterFailureSchedule(Customer customer, Universe universe) { - boolean isFailoverEnabled = - confGetter.getConfForScope(universe, UniverseConfKeys.enableAutoMasterFailover); - if (!isFailoverEnabled) { - log.debug( - "Skipping automated master failover for universe {} because it is disabled", - universe.getUniverseUUID(), - isFailoverEnabled); - // Schedule is no longer needed. - jobScheduler - .maybeGetSchedule(customer.getUuid(), getDetectMasterFailureScheduleName(universe)) - .ifPresent(s -> jobScheduler.deleteSchedule(s.getUuid())); - return; - } - if (universe.getUniverseDetails().universePaused) { - log.debug( - "Skipping automated master failover for universe {} because it is paused", - universe.getUniverseUUID()); - // Schedule is no longer needed. - jobScheduler - .maybeGetSchedule(customer.getUuid(), getDetectMasterFailureScheduleName(universe)) - .ifPresent(s -> jobScheduler.deleteSchedule(s.getUuid())); - return; - } log.trace( "Creating master failure detection schedule for universe {} if it is absent", universe.getUniverseUUID()); Duration detectionInterval = confGetter.getConfForScope(universe, UniverseConfKeys.autoMasterFailoverDetectionInterval); + log.debug( + "Master failover detection interval is set to {} seconds for universe {}", + detectionInterval.getSeconds(), + universe.getUniverseUUID()); String scheduleName = getDetectMasterFailureScheduleName(universe); Optional optional = jobScheduler.maybeGetSchedule(customer.getUuid(), scheduleName); if (optional.isPresent()) { ScheduleConfig scheduleConfig = optional.get().getScheduleConfig(); if (!detectionInterval.equals(scheduleConfig.getInterval())) { - log.debug( + log.info( "Failover detection schedule has changed from {} to {}", scheduleConfig.getInterval(), detectionInterval); @@ -242,7 +259,7 @@ private void createDetectMasterFailureSchedule(Customer customer, Universe unive scheduleConfig.toBuilder().interval(detectionInterval).build()); } } else { - log.debug( + log.info( "Creating master failure detection schedule for universe {}", universe.getUniverseUUID()); JobSchedule jobSchedule = new JobSchedule(); jobSchedule.setCustomerUuid(customer.getUuid()); @@ -270,13 +287,6 @@ private void detectMasterFailure( // Let the creator of this schedule handle the life-cycle. return; } - if (universe.getUniverseDetails().universePaused) { - log.debug( - "Skipping automated master failover for universe {} because it is paused", - universe.getUniverseUUID()); - // Let the creator of this schedule handle the life-cycle. - return; - } if (universe.universeIsLocked()) { log.info( "Skipping master failover for universe {} because it is already being updated", @@ -287,26 +297,77 @@ private void detectMasterFailure( String scheduleName = getAutoMasterFailoverScheduleName(universe); Action action = autoMasterFailover.getAllowedMasterFailoverAction(customer, universe); if (action.getActionType() == ActionType.NONE) { - // No fail-over action can be performed. Remove the schedule to restart fresh. + // No fail-over action can be performed. Disable to keep track of the last run. jobScheduler .maybeGetSchedule(customer.getUuid(), scheduleName) - .ifPresent(s -> jobScheduler.deleteSchedule(s.getUuid())); + .ifPresent(s -> jobScheduler.disableSchedule(s.getUuid(), true)); return; } - log.debug("Detected master failure for universe {}", universe.getUniverseUUID()); + log.info( + "Detected master failure for universe {}, next action {}", + universe.getUniverseUUID(), + action); + if (action.getActionType() == ActionType.SUBMIT + && action.getTaskType() == TaskType.MasterFailover) { + Optional optional = + CustomerTask.maybeGetLastTaskByTargetUuidTaskType( + universe.getUniverseUUID(), CustomerTask.TaskType.MasterFailover); + if (optional.isPresent()) { + // Cooldown for a new master failover is calculated from the master failover task. + Duration cooldownPeriod = + confGetter.getConfForScope(universe, UniverseConfKeys.autoMasterFailoverCooldown); + log.debug( + "Cooldown period is set to {} seconds for universe {}", + cooldownPeriod.getSeconds(), + universe.getUniverseUUID()); + Instant restrictionEndTime = + optional + .get() + .getCompletionTime() + .toInstant() + .plus(cooldownPeriod.getSeconds(), ChronoUnit.SECONDS); + Instant now = Instant.now(); + if (restrictionEndTime.isAfter(now)) { + long diffSecs = now.until(restrictionEndTime, ChronoUnit.SECONDS); + log.info( + "Universe {} is cooling down for {} seconds", universe.getUniverseUUID(), diffSecs); + jobScheduler + .maybeGetSchedule(customer.getUuid(), scheduleName) + .ifPresent(s -> jobScheduler.disableSchedule(s.getUuid(), true)); + return; + } + } + } Duration taskInterval = confGetter.getConfForScope(universe, UniverseConfKeys.autoMasterFailoverTaskInterval); + log.debug( + "Task interval period is set to {} seconds for universe {}", + taskInterval.getSeconds(), + universe.getUniverseUUID()); Optional optional = jobScheduler.maybeGetSchedule(customer.getUuid(), scheduleName); if (optional.isPresent()) { ScheduleConfig scheduleConfig = optional.get().getScheduleConfig(); - if (!taskInterval.equals(scheduleConfig.getInterval())) { + if (scheduleConfig.isDisabled()) { + jobScheduler.disableSchedule(optional.get().getUuid(), false); + log.info( + "Enabled schedule for action {} to perform on universe {} in {} seconds", + action, + universe.getUniverseUUID(), + taskInterval.getSeconds()); + } else if (!taskInterval.equals(scheduleConfig.getInterval())) { log.debug( "Task submission schedule has changed from {} to {}", scheduleConfig.getInterval(), taskInterval); jobScheduler.updateSchedule( - optional.get().getUuid(), scheduleConfig.toBuilder().interval(taskInterval).build()); + optional.get().getUuid(), + scheduleConfig.toBuilder().disabled(false).interval(taskInterval).build()); + log.info( + "Updated schedule for action {} to perform on universe {} in {} seconds", + action, + universe.getUniverseUUID(), + taskInterval.getSeconds()); } } else { JobSchedule jobSchedule = new JobSchedule(); @@ -320,6 +381,11 @@ private void detectMasterFailure( .failoverJobType(FailoverJobType.MASTER_FAILOVER) .build()); jobScheduler.submitSchedule(jobSchedule); + log.info( + "Scheduled action {} to perform on universe {} in {} seconds", + action, + universe.getUniverseUUID(), + taskInterval.getSeconds()); } } } diff --git a/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/SyncMasterAddresses.java b/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/SyncMasterAddresses.java index 6dfcb89a2e03..4f469e68193f 100644 --- a/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/SyncMasterAddresses.java +++ b/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/SyncMasterAddresses.java @@ -3,12 +3,13 @@ package com.yugabyte.yw.commissioner.tasks; import com.yugabyte.yw.commissioner.BaseTaskDependencies; +import com.yugabyte.yw.commissioner.Common.CloudType; import com.yugabyte.yw.commissioner.ITask.Retryable; import com.yugabyte.yw.commissioner.UserTaskDetails.SubTaskGroupType; +import com.yugabyte.yw.forms.UniverseDefinitionTaskParams.Cluster; import com.yugabyte.yw.models.Universe; import com.yugabyte.yw.models.helpers.NodeDetails; import com.yugabyte.yw.models.helpers.NodeDetails.MasterState; -import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -21,6 +22,8 @@ public class SyncMasterAddresses extends UniverseDefinitionTaskBase { private final Set nodesToStop = ConcurrentHashMap.newKeySet(); + private boolean isLocalProvider = false; + @Inject protected SyncMasterAddresses(BaseTaskDependencies baseTaskDependencies) { super(baseTaskDependencies); @@ -30,19 +33,26 @@ protected SyncMasterAddresses(BaseTaskDependencies baseTaskDependencies) { protected void createPrecheckTasks(Universe universe) { super.createPrecheckTasks(universe); if (isFirstTry()) { - Set masterNodes = new HashSet<>(universe.getMasters()); Set nonMasterNodes = universe.getNodes().stream() - .filter(n -> !masterNodes.contains(n)) + .filter(n -> !n.isMaster && n.cloudInfo.private_ip != null) .filter(n -> n.autoSyncMasterAddrs) .collect(Collectors.toSet()); - createCheckProcessStateTask( - universe, - nonMasterNodes, - ServerType.MASTER, - false /* ensureRunning */, - n -> nodesToStop.add(n.nodeName)) - .setSubTaskGroupType(SubTaskGroupType.ValidateConfigurations); + Cluster primaryCluster = universe.getUniverseDetails().getPrimaryCluster(); + if (primaryCluster.userIntent.providerType == CloudType.local) { + isLocalProvider = true; + nonMasterNodes.stream().map(NodeDetails::getNodeName).forEach(n -> nodesToStop.add(n)); + } else { + // Disable this on local provider as the check cannot be done properly as there are multiple + // master processes. + createCheckProcessStateTask( + universe, + nonMasterNodes, + ServerType.MASTER, + false /* ensureRunning */, + n -> nodesToStop.add(n.nodeName)) + .setSubTaskGroupType(SubTaskGroupType.ValidateConfigurations); + } } } @@ -67,6 +77,7 @@ public void run() { ServerType.MASTER, params -> { params.deconfigure = true; + params.isIgnoreError = isLocalProvider; }) .setSubTaskGroupType(SubTaskGroupType.StoppingNodeProcesses); createMasterAddressUpdateTask( diff --git a/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/UniverseTaskBase.java b/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/UniverseTaskBase.java index 817eac76503b..170769ca2a13 100644 --- a/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/UniverseTaskBase.java +++ b/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/UniverseTaskBase.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.api.client.util.Throwables; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; @@ -416,6 +417,8 @@ public enum ServerType { } public static final String DUMP_ENTITIES_URL_SUFFIX = "/dump-entities"; + public static final String TABLET_REPLICATION_URL_SUFFIX = "/api/v1/tablet-replication"; + public static final String LEADERLESS_TABLETS_KEY = "leaderless_tablets"; @Inject protected UniverseTaskBase(BaseTaskDependencies baseTaskDependencies) { @@ -2302,6 +2305,53 @@ public DumpEntitiesResponse dumpDbEntities(Universe universe) { return waitForCheck.retryWithBackoff(1, 2, 10); } + /** + * Fetch leaderless tablets for the universe. + * + * @param universeUuid the universe UUID. + * @return the set of leaderless tablet UUIDs. + */ + public Set getLeaderlessTablets(UUID universeUuid) { + Universe universe = Universe.getOrBadRequest(universeUuid); + String masterAddresses = universe.getMasterAddresses(); + String certificate = universe.getCertificateNodetoNode(); + try (YBClient client = ybService.getClient(masterAddresses, certificate)) { + HostAndPort leaderMasterHostAndPort = client.getLeaderMasterHostAndPort(); + if (leaderMasterHostAndPort == null) { + throw new RuntimeException( + "Could not find the master leader address in universe " + + taskParams().getUniverseUUID()); + } + int httpPort = universe.getUniverseDetails().communicationPorts.masterHttpPort; + HostAndPort hostAndPort = HostAndPort.fromParts(leaderMasterHostAndPort.getHost(), httpPort); + String url = + String.format("http://%s%s", hostAndPort.toString(), TABLET_REPLICATION_URL_SUFFIX); + log.debug("Making url request to endpoint: {}", url); + JsonNode response = nodeUIApiHelper.getRequest(url); + log.debug("Received {}", response); + JsonNode errors = response.get("error"); + if (errors != null) { + throw new RuntimeException("Received error: " + errors.asText()); + } + ArrayNode leaderlessTablets = (ArrayNode) response.get(LEADERLESS_TABLETS_KEY); + if (leaderlessTablets == null) { + throw new RuntimeException( + "Unexpected response, no " + LEADERLESS_TABLETS_KEY + " in it: " + response); + } + Set result = new HashSet<>(); + for (JsonNode leaderlessTabletInfo : leaderlessTablets) { + result.add(leaderlessTabletInfo.get("tablet_uuid").asText()); + } + return result; + } catch (RuntimeException e) { + log.error("Error in getting leaderless tablets {}", e.getMessage()); + throw e; + } catch (Exception e) { + log.error("Error in getting leaderless tablets {}", e.getMessage()); + throw new RuntimeException(e); + } + } + /** * For a given node, finds the tablets assigned to its tserver (if relevant). * diff --git a/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/subtasks/CheckLeaderlessTablets.java b/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/subtasks/CheckLeaderlessTablets.java index df36b7c6f8e4..f7565a188d3e 100644 --- a/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/subtasks/CheckLeaderlessTablets.java +++ b/managed/src/main/java/com/yugabyte/yw/commissioner/tasks/subtasks/CheckLeaderlessTablets.java @@ -94,6 +94,7 @@ public void run() { } } + // TODO Remove this to use the similar method in UniverseTaskBase. private List doGetLeaderlessTablets(YBClient client, int httpPort) { HostAndPort leaderMasterHostAndPort = client.getLeaderMasterHostAndPort(); diff --git a/managed/src/main/java/com/yugabyte/yw/common/AppInit.java b/managed/src/main/java/com/yugabyte/yw/common/AppInit.java index 02440186b6c1..44e2b46ce3e0 100644 --- a/managed/src/main/java/com/yugabyte/yw/common/AppInit.java +++ b/managed/src/main/java/com/yugabyte/yw/common/AppInit.java @@ -55,6 +55,7 @@ import io.prometheus.client.hotspot.DefaultExports; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; import play.Application; import play.Environment; @@ -70,6 +71,8 @@ public class AppInit { Gauge.build("yba_init_time_seconds", "Last YBA startup time in seconds.") .register(CollectorRegistry.defaultRegistry); + private static final AtomicBoolean IS_H2_DB = new AtomicBoolean(false); + @Inject public AppInit( Environment environment, @@ -120,7 +123,13 @@ public AppInit( try { log.info("Yugaware Application has started"); - if (!environment.isTest()) { + if (environment.isTest()) { + String dbDriverKey = "db.default.driver"; + if (config.hasPath(dbDriverKey)) { + String driver = config.getString(dbDriverKey); + IS_H2_DB.set(driver.contains("org.h2.Driver")); + } + } else { // only start thread dump collection for YBM at this time if (config.getBoolean("yb.cloud.enabled")) { threadDumpPublisher.start(); @@ -310,4 +319,9 @@ public AppInit( throw t; } } + + // Workaround for some tests with H2 database. + public static boolean isH2Db() { + return IS_H2_DB.get(); + } } diff --git a/managed/src/main/java/com/yugabyte/yw/common/LocalNodeManager.java b/managed/src/main/java/com/yugabyte/yw/common/LocalNodeManager.java index 0031a7b8f126..6752bd24fbfe 100644 --- a/managed/src/main/java/com/yugabyte/yw/common/LocalNodeManager.java +++ b/managed/src/main/java/com/yugabyte/yw/common/LocalNodeManager.java @@ -33,6 +33,7 @@ import com.yugabyte.yw.common.gflags.SpecificGFlags; import com.yugabyte.yw.common.utils.FileUtils; import com.yugabyte.yw.forms.UniverseDefinitionTaskParams; +import com.yugabyte.yw.forms.UniverseDefinitionTaskParams.UserIntent; import com.yugabyte.yw.models.Provider; import com.yugabyte.yw.models.ProviderDetails; import com.yugabyte.yw.models.Universe; @@ -224,12 +225,33 @@ private void terminateProcessAndSubprocesses(long pid) { }); } + // This does not clear the process map. public void killProcess(String nodeName, UniverseTaskBase.ServerType serverType) throws IOException, InterruptedException { NodeInfo nodeInfo = nodesByNameMap.get(nodeName); - Process process = nodeInfo.processMap.get(serverType); - log.debug("Destroying process with pid {} for {}", process.pid(), nodeInfo.ip); - killProcess(process.pid()); + if (nodeInfo != null) { + Process process = nodeInfo.processMap.get(serverType); + if (process != null) { + log.debug("Destroying process with pid {} for {}", process.pid(), nodeInfo.ip); + killProcess(process.pid()); + } + } + } + + public void startProcess( + UUID universeUuid, String nodeName, UniverseTaskBase.ServerType serverType) { + Universe universe = Universe.getOrBadRequest(universeUuid); + NodeInfo nodeInfo = nodesByNameMap.get(nodeName); + UserIntent userIntent = universe.getCluster(nodeInfo.placementUUID).userIntent; + startProcessForNode(userIntent, serverType, nodeInfo); + } + + public boolean isProcessRunning(String nodeName, UniverseTaskBase.ServerType serverType) { + NodeInfo nodeInfo = nodesByNameMap.get(nodeName); + if (nodeInfo == null) { + return false; + } + return nodeInfo.processMap.containsKey(serverType); } public void checkAllProcessesAlive() { diff --git a/managed/src/main/java/com/yugabyte/yw/common/LocalNodeUniverseManager.java b/managed/src/main/java/com/yugabyte/yw/common/LocalNodeUniverseManager.java index 114e797bebed..6e2226df8d50 100644 --- a/managed/src/main/java/com/yugabyte/yw/common/LocalNodeUniverseManager.java +++ b/managed/src/main/java/com/yugabyte/yw/common/LocalNodeUniverseManager.java @@ -33,8 +33,6 @@ @Singleton @Slf4j public class LocalNodeUniverseManager { - - private static final String YSQL_PASSWORD = "Pass@123"; @Inject LocalNodeManager localNodeManager; public ShellResponse runYsqlCommand( diff --git a/managed/src/main/java/com/yugabyte/yw/common/config/GlobalConfKeys.java b/managed/src/main/java/com/yugabyte/yw/common/config/GlobalConfKeys.java index 4848747dbe17..2bc0f89b7b74 100644 --- a/managed/src/main/java/com/yugabyte/yw/common/config/GlobalConfKeys.java +++ b/managed/src/main/java/com/yugabyte/yw/common/config/GlobalConfKeys.java @@ -1423,4 +1423,12 @@ public class GlobalConfKeys extends RuntimeConfigKeysModule { "Map LDAP/OIDC groups to custom roles defined by RBAC.", ConfDataType.BooleanType, ImmutableList.of(ConfKeyTags.INTERNAL)); + public static final ConfKeyInfo autoMasterFailoverPollerInterval = + new ConfKeyInfo<>( + "yb.auto_master_failover.poller_interval", + ScopeType.GLOBAL, + "Universe Poller Interval for Master Failover", + "Poller interval for universes to schedule master failover", + ConfDataType.DurationType, + ImmutableList.of(ConfKeyTags.INTERNAL)); } diff --git a/managed/src/main/java/com/yugabyte/yw/models/CustomerTask.java b/managed/src/main/java/com/yugabyte/yw/models/CustomerTask.java index ead08693c6ca..e00d2f8f91f9 100644 --- a/managed/src/main/java/com/yugabyte/yw/models/CustomerTask.java +++ b/managed/src/main/java/com/yugabyte/yw/models/CustomerTask.java @@ -876,6 +876,18 @@ public static CustomerTask getLastTaskByTargetUuid(UUID targetUUID) { } } + public static Optional maybeGetLastTaskByTargetUuidTaskType( + UUID targetUUID, TaskType taskType) { + return find.query() + .where() + .eq("target_uuid", targetUUID) + .eq("type", taskType) + .isNotNull("completion_time") + .orderBy("completion_time desc") + .setMaxRows(1) + .findOneOrEmpty(); + } + @JsonIgnore public String getNotificationTargetName() { if (getType().equals(TaskType.Create) && getTargetType().equals(TargetType.Backup)) { diff --git a/managed/src/main/java/com/yugabyte/yw/models/JobSchedule.java b/managed/src/main/java/com/yugabyte/yw/models/JobSchedule.java index 63f184a89cda..a8e3cbbe6f7e 100644 --- a/managed/src/main/java/com/yugabyte/yw/models/JobSchedule.java +++ b/managed/src/main/java/com/yugabyte/yw/models/JobSchedule.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.yugabyte.yw.common.AppInit; import com.yugabyte.yw.common.PlatformServiceException; import com.yugabyte.yw.models.helpers.schedule.JobConfig; import com.yugabyte.yw.models.helpers.schedule.JobConfig.JobConfigWrapper; @@ -30,6 +31,7 @@ import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; @@ -131,6 +133,17 @@ public static Optional maybeGet(UUID customerUuid, String name) { public static List getNextEnabled(Duration window) { Date nextTime = Date.from(Instant.now().plus(window.getSeconds(), ChronoUnit.SECONDS)); + if (AppInit.isH2Db()) { + return DB + .createQuery(JobSchedule.class) + .where() + .le("nextStartTime", nextTime) + .findList() + .stream() + .filter(s -> !s.getScheduleConfig().isDisabled()) + .map(JobSchedule::getUuid) + .collect(Collectors.toList()); + } return DB.createQuery(JobSchedule.class) .where() .le("nextStartTime", nextTime) @@ -143,6 +156,11 @@ public static List getAll() { } public static List getAll(Class jobConfigClass) { + if (AppInit.isH2Db()) { + return DB.createQuery(JobSchedule.class).findList().stream() + .filter(s -> s.getJobConfig().getClass() == jobConfigClass) + .collect(Collectors.toList()); + } return DB.createQuery(JobSchedule.class) .where() .eq("job_config::jsonb->>'classname'", jobConfigClass.getName()) diff --git a/managed/src/main/java/com/yugabyte/yw/models/helpers/schedule/JobConfig.java b/managed/src/main/java/com/yugabyte/yw/models/helpers/schedule/JobConfig.java index ac4d41913c02..68b16d3f374d 100644 --- a/managed/src/main/java/com/yugabyte/yw/models/helpers/schedule/JobConfig.java +++ b/managed/src/main/java/com/yugabyte/yw/models/helpers/schedule/JobConfig.java @@ -51,11 +51,16 @@ public interface JobConfig extends Serializable { * Calculate the next start time based for the job schedule. * * @param jobSchedule the current job schedule. + * @param restart restart from the current time if true. * @return the next execution time. */ @JsonIgnore - default Date createNextStartTime(JobSchedule jobSchedule) { + default Date createNextStartTime(JobSchedule jobSchedule, boolean restart) { ScheduleConfig scheduleConfig = jobSchedule.getScheduleConfig(); + if (restart) { + return Date.from( + Instant.now().plus(scheduleConfig.getInterval().getSeconds(), ChronoUnit.SECONDS)); + } Date lastTime = null; if (scheduleConfig.getType() == ScheduleType.FIXED_RATE) { lastTime = jobSchedule.getLastStartTime(); diff --git a/managed/src/main/java/com/yugabyte/yw/scheduler/JobScheduler.java b/managed/src/main/java/com/yugabyte/yw/scheduler/JobScheduler.java index ef1c50dba639..6d49a11cdf53 100644 --- a/managed/src/main/java/com/yugabyte/yw/scheduler/JobScheduler.java +++ b/managed/src/main/java/com/yugabyte/yw/scheduler/JobScheduler.java @@ -8,6 +8,7 @@ import com.google.inject.Injector; import com.yugabyte.yw.common.PlatformExecutorFactory; import com.yugabyte.yw.common.PlatformScheduler; +import com.yugabyte.yw.common.ShutdownHookHandler; import com.yugabyte.yw.common.config.RuntimeConfGetter; import com.yugabyte.yw.models.HighAvailabilityConfig; import com.yugabyte.yw.models.JobInstance; @@ -23,14 +24,13 @@ import java.util.Date; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.function.Predicate; import java.util.stream.Collectors; import javax.inject.Inject; @@ -53,11 +53,13 @@ public class JobScheduler { "yb.job_scheduler.instance_record_ttl"; private final Injector injector; + private final ShutdownHookHandler shutdownHookHandler; private final RuntimeConfGetter confGetter; private final PlatformExecutorFactory platformExecutorFactory; private final PlatformScheduler platformScheduler; private final DelayQueue jobInstanceQueue; - private final Set inflightJobSchedules; + // JobSchedule UUID to removable status. + private final Map inflightJobSchedules; private Duration pollerInterval; private Duration scanWindow; @@ -67,15 +69,17 @@ public class JobScheduler { @Inject public JobScheduler( Injector injector, + ShutdownHookHandler shutdownHookHandler, RuntimeConfGetter confGetter, PlatformExecutorFactory platformExecutorFactory, PlatformScheduler platformScheduler) { this.injector = injector; + this.shutdownHookHandler = shutdownHookHandler; this.confGetter = confGetter; this.platformExecutorFactory = platformExecutorFactory; this.platformScheduler = platformScheduler; this.jobInstanceQueue = new DelayQueue<>(); - this.inflightJobSchedules = ConcurrentHashMap.newKeySet(); + this.inflightJobSchedules = new ConcurrentHashMap<>(); this.pollerInterval = DEFAULT_POLLER_INTERVAL; } @@ -98,6 +102,15 @@ public void init() { JobInstance.class.getSimpleName(), 1, new ThreadFactoryBuilder().setNameFormat("job-instance-%d").build()); + // Override the default handler to shut it down faster. + shutdownHookHandler.addShutdownHook( + jobInstanceQueueExecutor, + exec -> { + if (exec != null) { + exec.shutdownNow(); + } + }, + 99 /* weight */); jobInstanceQueueExecutor.submit(this::processJobInstanceQueue); platformScheduler.schedule( JobScheduler.class.getSimpleName(), @@ -119,10 +132,10 @@ public UUID submitSchedule(JobSchedule jobSchedule) { Preconditions.checkNotNull(jobSchedule.getCustomerUuid(), "Customer UUID must be set"); Preconditions.checkNotNull(jobSchedule.getScheduleConfig(), "Schedule config must be set"); Preconditions.checkNotNull(jobSchedule.getJobConfig(), "Job config must be set"); - Instant nextMaxPollTime = Instant.now().plus(pollerInterval.getSeconds(), ChronoUnit.SECONDS); - Date nextTime = createNextStartTime(jobSchedule); + Date nextTime = createNextStartTime(jobSchedule, true); jobSchedule.setNextStartTime(nextTime); jobSchedule.save(); + Instant nextMaxPollTime = Instant.now().plus(pollerInterval.getSeconds(), ChronoUnit.SECONDS); if (nextMaxPollTime.isAfter(nextTime.toInstant())) { // If the next execution time is arriving too soon, add it in the memory as well. addJobInstanceIfAbsent(jobSchedule.getUuid()); @@ -156,12 +169,14 @@ public Optional maybeGetSchedule(UUID customerUuid, String name) { * * @param uuid the UUID of the schedule. */ - public void deleteSchedule(UUID uuid) { + public synchronized void deleteSchedule(UUID uuid) { JobSchedule.maybeGet(uuid) .ifPresent( jobSchedule -> { jobSchedule.delete(); - removeJobInstanceIfPresent(uuid); + if (!removeJobInstanceIfPresent(uuid)) { + inflightJobSchedules.remove(uuid); + } }); } @@ -171,7 +186,7 @@ public void deleteSchedule(UUID uuid) { * @param uuid the UUID of the schedule. * @param scheduleConfig the new schedule config. */ - public void updateSchedule(UUID uuid, ScheduleConfig scheduleConfig) { + public synchronized void updateSchedule(UUID uuid, ScheduleConfig scheduleConfig) { Preconditions.checkNotNull(scheduleConfig, "Schedule config must be set"); JobSchedule jobSchedule = JobSchedule.getOrBadRequest(uuid); jobSchedule.updateScheduleConfig(scheduleConfig); @@ -184,10 +199,28 @@ public void updateSchedule(UUID uuid, ScheduleConfig scheduleConfig) { * @param uuid the UUID of the schedule. * @param isDisable true to disable else false. */ - public void disableSchedule(UUID uuid, boolean disable) { + public synchronized void disableSchedule(UUID uuid, boolean disable) { JobSchedule jobSchedule = JobSchedule.getOrBadRequest(uuid); - jobSchedule.updateScheduleConfig( - jobSchedule.getScheduleConfig().toBuilder().disabled(disable).build()); + boolean wasDisabled = jobSchedule.getScheduleConfig().isDisabled(); + if (wasDisabled ^ disable) { + if (disable) { + jobSchedule.updateScheduleConfig( + jobSchedule.getScheduleConfig().toBuilder().disabled(disable).build()); + removeJobInstanceIfPresent(jobSchedule.getUuid()); + } else { + // Reset the next start time on enabling the schedule. + jobSchedule.setNextStartTime(createNextStartTime(jobSchedule, true)); + jobSchedule.setScheduleConfig( + jobSchedule.getScheduleConfig().toBuilder().disabled(disable).build()); + jobSchedule.update(); + Instant nextMaxPollTime = + Instant.now().plus(pollerInterval.getSeconds(), ChronoUnit.SECONDS); + if (nextMaxPollTime.isAfter(jobSchedule.getNextStartTime().toInstant())) { + // If the next execution time is arriving too soon, add it in the memory as well. + addJobInstanceIfAbsent(jobSchedule.getUuid()); + } + } + } } /** @@ -211,13 +244,13 @@ private void handleRestart() { // Create the next start time for the schedule. An implementation of JobConfig can choose to // override the default behavior. - private Date createNextStartTime(JobSchedule jobSchedule) { - return jobSchedule.getJobConfig().createNextStartTime(jobSchedule); + private Date createNextStartTime(JobSchedule jobSchedule, boolean restart) { + return jobSchedule.getJobConfig().createNextStartTime(jobSchedule, restart); } // Create and add a job instance to the queue when the schedule is picked up for execution soon. private synchronized void addJobInstanceIfAbsent(UUID jobScheduleUuid) { - if (!inflightJobSchedules.contains(jobScheduleUuid)) { + if (!inflightJobSchedules.containsKey(jobScheduleUuid)) { // Get the record again if the UUID was fetched right before the record deletion but after the // removal from the in-flight tracker. JobSchedule.maybeGet(jobScheduleUuid) @@ -228,25 +261,29 @@ private synchronized void addJobInstanceIfAbsent(UUID jobScheduleUuid) { jobInstance.setJobScheduleUuid(s.getUuid()); jobInstance.setStartTime(s.getNextStartTime()); jobInstance.save(); - inflightJobSchedules.add(s.getUuid()); + inflightJobSchedules.put(s.getUuid(), true /* Removable */); jobInstanceQueue.add(jobInstance); }); } } - private synchronized void removeJobInstanceIfPresent(UUID jobScheduleUuid) { - if (inflightJobSchedules.contains(jobScheduleUuid)) { + private synchronized boolean removeJobInstanceIfPresent(UUID jobScheduleUuid) { + if (inflightJobSchedules.getOrDefault(jobScheduleUuid, false)) { // Delete only the records which were found in the queue. removeQueuedJobInstances( jobInstance -> jobInstance.getJobScheduleUuid().equals(jobScheduleUuid)) .stream() + .filter(jobInstance -> jobInstance.getState() == State.SCHEDULED) .forEach(JobInstance::delete); inflightJobSchedules.remove(jobScheduleUuid); + return true; } + return false; } // Return the job instances which were found in the queue. - private List removeQueuedJobInstances(Predicate predicate) { + private synchronized List removeQueuedJobInstances( + Predicate predicate) { List tobeRemovedJobInstances = new ArrayList<>(); Iterator iter = jobInstanceQueue.iterator(); while (iter.hasNext()) { @@ -264,7 +301,7 @@ private List removeQueuedJobInstances(Predicate predic private void poll(Duration scanWindow) { try { JobSchedule.getNextEnabled(scanWindow).stream() - .filter(uuid -> !inflightJobSchedules.contains(uuid)) + .filter(uuid -> !inflightJobSchedules.containsKey(uuid)) .forEach( uuid -> { try { @@ -283,7 +320,7 @@ private void poll(Duration scanWindow) { private void processJobInstanceQueue() { while (!jobInstanceQueueExecutor.isShutdown()) { - Future future = null; + CompletableFuture future = null; JobInstance jobInstance = null; try { jobInstance = jobInstanceQueue.take(); @@ -300,18 +337,6 @@ private void processJobInstanceQueue() { } } - private synchronized void finalizeJob(JobSchedule jobSchedule, JobInstance jobInstance) { - if (inflightJobSchedules.contains(jobSchedule.getUuid())) { - Date endTime = new Date(); - jobSchedule.setLastEndTime(endTime); - jobInstance.setEndTime(endTime); - jobSchedule.setNextStartTime(createNextStartTime(jobSchedule)); - jobSchedule.update(); - jobInstance.update(); - inflightJobSchedules.remove(jobSchedule.getUuid()); - } - } - @VisibleForTesting CompletableFuture executeJobInstance(JobInstance instance) { if (HighAvailabilityConfig.isFollower()) { @@ -328,8 +353,17 @@ CompletableFuture executeJobInstance(JobInstance instance) { } JobInstance jobInstance = jobInstanceOptional.get(); JobSchedule jobSchedule = JobSchedule.getOrBadRequest(jobInstance.getJobScheduleUuid()); + synchronized (this) { + if (inflightJobSchedules.computeIfPresent(jobSchedule.getUuid(), (k, v) -> false) == null) { + log.debug( + "Ignoring job {} for schedule {} as it is already removed", + jobInstance.getUuid(), + jobInstance.getJobScheduleUuid()); + return null; + } + } if (jobInstance.getState() != State.SCHEDULED) { - finalizeJob(jobSchedule, jobInstance); + updateFinalState(jobSchedule, jobInstance, null); log.debug( "Skipping job {} for schedule {} as it is not scheduled", jobInstance.getUuid(), @@ -338,7 +372,7 @@ CompletableFuture executeJobInstance(JobInstance instance) { } if (jobSchedule.getScheduleConfig().isDisabled()) { jobInstance.setState(State.SKIPPED); - finalizeJob(jobSchedule, jobInstance); + updateFinalState(jobSchedule, jobInstance, null); log.debug( "Skipping job {} as the schedule {} is disabled", jobInstance.getUuid(), @@ -382,33 +416,47 @@ CompletableFuture executeJobInstance(JobInstance instance) { } catch (Exception e) { jobInstance.setState(State.FAILED); updateFinalState(jobSchedule, jobInstance, e); - } finally { - log.debug( - "Run completed for job instance {} for schedule {} with result {}", - jobInstance.getUuid(), - jobSchedule.getUuid(), - jobInstance.getState()); } return future; } - private void updateFinalState(JobSchedule jobSchedule, JobInstance jobInstance, Throwable t) { + private synchronized void updateFinalState( + JobSchedule jobSchedule, JobInstance jobInstance, Throwable t) { if (t != null) { log.info( - "Error in excuting kob instance {} for schedule {} - {}", + "Error in excuting job instance {} for schedule {} - {}", jobInstance.getUuid(), jobSchedule.getUuid(), t.getMessage()); } - if (jobInstance.getState() != State.SKIPPED) { - jobSchedule.setExecutionCount(jobSchedule.getExecutionCount() + 1); - } - if (jobInstance.getState() == State.SUCCESS) { - jobSchedule.setFailedCount(0); - } else if (jobInstance.getState() == State.FAILED) { - jobSchedule.setFailedCount(jobSchedule.getFailedCount() + 1); + if (inflightJobSchedules.containsKey(jobSchedule.getUuid())) { + JobSchedule.State jobScheduleState = jobSchedule.getState(); + State jobInstanceState = jobInstance.getState(); + jobSchedule.refresh(); + jobInstance.refresh(); + jobSchedule.setState(jobScheduleState); + jobInstance.setState(jobInstanceState); + if (jobInstance.getState() != State.SKIPPED) { + jobSchedule.setExecutionCount(jobSchedule.getExecutionCount() + 1); + } + if (jobInstance.getState() == State.SUCCESS) { + jobSchedule.setFailedCount(0); + } else if (jobInstance.getState() == State.FAILED) { + jobSchedule.setFailedCount(jobSchedule.getFailedCount() + 1); + } + jobSchedule.setState(JobSchedule.State.INACTIVE); + Date endTime = new Date(); + jobSchedule.setLastEndTime(endTime); + jobSchedule.setNextStartTime(createNextStartTime(jobSchedule, false)); + jobSchedule.update(); + jobInstance.setEndTime(endTime); + jobInstance.update(); + inflightJobSchedules.remove(jobSchedule.getUuid()); } - jobSchedule.setState(JobSchedule.State.INACTIVE); - finalizeJob(jobSchedule, jobInstance); + log.debug( + "Run completed for job instance {} for schedule {} with result {}", + jobInstance.getUuid(), + jobSchedule.getUuid(), + jobInstance.getState()); } } diff --git a/managed/src/main/resources/reference.conf b/managed/src/main/resources/reference.conf index d58ecaf5dd4c..63cfb940316c 100644 --- a/managed/src/main/resources/reference.conf +++ b/managed/src/main/resources/reference.conf @@ -482,7 +482,7 @@ yb { auto_master_failover { poller_interval = 2 minutes detect_interval = 1 minute - task_interval = 5 minutes + task_interval = 1 minute enabled = false max_master_follower_lag = 30 minutes max_master_heartbeat_delay = 10 minutes diff --git a/managed/src/test/java/com/yugabyte/yw/commissioner/tasks/local/AutoMasterFailoverLocalTest.java b/managed/src/test/java/com/yugabyte/yw/commissioner/tasks/local/AutoMasterFailoverLocalTest.java new file mode 100644 index 000000000000..84b9cccb71e7 --- /dev/null +++ b/managed/src/test/java/com/yugabyte/yw/commissioner/tasks/local/AutoMasterFailoverLocalTest.java @@ -0,0 +1,156 @@ +// Copyright (c) YugaByte, Inc. + +package com.yugabyte.yw.commissioner.tasks.local; + +import static org.junit.Assert.assertEquals; + +import com.google.inject.Inject; +import com.yugabyte.yw.commissioner.tasks.UniverseTaskBase; +import com.yugabyte.yw.common.LocalNodeManager; +import com.yugabyte.yw.common.config.GlobalConfKeys; +import com.yugabyte.yw.common.config.UniverseConfKeys; +import com.yugabyte.yw.common.gflags.SpecificGFlags; +import com.yugabyte.yw.common.utils.Pair; +import com.yugabyte.yw.forms.UniverseDefinitionTaskParams; +import com.yugabyte.yw.models.CustomerTask; +import com.yugabyte.yw.models.TaskInfo; +import com.yugabyte.yw.models.Universe; +import com.yugabyte.yw.models.helpers.NodeDetails; +import com.yugabyte.yw.models.helpers.TaskType; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; +import org.junit.Before; +import org.junit.Test; + +public class AutoMasterFailoverLocalTest extends LocalProviderUniverseTestBase { + @Inject LocalNodeManager localNodeManager; + + @Before + @Override + public void setUp() { + super.setUp(); + jobScheduler.init(); + autoMasterFailoverScheduler.init(); + } + + @Override + protected Pair getIpRange() { + return new Pair<>(30, 60); + } + + private void enableMasterFailover(Universe universe) { + settableRuntimeConfigFactory + .globalRuntimeConf() + .setValue(GlobalConfKeys.autoMasterFailoverPollerInterval.getKey(), "10s"); + settableRuntimeConfigFactory + .forUniverse(universe) + .setValue(UniverseConfKeys.autoMasterFailoverTaskInterval.getKey(), "5s"); + settableRuntimeConfigFactory + .forUniverse(universe) + .setValue(UniverseConfKeys.autoMasterFailoverCooldown.getKey(), "5s"); + settableRuntimeConfigFactory + .forUniverse(universe) + .setValue(UniverseConfKeys.autoMasterFailoverDetectionInterval.getKey(), "5s"); + settableRuntimeConfigFactory + .forUniverse(universe) + .setValue(UniverseConfKeys.enableAutoMasterFailover.getKey(), "true"); + } + + // Pick the tserver only nodes to stop the process. + private List pickEligibleTservers(Universe universe, int count, UUID excludeAz) { + List tservers = new ArrayList<>(); + Map> nodes = + universe.getNodes().stream() + .filter(n -> excludeAz == null || !excludeAz.equals(n.azUuid)) + .collect(Collectors.groupingBy(n -> n.azUuid)); + for (Map.Entry> entry : nodes.entrySet()) { + if (tservers.size() >= count) { + break; + } + if (entry.getValue().size() < 2) { + continue; + } + List tserverOnlyNodes = + entry.getValue().stream() + .filter(n -> n.isTserver && !n.isMaster) + .collect(Collectors.toList()); + if (tserverOnlyNodes.size() > 0) { + tservers.add(tserverOnlyNodes.get(0)); + } + } + return tservers; + } + + // Pick a master node to stop both tserver and master processes. + private NodeDetails pickEligibleMasterNode(Universe universe) + throws IOException, InterruptedException { + Map> nodes = + universe.getNodes().stream().collect(Collectors.groupingBy(n -> n.azUuid)); + String leaderHost = getMasterLeader(universe); + for (Map.Entry> entry : nodes.entrySet()) { + if (entry.getValue().size() < 2) { + continue; + } + int numMasters = (int) entry.getValue().stream().filter(n -> n.isMaster).count(); + if (numMasters == 0 || numMasters == entry.getValue().size()) { + // Ignore AZs without any master replacement node. + continue; + } + // Ignore master leader to shorten time. + Optional optional = + entry.getValue().stream() + .filter(n -> n.isMaster && !Objects.equals(leaderHost, n.cloudInfo.private_ip)) + .findFirst(); + if (!optional.isPresent()) { + continue; + } + return optional.get(); + } + throw new IllegalStateException("No eligible master found"); + } + + @Test + public void testAutoMasterFailover() throws InterruptedException, IOException { + UniverseDefinitionTaskParams.UserIntent userIntent = + getDefaultUserIntent("test-universe-1", false, 3, 6); + userIntent.specificGFlags = SpecificGFlags.construct(GFLAGS, GFLAGS); + Universe universe = createUniverse(userIntent); + CustomerTask customerTask = CustomerTask.getLastTaskByTargetUuid(universe.getUniverseUUID()); + enableMasterFailover(universe); + NodeDetails pickedMasterNode = pickEligibleMasterNode(universe); + List pickedTserverNodes = + pickEligibleTservers(universe, 1 /* count */, pickedMasterNode.azUuid /* exclude AZ */); + assertEquals(1, pickedTserverNodes.size()); + // Kill tserver process on a different node along with master and tserver on another node. + for (NodeDetails pickedTserverNode : pickedTserverNodes) { + killProcessesOnNode(universe.getUniverseUUID(), pickedTserverNode.getNodeName()); + } + killProcessesOnNode(universe.getUniverseUUID(), pickedMasterNode.getNodeName()); + TaskInfo taskInfo = + waitForNextTask( + universe.getUniverseUUID(), customerTask.getTaskUUID(), Duration.ofMinutes(5)); + assertEquals(TaskInfo.State.Success, taskInfo.getTaskState()); + assertEquals(TaskType.MasterFailover, taskInfo.getTaskType()); + // Start all the killed processes. + for (NodeDetails pickedTserverNode : pickedTserverNodes) { + startProcessesOnNode( + universe.getUniverseUUID(), pickedTserverNode, UniverseTaskBase.ServerType.TSERVER); + } + startProcessesOnNode( + universe.getUniverseUUID(), pickedMasterNode, UniverseTaskBase.ServerType.TSERVER); + startProcessesOnNode( + universe.getUniverseUUID(), pickedMasterNode, UniverseTaskBase.ServerType.MASTER); + taskInfo = + waitForNextTask(universe.getUniverseUUID(), taskInfo.getTaskUUID(), Duration.ofMinutes(5)); + assertEquals(TaskType.SyncMasterAddresses, taskInfo.getTaskType()); + assertEquals(TaskInfo.State.Success, taskInfo.getTaskState()); + assertEquals(false, isMasterProcessRunning(pickedMasterNode.getNodeName())); + } +} diff --git a/managed/src/test/java/com/yugabyte/yw/commissioner/tasks/local/LocalProviderUniverseTestBase.java b/managed/src/test/java/com/yugabyte/yw/commissioner/tasks/local/LocalProviderUniverseTestBase.java index 6a9bda3ea4ea..a4cae753d90d 100644 --- a/managed/src/test/java/com/yugabyte/yw/commissioner/tasks/local/LocalProviderUniverseTestBase.java +++ b/managed/src/test/java/com/yugabyte/yw/commissioner/tasks/local/LocalProviderUniverseTestBase.java @@ -13,12 +13,15 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Stopwatch; import com.google.common.net.HostAndPort; import com.yugabyte.yw.cloud.PublicCloudConstants; +import com.yugabyte.yw.commissioner.AutoMasterFailoverScheduler; import com.yugabyte.yw.commissioner.Commissioner; import com.yugabyte.yw.commissioner.Common; import com.yugabyte.yw.commissioner.tasks.CommissionerBaseTest; import com.yugabyte.yw.commissioner.tasks.UniverseTaskBase; +import com.yugabyte.yw.commissioner.tasks.UniverseTaskBase.ServerType; import com.yugabyte.yw.commissioner.tasks.subtasks.CheckClusterConsistency; import com.yugabyte.yw.common.ApiUtils; import com.yugabyte.yw.common.LocalNodeManager; @@ -54,6 +57,7 @@ import com.yugabyte.yw.models.AccessKey; import com.yugabyte.yw.models.AvailabilityZone; import com.yugabyte.yw.models.Customer; +import com.yugabyte.yw.models.CustomerTask; import com.yugabyte.yw.models.InstanceType; import com.yugabyte.yw.models.Provider; import com.yugabyte.yw.models.ProviderDetails; @@ -67,6 +71,7 @@ import com.yugabyte.yw.models.helpers.NodeDetails; import com.yugabyte.yw.models.helpers.TaskType; import com.yugabyte.yw.models.helpers.provider.LocalCloudInfo; +import com.yugabyte.yw.scheduler.JobScheduler; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -77,6 +82,7 @@ import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.text.SimpleDateFormat; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -128,9 +134,9 @@ public abstract class LocalProviderUniverseTestBase extends PlatformGuiceApplica private static final String DEFAULT_BASE_DIR = "/tmp/local"; protected static String YBC_VERSION; - public static String DB_VERSION = "2.20.1.3-b3"; + public static String DB_VERSION = "2.20.5.0-b72"; private static final String DOWNLOAD_URL = - "https://downloads.yugabyte.com/releases/2.20.1.3/" + "yugabyte-2.20.1.3-b3-%s-%s.tar.gz"; + "https://downloads.yugabyte.com/releases/2.20.5.0/" + "yugabyte-2.20.5.0-b72-%s-%s.tar.gz"; private static final String YBC_BASE_S3_URL = "https://downloads.yugabyte.com/ybc/"; private static final String YBC_BIN_ENV_KEY = "YBC_PATH"; private static final boolean KEEP_FAILED_UNIVERSE = true; @@ -202,6 +208,8 @@ public Map getYbcGFlags(UniverseDefinitionTaskParams.UserIntent protected Commissioner commissioner; protected SettableRuntimeConfigFactory settableRuntimeConfigFactory; protected RuntimeConfService runtimeConfService; + protected JobScheduler jobScheduler; + protected AutoMasterFailoverScheduler autoMasterFailoverScheduler; @BeforeClass public static void setUpEnv() { @@ -404,6 +412,8 @@ private void injectDependencies() { commissioner = app.injector().instanceOf(Commissioner.class); settableRuntimeConfigFactory = app.injector().instanceOf(SettableRuntimeConfigFactory.class); runtimeConfService = app.injector().instanceOf(RuntimeConfService.class); + jobScheduler = app.injector().instanceOf(JobScheduler.class); + autoMasterFailoverScheduler = app.injector().instanceOf(AutoMasterFailoverScheduler.class); } @Before @@ -562,8 +572,14 @@ protected UniverseDefinitionTaskParams.UserIntent getDefaultUserIntent() { protected UniverseDefinitionTaskParams.UserIntent getDefaultUserIntent( String univName, boolean disableTls) { + return getDefaultUserIntent(univName, disableTls, 3, 3); + } + + protected UniverseDefinitionTaskParams.UserIntent getDefaultUserIntent( + String univName, boolean disableTls, int rf, int numNodes) { UniverseDefinitionTaskParams.UserIntent userIntent = - ApiUtils.getTestUserIntent(region, provider, instanceType, 3); + ApiUtils.getTestUserIntent(region, provider, instanceType, numNodes); + userIntent.replicationFactor = rf; userIntent.universeName = "test-universe"; if (univName != null) { userIntent.universeName = univName; @@ -1036,4 +1052,49 @@ private void dumpToLog(Universe... universes) throws InterruptedException { protected String getBackupBaseDirectory() { return String.format("%s/%s/%s", baseDir, subDir, testName); } + + protected void killProcessesOnNode(UUID universeUuid, String nodeName) + throws IOException, InterruptedException { + Universe universe = Universe.getOrBadRequest(universeUuid); + NodeDetails node = universe.getNode(nodeName); + if (node.isTserver) { + localNodeManager.killProcess(nodeName, ServerType.TSERVER); + } + if (node.isMaster) { + localNodeManager.killProcess(nodeName, ServerType.MASTER); + } + } + + protected void startProcessesOnNode( + UUID universeUuid, NodeDetails node, UniverseTaskBase.ServerType serverType) + throws IOException, InterruptedException { + localNodeManager.startProcess(universeUuid, node.getNodeName(), serverType); + } + + protected boolean isMasterProcessRunning(String nodeName) { + return localNodeManager.isProcessRunning(nodeName, ServerType.MASTER); + } + + // This method waits for the next task to complete. + protected TaskInfo waitForNextTask(UUID universeUuid, UUID lastTaskUuid, Duration timeout) + throws InterruptedException { + Stopwatch stopwatch = Stopwatch.createStarted(); + do { + Universe universe = Universe.getOrBadRequest(universeUuid); + UniverseDefinitionTaskParams details = universe.getUniverseDetails(); + if (details.placementModificationTaskUuid != null + && !lastTaskUuid.equals(details.placementModificationTaskUuid)) { + // A new task has already started, wait for it to complete. + TaskInfo taskInfo = TaskInfo.getOrBadRequest(details.placementModificationTaskUuid); + return CommissionerBaseTest.waitForTask(taskInfo.getTaskUUID()); + } + CustomerTask customerTask = CustomerTask.getLastTaskByTargetUuid(universeUuid); + if (!lastTaskUuid.equals(customerTask.getTaskUUID())) { + // Last task has already completed. + return TaskInfo.getOrBadRequest(customerTask.getTaskUUID()); + } + Thread.sleep(1000); + } while (stopwatch.elapsed().compareTo(timeout) < 0); + throw new RuntimeException("Timed-out waiting for next task to start"); + } } diff --git a/managed/src/test/java/com/yugabyte/yw/scheduler/JobSchedulerTest.java b/managed/src/test/java/com/yugabyte/yw/scheduler/JobSchedulerTest.java index 05309b60fefb..0b31c71d82de 100644 --- a/managed/src/test/java/com/yugabyte/yw/scheduler/JobSchedulerTest.java +++ b/managed/src/test/java/com/yugabyte/yw/scheduler/JobSchedulerTest.java @@ -16,6 +16,7 @@ import com.yugabyte.yw.common.PlatformExecutorFactory; import com.yugabyte.yw.common.PlatformScheduler; import com.yugabyte.yw.common.PlatformServiceException; +import com.yugabyte.yw.common.ShutdownHookHandler; import com.yugabyte.yw.common.config.RuntimeConfGetter; import com.yugabyte.yw.models.Customer; import com.yugabyte.yw.models.JobInstance; @@ -41,6 +42,7 @@ @RunWith(MockitoJUnitRunner.class) public class JobSchedulerTest extends FakeDBApplication { private Injector injector; + private ShutdownHookHandler shutdownHookHandler; private Customer customer; private RuntimeConfGetter confGetter; private PlatformExecutorFactory platformExecutorFactory; @@ -51,11 +53,18 @@ public class JobSchedulerTest extends FakeDBApplication { public void setUp() { customer = ModelFactory.testCustomer(); injector = app.injector().instanceOf(Injector.class); + shutdownHookHandler = app.injector().instanceOf(ShutdownHookHandler.class); confGetter = app.injector().instanceOf(RuntimeConfGetter.class); platformExecutorFactory = app.injector().instanceOf(PlatformExecutorFactory.class); platformScheduler = app.injector().instanceOf(PlatformScheduler.class); jobScheduler = - spy(new JobScheduler(injector, confGetter, platformExecutorFactory, platformScheduler)); + spy( + new JobScheduler( + injector, + shutdownHookHandler, + confGetter, + platformExecutorFactory, + platformScheduler)); } @SuppressWarnings("serial")