Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Sep 4, 2024
1 parent 83d3687 commit 1e489b5
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,53 @@
/** Scheduler interface for scheduling asynchronous query jobs. */
public interface AsyncQueryScheduler {

/** Schedules a new job. */
/**
* Schedules a new job in the system. This method creates a new job entry based on the provided
* request parameters.
*
* <p>Use cases: - Creating a new periodic query execution - Setting up a scheduled data refresh
* task
*
* @param asyncQuerySchedulerRequest The request containing job configuration details
* @throws IllegalArgumentException if a job with the same name already exists
* @throws RuntimeException if there's an error during job creation
*/
void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);

/** Updates an existing job with new parameters. */
/**
* Updates an existing job with new parameters. This method modifies the configuration of an
* already scheduled job.
*
* <p>Use cases: - Changing the schedule of an existing job - Modifying query parameters of a
* scheduled job - Updating resource allocations for a job
*
* @param asyncQuerySchedulerRequest The request containing updated job configuration
* @throws IllegalArgumentException if the job to be updated doesn't exist
* @throws RuntimeException if there's an error during the update process
*/
void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);

/** Unschedules a job by marking it as disabled and updating its last update time. */
/**
* Unschedules a job by marking it as disabled and updating its last update time. This method is
* used when you want to temporarily stop a job from running but keep its configuration and
* history in the system.
*
* <p>Use cases: - Pausing a job that's causing issues without losing its configuration -
* Temporarily disabling a job during maintenance or high-load periods - Allowing for easy
* re-enabling of the job in the future
*
* @param jobId The unique identifier of the job to unschedule
*/
void unscheduleJob(String jobId);

/** Removes a job. */
/**
* Removes a job completely from the scheduler. This method permanently deletes the job and all
* its associated data from the system.
*
* <p>Use cases: - Cleaning up jobs that are no longer needed - Removing obsolete or erroneously
* created jobs - Freeing up resources by deleting unused job configurations
*
* @param jobId The unique identifier of the job to remove
*/
void removeJob(String jobId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ public void createDropIndexQueryWithScheduler() {
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID);

// Verifying that unscheduleJob is called on asyncQueryScheduler for external scheduler
verify(asyncQueryScheduler).unscheduleJob(indexName);
}

Expand Down Expand Up @@ -274,12 +273,10 @@ public void createVacuumIndexQueryWithScheduler() {
assertNull(response.getSessionId());
verifyGetQueryIdCalled();

// Verifying that the index is deleted
verify(flintIndexClient).deleteIndex(indexName);
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID);

// Verifying that unscheduleJob is called on asyncQueryScheduler for external scheduler
verify(asyncQueryScheduler).removeJob(indexName);
}

Expand Down Expand Up @@ -342,7 +339,6 @@ public void createAlterIndexQueryWithScheduler() {
FlintIndexOptions flintIndexOptions = flintIndexOptionsArgumentCaptor.getValue();
assertFalse(flintIndexOptions.autoRefresh());

// Verifying that unscheduleJob is called on asyncQueryScheduler for external scheduler
verify(asyncQueryScheduler).unscheduleJob(indexName);

verifyCreateIndexDMLResultCalled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public void setUp() {
asyncQueryScheduler);
}

