Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.15] Directly download commercial ip geolocation databases from providers (#110844) #111077

Merged
merged 2 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/110844.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 110844
summary: Directly download commercial ip geolocation databases from providers
area: Ingest Node
type: feature
issues: []
2 changes: 1 addition & 1 deletion docs/reference/security/authorization/privileges.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ status of {Ilm}
This privilege is not available in {serverless-full}.

`read_pipeline`::
Read-only access to ingest pipline (get, simulate).
Read-only access to ingest pipeline (get, simulate).

`read_slm`::
All read-only {slm-init} actions, such as getting policies and checking the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.geoip;

import fixture.geoip.EnterpriseGeoIpHttpFixture;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.ingest.EnterpriseGeoIpTask;
import org.elasticsearch.ingest.geoip.direct.DatabaseConfiguration;
import org.elasticsearch.ingest.geoip.direct.PutDatabaseConfigurationAction;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.junit.ClassRule;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;

import static org.elasticsearch.ingest.EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER;
import static org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderTaskExecutor.MAXMIND_LICENSE_KEY_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class EnterpriseGeoIpDownloaderIT extends ESIntegTestCase {

private static final String DATABASE_TYPE = "GeoIP2-City";
private static final boolean useFixture = Booleans.parseBoolean(System.getProperty("geoip_use_service", "false")) == false;

@ClassRule
public static final EnterpriseGeoIpHttpFixture fixture = new EnterpriseGeoIpHttpFixture(useFixture, DATABASE_TYPE);

protected String getEndpoint() {
return useFixture ? fixture.getAddress() : null;
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString(MAXMIND_LICENSE_KEY_SETTING.getKey(), "license_key");
Settings.Builder builder = Settings.builder();
builder.setSecureSettings(secureSettings)
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true);
if (getEndpoint() != null) {
// note: this is using the enterprise fixture for the regular downloader, too, as
// a slightly hacky way of making the regular downloader not actually download any files
builder.put(GeoIpDownloader.ENDPOINT_SETTING.getKey(), getEndpoint());
}
return builder.build();
}

@SuppressWarnings("unchecked")
protected Collection<Class<? extends Plugin>> nodePlugins() {
// the reindex plugin is (somewhat surprisingly) necessary in order to be able to delete-by-query,
// which modules/ingest-geoip does to delete old chunks
return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), IngestGeoIpPlugin.class, ReindexPlugin.class);
}

@SuppressWarnings("unchecked")
public void testEnterpriseDownloaderTask() throws Exception {
/*
* This test starts the enterprise geoip downloader task, and creates a database configuration. Then it creates an ingest
* pipeline that references that database, and ingests a single document using that pipeline. It then asserts that the document
* was updated with information from the database.
* Note that the "enterprise database" is actually just a geolite database being loaded by the GeoIpHttpFixture.
*/
if (getEndpoint() != null) {
EnterpriseGeoIpDownloader.DEFAULT_MAXMIND_ENDPOINT = getEndpoint();
}
final String pipelineName = "enterprise_geoip_pipeline";
final String indexName = "enterprise_geoip_test_index";
final String sourceField = "ip";
final String targetField = "ip-city";

startEnterpriseGeoIpDownloaderTask();
configureDatabase(DATABASE_TYPE);
createGeoIpPipeline(pipelineName, DATABASE_TYPE, sourceField, targetField);
String documentId = ingestDocument(indexName, pipelineName, sourceField);
GetResponse getResponse = client().get(new GetRequest(indexName, documentId)).actionGet();
Map<String, Object> returnedSource = getResponse.getSource();
assertNotNull(returnedSource);
Object targetFieldValue = returnedSource.get(targetField);
assertNotNull(targetFieldValue);
assertThat(((Map<String, Object>) targetFieldValue).get("organization_name"), equalTo("Bredband2 AB"));
}

private void startEnterpriseGeoIpDownloaderTask() {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
persistentTasksService.sendStartRequest(
ENTERPRISE_GEOIP_DOWNLOADER,
ENTERPRISE_GEOIP_DOWNLOADER,
new EnterpriseGeoIpTask.EnterpriseGeoIpTaskParams(),
TimeValue.MAX_VALUE,
ActionListener.wrap(r -> logger.debug("Started enterprise geoip downloader task"), e -> {
Throwable t = e instanceof RemoteTransportException ? ExceptionsHelper.unwrapCause(e) : e;
if (t instanceof ResourceAlreadyExistsException == false) {
logger.error("failed to create enterprise geoip downloader task", e);
}
})
);
}

