Skip to content

Commit

Permalink
introduce method to query jobs database by end timestamp of attempts (#…
Browse files Browse the repository at this point in the history
…6505)

* introduce method to query jobs database by end timestamp of attempts

* do not truncate milliseconds

* format changes
  • Loading branch information
subodh1810 authored Sep 29, 2021
1 parent 6b4c6a1 commit 7d1a67f
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,10 @@ public void failAttempt(long jobId, int attemptNumber) throws IOException {
updateJobStatusIfNotInTerminalState(ctx, jobId, JobStatus.INCOMPLETE, now);

ctx.execute(
"UPDATE attempts SET status = CAST(? as ATTEMPT_STATUS), updated_at = ? WHERE job_id = ? AND attempt_number = ?",
"UPDATE attempts SET status = CAST(? as ATTEMPT_STATUS), updated_at = ? , ended_at = ? WHERE job_id = ? AND attempt_number = ?",
Sqls.toSqlName(AttemptStatus.FAILED),
now,
now,
jobId,
attemptNumber);
return null;
Expand All @@ -265,9 +266,10 @@ public void succeedAttempt(long jobId, int attemptNumber) throws IOException {
updateJobStatus(ctx, jobId, JobStatus.SUCCEEDED, now);

ctx.execute(
"UPDATE attempts SET status = CAST(? as ATTEMPT_STATUS), updated_at = ? WHERE job_id = ? AND attempt_number = ?",
"UPDATE attempts SET status = CAST(? as ATTEMPT_STATUS), updated_at = ? , ended_at = ? WHERE job_id = ? AND attempt_number = ?",
Sqls.toSqlName(AttemptStatus.SUCCEEDED),
now,
now,
jobId,
attemptNumber);
return null;
Expand Down Expand Up @@ -408,6 +410,16 @@ public Optional<Job> getNextJob() throws IOException {
.flatMap(r -> getJobOptional(ctx, r.get("job_id", Long.class))));
}

@Override
public List<Job> listJobs(ConfigType configType, Instant attemptEndedAtTimestamp) throws IOException {
final LocalDateTime timeConvertedIntoLocalDateTime = LocalDateTime.ofInstant(attemptEndedAtTimestamp, ZoneOffset.UTC);
return database.query(ctx -> getJobsFromResult(ctx
.fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE " +
"CAST(config_type AS VARCHAR) = ? AND " +
" attempts.ended_at > ? ORDER BY jobs.created_at ASC, attempts.created_at ASC", Sqls.toSqlName(configType),
timeConvertedIntoLocalDateTime)));
}