// Helper method to create FlintIndexMetadata with latest ID
private static FlintIndexMetadata createFlintIndexMetadataWithLatestId() {
return FlintIndexMetadata.builder()
.latestId(LATEST_ID)
Expand All @@ -72,15 +71,13 @@ private static FlintIndexMetadata createFlintIndexMetadataWithLatestId() {
.build();
}

// Helper method to create FlintIndexMetadata without latest ID
private static FlintIndexMetadata createFlintIndexMetadataWithoutLatestId() {
return FlintIndexMetadata.builder()
.opensearchIndexName(INDEX_NAME)
.flintIndexOptions(new FlintIndexOptions())
.build();
}

// Helper method to create FlintIndexMetadata with external scheduler
private FlintIndexMetadata createFlintIndexMetadataWithExternalScheduler() {
FlintIndexOptions flintIndexOptions = new FlintIndexOptions();
flintIndexOptions.setOption(FlintIndexOptions.SCHEDULER_MODE, "external");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ public void testDoRefreshThrowsException() {
spyJobRunner = spy(jobRunner);
spyJobRunner.loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService);

// Create a ScheduledAsyncQueryJobRequest with valid parameters
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
.jobId("testJob")
Expand All @@ -131,10 +130,8 @@ public void testDoRefreshThrowsException() {
.queryLang(LangType.SQL)
.build();

// Mock the doRefresh method to throw an exception
doThrow(new RuntimeException("Test exception")).when(spyJobRunner).doRefresh(request);

// Capture the log output
Logger logger = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class);
Appender mockAppender = mock(Appender.class);
when(mockAppender.getName()).thenReturn("MockAppender");
Expand All @@ -151,7 +148,6 @@ public void testDoRefreshThrowsException() {
Runnable runnable = captor.getValue();
runnable.run();

// Verify that the doRefresh method was called and the exception was logged
verify(spyJobRunner).doRefresh(eq(request));
verify(mockAppender).append(any(LogEvent.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,14 @@ public void setup() {

@Test
public void testParseValidScheduleString() {
String scheduleStr = "5 minutes";
Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime);

assertEquals(new IntervalSchedule(startTime, 5, ChronoUnit.MINUTES), schedule);
verifyParseSchedule(5, "5 minutes");
}

@Test
public void testParseValidScheduleStringWithDifferentUnits() {
String scheduleStr = "2 hours";
Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime);

assertEquals(new IntervalSchedule(startTime, 120, ChronoUnit.MINUTES), schedule);

scheduleStr = "1 day";
schedule = IntervalScheduleParser.parse(scheduleStr, startTime);

assertEquals(new IntervalSchedule(startTime, 1440, ChronoUnit.MINUTES), schedule);

scheduleStr = "3 weeks";
schedule = IntervalScheduleParser.parse(scheduleStr, startTime);

assertEquals(new IntervalSchedule(startTime, 30240, ChronoUnit.MINUTES), schedule);
verifyParseSchedule(120, "2 hours");
verifyParseSchedule(1440, "1 day");
verifyParseSchedule(30240, "3 weeks");
}

@Test
Expand All @@ -61,7 +47,6 @@ public void testParseNullSchedule() {
public void testParseScheduleObject() {
IntervalSchedule expectedSchedule = new IntervalSchedule(startTime, 10, ChronoUnit.MINUTES);
Schedule schedule = IntervalScheduleParser.parse(expectedSchedule, startTime);

assertEquals(expectedSchedule, schedule);
}

Expand All @@ -79,21 +64,15 @@ public void testParseInvalidScheduleString() {

@Test
public void testParseUnsupportedUnits() {
IllegalArgumentException exception =
assertThrows(
IllegalArgumentException.class,
() -> IntervalScheduleParser.parse("1 year", startTime),
"Expected IllegalArgumentException but no exception was thrown");

assertEquals("Years cannot be converted to minutes accurately.", exception.getMessage());

exception =
assertThrows(
IllegalArgumentException.class,
() -> IntervalScheduleParser.parse("1 month", startTime),
"Expected IllegalArgumentException but no exception was thrown");

assertEquals("Months cannot be converted to minutes accurately.", exception.getMessage());
assertThrows(
IllegalArgumentException.class,
() -> IntervalScheduleParser.parse("1 year", startTime),
"Expected IllegalArgumentException but no exception was thrown");

assertThrows(
IllegalArgumentException.class,
() -> IntervalScheduleParser.parse("1 month", startTime),
"Expected IllegalArgumentException but no exception was thrown");
}

@Test
Expand All @@ -110,44 +89,34 @@ public void testParseNonStringSchedule() {

@Test
public void testParseScheduleWithNanoseconds() {
String scheduleStr = "60000000000 nanoseconds"; // Equivalent to 1 minute
Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime);

assertEquals(new IntervalSchedule(startTime, 1, ChronoUnit.MINUTES), schedule);
verifyParseSchedule(1, "60000000000 nanoseconds");
}

@Test
public void testParseScheduleWithMilliseconds() {
String scheduleStr = "60000 milliseconds"; // Equivalent to 1 minute
Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime);

assertEquals(new IntervalSchedule(startTime, 1, ChronoUnit.MINUTES), schedule);
verifyParseSchedule(1, "60000 milliseconds");
}

@Test
public void testParseScheduleWithMicroseconds() {
String scheduleStr = "60000000 microseconds"; // Equivalent to 1 minute
Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime);

assertEquals(new IntervalSchedule(startTime, 1, ChronoUnit.MINUTES), schedule);
verifyParseSchedule(1, "60000000 microseconds");
}

@Test
public void testUnsupportedTimeUnit() {
IllegalArgumentException exception =
assertThrows(
IllegalArgumentException.class,
() -> IntervalScheduleParser.convertToSupportedUnit(10, "unsupportedunit"),
"Expected IllegalArgumentException but no exception was thrown");

assertEquals("Unsupported time unit: unsupportedunit", exception.getMessage());
assertThrows(
IllegalArgumentException.class,
() -> IntervalScheduleParser.convertToSupportedUnit(10, "unsupportedunit"),
"Expected IllegalArgumentException but no exception was thrown");
}

@Test
public void testParseScheduleWithSeconds() {
String scheduleStr = "120 seconds"; // Equivalent to 2 minutes
Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime);
verifyParseSchedule(2, "120 seconds");
}

assertEquals(new IntervalSchedule(startTime, 2, ChronoUnit.MINUTES), schedule);
private void verifyParseSchedule(int expectedMinutes, String scheduleStr) {
Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime);
assertEquals(new IntervalSchedule(startTime, expectedMinutes, ChronoUnit.MINUTES), schedule);
}
}

0 comments on commit 1e489b5

Please sign in to comment.