Skip to content

Commit

Permalink
Fail geoip processor if database is older than 30 days (#72367) (#72424)
Browse files Browse the repository at this point in the history
As required by MaxMind license we can't use databases that are older than 30 days as we could miss "don't sell" request.
This check was missing before and this change fixes that.
  • Loading branch information
probakowski authored Apr 28, 2021
1 parent 49e818c commit 99f1092
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public void test() throws Exception {
geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"), 0);
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0);
lazyLoadReaders(databaseRegistry);

final GeoIpProcessor processor1 = factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field")));
Expand Down Expand Up @@ -116,13 +116,13 @@ public void test() throws Exception {
} else {
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
geoIpTmpDir.resolve("GeoLite2-City.mmdb"), StandardCopyOption.REPLACE_EXISTING);
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"), 0);
}
DatabaseReaderLazyLoader previous2 = databaseRegistry.get("GeoLite2-City-Test.mmdb");
InputStream source = LocalDatabases.class.getResourceAsStream(i % 2 == 0 ? "/GeoIP2-City-Test.mmdb" :
"/GeoLite2-City-Test.mmdb");
Files.copy(source, geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), StandardCopyOption.REPLACE_EXISTING);
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0);

DatabaseReaderLazyLoader current1 = databaseRegistry.get("GeoLite2-City.mmdb");
DatabaseReaderLazyLoader current2 = databaseRegistry.get("GeoLite2-City-Test.mmdb");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.core.internal.io.IOUtils;

import java.io.Closeable;
Expand All @@ -35,6 +36,7 @@
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -53,6 +55,7 @@ class DatabaseReaderLazyLoader implements Closeable {
private final GeoIpCache cache;
private final Path databasePath;
private final CheckedSupplier<DatabaseReader, IOException> loader;
private volatile long lastUpdate;
final SetOnce<DatabaseReader> databaseReader;

// cache the database type so that we do not re-read it on every pipeline execution
Expand Down Expand Up @@ -194,6 +197,16 @@ private <T extends AbstractResponse> T getResponse(InetAddress ipAddress,
}

DatabaseReader get() throws IOException {
//only downloaded databases will have lastUpdate != 0, we never update it for default databases or databases from config dir
if (lastUpdate != 0) {
Path fileName = databasePath.getFileName();
if (System.currentTimeMillis() - lastUpdate > Duration.ofDays(30).toMillis()) {
throw new IllegalStateException("database [" + fileName + "] was not updated for 30 days and is disabled");
} else if (System.currentTimeMillis() - lastUpdate > Duration.ofDays(25).toMillis()) {
HeaderWarning.addWarning(
"database [{}] was not updated for over 25 days, ingestion will fail if there is no update for 30 days", fileName);
}
}
if (databaseReader.get() == null) {
synchronized (databaseReader) {
if (databaseReader.get() == null) {
Expand Down Expand Up @@ -248,4 +261,7 @@ private static DatabaseReader.Builder createDatabaseBuilder(Path databasePath) {
return new DatabaseReader.Builder(databasePath.toFile());
}

void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ void checkDatabases(ClusterState state) {
String remoteMd5 = metadata.getMd5();
String localMd5 = reference != null ? reference.getMd5() : null;
if (Objects.equals(localMd5, remoteMd5)) {
reference.setLastUpdate(metadata.getLastUpdate());
LOGGER.debug("Current reference of [{}] is up to date [{}] with was recorded in CS [{}]", name, localMd5, remoteMd5);
return;
}
Expand Down Expand Up @@ -283,7 +284,7 @@ void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata meta

LOGGER.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile);
Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
updateDatabase(databaseName, recordedMd5, databaseFile);
updateDatabase(databaseName, recordedMd5, databaseFile, metadata.getLastUpdate());
Files.delete(databaseTmpGzFile);
},
failure -> {
Expand All @@ -298,10 +299,11 @@ void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata meta
});
}

void updateDatabase(String databaseFileName, String recordedMd5, Path file) {
void updateDatabase(String databaseFileName, String recordedMd5, Path file, long lastUpdate) {
try {
LOGGER.info("database file changed [{}], reload database...", file);
DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, recordedMd5);
loader.setLastUpdate(lastUpdate);
DatabaseReaderLazyLoader existing = databases.put(databaseFileName, loader);
if (existing != null) {
existing.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.ingest.geoip;

import com.maxmind.db.InvalidDatabaseException;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
Expand Down Expand Up @@ -127,7 +128,7 @@ public void testCheckDatabases() throws Exception {
String md5 = mockSearches("GeoIP2-City.mmdb", 5, 14);
String taskId = GeoIpDownloader.GEOIP_DOWNLOADER;
PersistentTask<?> task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null);
task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 5, 14, md5))));
task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(10L, 5, 14, md5))));
PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task));

ClusterState state = ClusterState.builder(new ClusterName("name"))
Expand All @@ -140,11 +141,30 @@ public void testCheckDatabases() throws Exception {

assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue());
databaseRegistry.checkDatabases(state);
assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), notNullValue());
DatabaseReaderLazyLoader database = databaseRegistry.getDatabase("GeoIP2-City.mmdb", false);
assertThat(database, notNullValue());
verify(client, times(10)).search(any());
try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
assertThat(files.collect(Collectors.toList()), hasSize(1));
}
IllegalStateException e = expectThrows(IllegalStateException.class, database::get);
assertEquals("database [GeoIP2-City.mmdb] was not updated for 30 days and is disabled", e.getMessage());

task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb",
new GeoIpTaskState.Metadata(System.currentTimeMillis(), 5, 14, md5))));
tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task));

state = ClusterState.builder(new ClusterName("name"))
.metadata(Metadata.builder().putCustom(TYPE, tasksCustomMetadata).build())
.nodes(new DiscoveryNodes.Builder()
.add(new DiscoveryNode("_id1", buildNewFakeTransportAddress(), Version.CURRENT))
.localNodeId("_id1"))
.routingTable(createIndexRoutingTable())
.build();
databaseRegistry.checkDatabases(state);
database = databaseRegistry.getDatabase("GeoIP2-City.mmdb", false);
//30 days check passed but we mocked mmdb data so parsing will fail
expectThrows(InvalidDatabaseException.class, database::get);
}

public void testCheckDatabases_dontCheckDatabaseOnNonIngestNode() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public void testFallbackUsingDefaultDatabasesWhileIngesting() throws Exception {
Map<?, ?> geoData = (Map<?, ?>) ingestDocument.getSourceAndMetadata().get("geoip");
assertThat(geoData.get("city_name"), equalTo("Tumba"));

databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0);
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
processor.execute(ingestDocument);
geoData = (Map<?, ?>) ingestDocument.getSourceAndMetadata().get("geoip");
Expand Down

0 comments on commit 99f1092

Please sign in to comment.