Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create datasource index explicitly #283

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;
import java.time.Instant;

import lombok.extern.log4j.Log4j2;

import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.StepListener;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.ActionFilters;
Expand Down Expand Up @@ -71,18 +73,26 @@ public PutDatasourceTransportAction(
@Override
protected void doExecute(final Task task, final PutDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
try {
Datasource datasource = Datasource.Builder.build(request);
IndexRequest indexRequest = new IndexRequest().index(DatasourceExtension.JOB_INDEX_NAME)
.id(datasource.getName())
.source(datasource.toXContent(JsonXContent.contentBuilder(), null))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.opType(DocWriteRequest.OpType.CREATE);
client.index(indexRequest, getIndexResponseListener(datasource, listener));
StepListener<Void> createIndexStep = new StepListener<>();
datasourceFacade.createIndexIfNotExists(createIndexStep);
createIndexStep.whenComplete(v -> putDatasource(request, listener), exception -> listener.onFailure(exception));
} catch (Exception e) {
listener.onFailure(e);
}
}

@VisibleForTesting
protected void putDatasource(final PutDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener)
throws IOException {
Datasource datasource = Datasource.Builder.build(request);
IndexRequest indexRequest = new IndexRequest().index(DatasourceExtension.JOB_INDEX_NAME)
.id(datasource.getName())
.source(datasource.toXContent(JsonXContent.contentBuilder(), null))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.opType(DocWriteRequest.OpType.CREATE);
client.index(indexRequest, getIndexResponseListener(datasource, listener));
}

@VisibleForTesting
protected ActionListener<IndexResponse> getIndexResponseListener(
final Datasource datasource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,28 @@

package org.opensearch.geospatial.ip2geo.common;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.StepListener;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.get.MultiGetItemResponse;
Expand All @@ -28,7 +38,9 @@
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentFactory;
Expand All @@ -48,12 +60,64 @@
@Log4j2
public class DatasourceFacade {
private static final Integer MAX_SIZE = 1000;
private static final Tuple<String, Integer> INDEX_SETTING_NUM_OF_SHARDS = new Tuple<>("index.number_of_shards", 1);
private static final Tuple<String, String> INDEX_SETTING_AUTO_EXPAND_REPLICAS = new Tuple<>("index.auto_expand_replicas", "0-all");
private static final Tuple<String, Boolean> INDEX_SETTING_HIDDEN = new Tuple<>("index.hidden", true);
private final Client client;
private final ClusterService clusterService;
private final ClusterSettings clusterSettings;

public DatasourceFacade(final Client client, final ClusterSettings clusterSettings) {
public DatasourceFacade(final Client client, final ClusterService clusterService) {
this.client = client;
this.clusterSettings = clusterSettings;
this.clusterService = clusterService;
this.clusterSettings = clusterService.getClusterSettings();
}

/**
* Create a datasource index of single shard with auto expand replicas to all nodes
*
* We want the index to expand to all replica so that datasource query request can be executed locally
* for faster ingestion time.
*/
public void createIndexIfNotExists(final StepListener<Void> stepListener) {
if (clusterService.state().metadata().hasIndex(DatasourceExtension.JOB_INDEX_NAME) == true) {
heemin32 marked this conversation as resolved.
Show resolved Hide resolved
stepListener.onResponse(null);
return;
}
final Map<String, Object> indexSettings = new HashMap<>();
indexSettings.put(INDEX_SETTING_NUM_OF_SHARDS.v1(), INDEX_SETTING_NUM_OF_SHARDS.v2());
indexSettings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2());
indexSettings.put(INDEX_SETTING_HIDDEN.v1(), INDEX_SETTING_HIDDEN.v2());
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(DatasourceExtension.JOB_INDEX_NAME).mapping(getIndexMapping())
.settings(indexSettings);
client.admin().indices().create(createIndexRequest, new ActionListener<>() {
@Override
public void onResponse(final CreateIndexResponse createIndexResponse) {
stepListener.onResponse(null);
}

@Override
public void onFailure(final Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
log.info("index[{}] already exist", DatasourceExtension.JOB_INDEX_NAME);
stepListener.onResponse(null);
return;
}
stepListener.onFailure(e);
}
});
}

private String getIndexMapping() {
try {
try (InputStream is = DatasourceFacade.class.getResourceAsStream("/mappings/ip2geo_datasource.json")) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
return reader.lines().map(String::trim).collect(Collectors.joining());
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void createIndexIfNotExists(final String indexName) {
*/
private String getIndexMapping() {
try {
try (InputStream is = DatasourceFacade.class.getResourceAsStream("/mappings/ip2geo_datasource.json")) {
try (InputStream is = DatasourceFacade.class.getResourceAsStream("/mappings/ip2geo_geoip.json")) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
return reader.lines().map(String::trim).collect(Collectors.joining());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ public class Datasource implements Writeable, ScheduledJobParameter {
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), Database.PARSER, DATABASE_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), UpdateStats.PARSER, UPDATE_STATS_FIELD);

}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ private String setupIndex(final DatasourceManifest manifest, final Datasource da
* @return
*/
private boolean shouldUpdate(final Datasource datasource, final DatasourceManifest manifest) {
if (datasource.getDatabase().getUpdatedAt().toEpochMilli() > manifest.getUpdatedAt()) {
if (datasource.getDatabase().getUpdatedAt() != null
&& datasource.getDatabase().getUpdatedAt().toEpochMilli() > manifest.getUpdatedAt()) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
new Ip2GeoProcessor.Factory(
parameters.client,
parameters.ingestService,
new DatasourceFacade(parameters.client, parameters.ingestService.getClusterService().getClusterSettings()),
new DatasourceFacade(parameters.client, parameters.ingestService.getClusterService()),
new GeoIpDataFacade(parameters.ingestService.getClusterService(), parameters.client)
)
)
Expand Down Expand Up @@ -128,7 +128,7 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
GeoIpDataFacade geoIpDataFacade = new GeoIpDataFacade(clusterService, client);
DatasourceFacade datasourceFacade = new DatasourceFacade(client, clusterService.getClusterSettings());
DatasourceFacade datasourceFacade = new DatasourceFacade(client, clusterService);
DatasourceUpdateService datasourceUpdateService = new DatasourceUpdateService(
clusterService,
client,
Expand Down
80 changes: 74 additions & 6 deletions src/main/resources/mappings/ip2geo_datasource.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,77 @@
{
"dynamic": false,
"properties": {
"_cidr": {
"type": "ip_range",
"doc_values": false
"properties" : {
"database" : {
"properties" : {
"fields" : {
"type" : "text"
},
"sha256_hash" : {
"type" : "text"
},
"provider" : {
"type" : "text"
},
"updated_at_in_epoch_millis" : {
"type" : "long"
},
"valid_for_in_days" : {
"type" : "long"
}
}
},
"enabled_time" : {
"type" : "long"
},
"endpoint" : {
"type" : "text"
},
"name" : {
"type" : "text"
},
"indices" : {
"type" : "text"
},
"last_update_time" : {
"type" : "long"
},
"schedule" : {
"properties" : {
"interval" : {
"properties" : {
"period" : {
"type" : "long"
},
"start_time" : {
"type" : "long"
},
"unit" : {
"type" : "text"
}
}
}
}
},
"state" : {
"type" : "text"
},
"update_enabled" : {
"type" : "boolean"
},
"update_stats" : {
"properties" : {
"last_failed_at_in_epoch_millis" : {
"type" : "long"
},
"last_processing_time_in_millis" : {
"type" : "long"
},
"last_skipped_at_in_epoch_millis" : {
"type" : "long"
},
"last_succeeded_at_in_epoch_millis" : {
"type" : "long"
}
}
}
}
}
}
9 changes: 9 additions & 0 deletions src/main/resources/mappings/ip2geo_geoip.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"dynamic": false,
"properties": {
"_cidr": {
"type": "ip_range",
"doc_values": false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,11 @@ public <Request extends ActionRequest, Response extends ActionResponse> void doE
Request request,
ActionListener<Response> listener
) {
listener.onResponse((Response) executeVerifier.get().apply(action, request));
try {
listener.onResponse((Response) executeVerifier.get().apply(action, request));
} catch (Exception e) {
listener.onFailure(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void testStreamInOut() throws Exception {
String domain = GeospatialTestHelper.randomLowerCaseString();
PutDatasourceRequest request = new PutDatasourceRequest(datasourceName);
request.setEndpoint(String.format(Locale.ROOT, "https://%s.com", domain));
request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(30) + 1));
request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(29) + 1));

// Run
BytesStreamOutput output = new BytesStreamOutput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import static org.mockito.Mockito.verify;

import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.StepListener;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -44,7 +46,7 @@ public void init() {
);
}

public void testDoExecute() throws Exception {
public void testDoExecute_whenValidInput_thenSucceed() throws Exception {
Task task = mock(Task.class);
PutDatasourceRequest request = new PutDatasourceRequest("test");
request.setEndpoint(sampleManifestUrl());
Expand All @@ -59,7 +61,18 @@ public void testDoExecute() throws Exception {
assertEquals(DocWriteRequest.OpType.CREATE, indexRequest.opType());
return null;
});

// Run
action.doExecute(task, request, listener);

// Verify
ArgumentCaptor<StepListener> captor = ArgumentCaptor.forClass(StepListener.class);
verify(datasourceFacade).createIndexIfNotExists(captor.capture());

// Run
captor.getValue().onResponse(null);

// Verify
verify(verifyingClient).index(any(IndexRequest.class), any(ActionListener.class));
verify(listener).onResponse(new AcknowledgedResponse(true));
}
Expand Down
Loading