Skip to content

Commit

Permalink
Support IPinfo database configurations (elastic#114548)
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo authored Oct 11, 2024
1 parent 3b75d56 commit 693fb95
Show file tree
Hide file tree
Showing 17 changed files with 452 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.BoolQueryBuilder;
Expand Down Expand Up @@ -236,7 +237,7 @@ boolean processDatabase(String id, DatabaseConfiguration database) throws IOExce
logger.debug("Processing database [{}] for configuration [{}]", name, database.id());

try (ProviderDownload downloader = downloaderFor(database)) {
if (downloader.validCredentials()) {
if (downloader != null && downloader.validCredentials()) {
// the name that comes from the enterprise downloader cluster state doesn't include the .mmdb extension,
// but the downloading and indexing of database code expects it to be there, so we add it on here before continuing
final String fileName = name + ".mmdb";
Expand Down Expand Up @@ -443,10 +444,17 @@ private void scheduleNextRun(TimeValue time) {
}
}

@Nullable
private ProviderDownload downloaderFor(DatabaseConfiguration database) {
assert database.provider() instanceof DatabaseConfiguration.Maxmind
: "Attempt to use maxmind downloader with a provider of type" + database.provider().getClass();
return new MaxmindDownload(database.name(), (DatabaseConfiguration.Maxmind) database.provider());
if (database.provider() instanceof DatabaseConfiguration.Maxmind) {
return new MaxmindDownload(database.name(), (DatabaseConfiguration.Maxmind) database.provider());
} else if (database.provider() instanceof DatabaseConfiguration.Ipinfo) {
// as a temporary implementation detail, null here means 'not actually supported *just yet*'
return null;
} else {
assert false : "Attempted to use database downloader with unsupported provider type [" + database.provider().getClass() + "]";
return null;
}
}

class MaxmindDownload implements ProviderDownload {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
import static org.elasticsearch.ingest.geoip.GeoIpProcessor.Factory.downloadDatabaseOnPipelineCreation;
import static org.elasticsearch.ingest.geoip.GeoIpProcessor.GEOIP_TYPE;
import static org.elasticsearch.ingest.geoip.GeoIpProcessor.IP_LOCATION_TYPE;

/**
* Persistent task executor that is responsible for starting {@link GeoIpDownloader} after task is allocated by master node.
Expand Down Expand Up @@ -297,9 +298,18 @@ private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor
return false;
}

final Map<String, Object> processorConfig = (Map<String, Object>) processor.get(GEOIP_TYPE);
if (processorConfig != null) {
return downloadDatabaseOnPipelineCreation(GEOIP_TYPE, processorConfig, null) == downloadDatabaseOnPipelineCreation;
{
final Map<String, Object> processorConfig = (Map<String, Object>) processor.get(GEOIP_TYPE);
if (processorConfig != null) {
return downloadDatabaseOnPipelineCreation(GEOIP_TYPE, processorConfig, null) == downloadDatabaseOnPipelineCreation;
}
}

{
final Map<String, Object> processorConfig = (Map<String, Object>) processor.get(IP_LOCATION_TYPE);
if (processorConfig != null) {
return downloadDatabaseOnPipelineCreation(IP_LOCATION_TYPE, processorConfig, null) == downloadDatabaseOnPipelineCreation;
}
}

return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
+ "in a future version of Elasticsearch"; // TODO add a message about migration?

public static final String GEOIP_TYPE = "geoip";
public static final String IP_LOCATION_TYPE = "ip_location";

private final String type;
private final String field;
Expand Down Expand Up @@ -225,7 +226,7 @@ public Processor create(
final Map<String, Object> config
) throws IOException {
String ipField = readStringProperty(type, processorTag, config, "field");
String targetField = readStringProperty(type, processorTag, config, "target_field", "geoip");
String targetField = readStringProperty(type, processorTag, config, "target_field", type);
String databaseFile = readStringProperty(type, processorTag, config, "database_file", "GeoLite2-City.mmdb");
List<String> propertyNames = readOptionalList(type, processorTag, config, "properties");
boolean ignoreMissing = readBooleanProperty(type, processorTag, config, "ignore_missing", false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

import static java.util.Map.entry;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.ingest.EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER;
import static org.elasticsearch.ingest.IngestService.INGEST_ORIGIN;
Expand Down Expand Up @@ -129,7 +130,10 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
parameters.ingestService.getClusterService()
);
databaseRegistry.set(registry);
return Map.of(GeoIpProcessor.GEOIP_TYPE, new GeoIpProcessor.Factory(GeoIpProcessor.GEOIP_TYPE, registry));
return Map.ofEntries(
entry(GeoIpProcessor.GEOIP_TYPE, new GeoIpProcessor.Factory(GeoIpProcessor.GEOIP_TYPE, registry)),
entry(GeoIpProcessor.IP_LOCATION_TYPE, new GeoIpProcessor.Factory(GeoIpProcessor.IP_LOCATION_TYPE, registry))
);
}

@Override
Expand Down Expand Up @@ -239,6 +243,11 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
DatabaseConfiguration.Maxmind.NAME,
DatabaseConfiguration.Maxmind::new
),
new NamedWriteableRegistry.Entry(
DatabaseConfiguration.Provider.class,
DatabaseConfiguration.Ipinfo.NAME,
DatabaseConfiguration.Ipinfo::new
),
new NamedWriteableRegistry.Entry(
DatabaseConfiguration.Provider.class,
DatabaseConfiguration.Local.NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -78,8 +79,19 @@ public record DatabaseConfiguration(String id, String name, Provider provider) i
// "GeoLite2-Country"
);

public static final Set<String> IPINFO_NAMES = Set.of(
// these file names are from https://ipinfo.io/developers/database-filename-reference
"asn", // "Free IP to ASN"
"country", // "Free IP to Country"
// "country_asn" // "Free IP to Country + IP to ASN", not supported at present
"standard_asn", // commercial "ASN"
"standard_location", // commercial "IP Geolocation"
"standard_privacy" // commercial "Privacy Detection" (sometimes "Anonymous IP")
);

private static final ParseField NAME = new ParseField("name");
private static final ParseField MAXMIND = new ParseField(Maxmind.NAME);
private static final ParseField IPINFO = new ParseField(Ipinfo.NAME);
private static final ParseField WEB = new ParseField(Web.NAME);
private static final ParseField LOCAL = new ParseField(Local.NAME);

Expand All @@ -89,12 +101,21 @@ public record DatabaseConfiguration(String id, String name, Provider provider) i
(a, id) -> {
String name = (String) a[0];
Provider provider;

// one and only one provider object must be present
final long numNonNulls = Arrays.stream(a, 1, a.length).filter(Objects::nonNull).count();
if (numNonNulls != 1) {
throw new IllegalArgumentException("Exactly one provider object must be specified, but [" + numNonNulls + "] were found");
}

if (a[1] != null) {
provider = (Maxmind) a[1];
} else if (a[2] != null) {
provider = (Web) a[2];
provider = (Ipinfo) a[2];
} else if (a[3] != null) {
provider = (Web) a[3];
} else {
provider = (Local) a[3];
provider = (Local) a[4];
}
return new DatabaseConfiguration(id, name, provider);
}
Expand All @@ -107,6 +128,7 @@ public record DatabaseConfiguration(String id, String name, Provider provider) i
(parser, id) -> Maxmind.PARSER.apply(parser, null),
MAXMIND
);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (parser, id) -> Ipinfo.PARSER.apply(parser, null), IPINFO);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (parser, id) -> Web.PARSER.apply(parser, null), WEB);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (parser, id) -> Local.PARSER.apply(parser, null), LOCAL);
}
Expand Down Expand Up @@ -194,8 +216,16 @@ public ActionRequestValidationException validate() {
err.addValidationError("invalid name [" + name + "]: cannot be empty");
}

if (MAXMIND_NAMES.contains(name) == false) {
err.addValidationError("invalid name [" + name + "]: must be a supported name ([" + MAXMIND_NAMES + "])");
// provider-specific name validation
if (provider instanceof Maxmind) {
if (MAXMIND_NAMES.contains(name) == false) {
err.addValidationError("invalid name [" + name + "]: must be a supported name ([" + MAXMIND_NAMES + "])");
}
}
if (provider instanceof Ipinfo) {
if (IPINFO_NAMES.contains(name) == false) {
err.addValidationError("invalid name [" + name + "]: must be a supported name ([" + IPINFO_NAMES + "])");
}
}

// important: the name must be unique across all configurations of this same type,
Expand Down Expand Up @@ -234,7 +264,7 @@ public String getWriteableName() {

private static final ParseField ACCOUNT_ID = new ParseField("account_id");

private static final ConstructingObjectParser<Maxmind, Void> PARSER = new ConstructingObjectParser<>("database", false, (a, id) -> {
private static final ConstructingObjectParser<Maxmind, Void> PARSER = new ConstructingObjectParser<>("maxmind", false, (a, id) -> {
String accountId = (String) a[0];
return new Maxmind(accountId);
});
Expand All @@ -247,10 +277,6 @@ public Maxmind(StreamInput in) throws IOException {
this(in.readString());
}

public static Maxmind parse(XContentParser parser) {
return PARSER.apply(parser, null);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(accountId);
Expand All @@ -270,6 +296,37 @@ public boolean isReadOnly() {
}
}

public record Ipinfo() implements Provider {
public static final String NAME = "ipinfo";

// this'll become a ConstructingObjectParser once we accept the token (securely) in the json definition
private static final ObjectParser<Ipinfo, Void> PARSER = new ObjectParser<>("ipinfo", Ipinfo::new);

public Ipinfo(StreamInput in) throws IOException {
this();
}

@Override
public void writeTo(StreamOutput out) throws IOException {}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.endObject();
return builder;
}

@Override
public String getWriteableName() {
return NAME;
}

@Override
public boolean isReadOnly() {
return false;
}
}

public record Local(String type) implements Provider {
public static final String NAME = "local";

Expand All @@ -288,10 +345,6 @@ public Local(StreamInput in) throws IOException {
this(in.readString());
}

public static Local parse(XContentParser parser) {
return PARSER.apply(parser, null);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(type);
Expand Down Expand Up @@ -325,10 +378,6 @@ public Web(StreamInput in) throws IOException {
this();
}

public static Web parse(XContentParser parser) {
return PARSER.apply(parser, null);
}

@Override
public void writeTo(StreamOutput out) throws IOException {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class RestDeleteDatabaseConfigurationAction extends BaseRestHandler {

@Override
public List<Route> routes() {
return List.of(new Route(DELETE, "/_ingest/geoip/database/{id}"));
return List.of(new Route(DELETE, "/_ingest/ip_location/database/{id}"), new Route(DELETE, "/_ingest/geoip/database/{id}"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ public class RestGetDatabaseConfigurationAction extends BaseRestHandler {

@Override
public List<Route> routes() {
return List.of(new Route(GET, "/_ingest/geoip/database"), new Route(GET, "/_ingest/geoip/database/{id}"));
return List.of(
new Route(GET, "/_ingest/ip_location/database"),
new Route(GET, "/_ingest/ip_location/database/{id}"),
new Route(GET, "/_ingest/geoip/database"),
new Route(GET, "/_ingest/geoip/database/{id}")
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class RestPutDatabaseConfigurationAction extends BaseRestHandler {

@Override
public List<Route> routes() {
return List.of(new Route(PUT, "/_ingest/geoip/database/{id}"));
return List.of(new Route(PUT, "/_ingest/ip_location/database/{id}"), new Route(PUT, "/_ingest/geoip/database/{id}"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.ingest.geoip.IngestGeoIpMetadata;
import org.elasticsearch.ingest.geoip.direct.PutDatabaseConfigurationAction.Request;
import org.elasticsearch.injection.guice.Inject;
Expand All @@ -41,6 +42,8 @@
import java.util.Map;
import java.util.Optional;

import static org.elasticsearch.ingest.IngestGeoIpFeatures.PUT_DATABASE_CONFIGURATION_ACTION_IPINFO;

public class TransportPutDatabaseConfigurationAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {

private static final Logger logger = LogManager.getLogger(TransportPutDatabaseConfigurationAction.class);
Expand All @@ -58,6 +61,7 @@ public void taskSucceeded(UpdateDatabaseConfigurationTask task, Void unused) {
}
};

private final FeatureService featureService;
private final MasterServiceTaskQueue<UpdateDatabaseConfigurationTask> updateDatabaseConfigurationTaskQueue;

@Inject
Expand All @@ -66,7 +70,8 @@ public TransportPutDatabaseConfigurationAction(
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
FeatureService featureService
) {
super(
PutDatabaseConfigurationAction.NAME,
Expand All @@ -79,6 +84,7 @@ public TransportPutDatabaseConfigurationAction(
AcknowledgedResponse::readFrom,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.featureService = featureService;
this.updateDatabaseConfigurationTaskQueue = clusterService.createTaskQueue(
"update-geoip-database-configuration-state-update",
Priority.NORMAL,
Expand All @@ -89,6 +95,19 @@ public TransportPutDatabaseConfigurationAction(
@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
final String id = request.getDatabase().id();

// if this is an ipinfo configuration, then make sure the whole cluster supports that feature
if (request.getDatabase().provider() instanceof DatabaseConfiguration.Ipinfo
&& featureService.clusterHasFeature(clusterService.state(), PUT_DATABASE_CONFIGURATION_ACTION_IPINFO) == false) {
listener.onFailure(
new IllegalArgumentException(
"Unable to use ipinfo database configurations in mixed-clusters with nodes that do not support feature "
+ PUT_DATABASE_CONFIGURATION_ACTION_IPINFO.id()
)
);
return;
}

updateDatabaseConfigurationTaskQueue.submitTask(
Strings.format("update-geoip-database-configuration-[%s]", id),
new UpdateDatabaseConfigurationTask(listener, request.getDatabase()),
Expand Down
Loading

0 comments on commit 693fb95

Please sign in to comment.