Skip to content

Commit

Permalink
Adjust auto-disable connection logic for spam (#12288)
Browse files Browse the repository at this point in the history
* Adjust auto-disable connection logic for spam

* refactor for readability
  • Loading branch information
terencecho authored Apr 27, 2022
1 parent 68b5191 commit c856d79
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public AutoDisableConnectionOutput autoDisableFailingConnection(final AutoDisabl
final long currTimestampInSeconds = input.getCurrTimestamp().getEpochSecond();
final Job lastJob = jobPersistence.getLastReplicationJob(input.getConnectionId())
.orElseThrow(() -> new Exception("Auto-Disable Connection should not have been attempted if can't get latest replication job."));
final Job firstJob = jobPersistence.getFirstReplicationJob(input.getConnectionId())
.orElseThrow(() -> new Exception("Auto-Disable Connection should not have been attempted if no replication job has been run."));

final List<JobWithStatusAndTimestamp> jobs = jobPersistence.listJobStatusAndTimestampWithConnection(input.getConnectionId(),
REPLICATION_TYPES, input.getCurrTimestamp().minus(maxDaysOfOnlyFailedJobs, DAYS));
Expand All @@ -73,13 +75,16 @@ public AutoDisableConnectionOutput autoDisableFailingConnection(final AutoDisabl
}
}

final boolean warningPreviouslySentForMaxDays =
warningPreviouslySentForMaxDays(numFailures, successTimestamp, maxDaysOfOnlyFailedJobsBeforeWarning, firstJob, jobs);

if (numFailures == 0) {
return new AutoDisableConnectionOutput(false);
} else if (numFailures >= configs.getMaxFailedJobsInARowBeforeConnectionDisable()) {
// disable connection if max consecutive failed jobs limit has been hit
disableConnection(input.getConnectionId(), lastJob);
return new AutoDisableConnectionOutput(true);
} else if (numFailures == maxFailedJobsInARowBeforeConnectionDisableWarning) {
} else if (numFailures == maxFailedJobsInARowBeforeConnectionDisableWarning && !warningPreviouslySentForMaxDays) {
// warn if number of consecutive failures hits 50% of MaxFailedJobsInARow
jobNotifier.autoDisableConnectionWarning(lastJob);
// explicitly send to email if customer.io api key is set, since email notification cannot be set by
Expand All @@ -91,8 +96,6 @@ public AutoDisableConnectionOutput autoDisableFailingConnection(final AutoDisabl
// calculate the number of days this connection first tried a replication job, used to ensure not to
// disable or warn for `maxDaysOfOnlyFailedJobs` if the first job is younger than
// `maxDaysOfOnlyFailedJobs` days, This avoids cases such as "the very first job run was a failure".
final Job firstJob = jobPersistence.getFirstReplicationJob(input.getConnectionId())
.orElseThrow(() -> new Exception("Auto-Disable Connection should not have been attempted if no replication job has been run."));
final int numDaysSinceFirstReplicationJob = getDaysSinceTimestamp(currTimestampInSeconds, firstJob.getCreatedAtInSecond());
final boolean firstReplicationOlderThanMaxDisableDays = numDaysSinceFirstReplicationJob >= maxDaysOfOnlyFailedJobs;
final boolean noPreviousSuccess = successTimestamp.isEmpty();
Expand All @@ -103,14 +106,22 @@ public AutoDisableConnectionOutput autoDisableFailingConnection(final AutoDisabl
return new AutoDisableConnectionOutput(true);
}

// skip warning if previously sent
if (warningPreviouslySentForMaxDays || numFailures > maxFailedJobsInARowBeforeConnectionDisableWarning) {
return new AutoDisableConnectionOutput(false);
}

final boolean firstReplicationOlderThanMaxDisableWarningDays = numDaysSinceFirstReplicationJob >= maxDaysOfOnlyFailedJobsBeforeWarning;
final boolean successOlderThanPrevFailureByMaxWarningDays = // set to true if no previous success is found
noPreviousSuccess || getDaysSinceTimestamp(currTimestampInSeconds, successTimestamp.get()) >= maxDaysOfOnlyFailedJobsBeforeWarning;

// send warning if there are only failed jobs in the past maxDaysOfOnlyFailedJobsBeforeWarning days
// _unless_ a warning should have already been sent in the previous failure
if (firstReplicationOlderThanMaxDisableWarningDays && successOlderThanPrevFailureByMaxWarningDays) {
sendWarningIfNotPreviouslySent(successTimestamp, maxDaysOfOnlyFailedJobsBeforeWarning, firstJob, lastJob, jobs, numFailures);
jobNotifier.autoDisableConnectionWarning(lastJob);
// explicitly send to email if customer.io api key is set, since email notification cannot be set by
// configs through UI yet
jobNotifier.notifyJobByEmail(null, CONNECTION_DISABLED_WARNING_NOTIFICATION, lastJob);
}

} catch (final Exception e) {
Expand All @@ -120,33 +131,20 @@ public AutoDisableConnectionOutput autoDisableFailingConnection(final AutoDisabl
return new AutoDisableConnectionOutput(false);
}

private void sendWarningIfNotPreviouslySent(final Optional<Long> successTimestamp,
final int maxDaysOfOnlyFailedJobsBeforeWarning,
final Job firstJob,
final Job lastJob,
final List<JobWithStatusAndTimestamp> jobs,
final int numFailures) {
if (numFailures > 1 && checkIfWarningPreviouslySent(successTimestamp, maxDaysOfOnlyFailedJobsBeforeWarning, firstJob, jobs)) {
return;
}
jobNotifier.autoDisableConnectionWarning(lastJob);
// explicitly send to email if customer.io api key is set, since email notification cannot be set by
// configs through UI yet
jobNotifier.notifyJobByEmail(null, CONNECTION_DISABLED_WARNING_NOTIFICATION, lastJob);
}

// Checks to see if warning should have been sent in the previous failure, if so skip sending of
// warning to avoid spam
// Assume warning has been sent if either of the following is true:
// 1. no success found in the time span and the previous failure occurred
// maxDaysOfOnlyFailedJobsBeforeWarning days after the first job
// 2. success found and the previous failure occurred maxDaysOfOnlyFailedJobsBeforeWarning days
// after that success
private boolean checkIfWarningPreviouslySent(final Optional<Long> successTimestamp,
final int maxDaysOfOnlyFailedJobsBeforeWarning,
final Job firstJob,
final List<JobWithStatusAndTimestamp> jobs) {
if (jobs.size() <= 1)
private boolean warningPreviouslySentForMaxDays(final int numFailures,
final Optional<Long> successTimestamp,
final int maxDaysOfOnlyFailedJobsBeforeWarning,
final Job firstJob,
final List<JobWithStatusAndTimestamp> jobs) {
// no previous warning sent if there was no previous failure
if (numFailures <= 1 || jobs.size() <= 1)
return false;

// get previous failed job (skipping first job since that's considered "current" job)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ void setUp() throws IOException {
Mockito.when(mFeatureFlags.autoDisablesFailingConnections()).thenReturn(true);
Mockito.when(mConfigs.getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable()).thenReturn(MAX_DAYS_OF_ONLY_FAILED_JOBS);
Mockito.when(mJobPersistence.getLastReplicationJob(CONNECTION_ID)).thenReturn(Optional.of(mJob));
Mockito.when(mJobPersistence.getFirstReplicationJob(CONNECTION_ID)).thenReturn(Optional.of(mJob));
}

// test warnings
Expand Down Expand Up @@ -118,7 +119,6 @@ public void testWarningNotificationsForAutoDisablingMaxDaysOfFailure() throws IO
.thenReturn(Collections.singletonList(FAILED_JOB));

Mockito.when(mConfigs.getMaxFailedJobsInARowBeforeConnectionDisable()).thenReturn(MAX_FAILURE_JOBS_IN_A_ROW);
Mockito.when(mJobPersistence.getFirstReplicationJob(CONNECTION_ID)).thenReturn(Optional.of(mJob));
Mockito.when(mJob.getCreatedAtInSecond()).thenReturn(
CURR_INSTANT.getEpochSecond() - TimeUnit.DAYS.toSeconds(MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_WARNING));

Expand All @@ -140,7 +140,27 @@ public void testWarningNotificationsDoesNotSpam() throws IOException {
CURR_INSTANT.minus(MAX_DAYS_OF_ONLY_FAILED_JOBS, ChronoUnit.DAYS))).thenReturn(jobs);

Mockito.when(mConfigs.getMaxFailedJobsInARowBeforeConnectionDisable()).thenReturn(MAX_FAILURE_JOBS_IN_A_ROW);
Mockito.when(mJobPersistence.getFirstReplicationJob(CONNECTION_ID)).thenReturn(Optional.of(mJob));
Mockito.when(mJob.getCreatedAtInSecond()).thenReturn(mJobCreateOrUpdatedInSeconds);
Mockito.when(mJob.getUpdatedAtInSecond()).thenReturn(mJobCreateOrUpdatedInSeconds);

final AutoDisableConnectionOutput output = autoDisableActivity.autoDisableFailingConnection(ACTIVITY_INPUT);
assertThat(output.isDisabled()).isFalse();
assertThat(standardSync.getStatus()).isEqualTo(Status.ACTIVE);
Mockito.verify(mJobNotifier, Mockito.never()).autoDisableConnection(Mockito.any());
Mockito.verify(mJobNotifier, Mockito.never()).autoDisableConnectionWarning(Mockito.any());
}

@Test
@DisplayName("Test that a notification warning is not sent after one was just sent for consecutive failures")
public void testWarningNotificationsDoesNotSpamAfterConsecutiveFailures() throws IOException {
final List<JobWithStatusAndTimestamp> jobs = new ArrayList<>(Collections.nCopies(MAX_FAILURE_JOBS_IN_A_ROW - 1, FAILED_JOB));
final long mJobCreateOrUpdatedInSeconds = CURR_INSTANT.getEpochSecond() - TimeUnit.DAYS.toSeconds(MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_WARNING);

Mockito.when(mConfigs.getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable()).thenReturn(MAX_DAYS_OF_ONLY_FAILED_JOBS);
Mockito.when(mJobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, REPLICATION_TYPES,
CURR_INSTANT.minus(MAX_DAYS_OF_ONLY_FAILED_JOBS, ChronoUnit.DAYS))).thenReturn(jobs);

Mockito.when(mConfigs.getMaxFailedJobsInARowBeforeConnectionDisable()).thenReturn(MAX_FAILURE_JOBS_IN_A_ROW);
Mockito.when(mJob.getCreatedAtInSecond()).thenReturn(mJobCreateOrUpdatedInSeconds);
Mockito.when(mJob.getUpdatedAtInSecond()).thenReturn(mJobCreateOrUpdatedInSeconds);

Expand All @@ -160,7 +180,6 @@ public void testOnlyFailuresButFirstJobYoungerThanMaxDaysWarning() throws IOExce
.thenReturn(Collections.singletonList(FAILED_JOB));

Mockito.when(mConfigs.getMaxFailedJobsInARowBeforeConnectionDisable()).thenReturn(MAX_FAILURE_JOBS_IN_A_ROW);
Mockito.when(mJobPersistence.getFirstReplicationJob(CONNECTION_ID)).thenReturn(Optional.of(mJob));
Mockito.when(mJob.getCreatedAtInSecond()).thenReturn(CURR_INSTANT.getEpochSecond());

final AutoDisableConnectionOutput output = autoDisableActivity.autoDisableFailingConnection(ACTIVITY_INPUT);
Expand Down Expand Up @@ -199,7 +218,6 @@ public void testLessThanMaxFailuresInARow() throws IOException {
jobs.add(SUCCEEDED_JOB);

Mockito.when(mConfigs.getMaxFailedJobsInARowBeforeConnectionDisable()).thenReturn(MAX_FAILURE_JOBS_IN_A_ROW);
Mockito.when(mJobPersistence.getFirstReplicationJob(CONNECTION_ID)).thenReturn(Optional.of(mJob));
Mockito.when(mJobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, REPLICATION_TYPES,
CURR_INSTANT.minus(MAX_DAYS_OF_ONLY_FAILED_JOBS, ChronoUnit.DAYS))).thenReturn(jobs);
Mockito.when(mJob.getCreatedAtInSecond()).thenReturn(
Expand Down Expand Up @@ -237,7 +255,6 @@ public void testOnlyFailuresInMaxDays() throws IOException, JsonValidationExcept
.thenReturn(Collections.singletonList(FAILED_JOB));

Mockito.when(mConfigs.getMaxFailedJobsInARowBeforeConnectionDisable()).thenReturn(MAX_FAILURE_JOBS_IN_A_ROW);
Mockito.when(mJobPersistence.getFirstReplicationJob(CONNECTION_ID)).thenReturn(Optional.of(mJob));
Mockito.when(mJob.getCreatedAtInSecond()).thenReturn(
CURR_INSTANT.getEpochSecond() - TimeUnit.DAYS.toSeconds(MAX_DAYS_OF_ONLY_FAILED_JOBS));
Mockito.when(mConfigRepository.getStandardSync(CONNECTION_ID)).thenReturn(standardSync);
Expand Down

0 comments on commit c856d79

Please sign in to comment.