Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
aschey-forpeople committed Jan 14, 2025
2 parents cc493fe + 1451e85 commit 8726916
Show file tree
Hide file tree
Showing 31 changed files with 1,641 additions and 228 deletions.
2 changes: 1 addition & 1 deletion .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,5 @@ Please indicate if this PR does any of the following:

### Validation

Have you fully verified and tested these changes? Is the acceptance criteria met? Please provide reproducible testing instructions, code snippets, or screenshots as applicable.
**Have you fully verified and tested these changes? Is the acceptance criteria met? Please provide reproducible testing instructions, code snippets, or screenshots as applicable.**

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import gov.cms.bfd.pipeline.rda.grpc.RdaServerJob;
import gov.cms.bfd.pipeline.sharedutils.PipelineApplicationState;
import gov.cms.bfd.pipeline.sharedutils.PipelineJob;
import gov.cms.bfd.pipeline.sharedutils.PipelineOutcome;
import gov.cms.bfd.pipeline.sharedutils.ec2.AwsEc2Client;
import gov.cms.bfd.pipeline.sharedutils.s3.AwsS3ClientFactory;
import gov.cms.bfd.sharedutils.config.AppConfigurationException;
import gov.cms.bfd.sharedutils.config.AwsClientConfig;
Expand Down Expand Up @@ -66,6 +68,7 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.postgresql.core.BaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -76,9 +79,13 @@
* the specified S3 bucket, parse it, and push it to the specified database server. See {@link
* #main(String[])}.
*/
@RequiredArgsConstructor
public final class PipelineApplication {
static final Logger LOGGER = LoggerFactory.getLogger(PipelineApplication.class);

/** EC2 client. */
private final AwsEc2Client ec2Client;

/** This {@link System#exit(int)} value should be used when the application exits successfully. */
static final int EXIT_CODE_SUCCESS = 0;

Expand Down Expand Up @@ -114,7 +121,7 @@ public final class PipelineApplication {
* variables)
*/
public static void main(String[] args) {
int exitCode = new PipelineApplication().runPipelineAndHandleExceptions();
int exitCode = new PipelineApplication(new AwsEc2Client()).runPipelineAndHandleExceptions();
System.exit(exitCode);
}

Expand All @@ -127,7 +134,14 @@ public static void main(String[] args) {
@VisibleForTesting
int runPipelineAndHandleExceptions() {
try {
runPipeline();
PipelineOutcome outcome = runPipeline();
if (outcome == PipelineOutcome.TERMINATE_INSTANCE) {
// When we trigger a scale-in, the instance gets terminated very quickly,
// so we should call this at the last minute after all log messages have been flushed.
// We can't schedule a scale-in in the future without causing a race condition that may
// prevent the next scale-out from happening.
ec2Client.scaleInNow();
}
return EXIT_CODE_SUCCESS;
} catch (FatalAppException ex) {
if (ex.getCause() != null) {
Expand Down Expand Up @@ -162,8 +176,9 @@ ConfigLoader createConfigLoader() {
* @throws FatalAppException if app shutdown required
* @throws IOException for I/O errors
* @throws SQLException for database errors
* @return outcome
*/
private void runPipeline() throws FatalAppException, SQLException, IOException {
private PipelineOutcome runPipeline() throws FatalAppException, SQLException, IOException {

LOGGER.info("Application starting up!");
logTempDirectory();
Expand Down Expand Up @@ -192,7 +207,7 @@ private void runPipeline() throws FatalAppException, SQLException, IOException {
try (HikariDataSource pooledDataSource =
PipelineApplicationState.createPooledDataSource(dataSourceFactory, appMetrics)) {
logDatabaseDetails(pooledDataSource);
createJobsAndRunPipeline(appConfig, appMeters, appMetrics, pooledDataSource);
return createJobsAndRunPipeline(appConfig, appMeters, appMetrics, pooledDataSource);
}
}

Expand Down Expand Up @@ -305,9 +320,10 @@ private void logDatabaseDetails(HikariDataSource pooledDataSource) throws SQLExc
* @param appMeters the app meters
* @param appMetrics the {@link MetricRegistry} to receive metrics
* @param pooledDataSource our connection pool
* @return pipeline outcome
* @throws FatalAppException if app shutdown required
*/
private void createJobsAndRunPipeline(
private PipelineOutcome createJobsAndRunPipeline(
AppConfiguration appConfig,
CompositeMeterRegistry appMeters,
MetricRegistry appMetrics,
Expand All @@ -330,12 +346,14 @@ private void createJobsAndRunPipeline(
pipelineManager.start();
LOGGER.info("Job processing started.");

pipelineManager.awaitCompletion();
PipelineOutcome pipelineOutcome = pipelineManager.awaitCompletion();

if (pipelineManager.getError() != null) {
throw new FatalAppException(
"Pipeline job threw exception", pipelineManager.getError(), EXIT_CODE_JOB_FAILED);
}

return pipelineOutcome;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@
import java.time.Instant;
import java.util.Optional;
import java.util.regex.Pattern;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
* Wrapper for a {@link PipelineJob} that runs the job on a schedule. Implements {@link Runnable} so
* it can be submitted to an {@link java.util.concurrent.ExecutorService}.
*/
@Slf4j
@AllArgsConstructor
@RequiredArgsConstructor
public class PipelineJobRunner implements Runnable {
/** Object that tracks the status of all job runs. */
private final Tracker tracker;
Expand All @@ -39,6 +39,7 @@ public class PipelineJobRunner implements Runnable {
*/
@Override
public void run() {
PipelineJobOutcome outcome = null;
try {
// This try-with-resources guarantees job's close method is called.
// Nested within the outer try because we don't want stoppingNormally to be
Expand All @@ -50,8 +51,9 @@ public void run() {
.map(Duration::toMillis)
.orElse(0L);
while (tracker.jobsCanRun()) {
runJob();
if (repeatMillis <= 0 || !tracker.jobsCanRun()) {
outcome = runJob();
boolean shouldTerminate = outcome == PipelineJobOutcome.SHOULD_TERMINATE;
if (shouldTerminate || repeatMillis <= 0 || !tracker.jobsCanRun()) {
break;
}
tracker.sleeping(job);
Expand All @@ -64,16 +66,17 @@ public void run() {
} catch (Exception ex) {
tracker.stoppingDueToException(job, ex);
} finally {
tracker.stopped(job);
tracker.stopped(job, outcome);
}
}

/**
* Runs the job once and reports its outcome to the {@link Tracker}.
*
* @return PipelineJobOutcome outcome
* @throws Exception passed through if the job terminates with an exception
*/
private void runJob() throws Exception {
private PipelineJobOutcome runJob() throws Exception {
final long id = tracker.beginningRun(job);
final Instant startTime = clock.instant();
PipelineJobOutcome outcome = null;
Expand All @@ -99,6 +102,8 @@ private void runJob() throws Exception {
if (exception != null) {
throw exception;
}

return outcome;
}

/** Summarizes the results of a job run. */
Expand Down Expand Up @@ -237,7 +242,8 @@ public interface Tracker {
* Notifies the tracker that a job has stopped.
*
* @param job the job that has stopped
* @param outcome job outcome
*/
void stopped(PipelineJob job);
void stopped(PipelineJob job, PipelineJobOutcome outcome);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import gov.cms.bfd.pipeline.sharedutils.PipelineJob;
import gov.cms.bfd.pipeline.sharedutils.PipelineJobOutcome;
import gov.cms.bfd.pipeline.sharedutils.PipelineOutcome;
import gov.cms.bfd.sharedutils.interfaces.ThrowingConsumer;
import jakarta.annotation.Nullable;
import java.time.Clock;
Expand Down Expand Up @@ -51,6 +53,9 @@ public class PipelineManager implements PipelineJobRunner.Tracker {
/** Recent job results for use by tests. */
private final LinkedList<PipelineJobRunner.JobRunSummary> completedJobs;

/** Termination requested. */
private boolean terminationRequested = false;

/**
* True if all jobs are interruptable. When false we can't interrupt the pool for faster
* shutdowns.
Expand Down Expand Up @@ -144,8 +149,10 @@ public synchronized void stop() {
* <p>External causes like jobs completing on their own or calls to {@link #stop} from the
* shutdown handler will cause the pool to shut down gracefully while we wait and thus allow us to
* return.
*
* @return outcome.
*/
public void awaitCompletion() {
public PipelineOutcome awaitCompletion() {
// Calls to the latch are automatically synchronized on the latch.
while (latch.getCount() > 0) {
try {
Expand All @@ -170,6 +177,12 @@ public void awaitCompletion() {
}
}
log.info("pool has terminated");

if (this.terminationRequested) {
return PipelineOutcome.TERMINATE_INSTANCE;
} else {
return PipelineOutcome.STOP_SERVICE;
}
}

/**
Expand Down Expand Up @@ -287,8 +300,11 @@ public void stoppingNormally(PipelineJob job) {
* @param job the job that has stopped
*/
@Override
public void stopped(PipelineJob job) {
public void stopped(PipelineJob job, PipelineJobOutcome outcome) {
log.info("Job stopped: " + job.getType());
if (outcome == PipelineJobOutcome.SHOULD_TERMINATE) {
this.terminationRequested = true;
}
latch.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import com.google.common.collect.ImmutableSet;
import gov.cms.bfd.DataSourceComponents;
Expand All @@ -29,6 +32,7 @@
import gov.cms.bfd.pipeline.rda.grpc.server.RdaMessageSourceFactory;
import gov.cms.bfd.pipeline.rda.grpc.server.RdaServer;
import gov.cms.bfd.pipeline.sharedutils.PipelineJob;
import gov.cms.bfd.pipeline.sharedutils.ec2.AwsEc2Client;
import gov.cms.bfd.pipeline.sharedutils.s3.S3Dao;
import gov.cms.bfd.sharedutils.config.ConfigLoader;
import gov.cms.bfd.sharedutils.json.JsonConverter;
Expand All @@ -46,6 +50,11 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import software.amazon.awssdk.utils.StringUtils;

/**
Expand All @@ -56,6 +65,8 @@
* an older assembly exists (because you haven't rebuilt it), it'll run using the old code, which
* probably isn't what you want.
*/
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public final class PipelineApplicationIT extends AbstractLocalStackS3Test {
/**
* Name of log file that will contain log output from the app. This has to match the value in our
Expand All @@ -76,6 +87,9 @@ public final class PipelineApplicationIT extends AbstractLocalStackS3Test {
/** Used to communicate with the localstack SQS service. */
private SqsDao sqsDao;

/** ec2 client. */
@Mock private AwsEc2Client ec2Client;

/**
* Locks and truncates the log file so that each test case can read only its own messages from the
* file when making assertions about log output.
Expand Down Expand Up @@ -134,6 +148,7 @@ public void missingBucket() {
// Verify the results match expectations
assertEquals(PipelineApplication.EXIT_CODE_JOB_FAILED, exitCode);
assertCcwRifLoadJobFailed(logLines);
verifyNoInteractions(ec2Client);
}

/**
Expand Down Expand Up @@ -165,6 +180,8 @@ public void noRifData() {
CcwRifLoadJobStatusEvent.JobStage.CheckingBucketForManifest,
CcwRifLoadJobStatusEvent.JobStage.NothingToDo),
readStatusEventsFromSQSQueue());
verify(ec2Client).scaleInNow();
verifyNoMoreInteractions(ec2Client);
} finally {
if (StringUtils.isNotBlank(bucket)) {
s3Dao.deleteTestBucket(bucket);
Expand Down Expand Up @@ -244,6 +261,8 @@ public void smallAmountOfRifData() {
CcwRifLoadJobStatusEvent.JobStage.ProcessingManifestDataFiles,
CcwRifLoadJobStatusEvent.JobStage.CompletedManifest),
readStatusEventsFromSQSQueue());

verifyNoInteractions(ec2Client);
} finally {
if (StringUtils.isNotBlank(bucket)) {
s3Dao.deleteTestBucket(bucket);
Expand Down Expand Up @@ -290,6 +309,7 @@ public void rdaPipeline() throws Exception {
RdaMcsClaimLoadJob.class,
"MCS job processed all claims");
});
verifyNoInteractions(ec2Client);
}

/**
Expand Down Expand Up @@ -335,6 +355,7 @@ public void rdaPipelineServerFailure() throws Exception {
RdaMcsClaimLoadJob.class,
"MCS job terminated by grpc exception");
});
verifyNoInteractions(ec2Client);
}

/**
Expand Down Expand Up @@ -371,6 +392,7 @@ public void smokeTestFailure() throws Exception {
// Verify the results match expectations
assertEquals(PipelineApplication.EXIT_CODE_SMOKE_TEST_FAILURE, exitCode);
assertASmokeTestFailureWasLogged(logLines);
verifyNoInteractions(ec2Client);
} finally {
if (StringUtils.isNotBlank(bucket)) {
s3Dao.deleteTestBucket(bucket);
Expand Down Expand Up @@ -556,7 +578,7 @@ private ConfigLoader createRdaJobConfig(int port) {
*/
private PipelineApplication createApplicationForTest(ConfigLoader configLoader) {
// using a spy lets us override and verify method calls
PipelineApplication app = spy(new PipelineApplication());
PipelineApplication app = spy(new PipelineApplication(ec2Client));

// override the default app logic with our own config
doReturn(configLoader).when(app).createConfigLoader();
Expand Down
Loading

0 comments on commit 8726916

Please sign in to comment.