Skip to content

Commit

Permalink
Add new fields in datasource (opensearch-project#325)
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jul 21, 2023
1 parent 91b99d8 commit 7a83df4
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
builder.field(FIELD_NAME_NAME.getPreferredName(), datasource.getName());
builder.field(FIELD_NAME_STATE.getPreferredName(), datasource.getState());
builder.field(FIELD_NAME_ENDPOINT.getPreferredName(), datasource.getEndpoint());
builder.field(FIELD_NAME_UPDATE_INTERVAL.getPreferredName(), datasource.getSchedule().getInterval());
builder.field(FIELD_NAME_UPDATE_INTERVAL.getPreferredName(), datasource.getUserSchedule().getInterval());
builder.timeField(
FIELD_NAME_NEXT_UPDATE_AT.getPreferredName(),
FIELD_NAME_NEXT_UPDATE_AT_READABLE.getPreferredName(),
datasource.getSchedule().getNextExecutionTime(Instant.now()).toEpochMilli()
datasource.getUserSchedule().getNextExecutionTime(Instant.now()).toEpochMilli()
);
builder.field(FIELD_NAME_DATABASE.getPreferredName(), datasource.getDatabase());
builder.field(FIELD_NAME_UPDATE_STATS.getPreferredName(), datasource.getUpdateStats());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,12 @@ private void updateIfChanged(final UpdateDatasourceRequest request, final Dataso
}

if (isUpdateIntervalChanged(request, datasource)) {
datasource.setSchedule(
new IntervalSchedule(datasource.getSchedule().getStartTime(), (int) request.getUpdateInterval().getDays(), ChronoUnit.DAYS)
datasource.setUserSchedule(
new IntervalSchedule(
datasource.getUserSchedule().getStartTime(),
(int) request.getUpdateInterval().getDays(),
ChronoUnit.DAYS
)
);
isChanged = true;
}
Expand Down Expand Up @@ -163,7 +167,7 @@ private void validateUpdateIntervalIsLessThanValidForInDays(final UpdateDatasour

long updateInterval = isUpdateIntervalChanged(request, datasource)
? request.getUpdateInterval().days()
: datasource.getSchedule().getInterval();
: datasource.getUserSchedule().getInterval();

if (updateInterval >= validForInDays) {
throw new InvalidParameterException(
Expand All @@ -177,6 +181,7 @@ private boolean isEndpointChanged(final UpdateDatasourceRequest request, final D
}

private boolean isUpdateIntervalChanged(final UpdateDatasourceRequest request, final Datasource datasource) {
return request.getUpdateInterval() != null && (int) request.getUpdateInterval().days() != datasource.getSchedule().getInterval();
return request.getUpdateInterval() != null
&& (int) request.getUpdateInterval().days() != datasource.getUserSchedule().getInterval();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,28 @@ public class Datasource implements Writeable, ScheduledJobParameter {
private static final ParseField ENABLED_FIELD = new ParseField("update_enabled");
private static final ParseField LAST_UPDATE_TIME_FIELD = new ParseField("last_update_time");
private static final ParseField LAST_UPDATE_TIME_FIELD_READABLE = new ParseField("last_update_time_field");
private static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
/**
* Schedule that user set
*/
private static final ParseField USER_SCHEDULE_FIELD = new ParseField("user_schedule");
/**
* System schedule which will be used by job scheduler
*
* If datasource is going to get expired before next update, we want to run clean up task before the next update
* by changing system schedule.
*
* If datasource is restored from snapshot, we want to run clean up task immediately to handle expired datasource
* by changing system schedule.
*
* For every task run, we revert system schedule back to user schedule.
*/
private static final ParseField SYSTEM_SCHEDULE_FIELD = new ParseField("system_schedule");
/**
* {@link DatasourceTask} that DatasourceRunner will execute in next run
*
* For every task run, we revert task back to {@link DatasourceTask#ALL}
*/
private static final ParseField TASK_FIELD = new ParseField("task");
private static final ParseField ENABLED_TIME_FIELD = new ParseField("enabled_time");
private static final ParseField ENABLED_TIME_FIELD_READABLE = new ParseField("enabled_time_field");

Expand Down Expand Up @@ -97,10 +118,22 @@ public class Datasource implements Writeable, ScheduledJobParameter {
*/
private boolean isEnabled;
/**
* @param schedule Schedule for a GeoIP data update
* @return Schedule for the job scheduler
* @param userSchedule Schedule that user provided
* @return Schedule that user provided
*/
private IntervalSchedule userSchedule;

/**
* @param systemSchedule Schedule that job scheduler use
* @return Schedule that job scheduler use
*/
private IntervalSchedule schedule;
private IntervalSchedule systemSchedule;

/**
* @param task Task that {@link DatasourceRunner} will execute
* @return Task that {@link DatasourceRunner} will execute
*/
private DatasourceTask task;

/**
* Additional variables for datasource
Expand Down Expand Up @@ -143,18 +176,22 @@ public class Datasource implements Writeable, ScheduledJobParameter {
Instant lastUpdateTime = Instant.ofEpochMilli((long) args[1]);
Instant enabledTime = args[2] == null ? null : Instant.ofEpochMilli((long) args[2]);
boolean isEnabled = (boolean) args[3];
IntervalSchedule schedule = (IntervalSchedule) args[4];
String endpoint = (String) args[5];
DatasourceState state = DatasourceState.valueOf((String) args[6]);
List<String> indices = (List<String>) args[7];
Database database = (Database) args[8];
UpdateStats updateStats = (UpdateStats) args[9];
IntervalSchedule userSchedule = (IntervalSchedule) args[4];
IntervalSchedule systemSchedule = (IntervalSchedule) args[5];
DatasourceTask task = DatasourceTask.valueOf((String) args[6]);
String endpoint = (String) args[7];
DatasourceState state = DatasourceState.valueOf((String) args[8]);
List<String> indices = (List<String>) args[9];
Database database = (Database) args[10];
UpdateStats updateStats = (UpdateStats) args[11];
Datasource parameter = new Datasource(
name,
lastUpdateTime,
enabledTime,
isEnabled,
schedule,
userSchedule,
systemSchedule,
task,
endpoint,
state,
indices,
Expand All @@ -170,7 +207,9 @@ public class Datasource implements Writeable, ScheduledJobParameter {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_UPDATE_TIME_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), ENABLED_TIME_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ScheduleParser.parse(p), SCHEDULE_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ScheduleParser.parse(p), USER_SCHEDULE_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ScheduleParser.parse(p), SYSTEM_SCHEDULE_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), TASK_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), ENDPOINT_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_FIELD);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD);
Expand All @@ -190,6 +229,8 @@ public Datasource(final String name, final IntervalSchedule schedule, final Stri
null,
false,
schedule,
schedule,
DatasourceTask.ALL,
endpoint,
DatasourceState.CREATING,
new ArrayList<>(),
Expand All @@ -203,7 +244,9 @@ public Datasource(final StreamInput in) throws IOException {
lastUpdateTime = toInstant(in.readVLong());
enabledTime = toInstant(in.readOptionalVLong());
isEnabled = in.readBoolean();
schedule = new IntervalSchedule(in);
userSchedule = new IntervalSchedule(in);
systemSchedule = new IntervalSchedule(in);
task = DatasourceTask.valueOf(in.readString());
endpoint = in.readString();
state = DatasourceState.valueOf(in.readString());
indices = in.readStringList();
Expand All @@ -217,7 +260,9 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeVLong(lastUpdateTime.toEpochMilli());
out.writeOptionalVLong(enabledTime == null ? null : enabledTime.toEpochMilli());
out.writeBoolean(isEnabled);
schedule.writeTo(out);
userSchedule.writeTo(out);
systemSchedule.writeTo(out);
out.writeString(task.name());
out.writeString(endpoint);
out.writeString(state.name());
out.writeStringCollection(indices);
Expand All @@ -242,7 +287,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
);
}
builder.field(ENABLED_FIELD.getPreferredName(), isEnabled);
builder.field(SCHEDULE_FIELD.getPreferredName(), schedule);
builder.field(USER_SCHEDULE_FIELD.getPreferredName(), userSchedule);
builder.field(SYSTEM_SCHEDULE_FIELD.getPreferredName(), systemSchedule);
builder.field(TASK_FIELD.getPreferredName(), task.name());
builder.field(ENDPOINT_FIELD.getPreferredName(), endpoint);
builder.field(STATE_FIELD.getPreferredName(), state.name());
builder.field(INDICES_FIELD.getPreferredName(), indices);
Expand All @@ -269,7 +316,7 @@ public Instant getEnabledTime() {

@Override
public IntervalSchedule getSchedule() {
return schedule;
return systemSchedule;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.jobscheduler;

/**
* Task that {@link DatasourceRunner} will run
*/
public enum DatasourceTask {
/**
* Do everything
*/
ALL,

/**
* Only delete unused indices
*/
DELETE_UNUSED_INDICES
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.geospatial.ip2geo.common.Ip2GeoProcessorFacade;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
import org.opensearch.ingest.IngestMetadata;
Expand Down Expand Up @@ -143,6 +144,13 @@ protected DatasourceState randomState() {
.get(Randomness.createSecure().nextInt(DatasourceState.values().length - 1));
}

protected DatasourceTask randomTask() {
return Arrays.stream(DatasourceTask.values())
.sequential()
.collect(Collectors.toList())
.get(Randomness.createSecure().nextInt(DatasourceTask.values().length - 1));
}

protected String randomIpAddress() {
return String.format(
Locale.ROOT,
Expand Down Expand Up @@ -183,7 +191,9 @@ protected Datasource randomDatasource() {
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Datasource datasource = new Datasource();
datasource.setName(GeospatialTestHelper.randomLowerCaseString());
datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(28) + 1, ChronoUnit.DAYS));
datasource.setUserSchedule(new IntervalSchedule(now, Randomness.get().nextInt(28) + 1, ChronoUnit.DAYS));
datasource.setSystemSchedule(datasource.getUserSchedule());
datasource.setTask(randomTask());
datasource.setState(randomState());
datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString()));
datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ public void testToXContent_whenValidInput_thenSucceed() throws Exception {
assertTrue(json.contains(String.format(Locale.ROOT, "\"name\":\"%s\"", datasource.getName())));
assertTrue(json.contains(String.format(Locale.ROOT, "\"state\":\"%s\"", datasource.getState())));
assertTrue(json.contains(String.format(Locale.ROOT, "\"endpoint\":\"%s\"", datasource.getEndpoint())));
assertTrue(json.contains(String.format(Locale.ROOT, "\"update_interval_in_days\":%d", datasource.getSchedule().getInterval())));
assertTrue(
json.contains(String.format(Locale.ROOT, "\"update_interval_in_days\":%d", datasource.getUserSchedule().getInterval()))
);
assertTrue(json.contains(String.format(Locale.ROOT, "\"next_update_at_in_epoch_millis\"")));
assertTrue(json.contains(String.format(Locale.ROOT, "\"provider\":\"%s\"", datasource.getDatabase().getProvider())));
assertTrue(json.contains(String.format(Locale.ROOT, "\"sha256_hash\":\"%s\"", datasource.getDatabase().getSha256Hash())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void testInternalDoExecute_whenValidInput_thenSucceed() {
verify(datasourceFacade).putDatasource(datasourceCaptor.capture(), actionListenerCaptor.capture());
assertEquals(request.getName(), datasourceCaptor.getValue().getName());
assertEquals(request.getEndpoint(), datasourceCaptor.getValue().getEndpoint());
assertEquals(request.getUpdateInterval().days(), datasourceCaptor.getValue().getSchedule().getInterval());
assertEquals(request.getUpdateInterval().days(), datasourceCaptor.getValue().getUserSchedule().getInterval());

// Run next listener.onResponse
actionListenerCaptor.getValue().onResponse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testDoExecute_whenValidInput_thenUpdate() {
verify(datasourceFacade).updateDatasource(datasource);
verify(datasourceUpdateService).getHeaderFields(request.getEndpoint());
assertEquals(request.getEndpoint(), datasource.getEndpoint());
assertEquals(request.getUpdateInterval().days(), datasource.getSchedule().getInterval());
assertEquals(request.getUpdateInterval().days(), datasource.getUserSchedule().getInterval());
verify(listener).onResponse(new AcknowledgedResponse(true));
verify(ip2GeoLockService).releaseLock(eq(lockModel));
}
Expand All @@ -119,7 +119,7 @@ public void testDoExecute_whenValidInput_thenUpdate() {
public void testDoExecute_whenNoChangesInValues_thenNoUpdate() {
Datasource datasource = randomDatasource();
UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName());
request.setUpdateInterval(TimeValue.timeValueDays(datasource.getSchedule().getInterval()));
request.setUpdateInterval(TimeValue.timeValueDays(datasource.getUserSchedule().getInterval()));
request.setEndpoint(datasource.getEndpoint());

Task task = mock(Task.class);
Expand Down

0 comments on commit 7a83df4

Please sign in to comment.