Skip to content

Commit

Permalink
Create datasource index explicitly
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed May 4, 2023
1 parent 3f7b6e9 commit de8f30a
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;
import java.time.Instant;

import lombok.extern.log4j.Log4j2;

import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.StepListener;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.ActionFilters;
Expand Down Expand Up @@ -71,18 +73,26 @@ public PutDatasourceTransportAction(
@Override
protected void doExecute(final Task task, final PutDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
try {
Datasource datasource = Datasource.Builder.build(request);
IndexRequest indexRequest = new IndexRequest().index(DatasourceExtension.JOB_INDEX_NAME)
.id(datasource.getName())
.source(datasource.toXContent(JsonXContent.contentBuilder(), null))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.opType(DocWriteRequest.OpType.CREATE);
client.index(indexRequest, getIndexResponseListener(datasource, listener));
StepListener<Void> createIndexStep = new StepListener<>();
datasourceFacade.createIndexIfNotExists(createIndexStep);
createIndexStep.whenComplete(v -> putDatasource(request, listener), exception -> listener.onFailure(exception));
} catch (Exception e) {
listener.onFailure(e);
}
}

@VisibleForTesting
protected void putDatasource(final PutDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener)
throws IOException {
Datasource datasource = Datasource.Builder.build(request);
IndexRequest indexRequest = new IndexRequest().index(DatasourceExtension.JOB_INDEX_NAME)
.id(datasource.getName())
.source(datasource.toXContent(JsonXContent.contentBuilder(), null))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.opType(DocWriteRequest.OpType.CREATE);
client.index(indexRequest, getIndexResponseListener(datasource, listener));
}

@VisibleForTesting
protected ActionListener<IndexResponse> getIndexResponseListener(
final Datasource datasource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,28 @@

package org.opensearch.geospatial.ip2geo.common;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.StepListener;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.get.MultiGetItemResponse;
Expand All @@ -28,7 +38,9 @@
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentFactory;
Expand All @@ -48,12 +60,64 @@
@Log4j2
public class DatasourceFacade {
private static final Integer MAX_SIZE = 1000;
private static final Tuple<String, Integer> INDEX_SETTING_NUM_OF_SHARDS = new Tuple<>("index.number_of_shards", 1);
private static final Tuple<String, String> INDEX_SETTING_AUTO_EXPAND_REPLICAS = new Tuple<>("index.auto_expand_replicas", "0-all");
private static final Tuple<String, Boolean> INDEX_SETTING_HIDDEN = new Tuple<>("index.hidden", true);
private final Client client;
private final ClusterService clusterService;
private final ClusterSettings clusterSettings;

public DatasourceFacade(final Client client, final ClusterSettings clusterSettings) {
public DatasourceFacade(final Client client, final ClusterService clusterService) {
this.client = client;
this.clusterSettings = clusterSettings;
this.clusterService = clusterService;
this.clusterSettings = clusterService.getClusterSettings();
}

/**
* Create a datasource index of single shard with auto expand replicas to all nodes
*
* We want the index to expand to all replica so that datasource query request can be executed locally
* for faster ingestion time.
*/
public void createIndexIfNotExists(final StepListener<Void> stepListener) {
if (clusterService.state().metadata().hasIndex(DatasourceExtension.JOB_INDEX_NAME) == true) {
stepListener.onResponse(null);
return;
}
final Map<String, Object> indexSettings = new HashMap<>();
indexSettings.put(INDEX_SETTING_NUM_OF_SHARDS.v1(), INDEX_SETTING_NUM_OF_SHARDS.v2());
indexSettings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2());
indexSettings.put(INDEX_SETTING_HIDDEN.v1(), INDEX_SETTING_HIDDEN.v2());
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(DatasourceExtension.JOB_INDEX_NAME).mapping(getIndexMapping())
.settings(indexSettings);
client.admin().indices().create(createIndexRequest, new ActionListener<>() {
@Override
public void onResponse(final CreateIndexResponse createIndexResponse) {
stepListener.onResponse(null);
}

@Override
public void onFailure(final Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
log.info("index[{}] already exist", DatasourceExtension.JOB_INDEX_NAME);
stepListener.onResponse(null);
return;
}
stepListener.onFailure(e);
}
});
}

private String getIndexMapping() {
try {
try (InputStream is = DatasourceFacade.class.getResourceAsStream("/mappings/ip2geo_datasource.json")) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
return reader.lines().map(String::trim).collect(Collectors.joining());
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void createIndexIfNotExists(final String indexName) {
*/
private String getIndexMapping() {
try {
try (InputStream is = DatasourceFacade.class.getResourceAsStream("/mappings/ip2geo_datasource.json")) {
try (InputStream is = DatasourceFacade.class.getResourceAsStream("/mappings/ip2geo_geoip.json")) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
return reader.lines().map(String::trim).collect(Collectors.joining());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ public class Datasource implements Writeable, ScheduledJobParameter {
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), Database.PARSER, DATABASE_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), UpdateStats.PARSER, UPDATE_STATS_FIELD);

}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ private String setupIndex(final DatasourceManifest manifest, final Datasource da
* @return
*/
private boolean shouldUpdate(final Datasource datasource, final DatasourceManifest manifest) {
if (datasource.getDatabase().getUpdatedAt().toEpochMilli() > manifest.getUpdatedAt()) {
if (datasource.getDatabase().getUpdatedAt() != null
&& datasource.getDatabase().getUpdatedAt().toEpochMilli() > manifest.getUpdatedAt()) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
new Ip2GeoProcessor.Factory(
parameters.client,
parameters.ingestService,
new DatasourceFacade(parameters.client, parameters.ingestService.getClusterService().getClusterSettings()),
new DatasourceFacade(parameters.client, parameters.ingestService.getClusterService()),
new GeoIpDataFacade(parameters.ingestService.getClusterService(), parameters.client)
)
)
Expand Down Expand Up @@ -128,7 +128,7 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
GeoIpDataFacade geoIpDataFacade = new GeoIpDataFacade(clusterService, client);
DatasourceFacade datasourceFacade = new DatasourceFacade(client, clusterService.getClusterSettings());
DatasourceFacade datasourceFacade = new DatasourceFacade(client, clusterService);
DatasourceUpdateService datasourceUpdateService = new DatasourceUpdateService(
clusterService,
client,
Expand Down
80 changes: 74 additions & 6 deletions src/main/resources/mappings/ip2geo_datasource.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,77 @@
{
"dynamic": false,
"properties": {
"_cidr": {
"type": "ip_range",
"doc_values": false
"properties" : {
"database" : {
"properties" : {
"fields" : {
"type" : "text"
},
"md5_hash" : {
"type" : "text"
},
"provider" : {
"type" : "text"
},
"updated_at" : {
"type" : "long"
},
"valid_for_in_days" : {
"type" : "long"
}
}
},
"enabled_time" : {
"type" : "long"
},
"endpoint" : {
"type" : "text"
},
"id" : {
"type" : "text"
},
"indices" : {
"type" : "text"
},
"last_update_time" : {
"type" : "long"
},
"schedule" : {
"properties" : {
"interval" : {
"properties" : {
"period" : {
"type" : "long"
},
"start_time" : {
"type" : "long"
},
"unit" : {
"type" : "text"
}
}
}
}
},
"state" : {
"type" : "text"
},
"update_enabled" : {
"type" : "boolean"
},
"update_stats" : {
"properties" : {
"last_failed_at" : {
"type" : "long"
},
"last_processing_time_in_millis" : {
"type" : "long"
},
"last_skipped_at" : {
"type" : "long"
},
"last_succeeded_at" : {
"type" : "long"
}
}
}
}
}
}
9 changes: 9 additions & 0 deletions src/main/resources/mappings/ip2geo_geoip.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"dynamic": false,
"properties": {
"_cidr": {
"type": "ip_range",
"doc_values": false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,11 @@ public <Request extends ActionRequest, Response extends ActionResponse> void doE
Request request,
ActionListener<Response> listener
) {
listener.onResponse((Response) executeVerifier.get().apply(action, request));
try {
listener.onResponse((Response) executeVerifier.get().apply(action, request));
} catch (Exception e) {
listener.onFailure(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void testStreamInOut() throws Exception {
String domain = GeospatialTestHelper.randomLowerCaseString();
PutDatasourceRequest request = new PutDatasourceRequest(datasourceName);
request.setEndpoint(String.format(Locale.ROOT, "https://%s.com", domain));
request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(30) + 1));
request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(29) + 1));

// Run
BytesStreamOutput output = new BytesStreamOutput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import static org.mockito.Mockito.verify;

import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.StepListener;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -44,7 +46,7 @@ public void init() {
);
}

public void testDoExecute() throws Exception {
public void testDoExecute_whenValidInput_thenSucceed() throws Exception {
Task task = mock(Task.class);
PutDatasourceRequest request = new PutDatasourceRequest("test");
request.setEndpoint(sampleManifestUrl());
Expand All @@ -59,7 +61,18 @@ public void testDoExecute() throws Exception {
assertEquals(DocWriteRequest.OpType.CREATE, indexRequest.opType());
return null;
});

// Run
action.doExecute(task, request, listener);

// Verify
ArgumentCaptor<StepListener> captor = ArgumentCaptor.forClass(StepListener.class);
verify(datasourceFacade).createIndexIfNotExists(captor.capture());

// Run
captor.getValue().onResponse(null);

// Verify
verify(verifyingClient).index(any(IndexRequest.class), any(ActionListener.class));
verify(listener).onResponse(new AcknowledgedResponse(true));
}
Expand Down
Loading

0 comments on commit de8f30a

Please sign in to comment.