Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use bool query for array form of IPs and wrapping with try/catch #335

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.nio.charset.StandardCharsets;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -44,8 +43,6 @@
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.MultiSearchRequestBuilder;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.master.AcknowledgedResponse;
Expand All @@ -63,6 +60,7 @@
import org.opensearch.geospatial.constants.IndexSetting;
import org.opensearch.geospatial.shared.Constants;
import org.opensearch.geospatial.shared.StashedThreadContext;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;

/**
Expand Down Expand Up @@ -255,15 +253,19 @@ public void getGeoIpData(final String indexName, final String ip, final ActionLi
.execute(new ActionListener<>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
if (searchResponse.getHits().getHits().length == 0) {
actionListener.onResponse(Collections.emptyMap());
} else {
Map<String, Object> geoIpData = (Map<String, Object>) XContentHelper.convertToMap(
searchResponse.getHits().getAt(0).getSourceRef(),
false,
XContentType.JSON
).v2().get(DATA_FIELD_NAME);
actionListener.onResponse(geoIpData);
try {
if (searchResponse.getHits().getHits().length == 0) {
actionListener.onResponse(Collections.emptyMap());
} else {
Map<String, Object> geoIpData = (Map<String, Object>) XContentHelper.convertToMap(
searchResponse.getHits().getAt(0).getSourceRef(),
false,
XContentType.JSON
).v2().get(DATA_FIELD_NAME);
actionListener.onResponse(geoIpData);
}
} catch (Exception e) {
actionListener.onFailure(e);
}
}

Expand All @@ -276,73 +278,56 @@ public void onFailure(final Exception e) {
}

/**
* Query a given index using a given ip address iterator to get geoip data
* Query a given index using a given list of ip addresses to get geoip data
*
* This method calls itself recursively until it processes all ip addresses in bulk of {@code bulkSize}.
*
* @param indexName the index name
* @param ipIterator the iterator of ip addresses
* @param geoIpData collected geo data
* @param actionListener the action listener
* @param indexName index
* @param ips list of ip addresses
* @param actionListener action listener
*/
public void getGeoIpData(
final String indexName,
final Iterator<String> ipIterator,
final Map<String, Map<String, Object>> geoIpData,
final ActionListener<Map<String, Map<String, Object>>> actionListener
final List<String> ips,
final ActionListener<List<Map<String, Object>>> actionListener
) {
MultiSearchRequestBuilder mRequestBuilder = client.prepareMultiSearch();

List<String> ipsToSearch = new ArrayList<>(BUNDLE_SIZE);
while (ipIterator.hasNext() && ipsToSearch.size() < BUNDLE_SIZE) {
String ip = ipIterator.next();
if (geoIpData.get(ip) == null) {
mRequestBuilder.add(
client.prepareSearch(indexName)
.setSize(1)
.setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))
.setPreference("_local")
.setRequestCache(true)
);
ipsToSearch.add(ip);
}
}

if (ipsToSearch.isEmpty()) {
actionListener.onResponse(geoIpData);
return;
}

