From 99f1092783cca9b00b89bf903b911f7fdb7f58fa Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Wed, 28 Apr 2021 23:24:27 +0200 Subject: [PATCH] Fail geoip processor if database is older than 30 days (#72367) (#72424) As required by MaxMind license we can't use databases that are older than 30 days as we could miss "don't sell" request. This check was missing before and this change fixes that. --- ...gDatabasesWhilePerformingGeoLookupsIT.java | 8 +++---- .../geoip/DatabaseReaderLazyLoader.java | 16 +++++++++++++ .../ingest/geoip/DatabaseRegistry.java | 6 +++-- .../ingest/geoip/DatabaseRegistryTests.java | 24 +++++++++++++++++-- .../geoip/GeoIpProcessorFactoryTests.java | 2 +- 5 files changed, 47 insertions(+), 9 deletions(-) diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java index 393ec50990281..63f86a2982b02 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java @@ -62,8 +62,8 @@ public void test() throws Exception { geoIpTmpDir.resolve("GeoLite2-City.mmdb")); Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")); - databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb")); - databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")); + databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"), 0); + databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0); lazyLoadReaders(databaseRegistry); final GeoIpProcessor processor1 = factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field"))); @@ -116,13 +116,13 @@ public void test() throws Exception { } else { Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), geoIpTmpDir.resolve("GeoLite2-City.mmdb"), StandardCopyOption.REPLACE_EXISTING); - databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb")); + databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"), 0); } DatabaseReaderLazyLoader previous2 = databaseRegistry.get("GeoLite2-City-Test.mmdb"); InputStream source = LocalDatabases.class.getResourceAsStream(i % 2 == 0 ? "/GeoIP2-City-Test.mmdb" : "/GeoLite2-City-Test.mmdb"); Files.copy(source, geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), StandardCopyOption.REPLACE_EXISTING); - databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")); + databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0); DatabaseReaderLazyLoader current1 = databaseRegistry.get("GeoLite2-City.mmdb"); DatabaseReaderLazyLoader current2 = databaseRegistry.get("GeoLite2-City-Test.mmdb"); diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java index af2b1d1be566c..c41f930ebf22b 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.logging.HeaderWarning; import org.elasticsearch.core.internal.io.IOUtils; import java.io.Closeable; @@ -35,6 +36,7 @@ import java.nio.file.Path; import java.security.AccessController; import java.security.PrivilegedAction; +import java.time.Duration; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; @@ -53,6 +55,7 @@ class DatabaseReaderLazyLoader implements Closeable { private final GeoIpCache cache; private final Path databasePath; private final CheckedSupplier loader; + private volatile long lastUpdate; final SetOnce databaseReader; // cache the database type so that we do not re-read it on every pipeline execution @@ -194,6 +197,16 @@ private T getResponse(InetAddress ipAddress, } DatabaseReader get() throws IOException { + //only downloaded databases will have lastUpdate != 0, we never update it for default databases or databases from config dir + if (lastUpdate != 0) { + Path fileName = databasePath.getFileName(); + if (System.currentTimeMillis() - lastUpdate > Duration.ofDays(30).toMillis()) { + throw new IllegalStateException("database [" + fileName + "] was not updated for 30 days and is disabled"); + } else if (System.currentTimeMillis() - lastUpdate > Duration.ofDays(25).toMillis()) { + HeaderWarning.addWarning( + "database [{}] was not updated for over 25 days, ingestion will fail if there is no update for 30 days", fileName); + } + } if (databaseReader.get() == null) { synchronized (databaseReader) { if (databaseReader.get() == null) { @@ -248,4 +261,7 @@ private static DatabaseReader.Builder createDatabaseBuilder(Path databasePath) { return new DatabaseReader.Builder(databasePath.toFile()); } + void setLastUpdate(long lastUpdate) { + this.lastUpdate = lastUpdate; + } } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java index 937f014b0666c..d831aa9cc6c06 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java @@ -207,6 +207,7 @@ void checkDatabases(ClusterState state) { String remoteMd5 = metadata.getMd5(); String localMd5 = reference != null ? reference.getMd5() : null; if (Objects.equals(localMd5, remoteMd5)) { + reference.setLastUpdate(metadata.getLastUpdate()); LOGGER.debug("Current reference of [{}] is up to date [{}] with was recorded in CS [{}]", name, localMd5, remoteMd5); return; } @@ -283,7 +284,7 @@ void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata meta LOGGER.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile); Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); - updateDatabase(databaseName, recordedMd5, databaseFile); + updateDatabase(databaseName, recordedMd5, databaseFile, metadata.getLastUpdate()); Files.delete(databaseTmpGzFile); }, failure -> { @@ -298,10 +299,11 @@ void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata meta }); } - void updateDatabase(String databaseFileName, String recordedMd5, Path file) { + void updateDatabase(String databaseFileName, String recordedMd5, Path file, long lastUpdate) { try { LOGGER.info("database file changed [{}], reload database...", file); DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, recordedMd5); + loader.setLastUpdate(lastUpdate); DatabaseReaderLazyLoader existing = databases.put(databaseFileName, loader); if (existing != null) { existing.close(); diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java index f64ec3f93e5a8..acfb58ac8db93 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.ingest.geoip; +import com.maxmind.db.InvalidDatabaseException; import org.apache.lucene.search.TotalHits; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.Version; @@ -127,7 +128,7 @@ public void testCheckDatabases() throws Exception { String md5 = mockSearches("GeoIP2-City.mmdb", 5, 14); String taskId = GeoIpDownloader.GEOIP_DOWNLOADER; PersistentTask task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null); - task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 5, 14, md5)))); + task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(10L, 5, 14, md5)))); PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task)); ClusterState state = ClusterState.builder(new ClusterName("name")) @@ -140,11 +141,30 @@ public void testCheckDatabases() throws Exception { assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue()); databaseRegistry.checkDatabases(state); - assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), notNullValue()); + DatabaseReaderLazyLoader database = databaseRegistry.getDatabase("GeoIP2-City.mmdb", false); + assertThat(database, notNullValue()); verify(client, times(10)).search(any()); try (Stream files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) { assertThat(files.collect(Collectors.toList()), hasSize(1)); } + IllegalStateException e = expectThrows(IllegalStateException.class, database::get); + assertEquals("database [GeoIP2-City.mmdb] was not updated for 30 days and is disabled", e.getMessage()); + + task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", + new GeoIpTaskState.Metadata(System.currentTimeMillis(), 5, 14, md5)))); + tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task)); + + state = ClusterState.builder(new ClusterName("name")) + .metadata(Metadata.builder().putCustom(TYPE, tasksCustomMetadata).build()) + .nodes(new DiscoveryNodes.Builder() + .add(new DiscoveryNode("_id1", buildNewFakeTransportAddress(), Version.CURRENT)) + .localNodeId("_id1")) + .routingTable(createIndexRoutingTable()) + .build(); + databaseRegistry.checkDatabases(state); + database = databaseRegistry.getDatabase("GeoIP2-City.mmdb", false); + //30 days check passed but we mocked mmdb data so parsing will fail + expectThrows(InvalidDatabaseException.class, database::get); } public void testCheckDatabases_dontCheckDatabaseOnNonIngestNode() throws Exception { diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index 7c24a6c38fe42..e76fd78f0e2ca 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -379,7 +379,7 @@ public void testFallbackUsingDefaultDatabasesWhileIngesting() throws Exception { Map geoData = (Map) ingestDocument.getSourceAndMetadata().get("geoip"); assertThat(geoData.get("city_name"), equalTo("Tumba")); - databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb")); + databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0); ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); processor.execute(ingestDocument); geoData = (Map) ingestDocument.getSourceAndMetadata().get("geoip");