private void configureDatabase(String databaseType) throws Exception {
admin().cluster()
.execute(
PutDatabaseConfigurationAction.INSTANCE,
new PutDatabaseConfigurationAction.Request(
TimeValue.MAX_VALUE,
TimeValue.MAX_VALUE,
new DatabaseConfiguration("test", databaseType, new DatabaseConfiguration.Maxmind("test_account"))
)
)
.actionGet();
ensureGreen(GeoIpDownloader.DATABASES_INDEX);
assertBusy(() -> {
SearchResponse searchResponse = client().search(new SearchRequest(GeoIpDownloader.DATABASES_INDEX)).actionGet();
try {
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
} finally {
searchResponse.decRef();
}
});
}

private void createGeoIpPipeline(String pipelineName, String databaseType, String sourceField, String targetField) throws IOException {
final BytesReference bytes;
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
{
builder.field("description", "test");
builder.startArray("processors");
{
builder.startObject();
{
builder.startObject("geoip");
{
builder.field("field", sourceField);
builder.field("target_field", targetField);
builder.field("database_file", databaseType + ".mmdb");
}
builder.endObject();
}
builder.endObject();
}
builder.endArray();
}
builder.endObject();
bytes = BytesReference.bytes(builder);
}
assertAcked(clusterAdmin().putPipeline(new PutPipelineRequest(pipelineName, bytes, XContentType.JSON)).actionGet());
}

private String ingestDocument(String indexName, String pipelineName, String sourceField) {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(
new IndexRequest(indexName).source("{\"" + sourceField + "\": \"89.160.20.128\"}", XContentType.JSON).setPipeline(pipelineName)
);
BulkResponse response = client().bulk(bulkRequest).actionGet();
BulkItemResponse[] bulkItemResponses = response.getItems();
assertThat(bulkItemResponses.length, equalTo(1));
assertThat(bulkItemResponses[0].status(), equalTo(RestStatus.CREATED));
return bulkItemResponses[0].getId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ public void testInvalidTimestamp() throws Exception {
updateClusterSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true));
assertBusy(() -> {
GeoIpTaskState state = getGeoIpTaskState();
assertEquals(
Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "MyCustomGeoLite2-City.mmdb"),
state.getDatabases().keySet()
assertThat(
state.getDatabases().keySet(),
containsInAnyOrder("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "MyCustomGeoLite2-City.mmdb")
);
}, 2, TimeUnit.MINUTES);

Expand Down Expand Up @@ -227,9 +227,9 @@ public void testGeoIpDatabasesDownload() throws Exception {
updateClusterSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true));
assertBusy(() -> {
GeoIpTaskState state = getGeoIpTaskState();
assertEquals(
Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "MyCustomGeoLite2-City.mmdb"),
state.getDatabases().keySet()
assertThat(
state.getDatabases().keySet(),
containsInAnyOrder("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "MyCustomGeoLite2-City.mmdb")
);
putGeoIpPipeline(); // This is to work around the race condition described in #92888
}, 2, TimeUnit.MINUTES);
Expand All @@ -238,9 +238,9 @@ public void testGeoIpDatabasesDownload() throws Exception {
assertBusy(() -> {
try {
GeoIpTaskState state = (GeoIpTaskState) getTask().getState();
assertEquals(
Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "MyCustomGeoLite2-City.mmdb"),
state.getDatabases().keySet()
assertThat(
state.getDatabases().keySet(),
containsInAnyOrder("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "MyCustomGeoLite2-City.mmdb")
);
GeoIpTaskState.Metadata metadata = state.getDatabases().get(id);
int size = metadata.lastChunk() - metadata.firstChunk() + 1;
Expand Down Expand Up @@ -301,9 +301,9 @@ public void testGeoIpDatabasesDownloadNoGeoipProcessors() throws Exception {
assertNotNull(getTask().getState()); // removing all geoip processors should not result in the task being stopped
assertBusy(() -> {
GeoIpTaskState state = getGeoIpTaskState();
assertEquals(
Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "MyCustomGeoLite2-City.mmdb"),
state.getDatabases().keySet()
assertThat(
state.getDatabases().keySet(),
containsInAnyOrder("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "MyCustomGeoLite2-City.mmdb")
);
});
}
Expand Down Expand Up @@ -337,9 +337,9 @@ public void testDoNotDownloadDatabaseOnPipelineCreation() throws Exception {
assertAcked(indicesAdmin().prepareUpdateSettings(indexIdentifier).setSettings(indexSettings).get());
assertBusy(() -> {
GeoIpTaskState state = getGeoIpTaskState();
assertEquals(
Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "MyCustomGeoLite2-City.mmdb"),
state.getDatabases().keySet()
assertThat(
state.getDatabases().keySet(),
containsInAnyOrder("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "MyCustomGeoLite2-City.mmdb")
);
}, 2, TimeUnit.MINUTES);

Expand Down
1 change: 1 addition & 0 deletions modules/ingest-geoip/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
requires com.maxmind.geoip2;
requires com.maxmind.db;

exports org.elasticsearch.ingest.geoip.direct to org.elasticsearch.server;
exports org.elasticsearch.ingest.geoip.stats to org.elasticsearch.server;
}
Loading
Loading