Skip to content

Commit

Permalink
Change GeoIP downloader policy after 30 days of no updates (#74099)
Browse files Browse the repository at this point in the history
This PR changes the way GeoIpDownloader and GeoIpProcessor handle situation when we are unable to update databases for 30 days. In that case:

GeoIpDownloader will delete all chunks from .geoip_databases index
DatabaseRegistry will delete all files on ingest nodes
GeoIpProcessor will tag document with tags: ["_geoip_expired_database"] field (same way as in Logstash)
This change also fixes bug with that breaks DatabaseRegistry and when it tires to download databases after updating timestamp only (GeoIpDownloader checks if there are new databases and updates timestamp because local databases are up to date)
  • Loading branch information
probakowski authored Jun 18, 2021
1 parent c9ad768 commit 331a44b
Show file tree
Hide file tree
Showing 15 changed files with 415 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.StreamsUtils;
Expand All @@ -21,7 +22,6 @@
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

public abstract class AbstractGeoIpIT extends ESIntegTestCase {
Expand Down Expand Up @@ -58,7 +58,9 @@ public static class IngestGeoIpSettingsPlugin extends Plugin {

@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope));
return List.of(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope),
Setting.timeSetting("ingest.geoip.database_validity", TimeValue.timeValueDays(3), Setting.Property.NodeScope,
Setting.Property.Dynamic));
}
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -41,6 +43,7 @@
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") // Don't randomly add 'extra' files to directory.
public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
Expand All @@ -57,13 +60,15 @@ public void test() throws Exception {
Path geoIpConfigDir = createTempDir();
Path geoIpTmpDir = createTempDir();
DatabaseRegistry databaseRegistry = createRegistry(geoIpModulesDir, geoIpConfigDir, geoIpTmpDir);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"), 0);
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0);
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
lazyLoadReaders(databaseRegistry);

final GeoIpProcessor processor1 = factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field")));
Expand Down Expand Up @@ -116,13 +121,13 @@ public void test() throws Exception {
} else {
Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
geoIpTmpDir.resolve("GeoLite2-City.mmdb"), StandardCopyOption.REPLACE_EXISTING);
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"), 0);
databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
}
DatabaseReaderLazyLoader previous2 = databaseRegistry.get("GeoLite2-City-Test.mmdb");
InputStream source = LocalDatabases.class.getResourceAsStream(i % 2 == 0 ? "/GeoIP2-City-Test.mmdb" :
"/GeoLite2-City-Test.mmdb");
Files.copy(source, geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), StandardCopyOption.REPLACE_EXISTING);
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0);
databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));

DatabaseReaderLazyLoader current1 = databaseRegistry.get("GeoLite2-City.mmdb");
DatabaseReaderLazyLoader current2 = databaseRegistry.get("GeoLite2-City-Test.mmdb");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.core.internal.io.IOUtils;

