Skip to content

Commit

Permalink
Use _primary in query preference
Browse files Browse the repository at this point in the history
1. Use _primary preference to get datasource metadata so that it can read the latest data. RefreshPolicy.IMMEDIATE won't refresh replica shards immediately according to #346
2. Update datasource metadata index mapping

Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jun 30, 2023
1 parent 0f0297e commit efe125a
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.constants;

/**
* Collection of query preference
*/
public class QueryPreference {
public static final String PRIMARY = "_primary";
public static final String LOCAL = "_local";
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.geospatial.constants.QueryPreference;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension;
Expand Down Expand Up @@ -156,6 +157,7 @@ private IndexRequest toIndexRequest(Datasource datasource) {
indexRequest.index(DatasourceExtension.JOB_INDEX_NAME);
indexRequest.id(datasource.getName());
indexRequest.opType(DocWriteRequest.OpType.INDEX);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
indexRequest.source(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS));
return indexRequest;
} catch (IOException e) {
Expand Down Expand Up @@ -215,7 +217,7 @@ public void deleteDatasource(final Datasource datasource) {
* @throws IOException exception
*/
public Datasource getDatasource(final String name) throws IOException {
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name);
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name).preference(QueryPreference.PRIMARY);
GetResponse response;
try {
response = StashedThreadContext.run(client, () -> client.get(request).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)));
Expand All @@ -242,7 +244,7 @@ public Datasource getDatasource(final String name) throws IOException {
* @param actionListener the action listener
*/
public void getDatasource(final String name, final ActionListener<Datasource> actionListener) {
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name);
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name).preference(QueryPreference.PRIMARY);
StashedThreadContext.run(client, () -> client.get(request, new ActionListener<>() {
@Override
public void onResponse(final GetResponse response) {
Expand Down Expand Up @@ -280,6 +282,7 @@ public void getDatasources(final String[] names, final ActionListener<List<Datas
client,
() -> client.prepareMultiGet()
.add(DatasourceExtension.JOB_INDEX_NAME, names)
.setPreference(QueryPreference.PRIMARY)
.execute(createGetDataSourceQueryActionLister(MultiGetResponse.class, actionListener))
);
}
Expand All @@ -293,6 +296,7 @@ public void getAllDatasources(final ActionListener<List<Datasource>> actionListe
client,
() -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME)
.setQuery(QueryBuilders.matchAllQuery())
.setPreference(QueryPreference.PRIMARY)
.setSize(MAX_SIZE)
.execute(createGetDataSourceQueryActionLister(SearchResponse.class, actionListener))
);
Expand All @@ -306,6 +310,7 @@ public List<Datasource> getAllDatasources() {
client,
() -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME)
.setQuery(QueryBuilders.matchAllQuery())
.setPreference(QueryPreference.PRIMARY)
.setSize(MAX_SIZE)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.constants.IndexSetting;
import org.opensearch.geospatial.constants.QueryPreference;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.shared.Constants;
Expand Down Expand Up @@ -248,7 +249,7 @@ public Map<String, Object> getGeoIpData(final String indexName, final String ip)
() -> client.prepareSearch(indexName)
.setSize(1)
.setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))
.setPreference("_local")
.setPreference(QueryPreference.LOCAL)
.setRequestCache(true)
.get(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
);
Expand Down
149 changes: 102 additions & 47 deletions src/main/resources/mappings/ip2geo_datasource.json
Original file line number Diff line number Diff line change
@@ -1,75 +1,130 @@
{
"properties" : {
"database" : {
"properties" : {
"fields" : {
"type" : "text"
"properties": {
"database": {
"properties": {
"fields": {
"type": "text"
},
"sha256_hash" : {
"type" : "text"
"provider": {
"type": "text"
},
"provider" : {
"type" : "text"
"sha256_hash": {
"type": "text"
},
"updated_at_in_epoch_millis" : {
"type" : "long"
"updated_at_in_epoch_millis": {
"type": "long"
},
"valid_for_in_days" : {
"type" : "long"
"valid_for_in_days": {
"type": "long"
}
}
},
"enabled_time" : {
"type" : "long"
"enabled_time": {
"type": "long"
},
"endpoint" : {
"type" : "text"
"endpoint": {
"type": "text"
},
"name" : {
"type" : "text"
"indices": {
"type": "text"
},
"indices" : {
"type" : "text"
"last_update_time": {
"type": "long"
},
"last_update_time" : {
"type" : "long"
"name": {
"type": "text"
},
"schedule" : {
"properties" : {
"interval" : {
"properties" : {
"period" : {
"type" : "long"
"schedule": {
"properties": {
"interval": {
"properties": {
"period": {
"type": "long"
},
"start_time" : {
"type" : "long"
"start_time": {
"type": "long"
},
"unit" : {
"type" : "text"
"unit": {
"type": "text"
}
}
}
}
},
"state" : {
"type" : "text"
"state": {
"type": "text"
},
"update_enabled" : {
"type" : "boolean"
"system_schedule": {
"properties": {
"interval": {
"properties": {
"period": {
"type": "long"
},
"start_time": {
"type": "long"
},
"unit": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
},
"task": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"update_stats" : {
"properties" : {
"last_failed_at_in_epoch_millis" : {
"type" : "long"
"update_enabled": {
"type": "boolean"
},
"update_stats": {
"properties": {
"last_failed_at_in_epoch_millis": {
"type": "long"
},
"last_processing_time_in_millis" : {
"type" : "long"
"last_processing_time_in_millis": {
"type": "long"
},
"last_skipped_at_in_epoch_millis" : {
"type" : "long"
"last_skipped_at_in_epoch_millis": {
"type": "long"
},
"last_succeeded_at_in_epoch_millis" : {
"type" : "long"
"last_succeeded_at_in_epoch_millis": {
"type": "long"
}
}
},
"user_schedule": {
"properties": {
"interval": {
"properties": {
"period": {
"type": "long"
},
"start_time": {
"type": "long"
},
"unit": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.constants.QueryPreference;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension;
Expand Down Expand Up @@ -205,6 +206,7 @@ private Datasource setupClientForGetRequest(final boolean isExist, final Runtime
GetRequest request = (GetRequest) actionRequest;
assertEquals(datasource.getName(), request.id());
assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index());
assertEquals(QueryPreference.PRIMARY, request.preference());
GetResponse response = getMockedGetResponse(isExist ? datasource : null);
if (exception != null) {
throw exception;
Expand Down Expand Up @@ -262,6 +264,7 @@ public void testGetDatasources_whenValidInput_thenSucceed() {
assertTrue(actionRequest instanceof MultiGetRequest);
MultiGetRequest request = (MultiGetRequest) actionRequest;
assertEquals(2, request.getItems().size());
assertEquals(QueryPreference.PRIMARY, request.preference());
for (MultiGetRequest.Item item : request.getItems()) {
assertEquals(DatasourceExtension.JOB_INDEX_NAME, item.index());
assertTrue(datasources.stream().filter(datasource -> datasource.getName().equals(item.id())).findAny().isPresent());
Expand Down Expand Up @@ -295,6 +298,7 @@ public void testGetAllDatasources_whenAsynchronous_thenSucceed() {
assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.indices()[0]);
assertEquals(QueryBuilders.matchAllQuery(), request.source().query());
assertEquals(1000, request.source().size());
assertEquals(QueryPreference.PRIMARY, request.preference());

SearchResponse response = mock(SearchResponse.class);
when(response.getHits()).thenReturn(searchHits);
Expand Down Expand Up @@ -322,6 +326,7 @@ public void testGetAllDatasources_whenSynchronous_thenSucceed() {
assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.indices()[0]);
assertEquals(QueryBuilders.matchAllQuery(), request.source().query());
assertEquals(1000, request.source().size());
assertEquals(QueryPreference.PRIMARY, request.preference());

SearchResponse response = mock(SearchResponse.class);
when(response.getHits()).thenReturn(searchHits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.constants.QueryPreference;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.shared.Constants;
Expand Down Expand Up @@ -236,7 +237,7 @@ public void testGetGeoIpData_whenDataExist_thenReturnTheData() {
verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
assert actionRequest instanceof SearchRequest;
SearchRequest request = (SearchRequest) actionRequest;
assertEquals("_local", request.preference());
assertEquals(QueryPreference.LOCAL, request.preference());
assertEquals(1, request.source().size());
assertEquals(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip), request.source().query());

Expand Down Expand Up @@ -269,7 +270,7 @@ public void testGetGeoIpData_whenNoData_thenReturnEmpty() {
verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
assert actionRequest instanceof SearchRequest;
SearchRequest request = (SearchRequest) actionRequest;
assertEquals("_local", request.preference());
assertEquals(QueryPreference.LOCAL, request.preference());
assertEquals(1, request.source().size());
assertEquals(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip), request.source().query());

Expand Down

0 comments on commit efe125a

Please sign in to comment.