From 495b2fa904044a36660b16e74d1cf8f4b045172a Mon Sep 17 00:00:00 2001 From: Bryan Alexander Rivera <2250179+bryaan@users.noreply.github.com> Date: Tue, 15 Feb 2022 15:16:05 -0500 Subject: [PATCH] [SRE-20063] Update AgentWatchDog to replace instances when unable to connect --- pom.xml | 17 +- .../intuit/tank/vmManager/AgentWatchdog.java | 295 +++++++----------- .../tank/vmManager/AgentWatchdogTest.java | 107 +++++++ tank_vmManager/src/test/resources/log4j2.xml | 17 + 4 files changed, 257 insertions(+), 179 deletions(-) create mode 100644 tank_vmManager/src/test/java/com/intuit/tank/vmManager/AgentWatchdogTest.java create mode 100644 tank_vmManager/src/test/resources/log4j2.xml diff --git a/pom.xml b/pom.xml index a9971968c..f30afb14f 100644 --- a/pom.xml +++ b/pom.xml @@ -44,6 +44,7 @@ 2.22.2 2.34 5.8.2 + 4.2.0 4.5.13 5.1.2 8.11.0 @@ -287,7 +288,7 @@ ${project.groupId} test-support - 3.1.1-SNAPSHOT + ${project.version} test @@ -296,7 +297,11 @@ mockito-inline test - + + org.mockito + mockito-junit-jupiter + test + org.jboss.weld weld-junit5 @@ -758,7 +763,13 @@ org.mockito mockito-inline - 3.9.0 + ${version.mockito} + test + + + org.mockito + mockito-junit-jupiter + ${version.mockito} test diff --git a/tank_vmManager/src/main/java/com/intuit/tank/vmManager/AgentWatchdog.java b/tank_vmManager/src/main/java/com/intuit/tank/vmManager/AgentWatchdog.java index 70da2fc56..d83f7c569 100644 --- a/tank_vmManager/src/main/java/com/intuit/tank/vmManager/AgentWatchdog.java +++ b/tank_vmManager/src/main/java/com/intuit/tank/vmManager/AgentWatchdog.java @@ -52,38 +52,63 @@ public class AgentWatchdog implements Runnable { private static final Logger LOG = LogManager.getLogger(AgentWatchdog.class); + private static final VmManagerConfig vmManagerConfig = new TankConfig().getVmManagerConfig(); private long sleepTime; - private long maxWaitForStart; private long maxWaitForResponse; private int maxRestarts; private VMTracker vmTracker; private VMInstanceRequest instanceRequest; private List vmInfo; + private ArrayList startedInstances; + private ArrayList reportedInstances; private boolean stopped; - private boolean checkForStart = true; private long startTime; private int restartCount; - private int rebootCount; private AmazonInstance amazonInstance; + private int expectedInstanceCount; /** + * Constructor + * * @param instanceRequest * @param vmInfo * @param vmTracker */ public AgentWatchdog(VMInstanceRequest instanceRequest, List vmInfo, VMTracker vmTracker) { + this(instanceRequest, vmInfo, vmTracker, + new AmazonInstance(instanceRequest.getRegion()), + vmManagerConfig.getWatchdogSleepTime(30 * 1000), // 30 seconds + vmManagerConfig.getMaxAgentReportMills(1000 * 60 * 3) // 3 minutes + ); + } + + /** + * Constructor + * + * @param instanceRequest + * @param vmInfo + * @param vmTracker + * @param amazonInstance + * @param maxWaitForResponse + */ + public AgentWatchdog(VMInstanceRequest instanceRequest, List vmInfo, VMTracker vmTracker, AmazonInstance amazonInstance, long sleepTime, long maxWaitForResponse) { this.instanceRequest = instanceRequest; this.vmInfo = vmInfo; this.vmTracker = vmTracker; this.startTime = System.currentTimeMillis(); - this.amazonInstance = new AmazonInstance(instanceRequest.getRegion()); + this.amazonInstance = amazonInstance; VmManagerConfig vmManagerConfig = new TankConfig().getVmManagerConfig(); - this.maxWaitForResponse = vmManagerConfig.getMaxAgentReportMills(1000 * 60 * 5); // 5 minutes - this.maxWaitForStart = vmManagerConfig.getMaxAgentStartMills(1000 * 60 * 3); // 3 minutes - this.maxRestarts = vmManagerConfig.getMaxRestarts(2); - this.sleepTime = vmManagerConfig.getWatchdogSleepTime(30 * 1000); // 30 seconds + this.maxWaitForResponse = vmManagerConfig.getMaxAgentReportMills(1000 * 60 * 3); // 3 minutes + this.maxRestarts = vmManagerConfig.getMaxRestarts(3); + this.sleepTime = sleepTime; + this.expectedInstanceCount = vmInfo.size(); + + LOG.info("AgentWatchdog settings: " + + "\nmaxWaitForResponse: " + maxWaitForResponse + + "\nmaxRestarts: " + maxRestarts + + "\nsleepTime: " + sleepTime); } /** @@ -91,7 +116,8 @@ public AgentWatchdog(VMInstanceRequest instanceRequest, List vmIn */ @Override public String toString() { - return new ToStringBuilder(this).append("sleepTime", sleepTime).append("maxWaitForStart", maxWaitForStart) + return new ToStringBuilder(this) + .append("sleepTime", sleepTime) .append("maxWaitForResponse", maxWaitForResponse) .append("maxRestarts", maxRestarts).toString(); } @@ -104,44 +130,20 @@ public void run() { LOG.info("Starting WatchDog: " + this.toString()); AWSXRay.getGlobalRecorder().beginNoOpSegment(); //jdbcInterceptor will throw SegmentNotFoundException,RuntimeException without this try { - List instances = new ArrayList(vmInfo); - while (rebootCount <= maxRestarts && restartCount <= maxRestarts && !stopped) { - if (!vmTracker.isRunning(instanceRequest.getJobId())) { - break; - } - if (checkForStart) { - LOG.info("Checking for " + instances.size() + " running agents..."); - removeRunningInstances(instances); - if (!instances.isEmpty()) { - if (shouldRelaunchInstances()) { - relaunch(instances); - } else { - LOG.info("Waiting for " + instances.size() + " agents to start: " - + getInstanceIdList(instances)); - } - Thread.sleep(sleepTime); - continue; - } else { - LOG.info("All Agents Started."); - vmTracker.publishEvent(new JobEvent(instanceRequest.getJobId(), "All Agents Started.", - JobLifecycleEvent.AGENT_STARTED)); - checkForStart = false; + startedInstances = new ArrayList(vmInfo); + reportedInstances = new ArrayList(); + while (restartCount <= maxRestarts && !stopped) { + checkForReportingInstances(); + LOG.info(startedInstances.size() + " instances started. " + reportedInstances.size() + " instances reported. Waiting for remaining " + startedInstances.size() + " agents to report back..."); + // When runningInstances is empty all instances have reported back. + // If not, check if its time for a restart. + if (!startedInstances.isEmpty()) { + if (shouldRelaunchInstances()) { + relaunch(startedInstances); startTime = System.currentTimeMillis(); } - } - // all instances are now started - instances = new ArrayList(vmInfo); - String jobId = instanceRequest.getJobId(); - // check to see if all agents have reported back - LOG.info("Checking for " + instances.size() + " reporting agents..."); - removeReportingInstances(jobId, instances); - if (!instances.isEmpty()) { - if (shouldRebootInstances()) { - reboot(instances); - } else { - LOG.info("Waiting for " + instances.size() + " agents to report: " - + getInstanceIdList(instances)); - } + LOG.info("Waiting for " + startedInstances.size() + " agents to report: " + + getInstanceIdList(startedInstances)); Thread.sleep(sleepTime); continue; } else { @@ -150,10 +152,10 @@ public void run() { "All Agents Reported Back and are ready to start load.", JobLifecycleEvent.AGENT_REPORTED)); stopped = true; } - } } catch (Exception e) { LOG.error("Error in Watchdog: " + e.toString(), e); + // TODO Terminate all instances } finally { LOG.info("Exiting Watchdog " + this.toString()); AWSXRay.endSegment(); @@ -161,116 +163,89 @@ public void run() { } /** - * @param instances - * + * Find instances that have reported and remove from started instances and add to reported instances. */ - private void reboot(List instances) { - rebootCount++; - if (rebootCount <= maxRestarts) { - - String msg = "Have " + instances.size() - + " agents that started but failed to report status correctly. rebooting " - + getInstanceIdList(instances); - vmTracker.publishEvent(new JobEvent(instanceRequest.getJobId(), msg, JobLifecycleEvent.AGENT_RESTARTED)); - LOG.info(msg); - startTime = System.currentTimeMillis(); - amazonInstance.reboot(instances); - checkForStart = true; - } else { + private void checkForReportingInstances() { + String jobId = instanceRequest.getJobId(); + CloudVmStatusContainer vmStatusForJob = vmTracker.getVmStatusForJob(jobId); + if (vmStatusForJob == null || vmStatusForJob.getEndTime() != null) { stopped = true; - String msg = "Have " - + instances.size() - + " agents that failed to report correctly and have exceeded the maximum number of restarts. Killing job."; - vmTracker.publishEvent(new JobEvent(instanceRequest.getJobId(), msg, JobLifecycleEvent.JOB_KILLED)); - LOG.info(msg); - killJob(); + throw new RuntimeException("Job appears to have been stopped. Exiting..."); + } + for (CloudVmStatus status : vmStatusForJob.getStatuses()) { + // Checks the state of Tank job. + if (status.getVmStatus() == VMStatus.running) { + VMInformation removedInstance = removeInstance(startedInstances, status.getInstanceId()); + if (removedInstance != null) { + addInstance(reportedInstances, removedInstance); + } + } } } - /** - * @param instances - * @return - */ private String getInstanceIdList(List instances) { return StringUtils.join(instances, ", "); } /** + * Relaunch all passed instances. + * Kill instances first then create a new request to start. Instances are added to started instances list. * @param instances */ - private void removeReportingInstances(String jobId, List instances) { - CloudVmStatusContainer vmStatusForJob = vmTracker.getVmStatusForJob(jobId); - if (vmStatusForJob != null && vmStatusForJob.getEndTime() == null) { - for (CloudVmStatus status : vmStatusForJob.getStatuses()) { - if (status.getVmStatus() == VMStatus.running - || (status.getJobStatus() != JobStatus.Unknown && status.getJobStatus() != JobStatus.Starting)) { - removeInstance(status.getInstanceId(), instances); - } - } - } else { - stopped = true; - throw new RuntimeException("Job appears to have been stopped. Exiting..."); - } - } - - /** - * @param instances - * - */ - private void relaunch(List instances) { + private void relaunch(ArrayList instances) { restartCount++; - if (restartCount <= maxRestarts) { - startTime = System.currentTimeMillis(); - String msg = "Have " + instances.size() + " agents that failed to start correctly. Restarting " - + getInstanceIdList(instances); - vmTracker.publishEvent(new JobEvent(instanceRequest.getJobId(), msg, JobLifecycleEvent.AGENT_REBOOTED)); - LOG.info(msg); - // relaunch instances and remove old onesn from vmTracker - // kill them first just to be sure - List instanceIds = instances.stream() - .map(VMInformation::getInstanceId).collect(Collectors.toCollection(() -> new ArrayList<>(instances.size()))); - amazonInstance.killInstances(instanceIds); - VMImageDao dao = new VMImageDao(); - for (VMInformation info : instances) { - vmInfo.remove(info); - vmTracker.setStatus(createTerminatedVmStatus(info)); - VMInstance image = dao.getImageByInstanceId(info.getInstanceId()); - if (image != null) { - image.setStatus(VMStatus.terminated.name()); - dao.saveOrUpdate(image); - } - } - instanceRequest.setNumberOfInstances(instances.size()); - List newVms = new AmazonInstance(instanceRequest.getRegion()).create(instanceRequest); - instances.clear(); - for (VMInformation newInfo : newVms) { - vmInfo.add(newInfo); - instances.add(newInfo); - vmTracker.setStatus(createCloudStatus(instanceRequest, newInfo)); - LOG.info("Added image (" + newInfo.getInstanceId() + ") to VMImage table"); - try { - dao.addImageFromInfo(instanceRequest.getJobId(), newInfo, - instanceRequest.getRegion()); - } catch (Exception e) { - LOG.warn("Error persisting VM Image: " + e); - } - } - } else { + LOG.info("Restart Count: " + restartCount); + if (restartCount > maxRestarts) { stopped = true; String msg = "Have " - + instances.size() + + this.startedInstances.size() + " agents that failed to start correctly and have exceeded the maximum number of restarts. Killing job."; vmTracker.publishEvent(new JobEvent(instanceRequest.getJobId(), msg, JobLifecycleEvent.JOB_ABORTED)); LOG.info(msg); - killJob(); + // TODO Do we have to kill jobs here? + throw new RuntimeException("Killing jobs and exiting"); } - } - - /** - * - */ - private void killJob() { - throw new RuntimeException("Killing jobs and exiting"); + String msg = "Have " + instances.size() + " agents that failed to start or report correctly. Relaunching. " + + getInstanceIdList(instances); + vmTracker.publishEvent(new JobEvent(instanceRequest.getJobId(), msg, JobLifecycleEvent.AGENT_REBOOTED)); + LOG.info(msg); + // Kill instances first + List instanceIds = instances.stream() + .map(VMInformation::getInstanceId).collect(Collectors.toCollection(() -> new ArrayList<>(instances.size()))); + amazonInstance.killInstances(instanceIds); + // Set terminated status on the DAO + VMImageDao dao = new VMImageDao(); + for (VMInformation info : instances) { + vmInfo.remove(info); + vmTracker.setStatus(createTerminatedVmStatus(info)); + VMInstance image = dao.getImageByInstanceId(info.getInstanceId()); + if (image != null) { + image.setStatus(VMStatus.terminated.name()); + dao.saveOrUpdate(image); + } + } + LOG.info("Setting number of instances to relaunch to: " + instances.size()); + instanceRequest.setNumberOfInstances(instances.size()); + instances.clear(); + // Create and send instance start request + List newVms = amazonInstance.create(instanceRequest); + // Add new instances + for (VMInformation newInfo : newVms) { + vmInfo.add(newInfo); + // Add directly to started instances since these are restarted from scratch + startedInstances.add(newInfo); + vmTracker.setStatus(createCloudStatus(instanceRequest, newInfo)); + LOG.info("Added image (" + newInfo.getInstanceId() + ") to VMImage table"); + try { + dao.addImageFromInfo(instanceRequest.getJobId(), newInfo, + instanceRequest.getRegion()); + } catch (Exception e) { + LOG.warn("Error persisting VM Image: " + e); + } + } + LOG.info("At end of relaunch" + + " startedInstances: " + startedInstances.size() + + " reportedInstances: " + reportedInstances.size()); } /** @@ -285,60 +260,28 @@ private CloudVmStatus createCloudStatus(VMInstanceRequest req, VMInformation inf VMImageType.AGENT, req.getRegion(), VMStatus.pending, new ValidationStatus(), 0, 0, null, null); } - /** - * @param info - * @return - */ private CloudVmStatus createTerminatedVmStatus(VMInformation info) { - LOG.info(info); - LOG.info(instanceRequest); return new CloudVmStatus(info.getInstanceId(), instanceRequest.getJobId(), "unknown", JobStatus.Stopped, VMImageType.AGENT, instanceRequest.getRegion(), VMStatus.terminated, new ValidationStatus(), 0, 0, null, null); } - /** - * @return - */ private boolean shouldRelaunchInstances() { - return startTime + maxWaitForStart < System.currentTimeMillis(); - } - - /** - * @return - */ - private boolean shouldRebootInstances() { return startTime + maxWaitForResponse < System.currentTimeMillis(); } - /** - * - */ - private void removeRunningInstances(List instances) { - CloudVmStatusContainer vmStatusForJob = vmTracker.getVmStatusForJob(instanceRequest.getJobId()); - if (shouldRelaunchInstances() && (vmStatusForJob == null || vmStatusForJob.getEndTime() != null)) { - stopped = true; - throw new RuntimeException("Job appears to have been stopped. Exiting..."); - } - List foundInstances = amazonInstance.describeInstances(instances.stream().map(VMInformation::getInstanceId).toArray(String[]::new)); - for (VMInformation info : foundInstances) { - if ("running".equalsIgnoreCase(info.getState())) { - removeInstance(info.getInstanceId(), instances); - } - } - + private static void addInstance(List instances, VMInformation info) { + instances.add(info); } - /** - * @param foundInstanceId - * @param instances - */ - private void removeInstance(String foundInstanceId, List instances) { + private static VMInformation removeInstance(List instances, String instanceIdToDelete) { + VMInformation instanceRemoved = null; for (int i = instances.size(); --i >= 0;) {// count down loop so no concurrent modification - if (foundInstanceId.equals(instances.get(i).getInstanceId())) { + if (instanceIdToDelete.equals(instances.get(i).getInstanceId())) { + instanceRemoved = instances.get(i); instances.remove(i); } } - + return instanceRemoved; } } diff --git a/tank_vmManager/src/test/java/com/intuit/tank/vmManager/AgentWatchdogTest.java b/tank_vmManager/src/test/java/com/intuit/tank/vmManager/AgentWatchdogTest.java new file mode 100644 index 000000000..4b0b675a1 --- /dev/null +++ b/tank_vmManager/src/test/java/com/intuit/tank/vmManager/AgentWatchdogTest.java @@ -0,0 +1,107 @@ +package com.intuit.tank.vmManager; + +import com.intuit.tank.api.cloud.VMTracker; +import com.intuit.tank.api.model.v1.cloud.CloudVmStatus; +import com.intuit.tank.api.model.v1.cloud.CloudVmStatusContainer; +import com.intuit.tank.api.model.v1.cloud.VMStatus; +import com.intuit.tank.api.model.v1.cloud.ValidationStatus; +import com.intuit.tank.vm.api.enumerated.JobStatus; +import com.intuit.tank.vm.api.enumerated.VMImageType; +import com.intuit.tank.vm.api.enumerated.VMRegion; +import com.intuit.tank.vm.vmManager.VMInformation; +import com.intuit.tank.vm.vmManager.VMInstanceRequest; +import com.intuit.tank.vmManager.environment.amazon.AmazonInstance; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +public class AgentWatchdogTest { + + @Mock + AmazonInstance amazonInstanceMock = new AmazonInstance(VMRegion.STANDALONE); + + @Test + public void toStringTest() { + VMTracker vmTracker = new VMTrackerImpl(); + VMInstanceRequest instanceRequest = new VMInstanceRequest(); + instanceRequest.setRegion(VMRegion.STANDALONE); + List vmInfo = new ArrayList<>(); + AgentWatchdog agentWatchdog = new AgentWatchdog(instanceRequest, vmInfo, vmTracker); + + String result = agentWatchdog.toString(); + + assertNotNull(result); +// assertTrue(result.endsWith("[sleepTime=30000,maxWaitForStart=180000,maxWaitForResponse=360000,maxRestarts=20]")); + } + + @Test + public void alreadyRunningRunTest(@Mock VMTracker vmTrackerMock, @Mock CloudVmStatusContainer cloudVmStatusContainerMock) { + when(cloudVmStatusContainerMock.getEndTime()).thenReturn(null); + CloudVmStatus vmstatus = new CloudVmStatus("i-123456789", "123", "sg-123456", JobStatus.Running, VMImageType.AGENT, VMRegion.STANDALONE, VMStatus.running, new ValidationStatus(), 1, 1, new Date(), new Date()); + Set set = Stream.of(vmstatus).collect(Collectors.toCollection(HashSet::new)); + when(cloudVmStatusContainerMock.getStatuses()).thenReturn(set); + when(vmTrackerMock.getVmStatusForJob(null)).thenReturn(cloudVmStatusContainerMock); + + VMInformation vmInformation = new VMInformation(); + vmInformation.setState("running"); + vmInformation.setInstanceId("i-123456789"); + List vmInfo = Collections.singletonList( vmInformation ); +// when(amazonInstanceMock.describeInstances(Mockito.anyString())).thenReturn(vmInfo); + VMInstanceRequest instanceRequest = new VMInstanceRequest(); + instanceRequest.setRegion(VMRegion.STANDALONE); + + AgentWatchdog agentWatchdog = new AgentWatchdog(instanceRequest, vmInfo, vmTrackerMock, amazonInstanceMock, 10, 1000); + + agentWatchdog.run(); +// verify(amazonInstanceMock, times(1)).describeInstances("i-123456789"); + verify(amazonInstanceMock, never()).killInstances(Mockito.anyList()); + verify(amazonInstanceMock, never()).reboot(Mockito.anyList()); + verify(cloudVmStatusContainerMock, times(1)).getEndTime(); + verify(cloudVmStatusContainerMock, times(1)).getStatuses(); + } + + @Test + public void progressToRunningRunTest(@Mock VMTracker vmTrackerMock, @Mock CloudVmStatusContainer cloudVmStatusContainerMock) { + when(cloudVmStatusContainerMock.getEndTime()).thenReturn(null); + CloudVmStatus vmstatusStarting = new CloudVmStatus("i-123456789", "123", "sg-123456", JobStatus.Starting, VMImageType.AGENT, VMRegion.STANDALONE, VMStatus.starting, new ValidationStatus(), 1, 1, new Date(), new Date()); + CloudVmStatus vmstatusPending = new CloudVmStatus("i-123456789", "123", "sg-123456", JobStatus.Starting, VMImageType.AGENT, VMRegion.STANDALONE, VMStatus.pending, new ValidationStatus(), 1, 1, new Date(), new Date()); + CloudVmStatus vmstatusRunning = new CloudVmStatus("i-123456789", "123", "sg-123456", JobStatus.Running, VMImageType.AGENT, VMRegion.STANDALONE, VMStatus.running, new ValidationStatus(), 1, 1, new Date(), new Date()); + Set setStarting = Stream.of(vmstatusStarting).collect(Collectors.toCollection(HashSet::new)); + Set setPending = Stream.of(vmstatusPending).collect(Collectors.toCollection(HashSet::new)); + Set setRunning = Stream.of(vmstatusRunning).collect(Collectors.toCollection(HashSet::new)); + when(cloudVmStatusContainerMock.getStatuses()).thenReturn(setStarting).thenReturn(setPending).thenReturn(setRunning); + when(vmTrackerMock.getVmStatusForJob(null)).thenReturn(cloudVmStatusContainerMock); + + VMInformation vmInformation = new VMInformation(); + vmInformation.setState("running"); + vmInformation.setInstanceId("i-123456789"); + List vmInfo = Collections.singletonList( vmInformation ); +// when(amazonInstanceMock.describeInstances(Mockito.anyString())).thenReturn(vmInfo); + VMInstanceRequest instanceRequest = new VMInstanceRequest(); + instanceRequest.setRegion(VMRegion.STANDALONE); + + AgentWatchdog agentWatchdog = new AgentWatchdog(instanceRequest, vmInfo, vmTrackerMock, amazonInstanceMock, 10, 1000); + + agentWatchdog.run(); +// verify(amazonInstanceMock, times(1)).describeInstances("i-123456789"); + verify(amazonInstanceMock, never()).killInstances(Mockito.anyList()); + verify(amazonInstanceMock, never()).reboot(Mockito.anyList()); + verify(cloudVmStatusContainerMock, times(3)).getEndTime(); + verify(cloudVmStatusContainerMock, times(3)).getStatuses(); + } +} diff --git a/tank_vmManager/src/test/resources/log4j2.xml b/tank_vmManager/src/test/resources/log4j2.xml new file mode 100644 index 000000000..8975e7d8b --- /dev/null +++ b/tank_vmManager/src/test/resources/log4j2.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file