diff --git a/modules/ingest-geoip/build.gradle b/modules/ingest-geoip/build.gradle index ffa3e7f63ea0f..cdb6be13d0d3a 100644 --- a/modules/ingest-geoip/build.gradle +++ b/modules/ingest-geoip/build.gradle @@ -11,6 +11,8 @@ import org.apache.tools.ant.taskdefs.condition.Os apply plugin: 'elasticsearch.yaml-rest-test' apply plugin: 'elasticsearch.internal-cluster-test' +final Project fixture = project(':test:fixtures:geoip-fixture') + esplugin { description 'Ingest processor that uses lookup geo data based on IP adresses using the MaxMind geo database' classname 'org.elasticsearch.ingest.geoip.IngestGeoIpPlugin' @@ -24,6 +26,7 @@ dependencies { api('com.maxmind.db:maxmind-db:1.3.1') testImplementation 'org.elasticsearch:geolite2-databases:20191119' + internalClusterTestImplementation project(path: ":modules:reindex") } restResources { @@ -32,6 +35,31 @@ restResources { } } +def useFixture = System.getenv("geoip_use_service") != "true" + +if (useFixture) { + apply plugin: 'elasticsearch.test.fixtures' + testFixtures.useFixture(fixture.path, 'geoip-fixture') + + task "beforeInternalClusterTest" { + dependsOn ':test:fixtures:geoip-fixture:postProcessFixture' + doLast { + int ephemeralPort = fixture.postProcessFixture.ext."test.fixtures.geoip-fixture.tcp.80" + assert ephemeralPort > 0 + internalClusterTest { + nonInputProperties.systemProperty "geoip_endpoint", "http://127.0.0.1:" + ephemeralPort + } + } + } +} + +internalClusterTest { + systemProperty "es.geoip_v2_feature_flag_enabled", "true" + if (useFixture) { + dependsOn "beforeInternalClusterTest" + } +} + tasks.register("copyDefaultGeoIp2DatabaseFiles", Copy) { from { zipTree(configurations.testCompileClasspath.files.find { it.name.contains('geolite2-databases') }) } into "${project.buildDir}/ingest-geoip" @@ -47,21 +75,21 @@ tasks.named("bundlePlugin").configure { tasks.named("thirdPartyAudit").configure { ignoreMissingClasses( - // geoip WebServiceClient needs apache http client, but we're not using WebServiceClient: - 'org.apache.http.HttpEntity', - 'org.apache.http.HttpHost', - 'org.apache.http.HttpResponse', - 'org.apache.http.StatusLine', - 'org.apache.http.auth.UsernamePasswordCredentials', - 'org.apache.http.client.config.RequestConfig$Builder', - 'org.apache.http.client.config.RequestConfig', - 'org.apache.http.client.methods.CloseableHttpResponse', - 'org.apache.http.client.methods.HttpGet', - 'org.apache.http.client.utils.URIBuilder', - 'org.apache.http.impl.auth.BasicScheme', - 'org.apache.http.impl.client.CloseableHttpClient', - 'org.apache.http.impl.client.HttpClientBuilder', - 'org.apache.http.util.EntityUtils' + // geoip WebServiceClient needs apache http client, but we're not using WebServiceClient: + 'org.apache.http.HttpEntity', + 'org.apache.http.HttpHost', + 'org.apache.http.HttpResponse', + 'org.apache.http.StatusLine', + 'org.apache.http.auth.UsernamePasswordCredentials', + 'org.apache.http.client.config.RequestConfig$Builder', + 'org.apache.http.client.config.RequestConfig', + 'org.apache.http.client.methods.CloseableHttpResponse', + 'org.apache.http.client.methods.HttpGet', + 'org.apache.http.client.utils.URIBuilder', + 'org.apache.http.impl.auth.BasicScheme', + 'org.apache.http.impl.client.CloseableHttpClient', + 'org.apache.http.impl.client.HttpClientBuilder', + 'org.apache.http.util.EntityUtils' ) } diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/AbstractGeoIpIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/AbstractGeoIpIT.java new file mode 100644 index 0000000000000..47386251f848d --- /dev/null +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/AbstractGeoIpIT.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.StreamsUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public abstract class AbstractGeoIpIT extends ESIntegTestCase { + @Override + protected Collection> nodePlugins() { + return Arrays.asList(IngestGeoIpPlugin.class, IngestGeoIpSettingsPlugin.class); + } + + @Override + protected Settings nodeSettings(final int nodeOrdinal) { + final Path databasePath = createTempDir(); + try { + Files.createDirectories(databasePath); + Files.copy( + new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")), + databasePath.resolve("GeoLite2-City.mmdb")); + Files.copy( + new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")), + databasePath.resolve("GeoLite2-Country.mmdb")); + Files.copy( + new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")), + databasePath.resolve("GeoLite2-ASN.mmdb")); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + return Settings.builder() + .put("ingest.geoip.database_path", databasePath) + .put(super.nodeSettings(nodeOrdinal)) + .build(); + } + + public static class IngestGeoIpSettingsPlugin extends Plugin { + + @Override + public List> getSettings() { + return Collections.singletonList(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope)); + } + } +} diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java new file mode 100644 index 0000000000000..c5ba9c15f8166 --- /dev/null +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java @@ -0,0 +1,174 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import com.maxmind.geoip2.DatabaseReader; +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.MatchQueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.ESIntegTestCase.Scope; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPInputStream; + +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; +import static java.nio.file.StandardOpenOption.WRITE; + +@ClusterScope(scope = Scope.TEST, maxNumDataNodes = 1) +public class GeoIpDownloaderIT extends AbstractGeoIpIT { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(ReindexPlugin.class, IngestGeoIpPlugin.class, GeoIpProcessorNonIngestNodeIT.IngestGeoIpSettingsPlugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + Settings.Builder settings = Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), false); + String endpoint = System.getProperty("geoip_endpoint"); + if (endpoint != null) { + settings.put(GeoIpDownloader.ENDPOINT_SETTING.getKey(), endpoint); + } + return settings.build(); + } + + public void testGeoIpDatabasesDownload() throws Exception { + ClusterUpdateSettingsResponse settingsResponse = client().admin().cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true)) + .get(); + assertTrue(settingsResponse.isAcknowledged()); + assertBusy(() -> { + PersistentTasksCustomMetadata.PersistentTask task = getTask(); + assertNotNull(task); + GeoIpTaskState state = (GeoIpTaskState) task.getState(); + assertNotNull(state); + assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet()); + }, 2, TimeUnit.MINUTES); + + GeoIpTaskState state = (GeoIpTaskState) getTask().getState(); + for (String id : List.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb")) { + assertBusy(() -> { + GeoIpTaskState.Metadata metadata = state.get(id); + BoolQueryBuilder queryBuilder = new BoolQueryBuilder() + .filter(new MatchQueryBuilder("name", id)) + .filter(new RangeQueryBuilder("chunk") + .from(metadata.getFirstChunk()) + .to(metadata.getLastChunk(), true)); + int size = metadata.getLastChunk() - metadata.getFirstChunk() + 1; + SearchResponse res = client().prepareSearch(GeoIpDownloader.DATABASES_INDEX) + .setSize(size) + .setQuery(queryBuilder) + .addSort("chunk", SortOrder.ASC) + .get(); + TotalHits totalHits = res.getHits().getTotalHits(); + assertEquals(TotalHits.Relation.EQUAL_TO, totalHits.relation); + assertEquals(size, totalHits.value); + assertEquals(size, res.getHits().getHits().length); + + List data = new ArrayList<>(); + + for (SearchHit hit : res.getHits().getHits()) { + data.add((byte[]) hit.getSourceAsMap().get("data")); + } + + GZIPInputStream stream = new GZIPInputStream(new MultiByteArrayInputStream(data)); + Path tempFile = createTempFile(); + try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tempFile, TRUNCATE_EXISTING, WRITE, CREATE))) { + stream.transferTo(os); + } + + parseDatabase(tempFile); + }); + } + } + + @SuppressForbidden(reason = "Maxmind API requires java.io.File") + private void parseDatabase(Path tempFile) throws IOException { + try (DatabaseReader databaseReader = new DatabaseReader.Builder(tempFile.toFile()).build()) { + assertNotNull(databaseReader.getMetadata()); + } + } + + private PersistentTasksCustomMetadata.PersistentTask getTask() { + return PersistentTasksCustomMetadata.getTaskWithId(clusterService().state(), GeoIpDownloader.GEOIP_DOWNLOADER); + } + + private static class MultiByteArrayInputStream extends InputStream { + + private final Iterator data; + private ByteArrayInputStream current; + + private MultiByteArrayInputStream(List data) { + this.data = data.iterator(); + } + + @Override + public int read() { + if (current == null) { + if (data.hasNext() == false) { + return -1; + } + + current = new ByteArrayInputStream(data.next()); + } + int read = current.read(); + if (read == -1) { + current = null; + return read(); + } + return read; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (current == null) { + if (data.hasNext() == false) { + return -1; + } + + current = new ByteArrayInputStream(data.next()); + } + int read = current.read(b, off, len); + if (read == -1) { + current = null; + return read(b, off, len); + } + return read; + } + } +} diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java index 59279c605a69b..23e1346660ff4 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java @@ -13,69 +13,27 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.NodeRoles; -import org.elasticsearch.test.StreamsUtils; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; -import java.util.List; import static org.elasticsearch.test.NodeRoles.nonIngestNode; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; -public class GeoIpProcessorNonIngestNodeIT extends ESIntegTestCase { - - public static class IngestGeoIpSettingsPlugin extends Plugin { - - @Override - public List> getSettings() { - return Collections.singletonList(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope)); - } - } +public class GeoIpProcessorNonIngestNodeIT extends AbstractGeoIpIT { @Override - protected Collection> nodePlugins() { - return Arrays.asList(IngestGeoIpPlugin.class, IngestGeoIpSettingsPlugin.class); - } - - @Override - protected Settings nodeSettings(final int nodeOrdinal) { - final Path databasePath = createTempDir(); - try { - Files.createDirectories(databasePath); - Files.copy( - new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")), - databasePath.resolve("GeoLite2-City.mmdb")); - Files.copy( - new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")), - databasePath.resolve("GeoLite2-Country.mmdb")); - Files.copy( - new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")), - databasePath.resolve("GeoLite2-ASN.mmdb")); - } catch (final IOException e) { - throw new UncheckedIOException(e); - } - return Settings.builder() - .put("ingest.geoip.database_path", databasePath) - .put(nonIngestNode()) - .put(super.nodeSettings(nodeOrdinal)) - .build(); + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(nonIngestNode()).build(); } /** diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java new file mode 100644 index 0000000000000..014be31402dd3 --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java @@ -0,0 +1,227 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.hash.MessageDigests; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.MatchQueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.ingest.geoip.GeoIpTaskState.Metadata; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.io.InputStream; +import java.security.MessageDigest; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Main component responsible for downloading new GeoIP databases. + * New databases are downloaded in chunks and stored in .geoip_databases index + * Downloads are verified against MD5 checksum provided by the server + * Current state of all stored databases is stored in cluster state in persistent task state + */ +class GeoIpDownloader extends AllocatedPersistentTask { + + private static final Logger logger = LogManager.getLogger(GeoIpDownloader.class); + + public static final boolean GEOIP_V2_FEATURE_FLAG_ENABLED = "true".equals(System.getProperty("es.geoip_v2_feature_flag_enabled")); + + public static final Setting POLL_INTERVAL_SETTING = Setting.timeSetting("geoip.downloader.poll.interval", + TimeValue.timeValueDays(3), TimeValue.timeValueDays(1), Property.Dynamic, Property.NodeScope); + public static final Setting ENDPOINT_SETTING = Setting.simpleString("geoip.downloader.endpoint", + "https://paisano.elastic.dev/v1/geoip/database", Property.NodeScope); + + public static final String GEOIP_DOWNLOADER = "geoip-downloader"; + static final String DATABASES_INDEX = ".geoip_databases"; + static final int MAX_CHUNK_SIZE = 1024 * 1024; + + private final Client client; + private final HttpClient httpClient; + private final ThreadPool threadPool; + private final String endpoint; + + //visible for testing + protected volatile GeoIpTaskState state; + private volatile TimeValue pollInterval; + private volatile Scheduler.ScheduledCancellable scheduled; + + GeoIpDownloader(Client client, HttpClient httpClient, ClusterService clusterService, ThreadPool threadPool, Settings settings, + long id, String type, String action, String description, TaskId parentTask, + Map headers) { + super(id, type, action, description, parentTask, headers); + this.httpClient = httpClient; + this.client = client; + this.threadPool = threadPool; + endpoint = ENDPOINT_SETTING.get(settings); + pollInterval = POLL_INTERVAL_SETTING.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(POLL_INTERVAL_SETTING, this::setPollInterval); + } + + public void setPollInterval(TimeValue pollInterval) { + this.pollInterval = pollInterval; + if (scheduled != null && scheduled.cancel()) { + scheduleNextRun(new TimeValue(1)); + } + } + + //visible for testing + void updateDatabases() throws IOException { + logger.info("updating geoip databases"); + List> response = fetchDatabasesOverview(); + for (Map res : response) { + processDatabase(res); + } + } + + @SuppressWarnings("unchecked") + private List fetchDatabasesOverview() throws IOException { + byte[] data = httpClient.getBytes(endpoint + "?key=11111111-1111-1111-1111-111111111111"); + try (XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, data)) { + return (List) parser.list(); + } + } + + //visible for testing + void processDatabase(Map databaseInfo) { + String name = databaseInfo.get("name").toString().replace(".gz", ""); + String md5 = (String) databaseInfo.get("md5_hash"); + if (state.contains(name) && Objects.equals(md5, state.get(name).getMd5())) { + updateTimestamp(name, state.get(name)); + return; + } + logger.info("updating geoip database [" + name + "]"); + String url = databaseInfo.get("url").toString(); + try (InputStream is = httpClient.get(url)) { + int firstChunk = state.contains(name) ? state.get(name).getLastChunk() + 1 : 0; + int lastChunk = indexChunks(name, is, firstChunk, md5); + if (lastChunk > firstChunk) { + state = state.put(name, new Metadata(System.currentTimeMillis(), firstChunk, lastChunk - 1, md5)); + updateTaskState(); + logger.info("updated geoip database [" + name + "]"); + deleteOldChunks(name, firstChunk); + } + } catch (Exception e) { + logger.error("error updating geoip database [" + name + "]", e); + } + } + + //visible for testing + void deleteOldChunks(String name, int firstChunk) { + BoolQueryBuilder queryBuilder = new BoolQueryBuilder() + .filter(new MatchQueryBuilder("name", name)) + .filter(new RangeQueryBuilder("chunk").to(firstChunk, false)); + DeleteByQueryRequest request = new DeleteByQueryRequest(); + request.indices(DATABASES_INDEX); + request.setQuery(queryBuilder); + client.execute(DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(r -> { + }, e -> logger.warn("could not delete old chunks for geoip database [" + name + "]", e))); + } + + //visible for testing + protected void updateTimestamp(String name, Metadata old) { + logger.info("geoip database [" + name + "] is up to date, updated timestamp"); + state = state.put(name, new Metadata(System.currentTimeMillis(), old.getFirstChunk(), old.getLastChunk(), old.getMd5())); + updateTaskState(); + } + + void updateTaskState() { + PlainActionFuture> future = PlainActionFuture.newFuture(); + updatePersistentTaskState(state, future); + state = ((GeoIpTaskState) future.actionGet().getState()); + } + + //visible for testing + int indexChunks(String name, InputStream is, int chunk, String expectedMd5) throws IOException { + MessageDigest md = MessageDigests.md5(); + for (byte[] buf = getChunk(is); buf.length != 0; buf = getChunk(is)) { + md.update(buf); + client.prepareIndex(DATABASES_INDEX).setId(name + "_" + chunk) + .setSource(XContentType.SMILE, "name", name, "chunk", chunk, "data", buf) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .setWaitForActiveShards(ActiveShardCount.ALL) + .get(); + chunk++; + } + String actualMd5 = MessageDigests.toHexString(md.digest()); + if (Objects.equals(expectedMd5, actualMd5) == false) { + throw new IOException("md5 checksum mismatch, expected [" + expectedMd5 + "], actual [" + actualMd5 + "]"); + } + return chunk; + } + + //visible for testing + byte[] getChunk(InputStream is) throws IOException { + byte[] buf = new byte[MAX_CHUNK_SIZE]; + int chunkSize = 0; + while (chunkSize < MAX_CHUNK_SIZE) { + int read = is.read(buf, chunkSize, MAX_CHUNK_SIZE - chunkSize); + if (read == -1) { + break; + } + chunkSize += read; + } + if (chunkSize < MAX_CHUNK_SIZE) { + buf = Arrays.copyOf(buf, chunkSize); + } + return buf; + } + + void setState(GeoIpTaskState state) { + this.state = state; + } + + void runDownloader() { + if (isCancelled() || isCompleted()) { + return; + } + try { + updateDatabases(); + } catch (Exception e) { + logger.error("exception during geoip databases update", e); + } + scheduleNextRun(pollInterval); + } + + @Override + protected void onCancelled() { + if (scheduled != null) { + scheduled.cancel(); + } + } + + private void scheduleNextRun(TimeValue time) { + scheduled = threadPool.schedule(this::runDownloader, time, ThreadPool.Names.GENERIC); + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java new file mode 100644 index 0000000000000..306489e3d798f --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Map; + +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_V2_FEATURE_FLAG_ENABLED; + +/** + * Persistent task executor that is responsible for starting {@link GeoIpDownloader} after task is allocated by master node. + * Also bootstraps GeoIP download task on clean cluster and handles changes to the 'geoip.downloader.enabled' setting + */ +final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor implements ClusterStateListener { + + public static final Setting ENABLED_SETTING = Setting.boolSetting("geoip.downloader.enabled", GEOIP_V2_FEATURE_FLAG_ENABLED, + Setting.Property.Dynamic, Setting.Property.NodeScope); + + private static final Logger logger = LogManager.getLogger(GeoIpDownloader.class); + + private final Client client; + private final HttpClient httpClient; + private final ClusterService clusterService; + private final ThreadPool threadPool; + private final Settings settings; + private final PersistentTasksService persistentTasksService; + + GeoIpDownloaderTaskExecutor(Client client, HttpClient httpClient, ClusterService clusterService, ThreadPool threadPool, + Settings settings) { + super(GEOIP_DOWNLOADER, ThreadPool.Names.GENERIC); + this.client = client; + this.httpClient = httpClient; + this.clusterService = clusterService; + this.threadPool = threadPool; + this.settings = settings; + persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); + if (ENABLED_SETTING.get(settings)) { + clusterService.addListener(this); + } + clusterService.getClusterSettings().addSettingsUpdateConsumer(ENABLED_SETTING, this::setEnabled); + } + + private void setEnabled(boolean enabled) { + if (enabled) { + if (clusterService.state().nodes().isLocalNodeElectedMaster()) { + startTask(() -> { + }); + } + } else { + persistentTasksService.sendRemoveRequest(GEOIP_DOWNLOADER, ActionListener.wrap(r -> { + }, e -> logger.error("failed to remove geoip task", e))); + } + } + + @Override + protected void nodeOperation(AllocatedPersistentTask task, GeoIpTaskParams params, PersistentTaskState state) { + GeoIpDownloader downloader = (GeoIpDownloader) task; + GeoIpTaskState geoIpTaskState = state == null ? GeoIpTaskState.EMPTY : (GeoIpTaskState) state; + downloader.setState(geoIpTaskState); + downloader.runDownloader(); + } + + @Override + protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, + PersistentTasksCustomMetadata.PersistentTask taskInProgress, + Map headers) { + return new GeoIpDownloader(client, httpClient, clusterService, threadPool, settings, id, type, action, + getDescription(taskInProgress), parentTaskId, headers); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + //bootstrap downloader after first cluster start + clusterService.removeListener(this); + if (event.localNodeMaster() && ENABLED_SETTING.get(event.state().getMetadata().settings())) { + startTask(() -> clusterService.addListener(this)); + } + } + + private void startTask(Runnable onFailure) { + persistentTasksService.sendStartRequest(GEOIP_DOWNLOADER, GEOIP_DOWNLOADER, new GeoIpTaskParams(), ActionListener.wrap(r -> { + }, e -> { + if (e instanceof ResourceAlreadyExistsException == false) { + logger.error("failed to create geoip downloader task", e); + onFailure.run(); + } + })); + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskParams.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskParams.java new file mode 100644 index 0000000000000..b009f3333758b --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskParams.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.persistent.PersistentTaskParams; + +import java.io.IOException; + +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; + +class GeoIpTaskParams implements PersistentTaskParams { + + public static final ObjectParser PARSER = new ObjectParser<>(GEOIP_DOWNLOADER, true, GeoIpTaskParams::new); + + GeoIpTaskParams() { + } + + GeoIpTaskParams(StreamInput in) { + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.endObject(); + return builder; + } + + @Override + public String getWriteableName() { + return GEOIP_DOWNLOADER; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_8_0_0; + } + + @Override + public void writeTo(StreamOutput out) { + } + + public static GeoIpTaskParams fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof GeoIpTaskParams; + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java new file mode 100644 index 0000000000000..6b0fc7ac40345 --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java @@ -0,0 +1,216 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.elasticsearch.Version; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.persistent.PersistentTaskState; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; + +class GeoIpTaskState implements PersistentTaskState, VersionedNamedWriteable { + + private static final ParseField DATABASES = new ParseField("databases"); + + static final GeoIpTaskState EMPTY = new GeoIpTaskState(Collections.emptyMap()); + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>(GEOIP_DOWNLOADER, true, + args -> { + List> databases = (List>) args[0]; + return new GeoIpTaskState(databases.stream().collect(Collectors.toMap(Tuple::v1, Tuple::v2))); + }); + + static { + PARSER.declareNamedObjects(constructorArg(), (p, c, name) -> Tuple.tuple(name, Metadata.fromXContent(p)), DATABASES); + } + + public static GeoIpTaskState fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + private final Map databases; + + private GeoIpTaskState(Map databases) { + this.databases = Map.copyOf(databases); + } + + GeoIpTaskState(StreamInput input) throws IOException { + databases = Collections.unmodifiableMap(input.readMap(StreamInput::readString, + in -> new Metadata(in.readLong(), in.readVInt(), in.readVInt(), in.readString()))); + } + + public GeoIpTaskState put(String name, Metadata metadata) { + HashMap newDatabases = new HashMap<>(databases); + newDatabases.put(name, metadata); + return new GeoIpTaskState(newDatabases); + } + + public Map getDatabases() { + return databases; + } + + public boolean contains(String name) { + return databases.containsKey(name); + } + + public Metadata get(String name) { + return databases.get(name); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GeoIpTaskState that = (GeoIpTaskState) o; + return databases.equals(that.databases); + } + + @Override + public int hashCode() { + return Objects.hash(databases); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.startObject("databases"); + for (Map.Entry e : databases.entrySet()) { + builder.field(e.getKey(), e.getValue()); + } + builder.endObject(); + } + builder.endObject(); + return builder; + } + + @Override + public String getWriteableName() { + return "geoip-downloader"; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_8_0_0; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(databases, StreamOutput::writeString, (o, v) -> { + o.writeLong(v.lastUpdate); + o.writeVInt(v.firstChunk); + o.writeVInt(v.lastChunk); + o.writeString(v.md5); + }); + } + + static class Metadata implements ToXContentObject { + + static final String NAME = GEOIP_DOWNLOADER + "-metadata"; + private static final ParseField LAST_UPDATE = new ParseField("last_update"); + private static final ParseField FIRST_CHUNK = new ParseField("first_chunk"); + private static final ParseField LAST_CHUNK = new ParseField("last_chunk"); + private static final ParseField MD5 = new ParseField("md5"); + + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>(NAME, true, + args -> new Metadata((long) args[0], (int) args[1], (int) args[2], (String) args[3])); + + static { + PARSER.declareLong(constructorArg(), LAST_UPDATE); + PARSER.declareInt(constructorArg(), FIRST_CHUNK); + PARSER.declareInt(constructorArg(), LAST_CHUNK); + PARSER.declareString(constructorArg(), MD5); + } + + public static Metadata fromXContent(XContentParser parser) { + try { + return PARSER.parse(parser, null); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private final long lastUpdate; + private final int firstChunk; + private final int lastChunk; + private final String md5; + + Metadata(long lastUpdate, int firstChunk, int lastChunk, String md5) { + this.lastUpdate = lastUpdate; + this.firstChunk = firstChunk; + this.lastChunk = lastChunk; + this.md5 = Objects.requireNonNull(md5); + } + + public long getLastUpdate() { + return lastUpdate; + } + + public int getFirstChunk() { + return firstChunk; + } + + public int getLastChunk() { + return lastChunk; + } + + public String getMd5() { + return md5; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Metadata metadata = (Metadata) o; + return lastUpdate == metadata.lastUpdate + && firstChunk == metadata.firstChunk + && lastChunk == metadata.lastChunk + && md5.equals(metadata.md5); + } + + @Override + public int hashCode() { + return Objects.hash(lastUpdate, firstChunk, lastChunk, md5); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field(LAST_UPDATE.getPreferredName(), lastUpdate); + builder.field(FIRST_CHUNK.getPreferredName(), firstChunk); + builder.field(LAST_CHUNK.getPreferredName(), lastChunk); + builder.field(MD5.getPreferredName(), md5); + } + builder.endObject(); + return builder; + } + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/HttpClient.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/HttpClient.java new file mode 100644 index 0000000000000..7b3b4bb89d505 --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/HttpClient.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.SpecialPermission; +import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.rest.RestStatus; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; + +import static java.net.HttpURLConnection.HTTP_MOVED_PERM; +import static java.net.HttpURLConnection.HTTP_MOVED_TEMP; +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_OK; +import static java.net.HttpURLConnection.HTTP_SEE_OTHER; + +class HttpClient { + + byte[] getBytes(String url) throws IOException { + return get(url).readAllBytes(); + } + + InputStream get(String urlToGet) throws IOException { + return doPrivileged(() -> { + String url = urlToGet; + HttpURLConnection conn = createConnection(url); + + int redirectsCount = 0; + while (true) { + switch (conn.getResponseCode()) { + case HTTP_OK: + return new BufferedInputStream(getInputStream(conn)); + case HTTP_MOVED_PERM: + case HTTP_MOVED_TEMP: + case HTTP_SEE_OTHER: + if (redirectsCount++ > 50) { + throw new IllegalStateException("too many redirects connection to [" + urlToGet + "]"); + } + String location = conn.getHeaderField("Location"); + URL base = new URL(url); + URL next = new URL(base, location); // Deal with relative URLs + url = next.toExternalForm(); + conn = createConnection(url); + break; + case HTTP_NOT_FOUND: + throw new ResourceNotFoundException("{} not found", urlToGet); + default: + int responseCode = conn.getResponseCode(); + throw new ElasticsearchStatusException("error during downloading {}", RestStatus.fromCode(responseCode), urlToGet); + } + } + }); + } + + @SuppressForbidden(reason = "we need socket connection to download data from internet") + private InputStream getInputStream(HttpURLConnection conn) throws IOException { + return conn.getInputStream(); + } + + private HttpURLConnection createConnection(String url) throws IOException { + HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection(); + conn.setConnectTimeout(5000); + conn.setReadTimeout(5000); + conn.setDoOutput(false); + conn.setInstanceFollowRedirects(false); + return conn; + } + + private static R doPrivileged(CheckedSupplier supplier) throws IOException { + SpecialPermission.check(); + try { + return AccessController.doPrivileged((PrivilegedExceptionAction) supplier::get); + } catch (PrivilegedActionException e) { + throw (IOException) e.getCause(); + } + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 20c3568ed672b..31074cb505ca2 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -9,17 +9,29 @@ package org.elasticsearch.ingest.geoip; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Version; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.ingest.Processor; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.plugins.IngestPlugin; +import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; @@ -28,13 +40,21 @@ import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Supplier; -public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable { +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX; +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_V2_FEATURE_FLAG_ENABLED; + +public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, SystemIndexPlugin, Closeable, PersistentTaskPlugin { public static final Setting CACHE_SIZE = Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope); @@ -44,7 +64,13 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable @Override public List> getSettings() { - return List.of(CACHE_SIZE); + List> settings = new ArrayList<>(Arrays.asList(CACHE_SIZE, + GeoIpDownloader.ENDPOINT_SETTING, + GeoIpDownloader.POLL_INTERVAL_SETTING)); + if (GEOIP_V2_FEATURE_FLAG_ENABLED) { + settings.add(GeoIpDownloaderTaskExecutor.ENABLED_SETTING); + } + return settings; } @Override @@ -72,7 +98,7 @@ public Collection createComponents(Client client, localDatabases.get().initialize(resourceWatcherService); } catch (IOException e) { throw new UncheckedIOException(e); - } + } return List.of(); } @@ -81,4 +107,83 @@ public void close() throws IOException { localDatabases.get().close(); } + @Override + public List> getPersistentTasksExecutor(ClusterService clusterService, ThreadPool threadPool, + Client client, SettingsModule settingsModule, + IndexNameExpressionResolver expressionResolver) { + if (GEOIP_V2_FEATURE_FLAG_ENABLED) { + Settings settings = settingsModule.getSettings(); + return List.of(new GeoIpDownloaderTaskExecutor(client, new HttpClient(), clusterService, threadPool, settings)); + } else { + return List.of(); + } + } + + @Override + public List getNamedXContent() { + return List.of(new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(GEOIP_DOWNLOADER), + GeoIpTaskParams::fromXContent), + new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(GEOIP_DOWNLOADER), GeoIpTaskState::fromXContent)); + } + + @Override + public List getNamedWriteables() { + return List.of(new NamedWriteableRegistry.Entry(PersistentTaskState.class, GEOIP_DOWNLOADER, GeoIpTaskState::new), + new NamedWriteableRegistry.Entry(PersistentTaskParams.class, GEOIP_DOWNLOADER, GeoIpTaskParams::new)); + } + + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + SystemIndexDescriptor geoipDatabasesIndex = SystemIndexDescriptor.builder() + .setIndexPattern(DATABASES_INDEX) + .setDescription("GeoIP databases") + .setMappings(mappings()) + .setSettings(Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1") + .build()) + .setOrigin("geoip") + .setVersionMetaKey("version") + .setPrimaryIndex(DATABASES_INDEX) + .build(); + return Collections.singleton(geoipDatabasesIndex); + } + + @Override + public String getFeatureName() { + return "geoip"; + } + + @Override + public String getFeatureDescription() { + return "Manages data related to GeoIP database downloader"; + } + + private static XContentBuilder mappings() { + try { + return jsonBuilder() + .startObject() + .startObject(SINGLE_MAPPING_NAME) + .startObject("_meta") + .field("version", Version.CURRENT) + .endObject() + .field("dynamic", "strict") + .startObject("properties") + .startObject("name") + .field("type", "keyword") + .endObject() + .startObject("chunk") + .field("type", "integer") + .endObject() + .startObject("data") + .field("type", "binary") + .endObject() + .endObject() + .endObject() + .endObject(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to build mappings for " + DATABASES_INDEX, e); + } + } } diff --git a/modules/ingest-geoip/src/main/plugin-metadata/plugin-security.policy b/modules/ingest-geoip/src/main/plugin-metadata/plugin-security.policy index c961d7248a2bf..2f1e80e8e5578 100644 --- a/modules/ingest-geoip/src/main/plugin-metadata/plugin-security.policy +++ b/modules/ingest-geoip/src/main/plugin-metadata/plugin-security.policy @@ -15,4 +15,5 @@ grant { permission java.lang.RuntimePermission "accessDeclaredMembers"; // Also needed because of jackson-databind: permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; + permission java.net.SocketPermission "*", "connect"; }; diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java new file mode 100644 index 0000000000000..467d708586e8c --- /dev/null +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java @@ -0,0 +1,368 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.index.IndexAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.node.Node; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; + +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.ENDPOINT_SETTING; +import static org.elasticsearch.ingest.geoip.GeoIpDownloader.MAX_CHUNK_SIZE; +import static org.elasticsearch.tasks.TaskId.EMPTY_TASK_ID; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class GeoIpDownloaderTests extends ESTestCase { + + private HttpClient httpClient; + private ClusterService clusterService; + private ThreadPool threadPool; + private MockClient client; + private GeoIpDownloader geoIpDownloader; + + @Before + public void setup() { + httpClient = mock(HttpClient.class); + clusterService = mock(ClusterService.class); + threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build()); + when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(Settings.EMPTY, + Set.of(GeoIpDownloader.ENDPOINT_SETTING, GeoIpDownloader.POLL_INTERVAL_SETTING, GeoIpDownloaderTaskExecutor.ENABLED_SETTING))); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).build(); + when(clusterService.state()).thenReturn(state); + client = new MockClient(threadPool); + geoIpDownloader = new GeoIpDownloader(client, httpClient, clusterService, threadPool, Settings.EMPTY, + 1, "", "", "", EMPTY_TASK_ID, Collections.emptyMap()); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testGetChunkEndOfStream() throws IOException { + byte[] chunk = geoIpDownloader.getChunk(new InputStream() { + @Override + public int read() { + return -1; + } + }); + assertArrayEquals(new byte[0], chunk); + chunk = geoIpDownloader.getChunk(new ByteArrayInputStream(new byte[0])); + assertArrayEquals(new byte[0], chunk); + } + + public void testGetChunkLessThanChunkSize() throws IOException { + ByteArrayInputStream is = new ByteArrayInputStream(new byte[]{1, 2, 3, 4}); + byte[] chunk = geoIpDownloader.getChunk(is); + assertArrayEquals(new byte[]{1, 2, 3, 4}, chunk); + chunk = geoIpDownloader.getChunk(is); + assertArrayEquals(new byte[0], chunk); + + } + + public void testGetChunkExactlyChunkSize() throws IOException { + byte[] bigArray = new byte[MAX_CHUNK_SIZE]; + for (int i = 0; i < MAX_CHUNK_SIZE; i++) { + bigArray[i] = (byte) i; + } + ByteArrayInputStream is = new ByteArrayInputStream(bigArray); + byte[] chunk = geoIpDownloader.getChunk(is); + assertArrayEquals(bigArray, chunk); + chunk = geoIpDownloader.getChunk(is); + assertArrayEquals(new byte[0], chunk); + } + + public void testGetChunkMoreThanChunkSize() throws IOException { + byte[] bigArray = new byte[MAX_CHUNK_SIZE * 2]; + for (int i = 0; i < MAX_CHUNK_SIZE * 2; i++) { + bigArray[i] = (byte) i; + } + byte[] smallArray = new byte[MAX_CHUNK_SIZE]; + System.arraycopy(bigArray, 0, smallArray, 0, MAX_CHUNK_SIZE); + ByteArrayInputStream is = new ByteArrayInputStream(bigArray); + byte[] chunk = geoIpDownloader.getChunk(is); + assertArrayEquals(smallArray, chunk); + System.arraycopy(bigArray, MAX_CHUNK_SIZE, smallArray, 0, MAX_CHUNK_SIZE); + chunk = geoIpDownloader.getChunk(is); + assertArrayEquals(smallArray, chunk); + chunk = geoIpDownloader.getChunk(is); + assertArrayEquals(new byte[0], chunk); + } + + public void testGetChunkRethrowsIOException() { + expectThrows(IOException.class, () -> geoIpDownloader.getChunk(new InputStream() { + @Override + public int read() throws IOException { + throw new IOException(); + } + })); + } + + public void testIndexChunksNoData() throws IOException { + assertEquals(0, geoIpDownloader.indexChunks("test", new ByteArrayInputStream(new byte[0]), 0, "d41d8cd98f00b204e9800998ecf8427e")); + } + + public void testIndexChunksMd5Mismatch() { + IOException exception = expectThrows(IOException.class, () -> geoIpDownloader.indexChunks("test", + new ByteArrayInputStream(new byte[0]), 0, "123123")); + assertEquals("md5 checksum mismatch, expected [123123], actual [d41d8cd98f00b204e9800998ecf8427e]", exception.getMessage()); + } + + public void testIndexChunks() throws IOException { + byte[] bigArray = new byte[MAX_CHUNK_SIZE + 20]; + for (int i = 0; i < MAX_CHUNK_SIZE + 20; i++) { + bigArray[i] = (byte) i; + } + byte[][] chunksData = new byte[2][]; + chunksData[0] = new byte[MAX_CHUNK_SIZE]; + System.arraycopy(bigArray, 0, chunksData[0], 0, MAX_CHUNK_SIZE); + chunksData[1] = new byte[20]; + System.arraycopy(bigArray, MAX_CHUNK_SIZE, chunksData[1], 0, 20); + + AtomicInteger chunkIndex = new AtomicInteger(); + + client.addHandler(IndexAction.INSTANCE, (IndexRequest request, ActionListener listener) -> { + int chunk = chunkIndex.getAndIncrement(); + assertEquals("test_" + (chunk + 15), request.id()); + assertEquals(XContentType.SMILE, request.getContentType()); + Map source = request.sourceAsMap(); + assertEquals("test", source.get("name")); + assertArrayEquals(chunksData[chunk], (byte[]) source.get("data")); + assertEquals(chunk + 15, source.get("chunk")); + listener.onResponse(mock(IndexResponse.class)); + }); + + assertEquals(17, geoIpDownloader.indexChunks("test", new ByteArrayInputStream(bigArray), 15, "a67563dfa8f3cba8b8cff61eb989a749")); + + assertEquals(2, chunkIndex.get()); + } + + public void testProcessDatabaseNew() throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(new byte[0]); + when(httpClient.get("a.b/t1")).thenReturn(bais); + + geoIpDownloader = new GeoIpDownloader(client, httpClient, clusterService, threadPool, Settings.EMPTY, + 1, "", "", "", EMPTY_TASK_ID, Collections.emptyMap()) { + @Override + void updateTaskState() { + assertEquals(0, state.get("test").getFirstChunk()); + assertEquals(10, state.get("test").getLastChunk()); + } + + @Override + int indexChunks(String name, InputStream is, int chunk, String expectedMd5) { + assertSame(bais, is); + assertEquals(0, chunk); + return 11; + } + + @Override + protected void updateTimestamp(String name, GeoIpTaskState.Metadata metadata) { + fail(); + } + + @Override + void deleteOldChunks(String name, int firstChunk) { + assertEquals("test", name); + assertEquals(0, firstChunk); + } + }; + + geoIpDownloader.setState(GeoIpTaskState.EMPTY); + geoIpDownloader.processDatabase(Map.of("name", "test.gz", "url", "a.b/t1", "md5_hash", "1")); + } + + public void testProcessDatabaseUpdate() throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(new byte[0]); + when(httpClient.get("a.b/t1")).thenReturn(bais); + + geoIpDownloader = new GeoIpDownloader(client, httpClient, clusterService, threadPool, Settings.EMPTY, + 1, "", "", "", EMPTY_TASK_ID, Collections.emptyMap()) { + @Override + void updateTaskState() { + assertEquals(9, state.get("test").getFirstChunk()); + assertEquals(10, state.get("test").getLastChunk()); + } + + @Override + int indexChunks(String name, InputStream is, int chunk, String expectedMd5) { + assertSame(bais, is); + assertEquals(9, chunk); + return 11; + } + + @Override + protected void updateTimestamp(String name, GeoIpTaskState.Metadata metadata) { + fail(); + } + + @Override + void deleteOldChunks(String name, int firstChunk) { + assertEquals("test", name); + assertEquals(9, firstChunk); + } + }; + + geoIpDownloader.setState(GeoIpTaskState.EMPTY.put("test", new GeoIpTaskState.Metadata(0, 5, 8, "0"))); + geoIpDownloader.processDatabase(Map.of("name", "test.gz", "url", "a.b/t1", "md5_hash", "1")); + } + + + public void testProcessDatabaseSame() throws IOException { + GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(0, 4, 10, "1"); + GeoIpTaskState taskState = GeoIpTaskState.EMPTY.put("test", metadata); + ByteArrayInputStream bais = new ByteArrayInputStream(new byte[0]); + when(httpClient.get("a.b/t1")).thenReturn(bais); + + geoIpDownloader = new GeoIpDownloader(client, httpClient, clusterService, threadPool, Settings.EMPTY, + 1, "", "", "", EMPTY_TASK_ID, Collections.emptyMap()) { + @Override + void updateTaskState() { + fail(); + } + + @Override + int indexChunks(String name, InputStream is, int chunk, String expectedMd5) { + fail(); + return 0; + } + + @Override + protected void updateTimestamp(String name, GeoIpTaskState.Metadata newMetadata) { + assertEquals(metadata, newMetadata); + assertEquals("test", name); + } + + @Override + void deleteOldChunks(String name, int firstChunk) { + fail(); + } + }; + geoIpDownloader.setState(taskState); + geoIpDownloader.processDatabase(Map.of("name", "test.gz", "url", "a.b/t1", "md5_hash", "1")); + } + + @SuppressWarnings("unchecked") + public void testUpdateTaskState() { + geoIpDownloader = new GeoIpDownloader(client, httpClient, clusterService, threadPool, Settings.EMPTY, + 1, "", "", "", EMPTY_TASK_ID, Collections.emptyMap()) { + @Override + public void updatePersistentTaskState(PersistentTaskState state, ActionListener> listener) { + assertSame(GeoIpTaskState.EMPTY, state); + PersistentTask task = mock(PersistentTask.class); + when(task.getState()).thenReturn(GeoIpTaskState.EMPTY); + listener.onResponse(task); + } + }; + geoIpDownloader.setState(GeoIpTaskState.EMPTY); + geoIpDownloader.updateTaskState(); + } + + @SuppressWarnings("unchecked") + public void testUpdateTaskStateError() { + geoIpDownloader = new GeoIpDownloader(client, httpClient, clusterService, threadPool, Settings.EMPTY, + 1, "", "", "", EMPTY_TASK_ID, Collections.emptyMap()) { + @Override + public void updatePersistentTaskState(PersistentTaskState state, ActionListener> listener) { + assertSame(GeoIpTaskState.EMPTY, state); + PersistentTask task = mock(PersistentTask.class); + when(task.getState()).thenReturn(GeoIpTaskState.EMPTY); + listener.onFailure(new IllegalStateException("test failure")); + } + }; + geoIpDownloader.setState(GeoIpTaskState.EMPTY); + IllegalStateException exception = expectThrows(IllegalStateException.class, geoIpDownloader::updateTaskState); + assertEquals("test failure", exception.getMessage()); + } + + public void testUpdateDatabases() throws IOException { + List> maps = List.of(Map.of("a", 1), Map.of("a", 2)); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), baos); + builder.startArray(); + builder.map(Map.of("a", 1)); + builder.map(Map.of("a", 2)); + builder.endArray(); + builder.close(); + when(httpClient.getBytes("a.b?key=11111111-1111-1111-1111-111111111111")).thenReturn(baos.toByteArray()); + Iterator> it = maps.iterator(); + geoIpDownloader = new GeoIpDownloader(client, httpClient, clusterService, threadPool, + Settings.builder().put(ENDPOINT_SETTING.getKey(), "a.b").build(), + 1, "", "", "", EMPTY_TASK_ID, Collections.emptyMap()) { + @Override + void processDatabase(Map databaseInfo) { + assertEquals(it.next(), databaseInfo); + } + }; + geoIpDownloader.updateDatabases(); + assertFalse(it.hasNext()); + } + + private static class MockClient extends NoOpClient { + + private final Map, BiConsumer>> handlers = new HashMap<>(); + + private MockClient(ThreadPool threadPool) { + super(threadPool); + } + + public void addHandler(ActionType action, + BiConsumer> listener) { + handlers.put(action, listener); + } + + @SuppressWarnings("unchecked") + @Override + protected void doExecute(ActionType action, + Request request, + ActionListener listener) { + if (handlers.containsKey(action)) { + BiConsumer> biConsumer = + (BiConsumer>) handlers.get(action); + biConsumer.accept(request, listener); + } else { + throw new IllegalStateException("unexpected action called [" + action.name() + "]"); + } + } + } +} diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpTaskStateSerializationTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpTaskStateSerializationTests.java new file mode 100644 index 0000000000000..dd5faa9d8fa33 --- /dev/null +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpTaskStateSerializationTests.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; + +public class GeoIpTaskStateSerializationTests extends AbstractSerializingTestCase { + @Override + protected GeoIpTaskState doParseInstance(XContentParser parser) throws IOException { + return GeoIpTaskState.fromXContent(parser); + } + + @Override + protected Writeable.Reader instanceReader() { + return GeoIpTaskState::new; + } + + @Override + protected GeoIpTaskState createTestInstance() { + GeoIpTaskState state = GeoIpTaskState.EMPTY; + int databaseCount = randomInt(20); + for (int i = 0; i < databaseCount; i++) { + GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(randomLong(), randomInt(), randomInt(), randomAlphaOfLength(32)); + state = state.put(randomAlphaOfLengthBetween(5, 10), metadata); + } + return state; + } +} diff --git a/settings.gradle b/settings.gradle index a3b7a412ab167..900014fe4d02d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -80,6 +80,7 @@ List projects = [ 'test:fixtures:minio-fixture', 'test:fixtures:old-elasticsearch', 'test:fixtures:s3-fixture', + 'test:fixtures:geoip-fixture', 'test:logger-usage' ] diff --git a/test/fixtures/geoip-fixture/Dockerfile b/test/fixtures/geoip-fixture/Dockerfile new file mode 100644 index 0000000000000..bcd8013408818 --- /dev/null +++ b/test/fixtures/geoip-fixture/Dockerfile @@ -0,0 +1,15 @@ +FROM ubuntu:18.04 + +RUN apt-get update -qqy +RUN apt-get install -qqy openjdk-11-jre-headless + +ARG fixtureClass +ARG port + +ENV GEOIP_FIXTURE_CLASS=${fixtureClass} +ENV GEOIP_FIXTURE_PORT=${port} + +ENTRYPOINT exec java -classpath "/fixture/shared/*" \ + $GEOIP_FIXTURE_CLASS 0.0.0.0 "$GEOIP_FIXTURE_PORT" + +EXPOSE $port diff --git a/test/fixtures/geoip-fixture/build.gradle b/test/fixtures/geoip-fixture/build.gradle new file mode 100644 index 0000000000000..d8f85759cf7f8 --- /dev/null +++ b/test/fixtures/geoip-fixture/build.gradle @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +apply plugin: 'elasticsearch.java' +apply plugin: 'elasticsearch.test.fixtures' + +description = 'Fixture for GeoIPv2 service' +tasks.named("test").configure { enabled = false } + +dependencies { + api project(':server') +} + +tasks.named("preProcessFixture").configure { + dependsOn "jar", configurations.runtimeClasspath + doLast { + file("${testFixturesDir}/shared").mkdirs() + project.copy { + from jar + from configurations.runtimeClasspath + into "${testFixturesDir}/shared" + } + } +} diff --git a/test/fixtures/geoip-fixture/docker-compose.yml b/test/fixtures/geoip-fixture/docker-compose.yml new file mode 100644 index 0000000000000..60883da5ae612 --- /dev/null +++ b/test/fixtures/geoip-fixture/docker-compose.yml @@ -0,0 +1,13 @@ +version: '3' +services: + geoip-fixture: + build: + context: . + args: + fixtureClass: fixture.geoip.GeoIpHttpFixture + port: 80 + dockerfile: Dockerfile + volumes: + - ./testfixtures_shared/shared:/fixture/shared + ports: + - "80" diff --git a/test/fixtures/geoip-fixture/src/main/java/fixture/geoip/GeoIpHttpFixture.java b/test/fixtures/geoip-fixture/src/main/java/fixture/geoip/GeoIpHttpFixture.java new file mode 100644 index 0000000000000..4593b054e09e2 --- /dev/null +++ b/test/fixtures/geoip-fixture/src/main/java/fixture/geoip/GeoIpHttpFixture.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package fixture.geoip; + +import com.sun.net.httpserver.HttpServer; + +import java.io.BufferedWriter; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.charset.StandardCharsets; + +public class GeoIpHttpFixture { + + private final HttpServer server; + + GeoIpHttpFixture(final String[] args) throws Exception { + String rawData = new String(GeoIpHttpFixture.class.getResourceAsStream("/data.json").readAllBytes(), StandardCharsets.UTF_8); + this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(args[0]), Integer.parseInt(args[1])), 0); + this.server.createContext("/", exchange -> { + String data = rawData.replace("endpoint", "http://" + exchange.getRequestHeaders().getFirst("Host")); + exchange.sendResponseHeaders(200, data.length()); + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(exchange.getResponseBody()))) { + writer.write(data); + } + }); + this.server.createContext("/db.mmdb.gz", exchange -> { + exchange.sendResponseHeaders(200, 0); + try (OutputStream outputStream = exchange.getResponseBody()) { + GeoIpHttpFixture.class.getResourceAsStream("/GeoIP2-City-Test.mmdb.gz").transferTo(outputStream); + } + }); + } + + final void start() throws Exception { + try { + server.start(); + // wait to be killed + Thread.sleep(Long.MAX_VALUE); + } finally { + server.stop(0); + } + } + + public static void main(final String[] args) throws Exception { + if (args == null || args.length < 2) { + throw new IllegalArgumentException("GeoIpHttpFixture expects 2 arguments [address, port]"); + } + final GeoIpHttpFixture fixture = new GeoIpHttpFixture(args); + fixture.start(); + } +} diff --git a/test/fixtures/geoip-fixture/src/main/resources/GeoIP2-City-Test.mmdb.gz b/test/fixtures/geoip-fixture/src/main/resources/GeoIP2-City-Test.mmdb.gz new file mode 100644 index 0000000000000..38a8c7fc9c03e Binary files /dev/null and b/test/fixtures/geoip-fixture/src/main/resources/GeoIP2-City-Test.mmdb.gz differ diff --git a/test/fixtures/geoip-fixture/src/main/resources/data.json b/test/fixtures/geoip-fixture/src/main/resources/data.json new file mode 100644 index 0000000000000..209d296150139 --- /dev/null +++ b/test/fixtures/geoip-fixture/src/main/resources/data.json @@ -0,0 +1,20 @@ +[ + { + "md5_hash": "dd3ac893b3b858d8f45674345dfe6fe9", + "name": "GeoLite2-City.mmdb.gz", + "url": "endpoint/db.mmdb.gz", + "provider": "maxmind" + }, + { + "md5_hash": "dd3ac893b3b858d8f45674345dfe6fe9", + "name": "GeoLite2-ASN.mmdb.gz", + "url": "endpoint/db.mmdb.gz", + "provider": "maxmind" + }, + { + "md5_hash": "dd3ac893b3b858d8f45674345dfe6fe9", + "name": "GeoLite2-Country.mmdb.gz", + "url": "endpoint/db.mmdb.gz", + "provider": "maxmind" + } +]