From 46efa6ad04e2ab98044f9d99cca39f7f8ba36ffe Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Tue, 13 Apr 2021 17:10:45 +0200 Subject: [PATCH] Fix problems in GeoIPv2 code (#71598) This change fixes number of problems in GeoIPv2 code: - closes streams from Files.list in GeoIpCli, which should fix tests on Windows - makes sure that total download time in GeoIP stats is non-negative (we serialize it as vInt which can cause problems with negative numbers and it can happen when clock was changed during operation) - fixes handling of failed/simultaneous downloads, #69951 was meant as a way to prevent 2 persistent tasks to index chunks but it would prevent any update if single download failed mid indexing, this change uses timestamp (lastUpdate) as sort of UUID. This should still prevent 2 tasks to step on each other toes (overwriting chunks) but in the end still only single task should be able to update task state (this is handled by persistent tasks framework) Closes #71145 --- .../org/elasticsearch/geoip/GeoIpCli.java | 57 +++++++++---------- .../elasticsearch/geoip/GeoIpCliTests.java | 1 - .../ingest/geoip/DatabaseRegistry.java | 2 +- .../ingest/geoip/GeoIpDownloader.java | 11 ++-- .../geoip/stats/GeoIpDownloaderStats.java | 4 +- .../ingest/geoip/DatabaseRegistryTests.java | 20 ++++--- .../ingest/geoip/GeoIpDownloaderTests.java | 17 +++--- 7 files changed, 58 insertions(+), 54 deletions(-) diff --git a/distribution/tools/geoip-cli/src/main/java/org/elasticsearch/geoip/GeoIpCli.java b/distribution/tools/geoip-cli/src/main/java/org/elasticsearch/geoip/GeoIpCli.java index 5667e0379210f..d04e9baf0fb81 100644 --- a/distribution/tools/geoip-cli/src/main/java/org/elasticsearch/geoip/GeoIpCli.java +++ b/distribution/tools/geoip-cli/src/main/java/org/elasticsearch/geoip/GeoIpCli.java @@ -30,9 +30,9 @@ import java.security.DigestInputStream; import java.security.MessageDigest; import java.util.Arrays; -import java.util.List; import java.util.Locale; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.zip.GZIPOutputStream; import static java.nio.file.StandardOpenOption.CREATE; @@ -42,9 +42,8 @@ public class GeoIpCli extends Command { private static final byte[] EMPTY_BUF = new byte[512]; - // visible for testing - final OptionSpec sourceDirectory; - final OptionSpec targetDirectory; + private final OptionSpec sourceDirectory; + private final OptionSpec targetDirectory; public GeoIpCli() { super("A CLI tool to prepare local GeoIp database service", () -> {}); @@ -58,7 +57,7 @@ protected void execute(Terminal terminal, OptionSet options) throws Exception { Path source = getPath(options.valueOf(sourceDirectory)); String targetString = options.valueOf(targetDirectory); Path target = targetString != null ? getPath(targetString) : source; - copyTgzToTarget(terminal, source, target); + copyTgzToTarget(source, target); packDatabasesToTgz(terminal, source, target); createOverviewJson(terminal, target); } @@ -68,49 +67,49 @@ private Path getPath(String file) { return PathUtils.get(file); } - private void copyTgzToTarget(Terminal terminal, Path source, Path target) throws IOException { + private void copyTgzToTarget(Path source, Path target) throws IOException { if (source.equals(target)) { return; } - List toCopy = Files.list(source).filter(p -> p.getFileName().toString().endsWith(".tgz")).collect(Collectors.toList()); - for (Path path : toCopy) { - Files.copy(path, target.resolve(path.getFileName()), StandardCopyOption.REPLACE_EXISTING); + try (Stream files = Files.list(source)) { + for (Path path : files.filter(p -> p.getFileName().toString().endsWith(".tgz")).collect(Collectors.toList())) { + Files.copy(path, target.resolve(path.getFileName()), StandardCopyOption.REPLACE_EXISTING); + } } } private void packDatabasesToTgz(Terminal terminal, Path source, Path target) throws IOException { - List toPack = Files.list(source).filter(p -> p.getFileName().toString().endsWith(".mmdb")).collect(Collectors.toList()); - for (Path path : toPack) { - String fileName = path.getFileName().toString(); - Path compressedPath = target.resolve(fileName.replaceAll("mmdb$", "") + "tgz"); - terminal.println("Found " + fileName + ", will compress it to " + compressedPath.getFileName()); - try ( - OutputStream fos = Files.newOutputStream(compressedPath, TRUNCATE_EXISTING, CREATE); - OutputStream gos = new GZIPOutputStream(new BufferedOutputStream(fos)) - ) { - long size = Files.size(path); - gos.write(createTarHeader(fileName, size)); - Files.copy(path, gos); - if (size % 512 != 0) { - gos.write(EMPTY_BUF, 0, (int) (512 - (size % 512))); + try (Stream files = Files.list(source)) { + for (Path path : files.filter(p -> p.getFileName().toString().endsWith(".mmdb")).collect(Collectors.toList())) { + String fileName = path.getFileName().toString(); + Path compressedPath = target.resolve(fileName.replaceAll("mmdb$", "") + "tgz"); + terminal.println("Found " + fileName + ", will compress it to " + compressedPath.getFileName()); + try ( + OutputStream fos = Files.newOutputStream(compressedPath, TRUNCATE_EXISTING, CREATE); + OutputStream gos = new GZIPOutputStream(new BufferedOutputStream(fos)) + ) { + long size = Files.size(path); + gos.write(createTarHeader(fileName, size)); + Files.copy(path, gos); + if (size % 512 != 0) { + gos.write(EMPTY_BUF, 0, (int) (512 - (size % 512))); + } + gos.write(EMPTY_BUF); + gos.write(EMPTY_BUF); } - gos.write(EMPTY_BUF); - gos.write(EMPTY_BUF); } } } private void createOverviewJson(Terminal terminal, Path directory) throws IOException { - List databasesPaths = Files.list(directory) - .filter(p -> p.getFileName().toString().endsWith(".tgz")) - .collect(Collectors.toList()); Path overview = directory.resolve("overview.json"); try ( + Stream files = Files.list(directory); OutputStream os = new BufferedOutputStream(Files.newOutputStream(overview, TRUNCATE_EXISTING, CREATE)); XContentGenerator generator = XContentType.JSON.xContent().createGenerator(os) ) { generator.writeStartArray(); - for (Path db : databasesPaths) { + for (Path db : files.filter(p -> p.getFileName().toString().endsWith(".tgz")).collect(Collectors.toList())) { terminal.println("Adding " + db.getFileName() + " to overview.json"); MessageDigest md5 = MessageDigests.md5(); try (InputStream dis = new DigestInputStream(new BufferedInputStream(Files.newInputStream(db)), md5)) { diff --git a/distribution/tools/geoip-cli/src/test/java/org/elasticsearch/geoip/GeoIpCliTests.java b/distribution/tools/geoip-cli/src/test/java/org/elasticsearch/geoip/GeoIpCliTests.java index e66a07f82e143..29b824561b529 100644 --- a/distribution/tools/geoip-cli/src/test/java/org/elasticsearch/geoip/GeoIpCliTests.java +++ b/distribution/tools/geoip-cli/src/test/java/org/elasticsearch/geoip/GeoIpCliTests.java @@ -34,7 +34,6 @@ import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/71145") @LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") // Don't randomly add 'extra' files to directory. public class GeoIpCliTests extends LuceneTestCase { diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java index 2684179df0548..7d3aa747a47d3 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java @@ -341,7 +341,7 @@ void retrieveDatabase(String databaseName, // (the chance that the documents change is rare, given the low frequency of the updates for these databases) for (int chunk = firstChunk; chunk <= lastChunk; chunk++) { SearchRequest searchRequest = new SearchRequest(GeoIpDownloader.DATABASES_INDEX); - String id = String.format(Locale.ROOT, "%s_%d", databaseName, chunk); + String id = String.format(Locale.ROOT, "%s_%d_%d", databaseName, chunk, metadata.getLastUpdate()); searchRequest.source().query(new TermQueryBuilder("_id", id)); // At most once a day a few searches may be executed to fetch the new files, diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java index 96b0b69720d4b..c9bc2baed8382 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java @@ -81,8 +81,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask { private volatile GeoIpDownloaderStats stats = GeoIpDownloaderStats.EMPTY; GeoIpDownloader(Client client, HttpClient httpClient, ClusterService clusterService, ThreadPool threadPool, Settings settings, - long id, String type, String action, String description, TaskId parentTask, - Map headers) { + long id, String type, String action, String description, TaskId parentTask, Map headers) { super(id, type, action, description, parentTask, headers); this.httpClient = httpClient; this.client = new OriginSettingClient(client, IngestService.INGEST_ORIGIN); @@ -139,9 +138,9 @@ void processDatabase(Map databaseInfo) { long start = System.currentTimeMillis(); try (InputStream is = httpClient.get(url)) { int firstChunk = state.contains(name) ? state.get(name).getLastChunk() + 1 : 0; - int lastChunk = indexChunks(name, is, firstChunk, md5); + int lastChunk = indexChunks(name, is, firstChunk, md5, start); if (lastChunk > firstChunk) { - state = state.put(name, new Metadata(System.currentTimeMillis(), firstChunk, lastChunk - 1, md5)); + state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5)); updateTaskState(); stats = stats.successfulDownload(System.currentTimeMillis() - start).count(state.getDatabases().size()); logger.info("updated geoip database [" + name + "]"); @@ -180,11 +179,11 @@ void updateTaskState() { } //visible for testing - int indexChunks(String name, InputStream is, int chunk, String expectedMd5) throws IOException { + int indexChunks(String name, InputStream is, int chunk, String expectedMd5, long timestamp) throws IOException { MessageDigest md = MessageDigests.md5(); for (byte[] buf = getChunk(is); buf.length != 0; buf = getChunk(is)) { md.update(buf); - client.prepareIndex(DATABASES_INDEX).setId(name + "_" + chunk) + client.prepareIndex(DATABASES_INDEX).setId(name + "_" + chunk + "_" + timestamp) .setCreate(true) .setSource(XContentType.SMILE, "name", name, "chunk", chunk, "data", buf) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java index 730405ec3a97a..3deb7b00bb465 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java @@ -90,8 +90,8 @@ public GeoIpDownloaderStats skippedDownload() { } public GeoIpDownloaderStats successfulDownload(long downloadTime) { - return new GeoIpDownloaderStats(successfulDownloads + 1, failedDownloads, totalDownloadTime + downloadTime, databasesCount, - skippedDownloads); + return new GeoIpDownloaderStats(successfulDownloads + 1, failedDownloads, totalDownloadTime + Math.max(downloadTime, 0), + databasesCount, skippedDownloads); } public GeoIpDownloaderStats failedDownload() { diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java index d7745d76e8172..bc28b5a77bb81 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java @@ -62,6 +62,7 @@ import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -82,7 +83,6 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -122,7 +122,7 @@ public void cleanup() { resourceWatcherService.close(); threadPool.shutdownNow(); } - + public void testCheckDatabases() throws Exception { String md5 = mockSearches("GeoIP2-City.mmdb", 5, 14); String taskId = GeoIpDownloader.GEOIP_DOWNLOADER; @@ -258,6 +258,7 @@ private String mockSearches(String databaseName, int firstChunk, int lastChunk) List data = gzip(databaseName, dummyContent, lastChunk - firstChunk + 1); assertThat(gunzip(data), equalTo(dummyContent)); + Map> requestMap = new HashMap<>(); for (int i = firstChunk; i <= lastChunk; i++) { byte[] chunk = data.get(i - firstChunk); SearchHit hit = new SearchHit(i); @@ -270,17 +271,20 @@ private String mockSearches(String databaseName, int firstChunk, int lastChunk) throw new UncheckedIOException(ex); } - SearchHits hits = new SearchHits(new SearchHit[] {hit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1f); + SearchHits hits = new SearchHits(new SearchHit[]{hit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1f); SearchResponse searchResponse = new SearchResponse(new SearchResponseSections(hits, null, null, false, null, null, 0), null, 1, 1, 0, 1L, null, null); @SuppressWarnings("unchecked") ActionFuture actionFuture = mock(ActionFuture.class); when(actionFuture.actionGet()).thenReturn(searchResponse); - SearchRequest expectedSearchRequest = new SearchRequest(GeoIpDownloader.DATABASES_INDEX); - String id = String.format(Locale.ROOT, "%s_%d", databaseName, i); - expectedSearchRequest.source().query(new TermQueryBuilder("_id", id)); - when(client.search(eq(expectedSearchRequest))).thenReturn(actionFuture); + requestMap.put(databaseName + "_" + i, actionFuture); } + when(client.search(any())).thenAnswer(invocationOnMock -> { + SearchRequest req = (SearchRequest) invocationOnMock.getArguments()[0]; + TermQueryBuilder term = (TermQueryBuilder) req.source().query(); + String id = (String) term.value(); + return requestMap.get(id.substring(0, id.lastIndexOf('_'))); + }); MessageDigest md = MessageDigests.md5(); data.forEach(md::update); @@ -322,7 +326,7 @@ private static List gzip(String name, String content, int chunks) throws int chunkSize = all.length / chunks; List data = new ArrayList<>(); - for (int from = 0; from < all.length;) { + for (int from = 0; from < all.length; ) { int to = from + chunkSize; if (to > all.length) { to = all.length; diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java index bf22e5556880b..19d9416222b4f 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.ThreadPool; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; @@ -139,12 +140,13 @@ public int read() throws IOException { } public void testIndexChunksNoData() throws IOException { - assertEquals(0, geoIpDownloader.indexChunks("test", new ByteArrayInputStream(new byte[0]), 0, "d41d8cd98f00b204e9800998ecf8427e")); + InputStream empty = new ByteArrayInputStream(new byte[0]); + assertEquals(0, geoIpDownloader.indexChunks("test", empty, 0, "d41d8cd98f00b204e9800998ecf8427e", 0)); } public void testIndexChunksMd5Mismatch() { IOException exception = expectThrows(IOException.class, () -> geoIpDownloader.indexChunks("test", - new ByteArrayInputStream(new byte[0]), 0, "123123")); + new ByteArrayInputStream(new byte[0]), 0, "123123", 0)); assertEquals("md5 checksum mismatch, expected [123123], actual [d41d8cd98f00b204e9800998ecf8427e]", exception.getMessage()); } @@ -164,7 +166,7 @@ public void testIndexChunks() throws IOException { client.addHandler(IndexAction.INSTANCE, (IndexRequest request, ActionListener listener) -> { int chunk = chunkIndex.getAndIncrement(); assertEquals(OpType.CREATE, request.opType()); - assertEquals("test_" + (chunk + 15), request.id()); + assertThat(request.id(), Matchers.startsWith("test_" + (chunk + 15) + "_")); assertEquals(XContentType.SMILE, request.getContentType()); Map source = request.sourceAsMap(); assertEquals("test", source.get("name")); @@ -173,7 +175,8 @@ public void testIndexChunks() throws IOException { listener.onResponse(mock(IndexResponse.class)); }); - assertEquals(17, geoIpDownloader.indexChunks("test", new ByteArrayInputStream(bigArray), 15, "a67563dfa8f3cba8b8cff61eb989a749")); + InputStream big = new ByteArrayInputStream(bigArray); + assertEquals(17, geoIpDownloader.indexChunks("test", big, 15, "a67563dfa8f3cba8b8cff61eb989a749", 0)); assertEquals(2, chunkIndex.get()); } @@ -191,7 +194,7 @@ void updateTaskState() { } @Override - int indexChunks(String name, InputStream is, int chunk, String expectedMd5) { + int indexChunks(String name, InputStream is, int chunk, String expectedMd5, long start) { assertSame(bais, is); assertEquals(0, chunk); return 11; @@ -226,7 +229,7 @@ void updateTaskState() { } @Override - int indexChunks(String name, InputStream is, int chunk, String expectedMd5) { + int indexChunks(String name, InputStream is, int chunk, String expectedMd5, long start) { assertSame(bais, is); assertEquals(9, chunk); return 11; @@ -263,7 +266,7 @@ void updateTaskState() { } @Override - int indexChunks(String name, InputStream is, int chunk, String expectedMd5) { + int indexChunks(String name, InputStream is, int chunk, String expectedMd5, long start) { fail(); return 0; }