StashedThreadContext.run(client, () -> mRequestBuilder.execute(new ActionListener<>() {
@Override
public void onResponse(final MultiSearchResponse items) {
for (int i = 0; i < ipsToSearch.size(); i++) {
if (items.getResponses()[i].isFailure()) {
actionListener.onFailure(items.getResponses()[i].getFailure());
return;
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
ips.stream().forEach(ip -> boolQueryBuilder.should(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip)));
StashedThreadContext.run(
client,
() -> client.prepareSearch(indexName)
.setSize(ips.size())
.setQuery(boolQueryBuilder)
.setPreference("_local")
.setRequestCache(true)
.execute(new ActionListener<>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
try {
actionListener.onResponse(toGeoIpDataList(searchResponse));
} catch (Exception e) {
actionListener.onFailure(e);
}
}

if (items.getResponses()[i].getResponse().getHits().getHits().length == 0) {
geoIpData.put(ipsToSearch.get(i), Collections.emptyMap());
continue;
@Override
public void onFailure(final Exception e) {
actionListener.onFailure(e);
}
})
);
}

Map<String, Object> data = (Map<String, Object>) XContentHelper.convertToMap(
items.getResponses()[i].getResponse().getHits().getAt(0).getSourceRef(),
false,
XContentType.JSON
).v2().get(DATA_FIELD_NAME);

geoIpData.put(ipsToSearch.get(i), data);
}
getGeoIpData(indexName, ipIterator, geoIpData, actionListener);
}
private List<Map<String, Object>> toGeoIpDataList(final SearchResponse searchResponse) {
if (searchResponse.getHits().getHits().length == 0) {
return Collections.emptyList();
}

@Override
public void onFailure(final Exception e) {
actionListener.onFailure(e);
}
}));
return Arrays.stream(searchResponse.getHits().getHits())
.map(
data -> (Map<String, Object>) XContentHelper.convertToMap(data.getSourceRef(), false, XContentType.JSON)
.v2()
.get(DATA_FIELD_NAME)
)
.collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
import static org.opensearch.ingest.ConfigurationUtils.readStringProperty;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
Expand All @@ -41,8 +40,6 @@
@Log4j2
public final class Ip2GeoProcessor extends AbstractProcessor {
private static final Map<String, Object> DATA_EXPIRED = Map.of("error", "ip2geo_data_expired");
private static final String PROPERTY_IP = "ip";

public static final String CONFIG_FIELD = "field";
public static final String CONFIG_TARGET_FIELD = "target_field";
public static final String CONFIG_DATASOURCE = "datasource";
Expand Down Expand Up @@ -111,19 +108,28 @@ public Ip2GeoProcessor(
*/
@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
Object ip = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);
try {
Object ip = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);

if (ip == null) {
handler.accept(ingestDocument, null);
return;
}
if (ip == null) {
handler.accept(ingestDocument, null);
return;
}

if (ip instanceof String) {
executeInternal(ingestDocument, handler, (String) ip);
} else if (ip instanceof List) {
executeInternal(ingestDocument, handler, ((List<?>) ip));
} else {
throw new IllegalArgumentException("field [" + field + "] should contain only string or array of strings");
if (ip instanceof String) {
executeInternal(ingestDocument, handler, (String) ip);
} else if (ip instanceof List) {
executeInternal(ingestDocument, handler, ((List<?>) ip));
} else {
handler.accept(
null,
new IllegalArgumentException(
String.format(Locale.ROOT, "field [%s] should contain only string or array of strings", field)
)
);
}
} catch (Exception e) {
handler.accept(null, e);
}
}

Expand Down Expand Up @@ -159,17 +165,11 @@ public void onResponse(final Datasource datasource) {
return;
}

geoIpDataFacade.getGeoIpData(indexName, ip, new ActionListener<>() {
@Override
public void onResponse(final Map<String, Object> ipToGeoData) {
handleSingleIp(ip, ipToGeoData, ingestDocument, handler);
}

@Override
public void onFailure(final Exception e) {
handler.accept(null, e);
}
});
try {
geoIpDataFacade.getGeoIpData(indexName, ip, getSingleGeoIpDataListener(ingestDocument, handler));
} catch (Exception e) {
handler.accept(null, e);
}
}

@Override
Expand All @@ -180,32 +180,44 @@ public void onFailure(final Exception e) {
}

@VisibleForTesting
protected void handleSingleIp(
final String ip,
final Map<String, Object> ipToGeoData,
protected ActionListener<Map<String, Object>> getSingleGeoIpDataListener(
final IngestDocument ingestDocument,
final BiConsumer<IngestDocument, Exception> handler
) {
if (ipToGeoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, filteredGeoData(ipToGeoData, ip));
}
handler.accept(ingestDocument, null);
return new ActionListener<>() {
@Override
public void onResponse(final Map<String, Object> ipToGeoData) {
try {
if (ipToGeoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, filteredGeoData(ipToGeoData));
}
handler.accept(ingestDocument, null);
} catch (Exception e) {
handler.accept(null, e);
}
}

@Override
public void onFailure(final Exception e) {
handler.accept(null, e);
}
};
}

