diff --git a/distribution/tools/geoip-cli/src/main/java/org/elasticsearch/geoip/GeoIpCli.java b/distribution/tools/geoip-cli/src/main/java/org/elasticsearch/geoip/GeoIpCli.java index 5667e0379210f..d04e9baf0fb81 100644 --- a/distribution/tools/geoip-cli/src/main/java/org/elasticsearch/geoip/GeoIpCli.java +++ b/distribution/tools/geoip-cli/src/main/java/org/elasticsearch/geoip/GeoIpCli.java @@ -30,9 +30,9 @@ import java.security.DigestInputStream; import java.security.MessageDigest; import java.util.Arrays; -import java.util.List; import java.util.Locale; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.zip.GZIPOutputStream; import static java.nio.file.StandardOpenOption.CREATE; @@ -42,9 +42,8 @@ public class GeoIpCli extends Command { private static final byte[] EMPTY_BUF = new byte[512]; - // visible for testing - final OptionSpec sourceDirectory; - final OptionSpec targetDirectory; + private final OptionSpec sourceDirectory; + private final OptionSpec targetDirectory; public GeoIpCli() { super("A CLI tool to prepare local GeoIp database service", () -> {}); @@ -58,7 +57,7 @@ protected void execute(Terminal terminal, OptionSet options) throws Exception { Path source = getPath(options.valueOf(sourceDirectory)); String targetString = options.valueOf(targetDirectory); Path target = targetString != null ? getPath(targetString) : source; - copyTgzToTarget(terminal, source, target); + copyTgzToTarget(source, target); packDatabasesToTgz(terminal, source, target); createOverviewJson(terminal, target); } @@ -68,49 +67,49 @@ private Path getPath(String file) { return PathUtils.get(file); } - private void copyTgzToTarget(Terminal terminal, Path source, Path target) throws IOException { + private void copyTgzToTarget(Path source, Path target) throws IOException { if (source.equals(target)) { return; } - List toCopy = Files.list(source).filter(p -> p.getFileName().toString().endsWith(".tgz")).collect(Collectors.toList()); - for (Path path : toCopy) { - Files.copy(path, target.resolve(path.getFileName()), StandardCopyOption.REPLACE_EXISTING); + try (Stream files = Files.list(source)) { + for (Path path : files.filter(p -> p.getFileName().toString().endsWith(".tgz")).collect(Collectors.toList())) { + Files.copy(path, target.resolve(path.getFileName()), StandardCopyOption.REPLACE_EXISTING); + } } } private void packDatabasesToTgz(Terminal terminal, Path source, Path target) throws IOException { - List toPack = Files.list(source).filter(p -> p.getFileName().toString().endsWith(".mmdb")).collect(Collectors.toList()); - for (Path path : toPack) { - String fileName = path.getFileName().toString(); - Path compressedPath = target.resolve(fileName.replaceAll("mmdb$", "") + "tgz"); - terminal.println("Found " + fileName + ", will compress it to " + compressedPath.getFileName()); - try ( - OutputStream fos = Files.newOutputStream(compressedPath, TRUNCATE_EXISTING, CREATE); - OutputStream gos = new GZIPOutputStream(new BufferedOutputStream(fos)) - ) { - long size = Files.size(path); - gos.write(createTarHeader(fileName, size)); - Files.copy(path, gos); - if (size % 512 != 0) { - gos.write(EMPTY_BUF, 0, (int) (512 - (size % 512))); + try (Stream files = Files.list(source)) { + for (Path path : files.filter(p -> p.getFileName().toString().endsWith(".mmdb")).collect(Collectors.toList())) { + String fileName = path.getFileName().toString(); + Path compressedPath = target.resolve(fileName.replaceAll("mmdb$", "") + "tgz"); + terminal.println("Found " + fileName + ", will compress it to " + compressedPath.getFileName()); + try ( + OutputStream fos = Files.newOutputStream(compressedPath, TRUNCATE_EXISTING, CREATE); + OutputStream gos = new GZIPOutputStream(new BufferedOutputStream(fos)) + ) { + long size = Files.size(path); + gos.write(createTarHeader(fileName, size)); + Files.copy(path, gos); + if (size % 512 != 0) { + gos.write(EMPTY_BUF, 0, (int) (512 - (size % 512))); + } + gos.write(EMPTY_BUF); + gos.write(EMPTY_BUF); } - gos.write(EMPTY_BUF); - gos.write(EMPTY_BUF); } } } private void createOverviewJson(Terminal terminal, Path directory) throws IOException { - List databasesPaths = Files.list(directory) - .filter(p -> p.getFileName().toString().endsWith(".tgz")) - .collect(Collectors.toList()); Path overview = directory.resolve("overview.json"); try ( + Stream files = Files.list(directory); OutputStream os = new BufferedOutputStream(Files.newOutputStream(overview, TRUNCATE_EXISTING, CREATE)); XContentGenerator generator = XContentType.JSON.xContent().createGenerator(os) ) { generator.writeStartArray(); - for (Path db : databasesPaths) { + for (Path db : files.filter(p -> p.getFileName().toString().endsWith(".tgz")).collect(Collectors.toList())) { terminal.println("Adding " + db.getFileName() + " to overview.json"); MessageDigest md5 = MessageDigests.md5(); try (InputStream dis = new DigestInputStream(new BufferedInputStream(Files.newInputStream(db)), md5)) { diff --git a/distribution/tools/geoip-cli/src/test/java/org/elasticsearch/geoip/GeoIpCliTests.java b/distribution/tools/geoip-cli/src/test/java/org/elasticsearch/geoip/GeoIpCliTests.java index e66a07f82e143..29b824561b529 100644 --- a/distribution/tools/geoip-cli/src/test/java/org/elasticsearch/geoip/GeoIpCliTests.java +++ b/distribution/tools/geoip-cli/src/test/java/org/elasticsearch/geoip/GeoIpCliTests.java @@ -34,7 +34,6 @@ import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/71145") @LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") // Don't randomly add 'extra' files to directory. public class GeoIpCliTests extends LuceneTestCase { diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java index 2684179df0548..7d3aa747a47d3 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java @@ -341,7 +341,7 @@ void retrieveDatabase(String databaseName, // (the chance that the documents change is rare, given the low frequency of the updates for these databases) for (int chunk = firstChunk; chunk <= lastChunk; chunk++) { SearchRequest searchRequest = new SearchRequest(GeoIpDownloader.DATABASES_INDEX); - String id = String.format(Locale.ROOT, "%s_%d", databaseName, chunk); + String id = String.format(Locale.ROOT, "%s_%d_%d", databaseName, chunk, metadata.getLastUpdate()); searchRequest.source().query(new TermQueryBuilder("_id", id)); // At most once a day a few searches may be executed to fetch the new files, diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java index 96b0b69720d4b..c9bc2baed8382 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java @@ -81,8 +81,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask { private volatile GeoIpDownloaderStats stats = GeoIpDownloaderStats.EMPTY; GeoIpDownloader(Client client, HttpClient httpClient, ClusterService clusterService, ThreadPool threadPool, Settings settings, - long id, String type, String action, String description, TaskId parentTask, - Map headers) { + long id, String type, String action, String description, TaskId parentTask, Map headers) { super(id, type, action, description, parentTask, headers); this.httpClient = httpClient; this.client = new OriginSettingClient(client, IngestService.INGEST_ORIGIN); @@ -139,9 +138,9 @@ void processDatabase(Map databaseInfo) { long start = System.currentTimeMillis(); try (InputStream is = httpClient.get(url)) { int firstChunk = state.contains(name) ? state.get(name).getLastChunk() + 1 : 0; - int lastChunk = indexChunks(name, is, firstChunk, md5); + int lastChunk = indexChunks(name, is, firstChunk, md5, start); if (lastChunk > firstChunk) { - state = state.put(name, new Metadata(System.currentTimeMillis(), firstChunk, lastChunk - 1, md5)); + state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5)); updateTaskState(); stats = stats.successfulDownload(System.currentTimeMillis() - start).count(state.getDatabases().size()); logger.info("updated geoip database [" + name + "]"); @@ -180,11 +179,11 @@ void updateTaskState() { } //visible for testing - int indexChunks(String name, InputStream is, int chunk, String expectedMd5) throws IOException { + int indexChunks(String name, InputStream is, int chunk, String expectedMd5, long timestamp) throws IOException { MessageDigest md = MessageDigests.md5(); for (byte[] buf = getChunk(is); buf.length != 0; buf = getChunk(is)) { md.update(buf); - client.prepareIndex(DATABASES_INDEX).setId(name + "_" + chunk) + client.prepareIndex(DATABASES_INDEX).setId(name + "_" + chunk + "_" + timestamp) .setCreate(true) .setSource(XContentType.SMILE, "name", name, "chunk", chunk, "data", buf) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java index 730405ec3a97a..3deb7b00bb465 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java @@ -90,8 +90,8 @@ public GeoIpDownloaderStats skippedDownload() { } public GeoIpDownloaderStats successfulDownload(long downloadTime) { - return new GeoIpDownloaderStats(successfulDownloads + 1, failedDownloads, totalDownloadTime + downloadTime, databasesCount, - skippedDownloads); + return new GeoIpDownloaderStats(successfulDownloads + 1, failedDownloads, totalDownloadTime + Math.max(downloadTime, 0), + databasesCount, skippedDownloads); } public GeoIpDownloaderStats failedDownload() { diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java index d7745d76e8172..bc28b5a77bb81 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java @@ -62,6 +62,7 @@ import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -82,7 +83,6 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -122,7 +122,7 @@ public void cleanup() { resourceWatcherService.close(); threadPool.shutdownNow(); } - + public void testCheckDatabases() throws Exception { String md5 = mockSearches("GeoIP2-City.mmdb", 5, 14); String taskId = GeoIpDownloader.GEOIP_DOWNLOADER; @@ -258,6 +258,7 @@ private String mockSearches(String databaseName, int firstChunk, int lastChunk) List data = gzip(databaseName, dummyContent, lastChunk - firstChunk + 1); assertThat(gunzip(data), equalTo(dummyContent)); + Map> requestMap = new HashMap<>(); for (int i = firstChunk; i <= lastChunk; i++) { byte[] chunk = data.get(i - firstChunk); SearchHit hit = new SearchHit(i); @@ -270,17 +271,20 @@ private String mockSearches(String databaseName, int firstChunk, int lastChunk) throw new UncheckedIOException(ex); } - SearchHits hits = new SearchHits(new SearchHit[] {hit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1f); + SearchHits hits = new SearchHits(new SearchHit[]{hit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1f); SearchResponse searchResponse = new SearchResponse(new SearchResponseSections(hits, null, null, false, null, null, 0), null, 1, 1, 0, 1L, null, null); @SuppressWarnings("unchecked") ActionFuture actionFuture = mock(ActionFuture.class); when(actionFuture.actionGet()).thenReturn(searchResponse); - SearchRequest expectedSearchRequest = new SearchRequest(GeoIpDownloader.DATABASES_INDEX); - String id = String.format(Locale.ROOT, "%s_%d", databaseName, i); - expectedSearchRequest.source().query(new TermQueryBuilder("_id", id)); - when(client.search(eq(expectedSearchRequest))).thenReturn(actionFuture); + requestMap.put(databaseName + "_" + i, actionFuture); } + when(client.search(any())).thenAnswer(invocationOnMock -> { + SearchRequest req = (SearchRequest) invocationOnMock.getArguments()[0]; + TermQueryBuilder term = (TermQueryBuilder) req.source().query(); + String id = (String) term.value(); + return requestMap.get(id.substring(0, id.lastIndexOf('_'))); + }); MessageDigest md = MessageDigests.md5(); data.forEach(md::update); @@ -322,7 +326,7 @@ private static List gzip(String name, String content, int chunks) throws int chunkSize = all.length / chunks; List data = new ArrayList<>(); - for (int from = 0; from < all.length;) { + for (int from = 0; from < all.length; ) { int to = from + chunkSize; if (to > all.length) { to = all.length; diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java index bf22e5556880b..19d9416222b4f 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.ThreadPool; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; @@ -139,12 +140,13 @@ public int read() throws IOException { } public void testIndexChunksNoData() throws IOException { - assertEquals(0, geoIpDownloader.indexChunks("test", new ByteArrayInputStream(new byte[0]), 0, "d41d8cd98f00b204e9800998ecf8427e")); + InputStream empty = new ByteArrayInputStream(new byte[0]); + assertEquals(0, geoIpDownloader.indexChunks("test", empty, 0, "d41d8cd98f00b204e9800998ecf8427e", 0)); } public void testIndexChunksMd5Mismatch() { IOException exception = expectThrows(IOException.class, () -> geoIpDownloader.indexChunks("test", - new ByteArrayInputStream(new byte[0]), 0, "123123")); + new ByteArrayInputStream(new byte[0]), 0, "123123", 0)); assertEquals("md5 checksum mismatch, expected [123123], actual [d41d8cd98f00b204e9800998ecf8427e]", exception.getMessage()); } @@ -164,7 +166,7 @@ public void testIndexChunks() throws IOException { client.addHandler(IndexAction.INSTANCE, (IndexRequest request, ActionListener listener) -> { int chunk = chunkIndex.getAndIncrement(); assertEquals(OpType.CREATE, request.opType()); - assertEquals("test_" + (chunk + 15), request.id()); + assertThat(request.id(), Matchers.startsWith("test_" + (chunk + 15) + "_")); assertEquals(XContentType.SMILE, request.getContentType()); Map source = request.sourceAsMap(); assertEquals("test", source.get("name")); @@ -173,7 +175,8 @@ public void testIndexChunks() throws IOException { listener.onResponse(mock(IndexResponse.class)); }); - assertEquals(17, geoIpDownloader.indexChunks("test", new ByteArrayInputStream(bigArray), 15, "a67563dfa8f3cba8b8cff61eb989a749")); + InputStream big = new ByteArrayInputStream(bigArray); + assertEquals(17, geoIpDownloader.indexChunks("test", big, 15, "a67563dfa8f3cba8b8cff61eb989a749", 0)); assertEquals(2, chunkIndex.get()); } @@ -191,7 +194,7 @@ void updateTaskState() { } @Override - int indexChunks(String name, InputStream is, int chunk, String expectedMd5) { + int indexChunks(String name, InputStream is, int chunk, String expectedMd5, long start) { assertSame(bais, is); assertEquals(0, chunk); return 11; @@ -226,7 +229,7 @@ void updateTaskState() { } @Override - int indexChunks(String name, InputStream is, int chunk, String expectedMd5) { + int indexChunks(String name, InputStream is, int chunk, String expectedMd5, long start) { assertSame(bais, is); assertEquals(9, chunk); return 11; @@ -263,7 +266,7 @@ void updateTaskState() { } @Override - int indexChunks(String name, InputStream is, int chunk, String expectedMd5) { + int indexChunks(String name, InputStream is, int chunk, String expectedMd5, long start) { fail(); return 0; }