From 3d5281c09e0d82546f89ef043610e00c5a075a7e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 11 Feb 2021 11:26:14 +0100 Subject: [PATCH] Changed how geoip cache is integrated with geoip processor. (#68890) Backport #68581 of to 7.x branch. This change helps facilitate allowing maxmind databases to be updated at runtime. This will make is easier to purge the cache if a database changes. Made the following changes: * Changed how geoip processor integrates with the cache. The cache is moved from the geoip processor to DatabaseReaderLazyLoader class. * Changed the cache key from ip + response class to ip + database_path. * Moved GeoIpCache from IngestGeoIpPlugin class to be a top level class. --- .../geoip/DatabaseReaderLazyLoader.java | 43 ++++++- .../ingest/geoip/GeoIpCache.java | 89 +++++++++++++ .../ingest/geoip/GeoIpProcessor.java | 56 +------- .../ingest/geoip/IngestGeoIpPlugin.java | 120 ++++-------------- ...pPluginTests.java => GeoIpCacheTests.java} | 29 +++-- .../geoip/GeoIpProcessorFactoryTests.java | 43 ++++--- .../ingest/geoip/GeoIpProcessorTests.java | 54 +++----- 7 files changed, 216 insertions(+), 218 deletions(-) create mode 100644 modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java rename modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/{IngestGeoIpPluginTests.java => GeoIpCacheTests.java} (65%) diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java index 459e708989c5a..a2bf5272b7b88 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -9,18 +9,28 @@ package org.elasticsearch.ingest.geoip; import com.maxmind.geoip2.DatabaseReader; +import com.maxmind.geoip2.exception.AddressNotFoundException; +import com.maxmind.geoip2.model.AbstractResponse; +import com.maxmind.geoip2.model.AsnResponse; +import com.maxmind.geoip2.model.CityResponse; +import com.maxmind.geoip2.model.CountryResponse; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.SpecialPermission; +import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.core.internal.io.IOUtils; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.net.InetAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.Objects; /** @@ -31,6 +41,7 @@ class DatabaseReaderLazyLoader implements Closeable { private static final Logger LOGGER = LogManager.getLogger(DatabaseReaderLazyLoader.class); + private final GeoIpCache cache; private final Path databasePath; private final CheckedSupplier loader; final SetOnce databaseReader; @@ -38,7 +49,8 @@ class DatabaseReaderLazyLoader implements Closeable { // cache the database type so that we do not re-read it on every pipeline execution final SetOnce databaseType; - DatabaseReaderLazyLoader(final Path databasePath, final CheckedSupplier loader) { + DatabaseReaderLazyLoader(final GeoIpCache cache, final Path databasePath, final CheckedSupplier loader) { + this.cache = cache; this.databasePath = Objects.requireNonNull(databasePath); this.loader = Objects.requireNonNull(loader); this.databaseReader = new SetOnce<>(); @@ -123,7 +135,34 @@ InputStream databaseInputStream() throws IOException { return Files.newInputStream(databasePath); } - DatabaseReader get() throws IOException { + CityResponse getCity(InetAddress ipAddress) { + return getResponse(ipAddress, DatabaseReader::city); + } + + CountryResponse getCountry(InetAddress ipAddress) { + return getResponse(ipAddress, DatabaseReader::country); + } + + AsnResponse getAsn(InetAddress ipAddress) { + return getResponse(ipAddress, DatabaseReader::asn); + } + + private T getResponse(InetAddress ipAddress, + CheckedBiFunction responseProvider) { + SpecialPermission.check(); + return AccessController.doPrivileged((PrivilegedAction) () -> + cache.putIfAbsent(ipAddress, databasePath.toString(), ip -> { + try { + return responseProvider.apply(get(), ipAddress); + } catch (AddressNotFoundException e) { + throw new GeoIpProcessor.AddressNotFoundRuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } + })); + } + + private DatabaseReader get() throws IOException { if (databaseReader.get() == null) { synchronized (databaseReader) { if (databaseReader.get() == null) { diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java new file mode 100644 index 0000000000000..a86037c7423c7 --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.ingest.geoip; + +import com.maxmind.db.NodeCache; +import com.maxmind.geoip2.model.AbstractResponse; +import org.elasticsearch.common.cache.Cache; +import org.elasticsearch.common.cache.CacheBuilder; + +import java.net.InetAddress; +import java.util.Objects; +import java.util.function.Function; + +/** + * The in-memory cache for the geoip data. There should only be 1 instance of this class.. + * This cache differs from the maxmind's {@link NodeCache} such that this cache stores the deserialized Json objects to avoid the + * cost of deserialization for each lookup (cached or not). This comes at slight expense of higher memory usage, but significant + * reduction of CPU usage. + */ +final class GeoIpCache { + private final Cache cache; + + //package private for testing + GeoIpCache(long maxSize) { + if (maxSize < 0) { + throw new IllegalArgumentException("geoip max cache size must be 0 or greater"); + } + this.cache = CacheBuilder.builder().setMaximumWeight(maxSize).build(); + } + + @SuppressWarnings("unchecked") + T putIfAbsent(InetAddress ip, + String databasePath, + Function retrieveFunction) { + + //can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader) + CacheKey cacheKey = new CacheKey(ip, databasePath); + //intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition. + AbstractResponse response = cache.get(cacheKey); + if (response == null) { + response = retrieveFunction.apply(ip); + cache.put(cacheKey, response); + } + return (T) response; + } + + //only useful for testing + AbstractResponse get(InetAddress ip, String databasePath) { + CacheKey cacheKey = new CacheKey(ip, databasePath); + return cache.get(cacheKey); + } + + /** + * The key to use for the cache. Since this cache can span multiple geoip processors that all use different databases, the database + * path is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same + * IP may be in both with different values and we need to cache both. + */ + private static class CacheKey { + + private final InetAddress ip; + private final String databasePath; + + private CacheKey(InetAddress ip, String databasePath) { + this.ip = ip; + this.databasePath = databasePath; + } + + //generated + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CacheKey cacheKey = (CacheKey) o; + return Objects.equals(ip, cacheKey.ip) && + Objects.equals(databasePath, cacheKey.databasePath); + } + + //generated + @Override + public int hashCode() { + return Objects.hash(ip, databasePath); + } + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index c7678b1bd8a99..0b397d18908d8 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -9,7 +9,6 @@ package org.elasticsearch.ingest.geoip; import com.maxmind.db.Network; -import com.maxmind.geoip2.exception.AddressNotFoundException; import com.maxmind.geoip2.model.AsnResponse; import com.maxmind.geoip2.model.CityResponse; import com.maxmind.geoip2.model.CountryResponse; @@ -19,18 +18,14 @@ import com.maxmind.geoip2.record.Location; import com.maxmind.geoip2.record.Subdivision; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.SpecialPermission; import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; -import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache; import java.io.IOException; import java.net.InetAddress; -import java.security.AccessController; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -58,7 +53,6 @@ public final class GeoIpProcessor extends AbstractProcessor { private final DatabaseReaderLazyLoader lazyLoader; private final Set properties; private final boolean ignoreMissing; - private final GeoIpCache cache; private final boolean firstOnly; /** @@ -70,7 +64,6 @@ public final class GeoIpProcessor extends AbstractProcessor { * @param targetField the target field * @param properties the properties; ideally this is lazily-loaded once on first use * @param ignoreMissing true if documents with a missing value for the field should be ignored - * @param cache a geo-IP cache * @param firstOnly true if only first result should be returned in case of array */ GeoIpProcessor( @@ -80,7 +73,6 @@ public final class GeoIpProcessor extends AbstractProcessor { final String targetField, final Set properties, final boolean ignoreMissing, - final GeoIpCache cache, boolean firstOnly) { super(tag, description); this.field = field; @@ -88,7 +80,6 @@ public final class GeoIpProcessor extends AbstractProcessor { this.lazyLoader = lazyLoader; this.properties = properties; this.ignoreMissing = ignoreMissing; - this.cache = cache; this.firstOnly = firstOnly; } @@ -190,18 +181,7 @@ Set getProperties() { } private Map retrieveCityGeoData(InetAddress ipAddress) { - SpecialPermission.check(); - CityResponse response = AccessController.doPrivileged((PrivilegedAction) () -> - cache.putIfAbsent(ipAddress, CityResponse.class, ip -> { - try { - return lazyLoader.get().city(ip); - } catch (AddressNotFoundException e) { - throw new AddressNotFoundRuntimeException(e); - } catch (Exception e) { - throw new RuntimeException(e); - } - })); - + CityResponse response = lazyLoader.getCity(ipAddress); Country country = response.getCountry(); City city = response.getCity(); Location location = response.getLocation(); @@ -276,18 +256,7 @@ private Map retrieveCityGeoData(InetAddress ipAddress) { } private Map retrieveCountryGeoData(InetAddress ipAddress) { - SpecialPermission.check(); - CountryResponse response = AccessController.doPrivileged((PrivilegedAction) () -> - cache.putIfAbsent(ipAddress, CountryResponse.class, ip -> { - try { - return lazyLoader.get().country(ip); - } catch (AddressNotFoundException e) { - throw new AddressNotFoundRuntimeException(e); - } catch (Exception e) { - throw new RuntimeException(e); - } - })); - + CountryResponse response = lazyLoader.getCountry(ipAddress); Country country = response.getCountry(); Continent continent = response.getContinent(); @@ -321,18 +290,7 @@ private Map retrieveCountryGeoData(InetAddress ipAddress) { } private Map retrieveAsnGeoData(InetAddress ipAddress) { - SpecialPermission.check(); - AsnResponse response = AccessController.doPrivileged((PrivilegedAction) () -> - cache.putIfAbsent(ipAddress, AsnResponse.class, ip -> { - try { - return lazyLoader.get().asn(ip); - } catch (AddressNotFoundException e) { - throw new AddressNotFoundRuntimeException(e); - } catch (Exception e) { - throw new RuntimeException(e); - } - })); - + AsnResponse response = lazyLoader.getAsn(ipAddress); Integer asn = response.getAutonomousSystemNumber(); String organization_name = response.getAutonomousSystemOrganization(); Network network = response.getNetwork(); @@ -381,11 +339,8 @@ Map databaseReaders() { return Collections.unmodifiableMap(databaseReaders); } - private final GeoIpCache cache; - - public Factory(Map databaseReaders, GeoIpCache cache) { + public Factory(Map databaseReaders) { this.databaseReaders = databaseReaders; - this.cache = cache; } @Override @@ -432,8 +387,7 @@ public GeoIpProcessor create( } } - return new GeoIpProcessor(processorTag, description, ipField, lazyLoader, targetField, properties, ignoreMissing, cache, - firstOnly); + return new GeoIpProcessor(processorTag, description, ipField, lazyLoader, targetField, properties, ignoreMissing, firstOnly); } } diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 5262691d9dac6..96060528a53b5 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -9,14 +9,11 @@ package org.elasticsearch.ingest.geoip; import com.maxmind.db.NoCache; -import com.maxmind.db.NodeCache; import com.maxmind.db.Reader; import com.maxmind.geoip2.DatabaseReader; -import com.maxmind.geoip2.model.AbstractResponse; import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.cache.Cache; -import org.elasticsearch.common.cache.CacheBuilder; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.core.internal.io.IOUtils; @@ -26,18 +23,14 @@ import java.io.Closeable; import java.io.IOException; -import java.net.InetAddress; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.PathMatcher; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.function.Function; import java.util.stream.Stream; public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable { @@ -50,7 +43,7 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable @Override public List> getSettings() { - return Arrays.asList(CACHE_SIZE); + return Collections.singletonList(CACHE_SIZE); } @Override @@ -58,15 +51,16 @@ public Map getProcessors(Processor.Parameters paramet if (databaseReaders != null) { throw new IllegalStateException("getProcessors called twice for geoip plugin!!"); } + final long cacheSize = CACHE_SIZE.get(parameters.env.settings()); + final GeoIpCache cache = new GeoIpCache(cacheSize); final Path geoIpDirectory = getGeoIpDirectory(parameters); final Path geoIpConfigDirectory = parameters.env.configFile().resolve("ingest-geoip"); - long cacheSize = CACHE_SIZE.get(parameters.env.settings()); try { - databaseReaders = loadDatabaseReaders(geoIpDirectory, geoIpConfigDirectory); + databaseReaders = loadDatabaseReaders(cache, geoIpDirectory, geoIpConfigDirectory); } catch (IOException e) { throw new RuntimeException(e); } - return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(cacheSize))); + return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders)); } /* @@ -86,7 +80,9 @@ private Path getGeoIpDirectory(Processor.Parameters parameters) { return geoIpDirectory; } - static Map loadDatabaseReaders(Path geoIpDirectory, Path geoIpConfigDirectory) throws IOException { + static Map loadDatabaseReaders(GeoIpCache cache, + Path geoIpDirectory, + Path geoIpConfigDirectory) throws IOException { assertDatabaseExistence(geoIpDirectory, true); assertDatabaseExistence(geoIpConfigDirectory, false); final boolean loadDatabaseOnHeap = Booleans.parseBoolean(System.getProperty("es.geoip.load_db_on_heap", "false")); @@ -95,7 +91,7 @@ static Map loadDatabaseReaders(Path geoIpDirec // load the default databases for (final String databaseFilename : DEFAULT_DATABASE_FILENAMES) { final Path databasePath = geoIpDirectory.resolve(databaseFilename); - final DatabaseReaderLazyLoader loader = createLoader(databasePath, loadDatabaseOnHeap); + final DatabaseReaderLazyLoader loader = createLoader(cache, databasePath, loadDatabaseOnHeap); databaseReaders.put(databaseFilename, loader); } @@ -109,7 +105,7 @@ static Map loadDatabaseReaders(Path geoIpDirec Path databasePath = iterator.next(); if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) { String databaseFileName = databasePath.getFileName().toString(); - final DatabaseReaderLazyLoader loader = createLoader(databasePath, loadDatabaseOnHeap); + final DatabaseReaderLazyLoader loader = createLoader(cache, databasePath, loadDatabaseOnHeap); databaseReaders.put(databaseFileName, loader); } } @@ -118,18 +114,17 @@ static Map loadDatabaseReaders(Path geoIpDirec return Collections.unmodifiableMap(databaseReaders); } - private static DatabaseReaderLazyLoader createLoader(Path databasePath, boolean loadDatabaseOnHeap) { - return new DatabaseReaderLazyLoader( - databasePath, - () -> { - DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance()); - if (loadDatabaseOnHeap) { - builder.fileMode(Reader.FileMode.MEMORY); - } else { - builder.fileMode(Reader.FileMode.MEMORY_MAPPED); - } - return builder.build(); - }); + private static DatabaseReaderLazyLoader createLoader(GeoIpCache cache, Path databasePath, boolean loadDatabaseOnHeap) { + CheckedSupplier loader = () -> { + DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance()); + if (loadDatabaseOnHeap) { + builder.fileMode(Reader.FileMode.MEMORY); + } else { + builder.fileMode(Reader.FileMode.MEMORY_MAPPED); + } + return builder.build(); + }; + return new DatabaseReaderLazyLoader(cache, databasePath, loader); } private static void assertDatabaseExistence(final Path path, final boolean exists) throws IOException { @@ -153,75 +148,4 @@ public void close() throws IOException { } } - /** - * The in-memory cache for the geoip data. There should only be 1 instance of this class.. - * This cache differs from the maxmind's {@link NodeCache} such that this cache stores the deserialized Json objects to avoid the - * cost of deserialization for each lookup (cached or not). This comes at slight expense of higher memory usage, but significant - * reduction of CPU usage. - */ - static class GeoIpCache { - private final Cache, AbstractResponse> cache; - - //package private for testing - GeoIpCache(long maxSize) { - if (maxSize < 0) { - throw new IllegalArgumentException("geoip max cache size must be 0 or greater"); - } - this.cache = CacheBuilder., AbstractResponse>builder().setMaximumWeight(maxSize).build(); - } - - T putIfAbsent(InetAddress ip, Class responseType, - Function retrieveFunction) { - - //can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader) - CacheKey cacheKey = new CacheKey<>(ip, responseType); - //intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition. - AbstractResponse response = cache.get(cacheKey); - if (response == null) { - response = retrieveFunction.apply(ip); - cache.put(cacheKey, response); - } - return responseType.cast(response); - } - - //only useful for testing - T get(InetAddress ip, Class responseType) { - CacheKey cacheKey = new CacheKey<>(ip, responseType); - return responseType.cast(cache.get(cacheKey)); - } - - /** - * The key to use for the cache. Since this cache can span multiple geoip processors that all use different databases, the response - * type is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same - * IP may be in both with different values and we need to cache both. The response type scopes the IP to the correct database - * provides a means to safely cast the return objects. - * @param The AbstractResponse type used to scope the key and cast the result. - */ - private static class CacheKey { - - private final InetAddress ip; - private final Class responseType; - - private CacheKey(InetAddress ip, Class responseType) { - this.ip = ip; - this.responseType = responseType; - } - - //generated - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - CacheKey cacheKey = (CacheKey) o; - return Objects.equals(ip, cacheKey.ip) && - Objects.equals(responseType, cacheKey.responseType); - } - - //generated - @Override - public int hashCode() { - return Objects.hash(ip, responseType); - } - } - } } diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/IngestGeoIpPluginTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpCacheTests.java similarity index 65% rename from modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/IngestGeoIpPluginTests.java rename to modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpCacheTests.java index edfef527af450..9826dcda88151 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/IngestGeoIpPluginTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpCacheTests.java @@ -10,12 +10,11 @@ import com.maxmind.geoip2.model.AbstractResponse; import org.elasticsearch.common.network.InetAddresses; -import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache; import org.elasticsearch.test.ESTestCase; import static org.mockito.Mockito.mock; -public class IngestGeoIpPluginTests extends ESTestCase { +public class GeoIpCacheTests extends ESTestCase { public void testCachesAndEvictsResults() { GeoIpCache cache = new GeoIpCache(1); @@ -23,25 +22,35 @@ public void testCachesAndEvictsResults() { AbstractResponse response2 = mock(AbstractResponse.class); //add a key - AbstractResponse cachedResponse = cache.putIfAbsent(InetAddresses.forString("127.0.0.1"), AbstractResponse.class, ip -> response1); + AbstractResponse cachedResponse = cache.putIfAbsent(InetAddresses.forString("127.0.0.1"), "path/to/db", ip -> response1); assertSame(cachedResponse, response1); - assertSame(cachedResponse, cache.putIfAbsent(InetAddresses.forString("127.0.0.1"), AbstractResponse.class, ip -> response1)); - assertSame(cachedResponse, cache.get(InetAddresses.forString("127.0.0.1"), AbstractResponse.class)); + assertSame(cachedResponse, cache.putIfAbsent(InetAddresses.forString("127.0.0.1"), "path/to/db", ip -> response1)); + assertSame(cachedResponse, cache.get(InetAddresses.forString("127.0.0.1"), "path/to/db")); // evict old key by adding another value - cachedResponse = cache.putIfAbsent(InetAddresses.forString("127.0.0.2"), AbstractResponse.class, ip -> response2); + cachedResponse = cache.putIfAbsent(InetAddresses.forString("127.0.0.2"), "path/to/db", ip -> response2); assertSame(cachedResponse, response2); - assertSame(cachedResponse, cache.putIfAbsent(InetAddresses.forString("127.0.0.2"), AbstractResponse.class, ip -> response2)); - assertSame(cachedResponse, cache.get(InetAddresses.forString("127.0.0.2"), AbstractResponse.class)); + assertSame(cachedResponse, cache.putIfAbsent(InetAddresses.forString("127.0.0.2"), "path/to/db", ip -> response2)); + assertSame(cachedResponse, cache.get(InetAddresses.forString("127.0.0.2"), "path/to/db")); + assertNotSame(response1, cache.get(InetAddresses.forString("127.0.0.1"), "path/to/db")); + } + + public void testCacheKey() { + GeoIpCache cache = new GeoIpCache(2); + AbstractResponse response1 = mock(AbstractResponse.class); + AbstractResponse response2 = mock(AbstractResponse.class); - assertNotSame(response1, cache.get(InetAddresses.forString("127.0.0.1"), AbstractResponse.class)); + assertSame(response1, cache.putIfAbsent(InetAddresses.forString("127.0.0.1"), "path/to/db1", ip -> response1)); + assertSame(response2, cache.putIfAbsent(InetAddresses.forString("127.0.0.1"), "path/to/db2", ip -> response2)); + assertSame(response1, cache.get(InetAddresses.forString("127.0.0.1"), "path/to/db1")); + assertSame(response2, cache.get(InetAddresses.forString("127.0.0.1"), "path/to/db2")); } public void testThrowsFunctionsException() { GeoIpCache cache = new GeoIpCache(1); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> cache.putIfAbsent(InetAddresses.forString("127.0.0.1"), AbstractResponse.class, + () -> cache.putIfAbsent(InetAddresses.forString("127.0.0.1"), "path/to/db", ip -> { throw new IllegalArgumentException("bad"); })); assertEquals("bad", ex.getMessage()); } diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index 42a721e57207b..756b24a1a0169 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -13,7 +13,6 @@ import org.elasticsearch.common.Randomness; import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.StreamsUtils; import org.junit.AfterClass; @@ -49,7 +48,7 @@ public static void loadDatabaseReaders() throws IOException { Files.createDirectories(geoIpConfigDir); copyDatabaseFiles(geoIpDir); - databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpDir, geoIpConfigDir); + databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(new GeoIpCache(1000), geoIpDir, geoIpConfigDir); } @AfterClass @@ -61,7 +60,7 @@ public static void closeDatabaseReaders() throws IOException { } public void testBuildDefaults() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Map config = new HashMap<>(); config.put("field", "_field"); @@ -77,7 +76,7 @@ public void testBuildDefaults() throws Exception { } public void testSetIgnoreMissing() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Map config = new HashMap<>(); config.put("field", "_field"); @@ -94,7 +93,7 @@ public void testSetIgnoreMissing() throws Exception { } public void testCountryBuildDefaults() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Map config = new HashMap<>(); config.put("field", "_field"); @@ -112,7 +111,7 @@ public void testCountryBuildDefaults() throws Exception { } public void testAsnBuildDefaults() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Map config = new HashMap<>(); config.put("field", "_field"); @@ -130,7 +129,7 @@ public void testAsnBuildDefaults() throws Exception { } public void testBuildTargetField() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Map config = new HashMap<>(); config.put("field", "_field"); config.put("target_field", "_field"); @@ -141,7 +140,7 @@ public void testBuildTargetField() throws Exception { } public void testBuildDbFile() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-Country.mmdb"); @@ -154,7 +153,7 @@ public void testBuildDbFile() throws Exception { } public void testBuildWithCountryDbAndAsnFields() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-Country.mmdb"); @@ -168,7 +167,7 @@ public void testBuildWithCountryDbAndAsnFields() throws Exception { } public void testBuildWithAsnDbAndCityFields() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-ASN.mmdb"); @@ -182,7 +181,7 @@ public void testBuildWithAsnDbAndCityFields() throws Exception { } public void testBuildNonExistingDbFile() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Map config = new HashMap<>(); config.put("field", "_field"); @@ -192,7 +191,7 @@ public void testBuildNonExistingDbFile() throws Exception { } public void testBuildFields() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Set properties = EnumSet.noneOf(GeoIpProcessor.Property.class); List fieldNames = new ArrayList<>(); @@ -216,7 +215,7 @@ public void testBuildFields() throws Exception { } public void testBuildIllegalFieldOption() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Map config1 = new HashMap<>(); config1.put("field", "_field"); @@ -242,8 +241,9 @@ public void testLazyLoading() throws Exception { // Loading another database reader instances, because otherwise we can't test lazy loading as the // database readers used at class level are reused between tests. (we want to keep that otherwise running this // test will take roughly 4 times more time) - Map databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpDir, geoIpConfigDir); - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); + Map databaseReaders = + IngestGeoIpPlugin.loadDatabaseReaders(new GeoIpCache(1000), geoIpDir, geoIpConfigDir); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) { assertNull(lazyLoader.databaseReader.get()); } @@ -299,8 +299,9 @@ public void testLoadingCustomDatabase() throws IOException { * Loading another database reader instances, because otherwise we can't test lazy loading as the database readers used at class * level are reused between tests. (we want to keep that otherwise running this test will take roughly 4 times more time). */ - final Map databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpDir, geoIpConfigDir); - final GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); + final Map databaseReaders = + IngestGeoIpPlugin.loadDatabaseReaders(new GeoIpCache(1000), geoIpDir, geoIpConfigDir); + final GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) { assertNull(lazyLoader.databaseReader.get()); } @@ -330,8 +331,8 @@ public void testDatabaseNotExistsInDir() throws IOException { copyDatabaseFiles(geoIpDir); final String databaseFilename = randomFrom(IngestGeoIpPlugin.DEFAULT_DATABASE_FILENAMES); Files.delete(geoIpDir.resolve(databaseFilename)); - final IOException e = - expectThrows(IOException.class, () -> IngestGeoIpPlugin.loadDatabaseReaders(geoIpDir, geoIpConfigDir)); + final IOException e = expectThrows(IOException.class, + () -> IngestGeoIpPlugin.loadDatabaseReaders(new GeoIpCache(1000), geoIpDir, geoIpConfigDir)); assertThat(e, hasToString(containsString("expected database [" + databaseFilename + "] to exist in [" + geoIpDir + "]"))); } @@ -343,8 +344,8 @@ public void testDatabaseExistsInConfigDir() throws IOException { copyDatabaseFiles(geoIpDir); final String databaseFilename = randomFrom(IngestGeoIpPlugin.DEFAULT_DATABASE_FILENAMES); copyDatabaseFile(geoIpConfigDir, databaseFilename); - final IOException e = - expectThrows(IOException.class, () -> IngestGeoIpPlugin.loadDatabaseReaders(geoIpDir, geoIpConfigDir)); + final IOException e = expectThrows(IOException.class, + () -> IngestGeoIpPlugin.loadDatabaseReaders(new GeoIpCache(1000), geoIpDir, geoIpConfigDir)); assertThat(e, hasToString(containsString("expected database [" + databaseFilename + "] to not exist in [" + geoIpConfigDir + "]"))); } diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java index 2457cdd1296d9..1d8af704ff299 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java @@ -13,7 +13,6 @@ import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.RandomDocumentPicks; -import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -36,8 +35,7 @@ public class GeoIpProcessorTests extends ESTestCase { public void testCity() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000), false); + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", "8.8.8.8"); @@ -61,8 +59,7 @@ public void testCity() throws Exception { public void testNullValueWithIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, - new GeoIpCache(1000), false); + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, false); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("source_field", null)); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -72,8 +69,7 @@ public void testNullValueWithIgnoreMissing() throws Exception { public void testNonExistentWithIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, - new GeoIpCache(1000), false); + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, false); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); processor.execute(ingestDocument); @@ -82,8 +78,7 @@ public void testNonExistentWithIgnoreMissing() throws Exception { public void testNullWithoutIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000), false); + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("source_field", null)); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -93,8 +88,7 @@ public void testNullWithoutIgnoreMissing() throws Exception { public void testNonExistentWithoutIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000), false); + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument)); @@ -103,8 +97,7 @@ public void testNonExistentWithoutIgnoreMissing() throws Exception { public void testCity_withIpV6() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000), false); + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); String address = "2602:306:33d3:8000::3257:9652"; Map document = new HashMap<>(); @@ -132,8 +125,7 @@ public void testCity_withIpV6() throws Exception { public void testCityWithMissingLocation() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000), false); + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", "80.231.5.0"); @@ -149,8 +141,7 @@ public void testCityWithMissingLocation() throws Exception { public void testCountry() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-Country.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000), false); + loader("/GeoLite2-Country.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", "82.170.213.79"); @@ -169,8 +160,7 @@ public void testCountry() throws Exception { public void testCountryWithMissingLocation() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-Country.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000), false); + loader("/GeoLite2-Country.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", "80.231.5.0"); @@ -187,8 +177,7 @@ public void testCountryWithMissingLocation() throws Exception { public void testAsn() throws Exception { String ip = "82.171.64.0"; GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-ASN.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000), false); + loader("/GeoLite2-ASN.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", ip); @@ -207,8 +196,7 @@ public void testAsn() throws Exception { public void testAddressIsNotInTheDatabase() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000), false); + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", "127.0.0.1"); @@ -220,8 +208,7 @@ public void testAddressIsNotInTheDatabase() throws Exception { /** Don't silently do DNS lookups or anything trappy on bogus data */ public void testInvalid() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000), false); + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", "www.google.com"); @@ -232,8 +219,7 @@ public void testInvalid() throws Exception { public void testListAllValid() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000), false); + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", Arrays.asList("8.8.8.8", "82.171.64.0")); @@ -253,8 +239,7 @@ public void testListAllValid() throws Exception { public void testListPartiallyValid() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000), false); + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", Arrays.asList("8.8.8.8", "127.0.0.1")); @@ -274,8 +259,7 @@ public void testListPartiallyValid() throws Exception { public void testListNoMatches() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000), false); + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false); Map document = new HashMap<>(); document.put("source_field", Arrays.asList("127.0.0.1", "127.0.0.1")); @@ -287,8 +271,7 @@ public void testListNoMatches() throws Exception { public void testListFirstOnly() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000), true); + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, true); Map document = new HashMap<>(); document.put("source_field", Arrays.asList("8.8.8.8", "127.0.0.1")); @@ -306,8 +289,7 @@ public void testListFirstOnly() throws Exception { public void testListFirstOnlyNoMatches() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", - loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000), true); + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, true); Map document = new HashMap<>(); document.put("source_field", Arrays.asList("127.0.0.1", "127.0.0.2")); @@ -321,7 +303,7 @@ private DatabaseReaderLazyLoader loader(final String path) { final Supplier databaseInputStreamSupplier = () -> GeoIpProcessor.class.getResourceAsStream(path); final CheckedSupplier loader = () -> new DatabaseReader.Builder(databaseInputStreamSupplier.get()).build(); - return new DatabaseReaderLazyLoader(PathUtils.get(path), loader) { + return new DatabaseReaderLazyLoader(new GeoIpCache(1000), PathUtils.get(path), loader) { @Override long databaseFileSize() throws IOException {