private Map<String, Object> filteredGeoData(final Map<String, Object> geoData, final String ip) {
Map<String, Object> filteredGeoData;
private Map<String, Object> filteredGeoData(final Map<String, Object> geoData) {
if (properties == null) {
return geoData;
}

filteredGeoData = properties.stream()
.filter(p -> p.equals(PROPERTY_IP) == false)
.filter(p -> geoData.containsKey(p))
.collect(Collectors.toMap(p -> p, p -> geoData.get(p)));
if (properties.contains(PROPERTY_IP)) {
filteredGeoData.put(PROPERTY_IP, ip);
return properties.stream().filter(p -> geoData.containsKey(p)).collect(Collectors.toMap(p -> p, p -> geoData.get(p)));
}

private List<Map<String, Object>> filteredGeoData(final List<Map<String, Object>> geoData) {
if (properties == null) {
return geoData;
}
return filteredGeoData;

return geoData.stream().map(this::filteredGeoData).collect(Collectors.toList());
}

/**
Expand All @@ -221,13 +233,11 @@ protected void executeInternal(
final BiConsumer<IngestDocument, Exception> handler,
final List<?> ips
) {
Map<String, Map<String, Object>> data = new HashMap<>();
for (Object ip : ips) {
if (ip instanceof String == false) {
throw new IllegalArgumentException("array in field [" + field + "] should only contain strings");
}
}
List<String> ipList = (List<String>) ips;
datasourceFacade.getDatasource(datasourceName, new ActionListener<>() {
@Override
public void onResponse(final Datasource datasource) {
Expand All @@ -243,12 +253,11 @@ public void onResponse(final Datasource datasource) {
return;
}

geoIpDataFacade.getGeoIpData(
indexName,
ipList.iterator(),
data,
listenerToAppendDataToDocument(data, ipList, ingestDocument, handler)
);
try {
geoIpDataFacade.getGeoIpData(indexName, (List<String>) ips, getMultiGeoIpDataListener(ingestDocument, handler));
} catch (Exception e) {
handler.accept(null, e);
}
}

@Override
Expand All @@ -259,31 +268,21 @@ public void onFailure(final Exception e) {
}

@VisibleForTesting
protected ActionListener<Map<String, Map<String, Object>>> listenerToAppendDataToDocument(
final Map<String, Map<String, Object>> data,
final List<String> ipList,
protected ActionListener<List<Map<String, Object>>> getMultiGeoIpDataListener(
final IngestDocument ingestDocument,
final BiConsumer<IngestDocument, Exception> handler
) {
return new ActionListener<>() {
@Override
public void onResponse(final Map<String, Map<String, Object>> response) {
boolean match = false;
List<Map<String, Object>> geoDataList = new ArrayList<>(ipList.size());
for (String ipAddr : ipList) {
Map<String, Object> geoData = data.get(ipAddr);
// GeoData for ipAddr won't be null
geoDataList.add(geoData.isEmpty() ? null : filteredGeoData(geoData, ipAddr));
if (geoData.isEmpty() == false) {
match = true;
public void onResponse(final List<Map<String, Object>> ipToGeoData) {
try {
if (ipToGeoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, filteredGeoData(ipToGeoData));
}
}
if (match) {
ingestDocument.setFieldValue(targetField, geoDataList);
handler.accept(ingestDocument, null);
return;
} catch (Exception e) {
handler.accept(null, e);
}
handler.accept(ingestDocument, null);
}

@Override
Expand Down Expand Up @@ -384,7 +383,6 @@ private void validate(final String processorTag, final String datasourceName, fi

// Validate properties are valid. If not add all available properties.
final Set<String> availableProperties = new HashSet<>(datasource.getDatabase().getFields());
availableProperties.add(PROPERTY_IP);
for (String fieldName : propertyNames) {
if (availableProperties.contains(fieldName) == false) {
throw newConfigurationException(
Expand Down
Loading