Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove jitter and move index setting from DatasourceFacade to Datasou… #319

Merged
merged 1 commit into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

Expand All @@ -40,7 +38,6 @@
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentFactory;
Expand All @@ -62,9 +59,6 @@
@Log4j2
public class DatasourceFacade {
private static final Integer MAX_SIZE = 1000;
private static final Tuple<String, Integer> INDEX_SETTING_NUM_OF_SHARDS = new Tuple<>("index.number_of_shards", 1);
private static final Tuple<String, String> INDEX_SETTING_AUTO_EXPAND_REPLICAS = new Tuple<>("index.auto_expand_replicas", "0-all");
private static final Tuple<String, Boolean> INDEX_SETTING_HIDDEN = new Tuple<>("index.hidden", true);
private final Client client;
private final ClusterService clusterService;
private final ClusterSettings clusterSettings;
Expand All @@ -76,22 +70,17 @@ public DatasourceFacade(final Client client, final ClusterService clusterService
}

/**
* Create a datasource index of single shard with auto expand replicas to all nodes
* Create datasource index
*
* We want the index to expand to all replica so that datasource query request can be executed locally
* for faster ingestion time.
* @param stepListener setp listener
*/
public void createIndexIfNotExists(final StepListener<Void> stepListener) {
if (clusterService.state().metadata().hasIndex(DatasourceExtension.JOB_INDEX_NAME) == true) {
stepListener.onResponse(null);
return;
}
final Map<String, Object> indexSettings = new HashMap<>();
indexSettings.put(INDEX_SETTING_NUM_OF_SHARDS.v1(), INDEX_SETTING_NUM_OF_SHARDS.v2());
indexSettings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2());
indexSettings.put(INDEX_SETTING_HIDDEN.v1(), INDEX_SETTING_HIDDEN.v2());
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(DatasourceExtension.JOB_INDEX_NAME).mapping(getIndexMapping())
.settings(indexSettings);
.settings(DatasourceExtension.INDEX_SETTING);
StashedThreadContext.run(client, () -> client.admin().indices().create(createIndexRequest, new ActionListener<>() {
@Override
public void onResponse(final CreateIndexResponse createIndexResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ public class Datasource implements Writeable, ScheduledJobParameter {
* Prefix of indices having Ip2Geo data
*/
public static final String IP2GEO_DATA_INDEX_NAME_PREFIX = ".ip2geo-data";
private static final long MAX_JITTER_IN_MINUTES = 5;
private static final long ONE_DAY_IN_HOURS = 24;
private static final long ONE_HOUR_IN_MINUTES = 60;

/**
* Default fields for job scheduling
Expand Down Expand Up @@ -285,21 +282,6 @@ public Long getLockDurationSeconds() {
return Ip2GeoLockService.LOCK_DURATION_IN_SECONDS;
}

/**
* Jitter in scheduling a task
*
* We want a job to be delayed randomly with range of (0, 5) minutes for the
* next execution time.
*
* @see ScheduledJobParameter#getJitter()
*
* @return the jitter
*/
@Override
public Double getJitter() {
return MAX_JITTER_IN_MINUTES / ((double) schedule.getInterval() * ONE_DAY_IN_HOURS * ONE_HOUR_IN_MINUTES);
}

/**
* Enable auto update of GeoIP data
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.geospatial.ip2geo.jobscheduler;

import java.util.Map;

import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
Expand All @@ -21,6 +23,20 @@ public class DatasourceExtension implements JobSchedulerExtension {
* Job index name for a datasource
*/
public static final String JOB_INDEX_NAME = ".scheduler_geospatial_ip2geo_datasource";
/**
* Job index setting
*
* We want it to be single shard so that job can be run only in a single node by job scheduler.
* We want it to expand to all replicas so that querying to this index can be done locally to reduce latency.
*/
public static final Map<String, Object> INDEX_SETTING = Map.of(
"index.number_of_shards",
1,
"index.auto_expand_replicas",
"0-all",
"index.hidden",
true
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need constants for keys, previously each separate variable as like such constant, but we lost it with the map

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INDEX_SETTING itself is a constant so I didn't make value in it as another constant.


@Override
public String getJobType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.Arrays;
import java.util.Locale;

import org.opensearch.common.Randomness;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
Expand Down Expand Up @@ -77,14 +76,6 @@ public void testGetIndexNameFor() {
);
}

public void testGetJitter() {
Datasource datasource = new Datasource();
datasource.setSchedule(new IntervalSchedule(Instant.now(), Randomness.get().ints(1, 31).findFirst().getAsInt(), ChronoUnit.DAYS));
long intervalInMinutes = datasource.getSchedule().getInterval() * 60l * 24l;
double sixMinutes = 6;
assertTrue(datasource.getJitter() * intervalInMinutes <= sixMinutes);
}

public void testIsExpired() {
Datasource datasource = new Datasource();
// never expire if validForInDays is null
Expand Down