Skip to content

Commit

Permalink
Changed how geoip cache is integrated with geoip processor.
Browse files Browse the repository at this point in the history
Backport elastic#68581 of to 7.x branch.

This change helps facilitate allowing maxmind databases to be updated at runtime.
This will make is easier to purge the cache if a database changes.

Made the following changes:
* Changed how geoip processor integrates with the cache. The cache is moved from the geoip processor to DatabaseReaderLazyLoader class.
* Changed the cache key from ip + response class to ip + database_path.
* Moved GeoIpCache from IngestGeoIpPlugin class to be a top level class.
  • Loading branch information
martijnvg committed Feb 11, 2021
1 parent 5c4492e commit ae392cb
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,28 @@
package org.elasticsearch.ingest.geoip;

import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.AddressNotFoundException;
import com.maxmind.geoip2.model.AbstractResponse;
import com.maxmind.geoip2.model.AsnResponse;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.model.CountryResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.core.internal.io.IOUtils;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Objects;

/**
Expand All @@ -31,14 +41,16 @@ class DatabaseReaderLazyLoader implements Closeable {

private static final Logger LOGGER = LogManager.getLogger(DatabaseReaderLazyLoader.class);

private final GeoIpCache cache;
private final Path databasePath;
private final CheckedSupplier<DatabaseReader, IOException> loader;
final SetOnce<DatabaseReader> databaseReader;

// cache the database type so that we do not re-read it on every pipeline execution
final SetOnce<String> databaseType;

DatabaseReaderLazyLoader(final Path databasePath, final CheckedSupplier<DatabaseReader, IOException> loader) {
DatabaseReaderLazyLoader(final GeoIpCache cache, final Path databasePath, final CheckedSupplier<DatabaseReader, IOException> loader) {
this.cache = cache;
this.databasePath = Objects.requireNonNull(databasePath);
this.loader = Objects.requireNonNull(loader);
this.databaseReader = new SetOnce<>();
Expand Down Expand Up @@ -123,7 +135,34 @@ InputStream databaseInputStream() throws IOException {
return Files.newInputStream(databasePath);
}

DatabaseReader get() throws IOException {
CityResponse getCity(InetAddress ipAddress) {
return getResponse(ipAddress, DatabaseReader::city);
}

CountryResponse getCountry(InetAddress ipAddress) {
return getResponse(ipAddress, DatabaseReader::country);
}

AsnResponse getAsn(InetAddress ipAddress) {
return getResponse(ipAddress, DatabaseReader::asn);
}

private <T extends AbstractResponse> T getResponse(InetAddress ipAddress,
CheckedBiFunction<DatabaseReader, InetAddress, T, Exception> responseProvider) {
SpecialPermission.check();
return AccessController.doPrivileged((PrivilegedAction<T>) () ->
cache.putIfAbsent(ipAddress, databasePath.toString(), ip -> {
try {
return responseProvider.apply(get(), ipAddress);
} catch (AddressNotFoundException e) {
throw new GeoIpProcessor.AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
}

private DatabaseReader get() throws IOException {
if (databaseReader.get() == null) {
synchronized (databaseReader) {
if (databaseReader.get() == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.ingest.geoip;

import com.maxmind.db.NodeCache;
import com.maxmind.geoip2.model.AbstractResponse;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;

import java.net.InetAddress;
import java.util.Objects;
import java.util.function.Function;

/**
* The in-memory cache for the geoip data. There should only be 1 instance of this class..
* This cache differs from the maxmind's {@link NodeCache} such that this cache stores the deserialized Json objects to avoid the
* cost of deserialization for each lookup (cached or not). This comes at slight expense of higher memory usage, but significant
* reduction of CPU usage.
*/
final class GeoIpCache {
private final Cache<CacheKey, AbstractResponse> cache;

//package private for testing
GeoIpCache(long maxSize) {
if (maxSize < 0) {
throw new IllegalArgumentException("geoip max cache size must be 0 or greater");
}
this.cache = CacheBuilder.<CacheKey, AbstractResponse>builder().setMaximumWeight(maxSize).build();
}

@SuppressWarnings("unchecked")
<T extends AbstractResponse> T putIfAbsent(InetAddress ip,
String databasePath,
Function<InetAddress, AbstractResponse> retrieveFunction) {

//can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader)
CacheKey cacheKey = new CacheKey(ip, databasePath);
//intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
AbstractResponse response = cache.get(cacheKey);
if (response == null) {
response = retrieveFunction.apply(ip);
cache.put(cacheKey, response);
}
return (T) response;
}

//only useful for testing
AbstractResponse get(InetAddress ip, String databasePath) {
CacheKey cacheKey = new CacheKey(ip, databasePath);
return cache.get(cacheKey);
}

/**
* The key to use for the cache. Since this cache can span multiple geoip processors that all use different databases, the database
* path is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same
* IP may be in both with different values and we need to cache both.
*/
private static class CacheKey {

private final InetAddress ip;
private final String databasePath;

private CacheKey(InetAddress ip, String databasePath) {
this.ip = ip;
this.databasePath = databasePath;
}

//generated
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheKey cacheKey = (CacheKey) o;
return Objects.equals(ip, cacheKey.ip) &&
Objects.equals(databasePath, cacheKey.databasePath);
}

//generated
@Override
public int hashCode() {
return Objects.hash(ip, databasePath);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.ingest.geoip;

import com.maxmind.db.Network;
import com.maxmind.geoip2.exception.AddressNotFoundException;
import com.maxmind.geoip2.model.AsnResponse;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.model.CountryResponse;
Expand All @@ -19,18 +18,14 @@
import com.maxmind.geoip2.record.Location;
import com.maxmind.geoip2.record.Subdivision;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache;

import java.io.IOException;
import java.net.InetAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -58,7 +53,6 @@ public final class GeoIpProcessor extends AbstractProcessor {
private final DatabaseReaderLazyLoader lazyLoader;
private final Set<Property> properties;
private final boolean ignoreMissing;
private final GeoIpCache cache;
private final boolean firstOnly;

/**
Expand All @@ -70,7 +64,6 @@ public final class GeoIpProcessor extends AbstractProcessor {
* @param targetField the target field
* @param properties the properties; ideally this is lazily-loaded once on first use
* @param ignoreMissing true if documents with a missing value for the field should be ignored
* @param cache a geo-IP cache
* @param firstOnly true if only first result should be returned in case of array
*/
GeoIpProcessor(
Expand All @@ -80,15 +73,13 @@ public final class GeoIpProcessor extends AbstractProcessor {
final String targetField,
final Set<Property> properties,
final boolean ignoreMissing,
final GeoIpCache cache,
boolean firstOnly) {
super(tag, description);
this.field = field;
this.targetField = targetField;
this.lazyLoader = lazyLoader;
this.properties = properties;
this.ignoreMissing = ignoreMissing;
this.cache = cache;
this.firstOnly = firstOnly;
}

Expand Down Expand Up @@ -190,18 +181,7 @@ Set<Property> getProperties() {
}

private Map<String, Object> retrieveCityGeoData(InetAddress ipAddress) {
SpecialPermission.check();
CityResponse response = AccessController.doPrivileged((PrivilegedAction<CityResponse>) () ->
cache.putIfAbsent(ipAddress, CityResponse.class, ip -> {
try {
return lazyLoader.get().city(ip);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}));

CityResponse response = lazyLoader.getCity(ipAddress);
Country country = response.getCountry();
City city = response.getCity();
Location location = response.getLocation();
Expand Down Expand Up @@ -276,18 +256,7 @@ private Map<String, Object> retrieveCityGeoData(InetAddress ipAddress) {
}

private Map<String, Object> retrieveCountryGeoData(InetAddress ipAddress) {
SpecialPermission.check();
CountryResponse response = AccessController.doPrivileged((PrivilegedAction<CountryResponse>) () ->
cache.putIfAbsent(ipAddress, CountryResponse.class, ip -> {
try {
return lazyLoader.get().country(ip);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}));

CountryResponse response = lazyLoader.getCountry(ipAddress);
Country country = response.getCountry();
Continent continent = response.getContinent();

Expand Down Expand Up @@ -321,18 +290,7 @@ private Map<String, Object> retrieveCountryGeoData(InetAddress ipAddress) {
}

private Map<String, Object> retrieveAsnGeoData(InetAddress ipAddress) {
SpecialPermission.check();
AsnResponse response = AccessController.doPrivileged((PrivilegedAction<AsnResponse>) () ->
cache.putIfAbsent(ipAddress, AsnResponse.class, ip -> {
try {
return lazyLoader.get().asn(ip);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}));

AsnResponse response = lazyLoader.getAsn(ipAddress);
Integer asn = response.getAutonomousSystemNumber();
String organization_name = response.getAutonomousSystemOrganization();
Network network = response.getNetwork();
Expand Down Expand Up @@ -381,11 +339,8 @@ Map<String, DatabaseReaderLazyLoader> databaseReaders() {
return Collections.unmodifiableMap(databaseReaders);
}

private final GeoIpCache cache;

public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders, GeoIpCache cache) {
public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders) {
this.databaseReaders = databaseReaders;
this.cache = cache;
}

@Override
Expand Down Expand Up @@ -432,8 +387,7 @@ public GeoIpProcessor create(
}
}

return new GeoIpProcessor(processorTag, description, ipField, lazyLoader, targetField, properties, ignoreMissing, cache,
firstOnly);
return new GeoIpProcessor(processorTag, description, ipField, lazyLoader, targetField, properties, ignoreMissing, firstOnly);
}
}

Expand Down
Loading

0 comments on commit ae392cb

Please sign in to comment.