import java.io.Closeable;
Expand All @@ -36,7 +35,6 @@
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -55,7 +53,6 @@ class DatabaseReaderLazyLoader implements Closeable {
private final GeoIpCache cache;
private final Path databasePath;
private final CheckedSupplier<DatabaseReader, IOException> loader;
private volatile long lastUpdate;
final SetOnce<DatabaseReader> databaseReader;

// cache the database type so that we do not re-read it on every pipeline execution
Expand Down Expand Up @@ -197,16 +194,6 @@ private <T extends AbstractResponse> T getResponse(InetAddress ipAddress,
}

DatabaseReader get() throws IOException {
//only downloaded databases will have lastUpdate != 0, we never update it for default databases or databases from config dir
if (lastUpdate != 0) {
Path fileName = databasePath.getFileName();
if (System.currentTimeMillis() - lastUpdate > Duration.ofDays(30).toMillis()) {
throw new IllegalStateException("database [" + fileName + "] was not updated for 30 days and is disabled");
} else if (System.currentTimeMillis() - lastUpdate > Duration.ofDays(25).toMillis()) {
HeaderWarning.addWarning(
"database [{}] was not updated for over 25 days, ingestion will fail if there is no update for 30 days", fileName);
}
}
if (databaseReader.get() == null) {
synchronized (databaseReader) {
if (databaseReader.get() == null) {
Expand Down Expand Up @@ -261,7 +248,4 @@ private static DatabaseReader.Builder createDatabaseBuilder(Path databasePath) {
return new DatabaseReader.Builder(databasePath.toFile());
}

void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -202,25 +203,31 @@ void checkDatabases(ClusterState state) {
// Empty state will purge stale entries in databases map.
GeoIpTaskState taskState = task == null || task.getState() == null ? GeoIpTaskState.EMPTY : (GeoIpTaskState) task.getState();

taskState.getDatabases().forEach((name, metadata) -> {
DatabaseReaderLazyLoader reference = databases.get(name);
String remoteMd5 = metadata.getMd5();
String localMd5 = reference != null ? reference.getMd5() : null;
if (Objects.equals(localMd5, remoteMd5)) {
reference.setLastUpdate(metadata.getLastUpdate());
LOGGER.debug("Current reference of [{}] is up to date [{}] with was recorded in CS [{}]", name, localMd5, remoteMd5);
return;
}
taskState.getDatabases().entrySet().stream()
.filter(e -> e.getValue().isValid(state.getMetadata().settings()))
.forEach(e -> {
String name = e.getKey();
GeoIpTaskState.Metadata metadata = e.getValue();
DatabaseReaderLazyLoader reference = databases.get(name);
String remoteMd5 = metadata.getMd5();
String localMd5 = reference != null ? reference.getMd5() : null;
if (Objects.equals(localMd5, remoteMd5)) {
LOGGER.debug("Current reference of [{}] is up to date [{}] with was recorded in CS [{}]", name, localMd5, remoteMd5);
return;
}

try {
retrieveAndUpdateDatabase(name, metadata);
} catch (Exception e) {
LOGGER.error((Supplier<?>) () -> new ParameterizedMessage("attempt to download database [{}] failed", name), e);
}
});
try {
retrieveAndUpdateDatabase(name, metadata);
} catch (Exception ex) {
LOGGER.error((Supplier<?>) () -> new ParameterizedMessage("attempt to download database [{}] failed", name), ex);
}
});

List<String> staleEntries = new ArrayList<>(databases.keySet());
staleEntries.removeAll(taskState.getDatabases().keySet());
staleEntries.removeAll(taskState.getDatabases().entrySet().stream()
.filter(e->e.getValue().isValid(state.getMetadata().settings()))
.map(Map.Entry::getKey)
.collect(Collectors.toSet()));
removeStaleEntries(staleEntries);
}

Expand Down Expand Up @@ -284,7 +291,7 @@ void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata meta

LOGGER.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile);
Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
updateDatabase(databaseName, recordedMd5, databaseFile, metadata.getLastUpdate());
updateDatabase(databaseName, recordedMd5, databaseFile);
Files.delete(databaseTmpGzFile);
},
failure -> {
Expand All @@ -299,11 +306,10 @@ void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata meta
});
}

void updateDatabase(String databaseFileName, String recordedMd5, Path file, long lastUpdate) {
void updateDatabase(String databaseFileName, String recordedMd5, Path file) {
try {
LOGGER.info("database file changed [{}], reload database...", file);
DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, recordedMd5);
loader.setLastUpdate(lastUpdate);
DatabaseReaderLazyLoader existing = databases.put(databaseFileName, loader);
if (existing != null) {
existing.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
Expand Down Expand Up @@ -70,6 +70,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {

private final Client client;
private final HttpClient httpClient;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final String endpoint;

Expand All @@ -84,6 +85,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
super(id, type, action, description, parentTask, headers);
this.httpClient = httpClient;
this.client = new OriginSettingClient(client, IngestService.INGEST_ORIGIN);
this.clusterService = clusterService;
this.threadPool = threadPool;
endpoint = ENDPOINT_SETTING.get(settings);
pollInterval = POLL_INTERVAL_SETTING.get(settings);
Expand Down Expand Up @@ -139,7 +141,7 @@ void processDatabase(Map<String, Object> databaseInfo) {
int firstChunk = state.contains(name) ? state.get(name).getLastChunk() + 1 : 0;
int lastChunk = indexChunks(name, is, firstChunk, md5, start);
if (lastChunk > firstChunk) {
state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5));
state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5, start));
updateTaskState();
stats = stats.successfulDownload(System.currentTimeMillis() - start).count(state.getDatabases().size());
logger.info("updated geoip database [" + name + "]");
Expand All @@ -166,7 +168,8 @@ void deleteOldChunks(String name, int firstChunk) {
//visible for testing
protected void updateTimestamp(String name, Metadata old) {
logger.info("geoip database [" + name + "] is up to date, updated timestamp");
state = state.put(name, new Metadata(System.currentTimeMillis(), old.getFirstChunk(), old.getLastChunk(), old.getMd5()));
state = state.put(name, new Metadata(old.getLastUpdate(), old.getFirstChunk(), old.getLastChunk(), old.getMd5(),
System.currentTimeMillis()));
stats = stats.skippedDownload();
updateTaskState();
}
Expand Down Expand Up @@ -235,9 +238,28 @@ void runDownloader() {
} catch (Exception e) {
logger.error("exception during geoip databases update", e);
}
try {
cleanDatabases();
} catch (Exception e) {
logger.error("exception during geoip databases cleanup", e);
}
scheduleNextRun(pollInterval);
}

private void cleanDatabases() {
long expiredDatabases = state.getDatabases().entrySet().stream()
.filter(e -> e.getValue().isValid(clusterService.state().metadata().settings()) == false)
.peek(e -> {
String name = e.getKey();
Metadata meta = e.getValue();
deleteOldChunks(name, meta.getLastChunk() + 1);
state = state.put(name, new Metadata(meta.getLastUpdate(), meta.getFirstChunk(), meta.getLastChunk(), meta.getMd5(),
meta.getLastCheck() - 1));
updateTaskState();
}).count();
stats = stats.expiredDatabases((int) expiredDatabases);
}

@Override
protected void onCancelled() {
if (scheduled != null) {
Expand All @@ -251,6 +273,8 @@ public GeoIpDownloaderStats getStatus() {
}

private void scheduleNextRun(TimeValue time) {
scheduled = threadPool.schedule(this::runDownloader, time, ThreadPool.Names.GENERIC);
if (threadPool.scheduler().isShutdown() == false) {
scheduled = threadPool.schedule(this::runDownloader, time, ThreadPool.Names.GENERIC);
}
}
}
Loading

0 comments on commit 331a44b

Please sign in to comment.