Skip to content

Commit

Permalink
Download IPinfo ip location databases (elastic#114847)
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo authored Oct 15, 2024
1 parent c401a71 commit 69054ac
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
Expand All @@ -40,18 +41,24 @@

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.ingest.EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER;
import static org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderTaskExecutor.IPINFO_TOKEN_SETTING;
import static org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderTaskExecutor.MAXMIND_LICENSE_KEY_SETTING;
import static org.hamcrest.Matchers.equalTo;

public class EnterpriseGeoIpDownloaderIT extends ESIntegTestCase {

private static final String DATABASE_TYPE = "GeoIP2-City";
private static final String MAXMIND_DATABASE_TYPE = "GeoIP2-City";
private static final String IPINFO_DATABASE_TYPE = "asn";

@ClassRule
public static final EnterpriseGeoIpHttpFixture fixture = new EnterpriseGeoIpHttpFixture(DATABASE_TYPE);
public static final EnterpriseGeoIpHttpFixture fixture = new EnterpriseGeoIpHttpFixture(
List.of(MAXMIND_DATABASE_TYPE),
List.of(IPINFO_DATABASE_TYPE)
);

protected String getEndpoint() {
return fixture.getAddress();
Expand All @@ -61,6 +68,7 @@ protected String getEndpoint() {
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString(MAXMIND_LICENSE_KEY_SETTING.getKey(), "license_key");
secureSettings.setString(IPINFO_TOKEN_SETTING.getKey(), "token");
Settings.Builder builder = Settings.builder();
builder.setSecureSettings(secureSettings)
.put(super.nodeSettings(nodeOrdinal, otherSettings))
Expand All @@ -87,29 +95,44 @@ public void testEnterpriseDownloaderTask() throws Exception {
* Note that the "enterprise database" is actually just a geolite database being loaded by the GeoIpHttpFixture.
*/
EnterpriseGeoIpDownloader.DEFAULT_MAXMIND_ENDPOINT = getEndpoint();
final String pipelineName = "enterprise_geoip_pipeline";
EnterpriseGeoIpDownloader.DEFAULT_IPINFO_ENDPOINT = getEndpoint();
final String indexName = "enterprise_geoip_test_index";
final String geoipPipelineName = "enterprise_geoip_pipeline";
final String iplocationPipelineName = "enterprise_iplocation_pipeline";
final String sourceField = "ip";
final String targetField = "ip-city";
final String targetField = "ip-result";

startEnterpriseGeoIpDownloaderTask();
configureDatabase(DATABASE_TYPE);
createGeoIpPipeline(pipelineName, DATABASE_TYPE, sourceField, targetField);
configureMaxmindDatabase(MAXMIND_DATABASE_TYPE);
configureIpinfoDatabase(IPINFO_DATABASE_TYPE);
waitAround();
createPipeline(geoipPipelineName, "geoip", MAXMIND_DATABASE_TYPE, sourceField, targetField);
createPipeline(iplocationPipelineName, "ip_location", IPINFO_DATABASE_TYPE, sourceField, targetField);

/*
* We know that the databases index has been populated (because we waited around, :wink:), but we don't know for sure that
* the databases have been pulled down and made available on all nodes. So we run these ingest-and-check steps in assertBusy blocks.
*/
assertBusy(() -> {
/*
* We know that the .geoip_databases index has been populated, but we don't know for sure that the database has been pulled
* down and made available on all nodes. So we run this ingest-and-check step in an assertBusy.
*/
logger.info("Ingesting a test document");
String documentId = ingestDocument(indexName, pipelineName, sourceField);
String documentId = ingestDocument(indexName, geoipPipelineName, sourceField, "89.160.20.128");
GetResponse getResponse = client().get(new GetRequest(indexName, documentId)).actionGet();
Map<String, Object> returnedSource = getResponse.getSource();
assertNotNull(returnedSource);
Object targetFieldValue = returnedSource.get(targetField);
assertNotNull(targetFieldValue);
assertThat(((Map<String, Object>) targetFieldValue).get("organization_name"), equalTo("Bredband2 AB"));
});
assertBusy(() -> {
logger.info("Ingesting another test document");
String documentId = ingestDocument(indexName, iplocationPipelineName, sourceField, "12.10.66.1");
GetResponse getResponse = client().get(new GetRequest(indexName, documentId)).actionGet();
Map<String, Object> returnedSource = getResponse.getSource();
assertNotNull(returnedSource);
Object targetFieldValue = returnedSource.get(targetField);
assertNotNull(targetFieldValue);
assertThat(((Map<String, Object>) targetFieldValue).get("organization_name"), equalTo("OAKLAWN JOCKEY CLUB, INC."));
});
}

private void startEnterpriseGeoIpDownloaderTask() {
Expand All @@ -128,36 +151,53 @@ private void startEnterpriseGeoIpDownloaderTask() {
);
}

private void configureDatabase(String databaseType) throws Exception {
private void configureMaxmindDatabase(String databaseType) {
admin().cluster()
.execute(
PutDatabaseConfigurationAction.INSTANCE,
new PutDatabaseConfigurationAction.Request(
TimeValue.MAX_VALUE,
TimeValue.MAX_VALUE,
new DatabaseConfiguration("test", databaseType, new DatabaseConfiguration.Maxmind("test_account"))
new DatabaseConfiguration("test-1", databaseType, new DatabaseConfiguration.Maxmind("test_account"))
)
)
.actionGet();
}

private void configureIpinfoDatabase(String databaseType) {
admin().cluster()
.execute(
PutDatabaseConfigurationAction.INSTANCE,
new PutDatabaseConfigurationAction.Request(
TimeValue.MAX_VALUE,
TimeValue.MAX_VALUE,
new DatabaseConfiguration("test-2", databaseType, new DatabaseConfiguration.Ipinfo())
)
)
.actionGet();
}

private void waitAround() throws Exception {
ensureGreen(GeoIpDownloader.DATABASES_INDEX);
assertBusy(() -> {
SearchResponse searchResponse = client().search(new SearchRequest(GeoIpDownloader.DATABASES_INDEX)).actionGet();
try {
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
assertThat(searchResponse.getHits().getHits().length, equalTo(2));
} finally {
searchResponse.decRef();
}
});
}

private void createGeoIpPipeline(String pipelineName, String databaseType, String sourceField, String targetField) throws IOException {
private void createPipeline(String pipelineName, String processorType, String databaseType, String sourceField, String targetField)
throws IOException {
putJsonPipeline(pipelineName, (builder, params) -> {
builder.field("description", "test");
builder.startArray("processors");
{
builder.startObject();
{
builder.startObject("geoip");
builder.startObject(processorType);
{
builder.field("field", sourceField);
builder.field("target_field", targetField);
Expand All @@ -171,11 +211,11 @@ private void createGeoIpPipeline(String pipelineName, String databaseType, Strin
});
}

private String ingestDocument(String indexName, String pipelineName, String sourceField) {
private String ingestDocument(String indexName, String pipelineName, String sourceField, String value) {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(
new IndexRequest(indexName).source("{\"" + sourceField + "\": \"89.160.20.128\"}", XContentType.JSON).setPipeline(pipelineName)
);
bulkRequest.add(new IndexRequest(indexName).source(Strings.format("""
{ "%s": "%s"}
""", sourceField, value), XContentType.JSON).setPipeline(pipelineName));
BulkResponse response = client().bulk(bulkRequest).actionGet();
BulkItemResponse[] bulkItemResponses = response.getItems();
assertThat(bulkItemResponses.length, equalTo(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.BoolQueryBuilder;
Expand All @@ -39,6 +38,8 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;

import java.io.Closeable;
Expand All @@ -57,6 +58,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderTaskExecutor.IPINFO_SETTINGS_PREFIX;
import static org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderTaskExecutor.MAXMIND_SETTINGS_PREFIX;

/**
Expand All @@ -72,6 +74,9 @@ public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {
// a sha256 checksum followed by two spaces followed by an (ignored) file name
private static final Pattern SHA256_CHECKSUM_PATTERN = Pattern.compile("(\\w{64})\\s\\s(.*)");

// an md5 checksum
private static final Pattern MD5_CHECKSUM_PATTERN = Pattern.compile("(\\w{32})");

// for overriding in tests
static String DEFAULT_MAXMIND_ENDPOINT = System.getProperty(
MAXMIND_SETTINGS_PREFIX + "endpoint.default", //
Expand All @@ -80,6 +85,14 @@ public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {
// n.b. a future enhancement might be to allow for a MAXMIND_ENDPOINT_SETTING, but
// at the moment this is an unsupported system property for use in tests (only)

// for overriding in tests
static String DEFAULT_IPINFO_ENDPOINT = System.getProperty(
IPINFO_SETTINGS_PREFIX + "endpoint.default", //
"https://ipinfo.io/data"
);
// n.b. a future enhancement might be to allow for an IPINFO_ENDPOINT_SETTING, but
// at the moment this is an unsupported system property for use in tests (only)

static final String DATABASES_INDEX = ".geoip_databases";
static final int MAX_CHUNK_SIZE = 1024 * 1024;

Expand Down Expand Up @@ -444,16 +457,15 @@ private void scheduleNextRun(TimeValue time) {
}
}

@Nullable
private ProviderDownload downloaderFor(DatabaseConfiguration database) {
if (database.provider() instanceof DatabaseConfiguration.Maxmind) {
return new MaxmindDownload(database.name(), (DatabaseConfiguration.Maxmind) database.provider());
} else if (database.provider() instanceof DatabaseConfiguration.Ipinfo) {
// as a temporary implementation detail, null here means 'not actually supported *just yet*'
return null;
if (database.provider() instanceof DatabaseConfiguration.Maxmind maxmind) {
return new MaxmindDownload(database.name(), maxmind);
} else if (database.provider() instanceof DatabaseConfiguration.Ipinfo ipinfo) {
return new IpinfoDownload(database.name(), ipinfo);
} else {
assert false : "Attempted to use database downloader with unsupported provider type [" + database.provider().getClass() + "]";
return null;
throw new IllegalArgumentException(
Strings.format("Unexpected provider [%s] for configuration [%s]", database.provider().getClass(), database.id())
);
}
}

Expand Down Expand Up @@ -488,7 +500,7 @@ public HttpClient.PasswordAuthenticationHolder buildCredentials() {

@Override
public boolean validCredentials() {
return auth.get() != null;
return auth != null && auth.get() != null;
}

@Override
Expand Down Expand Up @@ -529,7 +541,101 @@ public CheckedSupplier<InputStream, IOException> download() {

@Override
public void close() throws IOException {
auth.close();
if (auth != null) auth.close();
}
}

class IpinfoDownload implements ProviderDownload {

final String name;
final DatabaseConfiguration.Ipinfo ipinfo;
HttpClient.PasswordAuthenticationHolder auth;

IpinfoDownload(String name, DatabaseConfiguration.Ipinfo ipinfo) {
this.name = name;
this.ipinfo = ipinfo;
this.auth = buildCredentials();
}

@Override
public HttpClient.PasswordAuthenticationHolder buildCredentials() {
final char[] tokenChars = tokenProvider.apply("ipinfo");

// if the token is missing or empty, return null as 'no auth'
if (tokenChars == null || tokenChars.length == 0) {
return null;
}

// ipinfo uses the token as the username component of basic auth, see https://ipinfo.io/developers#authentication
return new HttpClient.PasswordAuthenticationHolder(new String(tokenChars), new char[] {});
}

@Override
public boolean validCredentials() {
return auth != null && auth.get() != null;
}

private static final Set<String> FREE_DATABASES = Set.of("asn", "country", "country_asn");

@Override
public String url(String suffix) {
// note: the 'free' databases are in the sub-path 'free/' in terms of the download endpoint
final String internalName;
if (FREE_DATABASES.contains(name)) {
internalName = "free/" + name;
} else {
internalName = name;
}

// reminder, we're passing the ipinfo token as the username part of http basic auth,
// see https://ipinfo.io/developers#authentication

String endpointPattern = DEFAULT_IPINFO_ENDPOINT;
if (endpointPattern.contains("%")) {
throw new IllegalArgumentException("Invalid endpoint [" + endpointPattern + "]");
}
if (endpointPattern.endsWith("/") == false) {
endpointPattern += "/";
}
endpointPattern += "%s.%s";

// at this point the pattern looks like this (in the default case):
// https://ipinfo.io/data/%s.%s
// also see https://ipinfo.io/developers/database-download,
// and https://ipinfo.io/developers/database-filename-reference for more

return Strings.format(endpointPattern, internalName, suffix);
}

@Override
public Checksum checksum() throws IOException {
final String checksumJsonUrl = this.url("mmdb/checksums"); // a minor abuse of the idea of a 'suffix', :shrug:
byte[] data = httpClient.getBytes(auth.get(), checksumJsonUrl); // this throws if the auth is bad
Map<String, Object> checksums;
try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, data)) {
checksums = parser.map();
}
@SuppressWarnings("unchecked")
String md5 = ((Map<String, String>) checksums.get("checksums")).get("md5");
logger.info("checksum was [{}]", md5);

var matcher = MD5_CHECKSUM_PATTERN.matcher(md5);
boolean match = matcher.matches();
if (match == false) {
throw new RuntimeException("Unexpected md5 response from [" + checksumJsonUrl + "]");
}
return Checksum.md5(md5);
}

@Override
public CheckedSupplier<InputStream, IOException> download() {
final String mmdbUrl = this.url("mmdb");
return () -> httpClient.get(auth.get(), mmdbUrl);
}

@Override
public void close() throws IOException {
if (auth != null) auth.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,15 @@ public class EnterpriseGeoIpDownloaderTaskExecutor extends PersistentTasksExecut

static final String MAXMIND_SETTINGS_PREFIX = "ingest.geoip.downloader.maxmind.";

static final String IPINFO_SETTINGS_PREFIX = "ingest.ip_location.downloader.ipinfo.";

public static final Setting<SecureString> MAXMIND_LICENSE_KEY_SETTING = SecureSetting.secureString(
MAXMIND_SETTINGS_PREFIX + "license_key",
null
);

public static final Setting<SecureString> IPINFO_TOKEN_SETTING = SecureSetting.secureString(IPINFO_SETTINGS_PREFIX + "token", null);

private final Client client;
private final HttpClient httpClient;
private final ClusterService clusterService;
Expand Down Expand Up @@ -106,6 +110,10 @@ private char[] getSecureToken(final String type) {
if (cachedSecureSettings.getSettingNames().contains(MAXMIND_LICENSE_KEY_SETTING.getKey())) {
token = cachedSecureSettings.getString(MAXMIND_LICENSE_KEY_SETTING.getKey()).getChars();
}
} else if (type.equals("ipinfo")) {
if (cachedSecureSettings.getSettingNames().contains(IPINFO_TOKEN_SETTING.getKey())) {
token = cachedSecureSettings.getString(IPINFO_TOKEN_SETTING.getKey()).getChars();
}
}
return token;
}
Expand Down Expand Up @@ -166,7 +174,7 @@ public synchronized void reload(Settings settings) {
// `SecureSettings` are available here! cache them as they will be needed
// whenever dynamic cluster settings change and we have to rebuild the accounts
try {
this.cachedSecureSettings = extractSecureSettings(settings, List.of(MAXMIND_LICENSE_KEY_SETTING));
this.cachedSecureSettings = extractSecureSettings(settings, List.of(MAXMIND_LICENSE_KEY_SETTING, IPINFO_TOKEN_SETTING));
} catch (GeneralSecurityException e) {
// rethrow as a runtime exception, there's logging higher up the call chain around ReloadablePlugin
throw new ElasticsearchException("Exception while reloading enterprise geoip download task executor", e);
Expand Down
Loading

0 comments on commit 69054ac

Please sign in to comment.