diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java index dc3e431e..6b3d71d7 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java @@ -16,7 +16,6 @@ import java.nio.charset.StandardCharsets; import java.security.AccessController; import java.security.PrivilegedAction; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -44,8 +43,6 @@ import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.search.MultiSearchRequestBuilder; -import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.support.master.AcknowledgedResponse; @@ -63,6 +60,7 @@ import org.opensearch.geospatial.constants.IndexSetting; import org.opensearch.geospatial.shared.Constants; import org.opensearch.geospatial.shared.StashedThreadContext; +import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilders; /** @@ -255,15 +253,19 @@ public void getGeoIpData(final String indexName, final String ip, final ActionLi .execute(new ActionListener<>() { @Override public void onResponse(final SearchResponse searchResponse) { - if (searchResponse.getHits().getHits().length == 0) { - actionListener.onResponse(Collections.emptyMap()); - } else { - Map geoIpData = (Map) XContentHelper.convertToMap( - searchResponse.getHits().getAt(0).getSourceRef(), - false, - XContentType.JSON - ).v2().get(DATA_FIELD_NAME); - actionListener.onResponse(geoIpData); + try { + if (searchResponse.getHits().getHits().length == 0) { + actionListener.onResponse(Collections.emptyMap()); + } else { + Map geoIpData = (Map) XContentHelper.convertToMap( + searchResponse.getHits().getAt(0).getSourceRef(), + false, + XContentType.JSON + ).v2().get(DATA_FIELD_NAME); + actionListener.onResponse(geoIpData); + } + } catch (Exception e) { + actionListener.onFailure(e); } } @@ -276,73 +278,56 @@ public void onFailure(final Exception e) { } /** - * Query a given index using a given ip address iterator to get geoip data + * Query a given index using a given list of ip addresses to get geoip data * - * This method calls itself recursively until it processes all ip addresses in bulk of {@code bulkSize}. - * - * @param indexName the index name - * @param ipIterator the iterator of ip addresses - * @param geoIpData collected geo data - * @param actionListener the action listener + * @param indexName index + * @param ips list of ip addresses + * @param actionListener action listener */ public void getGeoIpData( final String indexName, - final Iterator ipIterator, - final Map> geoIpData, - final ActionListener>> actionListener + final List ips, + final ActionListener>> actionListener ) { - MultiSearchRequestBuilder mRequestBuilder = client.prepareMultiSearch(); - - List ipsToSearch = new ArrayList<>(BUNDLE_SIZE); - while (ipIterator.hasNext() && ipsToSearch.size() < BUNDLE_SIZE) { - String ip = ipIterator.next(); - if (geoIpData.get(ip) == null) { - mRequestBuilder.add( - client.prepareSearch(indexName) - .setSize(1) - .setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip)) - .setPreference("_local") - .setRequestCache(true) - ); - ipsToSearch.add(ip); - } - } - - if (ipsToSearch.isEmpty()) { - actionListener.onResponse(geoIpData); - return; - } - - StashedThreadContext.run(client, () -> mRequestBuilder.execute(new ActionListener<>() { - @Override - public void onResponse(final MultiSearchResponse items) { - for (int i = 0; i < ipsToSearch.size(); i++) { - if (items.getResponses()[i].isFailure()) { - actionListener.onFailure(items.getResponses()[i].getFailure()); - return; + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + ips.stream().forEach(ip -> boolQueryBuilder.should(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))); + StashedThreadContext.run( + client, + () -> client.prepareSearch(indexName) + .setSize(ips.size()) + .setQuery(boolQueryBuilder) + .setPreference("_local") + .setRequestCache(true) + .execute(new ActionListener<>() { + @Override + public void onResponse(final SearchResponse searchResponse) { + try { + actionListener.onResponse(toGeoIpDataList(searchResponse)); + } catch (Exception e) { + actionListener.onFailure(e); + } } - if (items.getResponses()[i].getResponse().getHits().getHits().length == 0) { - geoIpData.put(ipsToSearch.get(i), Collections.emptyMap()); - continue; + @Override + public void onFailure(final Exception e) { + actionListener.onFailure(e); } + }) + ); + } - Map data = (Map) XContentHelper.convertToMap( - items.getResponses()[i].getResponse().getHits().getAt(0).getSourceRef(), - false, - XContentType.JSON - ).v2().get(DATA_FIELD_NAME); - - geoIpData.put(ipsToSearch.get(i), data); - } - getGeoIpData(indexName, ipIterator, geoIpData, actionListener); - } + private List> toGeoIpDataList(final SearchResponse searchResponse) { + if (searchResponse.getHits().getHits().length == 0) { + return Collections.emptyList(); + } - @Override - public void onFailure(final Exception e) { - actionListener.onFailure(e); - } - })); + return Arrays.stream(searchResponse.getHits().getHits()) + .map( + data -> (Map) XContentHelper.convertToMap(data.getSourceRef(), false, XContentType.JSON) + .v2() + .get(DATA_FIELD_NAME) + ) + .collect(Collectors.toList()); } /** diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java index 5ce80a9a..18bb3d69 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java @@ -11,10 +11,9 @@ import static org.opensearch.ingest.ConfigurationUtils.readStringProperty; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.function.BiConsumer; @@ -41,8 +40,6 @@ @Log4j2 public final class Ip2GeoProcessor extends AbstractProcessor { private static final Map DATA_EXPIRED = Map.of("error", "ip2geo_data_expired"); - private static final String PROPERTY_IP = "ip"; - public static final String CONFIG_FIELD = "field"; public static final String CONFIG_TARGET_FIELD = "target_field"; public static final String CONFIG_DATASOURCE = "datasource"; @@ -111,19 +108,23 @@ public Ip2GeoProcessor( */ @Override public void execute(IngestDocument ingestDocument, BiConsumer handler) { - Object ip = ingestDocument.getFieldValue(field, Object.class, ignoreMissing); + try { + Object ip = ingestDocument.getFieldValue(field, Object.class, ignoreMissing); - if (ip == null) { - handler.accept(ingestDocument, null); - return; - } + if (ip == null) { + handler.accept(ingestDocument, null); + return; + } - if (ip instanceof String) { - executeInternal(ingestDocument, handler, (String) ip); - } else if (ip instanceof List) { - executeInternal(ingestDocument, handler, ((List) ip)); - } else { - throw new IllegalArgumentException("field [" + field + "] should contain only string or array of strings"); + if (ip instanceof String) { + executeInternal(ingestDocument, handler, (String) ip); + } else if (ip instanceof List) { + executeInternal(ingestDocument, handler, ((List) ip)); + } else { + handler.accept(null, new IllegalArgumentException(String.format(Locale.ROOT, "field [%s] should contain only string or array of strings", field))); + } + } catch (Exception e) { + handler.accept(null, e); } } @@ -159,17 +160,11 @@ public void onResponse(final Datasource datasource) { return; } - geoIpDataFacade.getGeoIpData(indexName, ip, new ActionListener<>() { - @Override - public void onResponse(final Map ipToGeoData) { - handleSingleIp(ip, ipToGeoData, ingestDocument, handler); - } - - @Override - public void onFailure(final Exception e) { - handler.accept(null, e); - } - }); + try { + geoIpDataFacade.getGeoIpData(indexName, ip, getSingleGeoIpDataListener(ingestDocument, handler)); + } catch (Exception e) { + handler.accept(null, e); + } } @Override @@ -180,32 +175,44 @@ public void onFailure(final Exception e) { } @VisibleForTesting - protected void handleSingleIp( - final String ip, - final Map ipToGeoData, + protected ActionListener> getSingleGeoIpDataListener( final IngestDocument ingestDocument, final BiConsumer handler ) { - if (ipToGeoData.isEmpty() == false) { - ingestDocument.setFieldValue(targetField, filteredGeoData(ipToGeoData, ip)); - } - handler.accept(ingestDocument, null); + return new ActionListener<>() { + @Override + public void onResponse(final Map ipToGeoData) { + try { + if (ipToGeoData.isEmpty() == false) { + ingestDocument.setFieldValue(targetField, filteredGeoData(ipToGeoData)); + } + handler.accept(ingestDocument, null); + } catch (Exception e) { + handler.accept(null, e); + } + } + + @Override + public void onFailure(final Exception e) { + handler.accept(null, e); + } + }; } - private Map filteredGeoData(final Map geoData, final String ip) { - Map filteredGeoData; + private Map filteredGeoData(final Map geoData) { if (properties == null) { return geoData; } - filteredGeoData = properties.stream() - .filter(p -> p.equals(PROPERTY_IP) == false) - .filter(p -> geoData.containsKey(p)) - .collect(Collectors.toMap(p -> p, p -> geoData.get(p))); - if (properties.contains(PROPERTY_IP)) { - filteredGeoData.put(PROPERTY_IP, ip); + return properties.stream().filter(p -> geoData.containsKey(p)).collect(Collectors.toMap(p -> p, p -> geoData.get(p))); + } + + private List> filteredGeoData(final List> geoData) { + if (properties == null) { + return geoData; } - return filteredGeoData; + + return geoData.stream().map(this::filteredGeoData).collect(Collectors.toList()); } /** @@ -221,13 +228,11 @@ protected void executeInternal( final BiConsumer handler, final List ips ) { - Map> data = new HashMap<>(); for (Object ip : ips) { if (ip instanceof String == false) { throw new IllegalArgumentException("array in field [" + field + "] should only contain strings"); } } - List ipList = (List) ips; datasourceFacade.getDatasource(datasourceName, new ActionListener<>() { @Override public void onResponse(final Datasource datasource) { @@ -243,12 +248,11 @@ public void onResponse(final Datasource datasource) { return; } - geoIpDataFacade.getGeoIpData( - indexName, - ipList.iterator(), - data, - listenerToAppendDataToDocument(data, ipList, ingestDocument, handler) - ); + try { + geoIpDataFacade.getGeoIpData(indexName, (List) ips, getMultiGeoIpDataListener(ingestDocument, handler)); + } catch (Exception e) { + handler.accept(null, e); + } } @Override @@ -259,31 +263,21 @@ public void onFailure(final Exception e) { } @VisibleForTesting - protected ActionListener>> listenerToAppendDataToDocument( - final Map> data, - final List ipList, + protected ActionListener>> getMultiGeoIpDataListener( final IngestDocument ingestDocument, final BiConsumer handler ) { return new ActionListener<>() { @Override - public void onResponse(final Map> response) { - boolean match = false; - List> geoDataList = new ArrayList<>(ipList.size()); - for (String ipAddr : ipList) { - Map geoData = data.get(ipAddr); - // GeoData for ipAddr won't be null - geoDataList.add(geoData.isEmpty() ? null : filteredGeoData(geoData, ipAddr)); - if (geoData.isEmpty() == false) { - match = true; + public void onResponse(final List> ipToGeoData) { + try { + if (ipToGeoData.isEmpty() == false) { + ingestDocument.setFieldValue(targetField, filteredGeoData(ipToGeoData)); } - } - if (match) { - ingestDocument.setFieldValue(targetField, geoDataList); handler.accept(ingestDocument, null); - return; + } catch (Exception e) { + handler.accept(null, e); } - handler.accept(ingestDocument, null); } @Override @@ -384,7 +378,6 @@ private void validate(final String processorTag, final String datasourceName, fi // Validate properties are valid. If not add all available properties. final Set availableProperties = new HashSet<>(datasource.getDatabase().getFields()); - availableProperties.add(PROPERTY_IP); for (String fieldName : propertyNames) { if (availableProperties.contains(fieldName) == false) { throw newConfigurationException( diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java index 6ddac0a5..9e51f23b 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java @@ -11,7 +11,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade.BUNDLE_SIZE; import static org.opensearch.geospatial.ip2geo.jobscheduler.Datasource.IP2GEO_DATA_INDEX_NAME_PREFIX; import java.io.File; @@ -20,9 +19,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Instant; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Locale; @@ -38,8 +35,6 @@ import org.mockito.ArgumentCaptor; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; -import org.opensearch.action.ActionRequest; -import org.opensearch.action.ActionType; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; @@ -47,12 +42,9 @@ import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; -import org.opensearch.action.search.MultiSearchRequest; -import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.master.AcknowledgedResponse; -import org.opensearch.common.Randomness; import org.opensearch.common.Strings; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.bytes.BytesReference; @@ -229,7 +221,7 @@ public void testPutGeoIpData_whenValidInput_thenSucceed() { } } - public void testGetSingleGeoIpData() { + public void testGetGeoIpData_whenSingleIp_thenSucceed() { String indexName = GeospatialTestHelper.randomLowerCaseString(); String ip = randomIpAddress(); verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { @@ -261,80 +253,35 @@ public void testGetSingleGeoIpData() { assertEquals("seattle", captor.getValue().get("city")); } - public void testGetGeoIpData_whenAllDataIsGathered_thenNoMoreSearch() { + public void testGetGeoIpData_whenMultiIps_thenSucceed() { String indexName = GeospatialTestHelper.randomLowerCaseString(); - String ip1 = randomIpAddress(); - String ip2 = randomIpAddress(); - Iterator ipIterator = Arrays.asList(ip1, ip2).iterator(); - Map> geoData = new HashMap<>(); - geoData.put(ip1, Map.of("city", "Seattle")); - geoData.put(ip2, Map.of("city", "Hawaii")); - ActionListener>> actionListener = mock(ActionListener.class); - - // Run - verifyingGeoIpDataFacade.getGeoIpData(indexName, ipIterator, geoData, actionListener); - - // Verify - verify(actionListener).onResponse(geoData); - } - - public void testGetGeoIpData_whenCalled_thenGetGeoIpData() { - String indexName = GeospatialTestHelper.randomLowerCaseString(); - int dataSize = Randomness.get().nextInt(10) + 1; - List ips = new ArrayList<>(); - for (int i = 0; i < dataSize; i++) { - ips.add(randomIpAddress()); - } - Map> geoData = new HashMap<>(); - ActionListener>> actionListener = mock(ActionListener.class); - - List cities = new ArrayList<>(); + String ip = randomIpAddress(); verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { - assert actionRequest instanceof MultiSearchRequest; - MultiSearchRequest request = (MultiSearchRequest) actionRequest; - for (SearchRequest searchRequest : request.requests()) { - assertEquals("_local", searchRequest.preference()); - assertEquals(1, searchRequest.source().size()); - } + assert actionRequest instanceof SearchRequest; + SearchRequest request = (SearchRequest) actionRequest; + assertEquals("_local", request.preference()); + assertEquals(1, request.source().size()); + assertEquals(QueryBuilders.boolQuery().should(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip)), request.source().query()); - MultiSearchResponse.Item[] items = new MultiSearchResponse.Item[request.requests().size()]; - for (int i = 0; i < request.requests().size(); i++) { - String city = GeospatialTestHelper.randomLowerCaseString(); - cities.add(city); - String data = String.format( - Locale.ROOT, - "{\"%s\":\"1.0.0.1/16\",\"%s\":{\"city\":\"%s\"}}", - IP_RANGE_FIELD_NAME, - DATA_FIELD_NAME, - city - ); - SearchHit searchHit = new SearchHit(1); - searchHit.sourceRef(BytesReference.fromByteBuffer(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)))); - SearchHit[] searchHitArray = { searchHit }; - SearchHits searchHits = new SearchHits(searchHitArray, new TotalHits(1l, TotalHits.Relation.EQUAL_TO), 1); - SearchResponse searchResponse = mock(SearchResponse.class); - when(searchResponse.getHits()).thenReturn(searchHits); - MultiSearchResponse.Item item = mock(MultiSearchResponse.Item.class); - when(item.isFailure()).thenReturn(false); - when(item.getResponse()).thenReturn(searchResponse); - items[i] = item; - } - MultiSearchResponse response = mock(MultiSearchResponse.class); - when(response.getResponses()).thenReturn(items); + String data = String.format( + Locale.ROOT, + "{\"%s\":\"1.0.0.1/16\",\"%s\":{\"city\":\"seattle\"}}", + IP_RANGE_FIELD_NAME, + DATA_FIELD_NAME + ); + SearchHit searchHit = new SearchHit(1); + searchHit.sourceRef(BytesReference.fromByteBuffer(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)))); + SearchHit[] searchHitArray = { searchHit }; + SearchHits searchHits = new SearchHits(searchHitArray, new TotalHits(1l, TotalHits.Relation.EQUAL_TO), 1); + + SearchResponse response = mock(SearchResponse.class); + when(response.getHits()).thenReturn(searchHits); return response; }); - - // Run - verifyingGeoIpDataFacade.getGeoIpData(indexName, ips.iterator(), geoData, actionListener); - - // Verify - verify(verifyingClient, times((dataSize + BUNDLE_SIZE - 1) / BUNDLE_SIZE)).execute( - any(ActionType.class), - any(ActionRequest.class), - any(ActionListener.class) - ); - for (int i = 0; i < dataSize; i++) { - assertEquals(cities.get(i), geoData.get(ips.get(i)).get("city")); - } + ActionListener>> listener = mock(ActionListener.class); + verifyingGeoIpDataFacade.getGeoIpData(indexName, Arrays.asList(ip), listener); + ArgumentCaptor>> captor = ArgumentCaptor.forClass(List.class); + verify(listener).onResponse(captor.capture()); + assertEquals("seattle", captor.getValue().get(0).get("city")); } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorIT.java b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorIT.java index bf447a5a..aeface41 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorIT.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorIT.java @@ -86,7 +86,7 @@ public void testCreateIp2GeoProcessor_whenValidInput_thenAddData() { Ip2GeoProcessor.CONFIG_TARGET_FIELD, targetField, Ip2GeoProcessor.CONFIG_PROPERTIES, - Arrays.asList(IP, CITY) + Arrays.asList(CITY) ); // Create ip2geo processor @@ -104,10 +104,10 @@ public void testCreateIp2GeoProcessor_whenValidInput_thenAddData() { // Verify data added to document List> sources = convertToListOfSources(response, targetField); - sources.stream().forEach(source -> { - assertFalse(source.containsKey(COUNTRY)); - assertEquals(sampleData.get(source.get(IP)).get(CITY), source.get(CITY)); - }); + sources.stream().allMatch(source -> source.size() == 1); + List cities = sources.stream().map(value -> value.get(CITY)).collect(Collectors.toList()); + List expectedCities = sampleData.values().stream().map(value -> value.get(CITY)).collect(Collectors.toList()); + assertEquals(expectedCities, cities); // Delete datasource fails when there is a process using it ResponseException deleteException = expectThrows(ResponseException.class, () -> deleteDatasource(datasourceName)); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java index 2a7ce296..c3dbf765 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java @@ -6,18 +6,17 @@ package org.opensearch.geospatial.ip2geo.processor; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; @@ -71,9 +70,9 @@ public void testCreateWithInvalidDatasourceState() { assertTrue(exception.getDetailedMessage().contains("available state")); } - public void testCreateWithInvalidProperties() { + public void testCreateIp2GeoProcessor_whenInvalidProperties_thenException() { Map config = new HashMap<>(); - config.put("properties", Arrays.asList("ip", "invalid_property")); + config.put("properties", Arrays.asList(SUPPORTED_FIELDS.get(0), "invalid_property")); OpenSearchException exception = expectThrows( OpenSearchException.class, () -> createProcessor(GeospatialTestHelper.randomLowerCaseString(), config) @@ -94,25 +93,33 @@ public void testExecuteWithNoIpAndIgnoreMissing() throws Exception { processor.execute(document, handler); } - public void testExecuteWithNoIp() throws Exception { + public void testExecute_whenNoIp_thenException() throws Exception { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Map config = new HashMap<>(); Ip2GeoProcessor processor = createProcessor(datasourceName, config); IngestDocument document = new IngestDocument(new HashMap<>(), new HashMap<>()); - BiConsumer handler = (doc, e) -> {}; - IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(document, handler)); - assertTrue(exception.getMessage().contains("not present")); + BiConsumer handler = mock(BiConsumer.class); + + // Run + processor.execute(document, handler); + + // Verify + verify(handler).accept(isNull(), any(IllegalArgumentException.class)); } - public void testExecuteWithNonStringValue() throws Exception { + public void testExecute_whenNonStringValue_thenException() throws Exception { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); Map source = new HashMap<>(); source.put("ip", Randomness.get().nextInt()); IngestDocument document = new IngestDocument(source, new HashMap<>()); - BiConsumer handler = (doc, e) -> {}; - IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(document, handler)); - assertTrue(exception.getMessage().contains("string")); + BiConsumer handler = mock(BiConsumer.class); + + // Run + processor.execute(document, handler); + + // Verify + verify(handler).accept(isNull(), any(IllegalArgumentException.class)); } public void testExecuteWithNullDatasource() throws Exception { @@ -133,33 +140,6 @@ public void testExecuteWithExpiredDatasource() throws Exception { getActionListener(Collections.emptyMap(), handler).onResponse(datasource); } - public void testExecute() throws Exception { - Map ip2geoData = new HashMap<>(); - for (String field : SUPPORTED_FIELDS) { - ip2geoData.put(field, GeospatialTestHelper.randomLowerCaseString()); - } - - Datasource datasource = mock(Datasource.class); - when(datasource.isExpired()).thenReturn(false); - when(datasource.currentIndexName()).thenReturn(GeospatialTestHelper.randomLowerCaseString()); - BiConsumer handler = (doc, e) -> { - assertEquals( - ip2geoData.get(SUPPORTED_FIELDS.get(0)), - doc.getFieldValue(DEFAULT_TARGET_FIELD + "." + SUPPORTED_FIELDS.get(0), String.class) - ); - for (int i = 1; i < SUPPORTED_FIELDS.size(); i++) { - assertNull(doc.getFieldValue(DEFAULT_TARGET_FIELD + "." + SUPPORTED_FIELDS.get(i), String.class, true)); - } - assertNull(e); - }; - Map config = Map.of("properties", Arrays.asList(SUPPORTED_FIELDS.get(0))); - getActionListener(config, handler).onResponse(datasource); - - ArgumentCaptor>> captor = ArgumentCaptor.forClass(ActionListener.class); - verify(geoIpDataFacade).getGeoIpData(anyString(), anyString(), captor.capture()); - captor.getValue().onResponse(ip2geoData); - } - private ActionListener getActionListener( final Map config, final BiConsumer handler @@ -178,107 +158,155 @@ private ActionListener getActionListener( return captor.getValue(); } - public void testExecuteNotImplemented() throws Exception { + @SneakyThrows + public void testExecuteInternal_whenSingleIp_thenGetDatasourceIsCalled() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); - IngestDocument document = new IngestDocument(Collections.emptyMap(), Collections.emptyMap()); - Exception e = expectThrows(IllegalStateException.class, () -> processor.execute(document)); - assertTrue(e.getMessage().contains("Not implemented")); - } + Map source = new HashMap<>(); + String ip = randomIpAddress(); + source.put("ip", ip); + IngestDocument document = new IngestDocument(source, new HashMap<>()); - public void testGenerateDataToAppendWithNoData() throws Exception { - String datasourceName = GeospatialTestHelper.randomLowerCaseString(); - Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); - List ips = new ArrayList<>(); - Map> data = new HashMap<>(); - for (int i = 0; i < 3; i++) { - String ip = randomIpAddress(); - ips.add(ip); - data.put(ip, Collections.emptyMap()); - } - IngestDocument document = new IngestDocument(new HashMap<>(), new HashMap<>()); BiConsumer handler = mock(BiConsumer.class); - processor.listenerToAppendDataToDocument(data, ips, document, handler).onResponse(data); - verify(handler).accept(document, null); - Exception e = expectThrows(IllegalArgumentException.class, () -> document.getFieldValue(DEFAULT_TARGET_FIELD, Map.class)); - assertTrue(e.getMessage().contains("not present")); + processor.executeInternal(document, handler, ip); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(datasourceFacade).getDatasource(eq(datasourceName), captor.capture()); + Datasource datasource = mock(Datasource.class); + when(datasource.isExpired()).thenReturn(false); + when(datasource.currentIndexName()).thenReturn(GeospatialTestHelper.randomLowerCaseString()); + + captor.getValue().onResponse(datasource); + verify(geoIpDataFacade).getGeoIpData(anyString(), anyString(), any(ActionListener.class)); } - public void testExecuteInternalNonStringIp() throws Exception { + @SneakyThrows + public void testGetSingleGeoIpDataListener_whenNoPropertySet_thenAddAllProperties() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); - List ips = Arrays.asList(randomIpAddress(), 1); Map source = new HashMap<>(); String ip = randomIpAddress(); source.put("ip", ip); IngestDocument document = new IngestDocument(source, new HashMap<>()); - BiConsumer handler = mock(BiConsumer.class); - Exception e = expectThrows(IllegalArgumentException.class, () -> processor.executeInternal(document, handler, ips)); - assertTrue(e.getMessage().contains("should only contain strings")); + + Map geoIpData = Map.of("city", "Seattle", "country", "USA"); + // Run + processor.getSingleGeoIpDataListener(document, handler).onResponse(geoIpData); + + // Verify + assertEquals("Seattle", document.getFieldValue(DEFAULT_TARGET_FIELD + ".city", String.class)); + assertEquals("USA", document.getFieldValue(DEFAULT_TARGET_FIELD + ".country", String.class)); + verify(handler).accept(document, null); } @SneakyThrows - public void testExecuteInternal_whenCalled_thenGetDatasourceIsCalled() { + public void testGetSingleGeoIpDataListener_whenPropertySet_thenAddOnlyTheProperties() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); - Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); - List ips = Arrays.asList(randomIpAddress(), randomIpAddress()); + Ip2GeoProcessor processor = createProcessor(datasourceName, Map.of("properties", Arrays.asList("city"))); Map source = new HashMap<>(); String ip = randomIpAddress(); source.put("ip", ip); IngestDocument document = new IngestDocument(source, new HashMap<>()); - BiConsumer handler = mock(BiConsumer.class); - processor.executeInternal(document, handler, ips); - ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); - verify(datasourceFacade).getDatasource(eq(datasourceName), captor.capture()); - Datasource datasource = mock(Datasource.class); - when(datasource.isExpired()).thenReturn(false); - when(datasource.currentIndexName()).thenReturn(GeospatialTestHelper.randomLowerCaseString()); - captor.getValue().onResponse(datasource); - verify(geoIpDataFacade).getGeoIpData(anyString(), any(Iterator.class), anyMap(), any(ActionListener.class)); + Map geoIpData = Map.of("city", "Seattle", "country", "USA"); + // Run + processor.getSingleGeoIpDataListener(document, handler).onResponse(geoIpData); + + // Verify + assertEquals("Seattle", document.getFieldValue(DEFAULT_TARGET_FIELD + ".city", String.class)); + assertFalse(document.hasField(DEFAULT_TARGET_FIELD + ".country")); + verify(handler).accept(document, null); } @SneakyThrows - public void testHandleSingleIp_whenEmptyGeoData_thenTargetFieldShouldNotSet() { + public void testGetMultiGeoIpDataListener_whenNoPropertySet_thenAddAllProperties() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); Map source = new HashMap<>(); String ip = randomIpAddress(); source.put("ip", ip); - Map ipToGeoData = Collections.emptyMap(); IngestDocument document = new IngestDocument(source, new HashMap<>()); BiConsumer handler = mock(BiConsumer.class); + Map geoIpData = Map.of("city", "Seattle", "country", "USA"); // Run - processor.handleSingleIp(ip, ipToGeoData, document, handler); + processor.getMultiGeoIpDataListener(document, handler).onResponse(Arrays.asList(geoIpData)); // Verify - assertFalse(document.hasField(DEFAULT_TARGET_FIELD)); + assertEquals(1, document.getFieldValue(DEFAULT_TARGET_FIELD, List.class).size()); + assertEquals("Seattle", document.getFieldValue(DEFAULT_TARGET_FIELD + ".0.city", String.class)); + assertEquals("USA", document.getFieldValue(DEFAULT_TARGET_FIELD + ".0.country", String.class)); verify(handler).accept(document, null); } @SneakyThrows - public void testHandleSingleIp_whenNoValueForGivenProperty_thenDoNotAdd() { + public void testGetMultiGeoIpDataListener_whenPropertySet_thenAddOnlyTheProperties() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); - Ip2GeoProcessor processor = createProcessor(datasourceName, Map.of("properties", Arrays.asList("city", "country"))); + Ip2GeoProcessor processor = createProcessor(datasourceName, Map.of("properties", Arrays.asList("city"))); Map source = new HashMap<>(); String ip = randomIpAddress(); source.put("ip", ip); - Map ipToGeoData = Map.of("country", "USA"); IngestDocument document = new IngestDocument(source, new HashMap<>()); BiConsumer handler = mock(BiConsumer.class); + Map geoIpData = Map.of("city", "Seattle", "country", "USA"); // Run - processor.handleSingleIp(ip, ipToGeoData, document, handler); + processor.getMultiGeoIpDataListener(document, handler).onResponse(Arrays.asList(geoIpData)); // Verify - assertEquals("USA", document.getFieldValue(DEFAULT_TARGET_FIELD, Map.class).get("country")); - assertNull(document.getFieldValue(DEFAULT_TARGET_FIELD, Map.class).get("city")); + assertEquals(1, document.getFieldValue(DEFAULT_TARGET_FIELD, List.class).size()); + assertEquals("Seattle", document.getFieldValue(DEFAULT_TARGET_FIELD + ".0.city", String.class)); + assertFalse(document.hasField(DEFAULT_TARGET_FIELD + ".0.country")); verify(handler).accept(document, null); } + public void testExecuteNotImplemented() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); + IngestDocument document = new IngestDocument(Collections.emptyMap(), Collections.emptyMap()); + Exception e = expectThrows(IllegalStateException.class, () -> processor.execute(document)); + assertTrue(e.getMessage().contains("Not implemented")); + } + + public void testExecuteInternalNonStringIp() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); + List ips = Arrays.asList(randomIpAddress(), 1); + Map source = new HashMap<>(); + String ip = randomIpAddress(); + source.put("ip", ip); + IngestDocument document = new IngestDocument(source, new HashMap<>()); + + BiConsumer handler = mock(BiConsumer.class); + Exception e = expectThrows(IllegalArgumentException.class, () -> processor.executeInternal(document, handler, ips)); + assertTrue(e.getMessage().contains("should only contain strings")); + } + + @SneakyThrows + public void testExecuteInternal_whenMultiIps_thenGetDatasourceIsCalled() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); + List ips = Arrays.asList(randomIpAddress(), randomIpAddress()); + Map source = new HashMap<>(); + String ip = randomIpAddress(); + source.put("ip", ip); + IngestDocument document = new IngestDocument(source, new HashMap<>()); + + BiConsumer handler = mock(BiConsumer.class); + processor.executeInternal(document, handler, ips); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(datasourceFacade).getDatasource(eq(datasourceName), captor.capture()); + Datasource datasource = mock(Datasource.class); + when(datasource.isExpired()).thenReturn(false); + when(datasource.currentIndexName()).thenReturn(GeospatialTestHelper.randomLowerCaseString()); + + captor.getValue().onResponse(datasource); + verify(geoIpDataFacade).getGeoIpData(anyString(), anyList(), any(ActionListener.class)); + } + private Ip2GeoProcessor createProcessor(final String datasourceName, final Map config) throws Exception { Datasource datasource = new Datasource(); datasource.setName(datasourceName);