Skip to content

Commit

Permalink
Add DatabaseRegistry for locally managing databases managed by GeoIpD…
Browse files Browse the repository at this point in the history
…ownloader (#69540)

This component is responsible for making the databases maintained by GeoIpDownloader
available for ingest processors.

Also provided a lookup mechanism for geoip processors with fallback to {@link LocalDatabases}.
All databases are downloaded into a geoip tmp directory, which is created at node startup.

The following high level steps are executed after each cluster state update:
1) Check which databases are available in {@link GeoIpTaskState},
   which is part of the geoip downloader persistent task.
2) For each database check whether the databases have changed
   by comparing the local and remote md5 hash or are locally missing.
3) For each database identified in step 2 start downloading the database
   chunks. Each chunks is appended to a tmp file (inside geoip tmp dir) and
   after all chunks have been downloaded, the database is uncompressed and
   renamed to the final filename. After this the database is loaded and
   if there is an old instance of this database then that is closed.
4) Cleanup locally loaded databases that are no longer mentioned in {@link GeoIpTaskState}.

Relates to #68920
  • Loading branch information
martijnvg authored Mar 4, 2021
1 parent 9370b1c commit 6c35c25
Show file tree
Hide file tree
Showing 12 changed files with 1,098 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@
import com.maxmind.geoip2.DatabaseReader;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
Expand Down Expand Up @@ -41,11 +49,20 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import java.util.zip.GZIPInputStream;

import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
import static java.nio.file.StandardOpenOption.WRITE;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

@ClusterScope(scope = Scope.TEST, maxNumDataNodes = 1)
public class GeoIpDownloaderIT extends AbstractGeoIpIT {
Expand Down Expand Up @@ -128,6 +145,130 @@ public void testGeoIpDatabasesDownload() throws Exception {
}
}

public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
// setup:
BytesReference bytes;
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
{
builder.startArray("processors");
{
builder.startObject();
{
builder.startObject("geoip");
{
builder.field("field", "ip");
builder.field("target_field", "ip-city");
builder.field("database_file", "GeoLite2-City.mmdb");
}
builder.endObject();
}
builder.endObject();
builder.startObject();
{
builder.startObject("geoip");
{
builder.field("field", "ip");
builder.field("target_field", "ip-country");
builder.field("database_file", "GeoLite2-Country.mmdb");
}
builder.endObject();
}
builder.endObject();
builder.startObject();
{
builder.startObject("geoip");
{
builder.field("field", "ip");
builder.field("target_field", "ip-asn");
builder.field("database_file", "GeoLite2-ASN.mmdb");
}
builder.endObject();
}
builder.endObject();
}
builder.endArray();
}
builder.endObject();
bytes = BytesReference.bytes(builder);
}
assertAcked(client().admin().cluster().preparePutPipeline("_id", bytes, XContentType.JSON).get());

// verify before updating dbs
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
builder.startArray("docs");
{
builder.startObject();
builder.field("_index", "my-index");
{
builder.startObject("_source");
builder.field("ip", "89.160.20.128");
builder.endObject();
}
builder.endObject();
}
builder.endArray();
builder.endObject();
bytes = BytesReference.bytes(builder);
}
SimulatePipelineRequest simulateRequest = new SimulatePipelineRequest(bytes, XContentType.JSON);
simulateRequest.setId("_id");
{
SimulatePipelineResponse simulateResponse = client().admin().cluster().simulatePipeline(simulateRequest).actionGet();
assertThat(simulateResponse.getPipelineId(), equalTo("_id"));
assertThat(simulateResponse.getResults().size(), equalTo(1));
SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) simulateResponse.getResults().get(0);
assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Tumba"));
assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB"));
assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden"));
}

// Enable downloader:
Settings.Builder settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true);
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));

final List<Path> geoipTmpDirs = StreamSupport.stream(internalCluster().getInstances(Environment.class).spliterator(), false)
.map(env -> {
Path geoipTmpDir = env.tmpFile().resolve("geoip-databases");
assertThat(Files.exists(geoipTmpDir), is(true));
return geoipTmpDir;
}).collect(Collectors.toList());
assertBusy(() -> {
for (Path geoipTmpDir : geoipTmpDirs) {
try (Stream<Path> list = Files.list(geoipTmpDir)) {
List<String> files = list.map(Path::toString).collect(Collectors.toList());
assertThat(files, containsInAnyOrder(endsWith("GeoLite2-City.mmdb"), endsWith("GeoLite2-Country.mmdb"),
endsWith("GeoLite2-ASN.mmdb")));
}
}
});

// Verify after updating dbs:
assertBusy(() -> {
SimulatePipelineResponse simulateResponse = client().admin().cluster().simulatePipeline(simulateRequest).actionGet();
assertThat(simulateResponse.getPipelineId(), equalTo("_id"));
assertThat(simulateResponse.getResults().size(), equalTo(1));
SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) simulateResponse.getResults().get(0);
assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Linköping"));
assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB"));
assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden"));
});

// Disable downloader:
settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), false);
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));

assertBusy(() -> {
for (Path geoipTmpDir : geoipTmpDirs) {
try (Stream<Path> list = Files.list(geoipTmpDir)) {
List<String> files = list.map(Path::toString).collect(Collectors.toList());
assertThat(files, empty());
}
}
});
}

@SuppressForbidden(reason = "Maxmind API requires java.io.File")
private void parseDatabase(Path tempFile) throws IOException {
try (DatabaseReader databaseReader = new DatabaseReader.Builder(tempFile.toFile()).build()) {
Expand Down
Loading

0 comments on commit 6c35c25

Please sign in to comment.