private static List<Job> getJobsFromResult(Result<Record> result) {
// keeps results strictly in order so the sql query controls the sort
List<Job> jobs = new ArrayList<Job>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.State;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -125,6 +127,15 @@ public interface JobPersistence {
*/
List<Job> listJobs(Set<JobConfig.ConfigType> configTypes, String configId, int limit, int offset) throws IOException;

/**
*
* @param configType The type of job
* @param attemptEndedAtTimestamp The timestamp after which you want the jobs
* @return List of jobs that have attempts after the provided timestamp
* @throws IOException
*/
List<Job> listJobs(ConfigType configType, Instant attemptEndedAtTimestamp) throws IOException;

List<Job> listJobs(JobConfig.ConfigType configType, String configId, int limit, int offset) throws IOException;

List<Job> listJobsWithStatus(JobStatus status) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ public static void dbDown() {
}

private static Attempt createAttempt(long id, long jobId, AttemptStatus status, Path logPath) {
return new Attempt(
id,
jobId,
logPath,
null,
status,
NOW.getEpochSecond(),
NOW.getEpochSecond(),
NOW.getEpochSecond());
}

private static Attempt createUnfinishedAttempt(long id, long jobId, AttemptStatus status, Path logPath) {
return new Attempt(
id,
jobId,
Expand Down Expand Up @@ -293,6 +305,90 @@ void testExportImport() throws IOException, SQLException {
assertEquals(expected, actual);
}

@Test
@DisplayName("Should return correct set of jobs when querying on end timestamp")
void testListJobsWithTimestamp() throws IOException {
Supplier<Instant> timeSupplier = mock(Supplier.class);
// TODO : Once we fix the problem of precision loss in DefaultJobPersistence, change the test value
// to contain milliseconds as well
Instant now = Instant.parse("2021-01-01T00:00:00Z");
when(timeSupplier.get()).thenReturn(
now,
now.plusSeconds(1),
now.plusSeconds(2),
now.plusSeconds(3),
now.plusSeconds(4),
now.plusSeconds(5),
now.plusSeconds(6),
now.plusSeconds(7),
now.plusSeconds(8),
now.plusSeconds(9),
now.plusSeconds(10),
now.plusSeconds(11),
now.plusSeconds(12),
now.plusSeconds(13),
now.plusSeconds(14),
now.plusSeconds(15),
now.plusSeconds(16));
jobPersistence = new DefaultJobPersistence(database, timeSupplier, 30, 500, 10);
final long syncJobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
final int syncJobAttemptNumber0 = jobPersistence.createAttempt(syncJobId, LOG_PATH);
jobPersistence.failAttempt(syncJobId, syncJobAttemptNumber0);
final Path syncJobSecondAttemptLogPath = LOG_PATH.resolve("2");
final int syncJobAttemptNumber1 = jobPersistence.createAttempt(syncJobId, syncJobSecondAttemptLogPath);
jobPersistence.failAttempt(syncJobId, syncJobAttemptNumber1);

final long specJobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
final int specJobAttemptNumber0 = jobPersistence.createAttempt(specJobId, LOG_PATH);
jobPersistence.failAttempt(specJobId, specJobAttemptNumber0);
final Path specJobSecondAttemptLogPath = LOG_PATH.resolve("2");
final int specJobAttemptNumber1 = jobPersistence.createAttempt(specJobId, specJobSecondAttemptLogPath);
jobPersistence.succeedAttempt(specJobId, specJobAttemptNumber1);

List<Job> jobs = jobPersistence.listJobs(ConfigType.SYNC, Instant.EPOCH);
assertEquals(jobs.size(), 1);
assertEquals(jobs.get(0).getId(), syncJobId);
assertEquals(jobs.get(0).getAttempts().size(), 2);
assertEquals(jobs.get(0).getAttempts().get(0).getId(), 0);
assertEquals(jobs.get(0).getAttempts().get(1).getId(), 1);

final Path syncJobThirdAttemptLogPath = LOG_PATH.resolve("3");
final int syncJobAttemptNumber2 = jobPersistence.createAttempt(syncJobId, syncJobThirdAttemptLogPath);
jobPersistence.succeedAttempt(syncJobId, syncJobAttemptNumber2);

final long newSyncJobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
final int newSyncJobAttemptNumber0 = jobPersistence.createAttempt(newSyncJobId, LOG_PATH);
jobPersistence.failAttempt(newSyncJobId, newSyncJobAttemptNumber0);
final Path newSyncJobSecondAttemptLogPath = LOG_PATH.resolve("2");
final int newSyncJobAttemptNumber1 = jobPersistence.createAttempt(newSyncJobId, newSyncJobSecondAttemptLogPath);
jobPersistence.succeedAttempt(newSyncJobId, newSyncJobAttemptNumber1);

Long maxEndedAtTimestamp = jobs.get(0).getAttempts().stream().map(c -> c.getEndedAtInSecond().orElseThrow()).max(Long::compareTo).orElseThrow();

List<Job> secondQueryJobs = jobPersistence.listJobs(ConfigType.SYNC, Instant.ofEpochSecond(maxEndedAtTimestamp));
assertEquals(secondQueryJobs.size(), 2);
assertEquals(secondQueryJobs.get(0).getId(), syncJobId);
assertEquals(secondQueryJobs.get(0).getAttempts().size(), 1);
assertEquals(secondQueryJobs.get(0).getAttempts().get(0).getId(), 2);

assertEquals(secondQueryJobs.get(1).getId(), newSyncJobId);
assertEquals(secondQueryJobs.get(1).getAttempts().size(), 2);
assertEquals(secondQueryJobs.get(1).getAttempts().get(0).getId(), 0);
assertEquals(secondQueryJobs.get(1).getAttempts().get(1).getId(), 1);

Long maxEndedAtTimestampAfterSecondQuery = -1L;
for (Job c : secondQueryJobs) {
List<Attempt> attempts = c.getAttempts();
Long maxEndedAtTimestampForJob = attempts.stream().map(attempt -> attempt.getEndedAtInSecond().orElseThrow())
.max(Long::compareTo).orElseThrow();
if (maxEndedAtTimestampForJob > maxEndedAtTimestampAfterSecondQuery) {
maxEndedAtTimestampAfterSecondQuery = maxEndedAtTimestampForJob;
}
}

assertEquals(0, jobPersistence.listJobs(ConfigType.SYNC, Instant.ofEpochSecond(maxEndedAtTimestampAfterSecondQuery)).size());
}

@Test
@DisplayName("Should have valid yaml schemas in exported database")
void testYamlSchemas() throws IOException {
Expand Down Expand Up @@ -452,7 +548,7 @@ void testCreateAttempt() throws IOException {
jobId,
SPEC_JOB_CONFIG,
JobStatus.RUNNING,
Lists.newArrayList(createAttempt(0L, jobId, AttemptStatus.RUNNING, LOG_PATH)),
Lists.newArrayList(createUnfinishedAttempt(0L, jobId, AttemptStatus.RUNNING, LOG_PATH)),
NOW.getEpochSecond());
assertEquals(expected, actual);
}
Expand Down Expand Up @@ -487,7 +583,7 @@ void testCreateAttemptWhileAttemptAlreadyRunning() throws IOException {
jobId,
SPEC_JOB_CONFIG,
JobStatus.RUNNING,
Lists.newArrayList(createAttempt(0L, jobId, AttemptStatus.RUNNING, LOG_PATH)),
Lists.newArrayList(createUnfinishedAttempt(0L, jobId, AttemptStatus.RUNNING, LOG_PATH)),
NOW.getEpochSecond());
assertEquals(expected, actual);
}
Expand Down

0 comments on commit 7d1a67f

Please sign in to comment.