Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix problems in GeoIPv2 code #71598

Merged
merged 6 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.GZIPOutputStream;

import static java.nio.file.StandardOpenOption.CREATE;
Expand All @@ -42,9 +42,8 @@ public class GeoIpCli extends Command {

private static final byte[] EMPTY_BUF = new byte[512];

// visible for testing
final OptionSpec<String> sourceDirectory;
final OptionSpec<String> targetDirectory;
private final OptionSpec<String> sourceDirectory;
private final OptionSpec<String> targetDirectory;

public GeoIpCli() {
super("A CLI tool to prepare local GeoIp database service", () -> {});
Expand All @@ -58,7 +57,7 @@ protected void execute(Terminal terminal, OptionSet options) throws Exception {
Path source = getPath(options.valueOf(sourceDirectory));
String targetString = options.valueOf(targetDirectory);
Path target = targetString != null ? getPath(targetString) : source;
copyTgzToTarget(terminal, source, target);
copyTgzToTarget(source, target);
packDatabasesToTgz(terminal, source, target);
createOverviewJson(terminal, target);
}
Expand All @@ -68,49 +67,49 @@ private Path getPath(String file) {
return PathUtils.get(file);
}

private void copyTgzToTarget(Terminal terminal, Path source, Path target) throws IOException {
private void copyTgzToTarget(Path source, Path target) throws IOException {
if (source.equals(target)) {
return;
}
List<Path> toCopy = Files.list(source).filter(p -> p.getFileName().toString().endsWith(".tgz")).collect(Collectors.toList());
for (Path path : toCopy) {
Files.copy(path, target.resolve(path.getFileName()), StandardCopyOption.REPLACE_EXISTING);
try (Stream<Path> files = Files.list(source)) {
for (Path path : files.filter(p -> p.getFileName().toString().endsWith(".tgz")).collect(Collectors.toList())) {
Files.copy(path, target.resolve(path.getFileName()), StandardCopyOption.REPLACE_EXISTING);
}
}
}

private void packDatabasesToTgz(Terminal terminal, Path source, Path target) throws IOException {
List<Path> toPack = Files.list(source).filter(p -> p.getFileName().toString().endsWith(".mmdb")).collect(Collectors.toList());
for (Path path : toPack) {
String fileName = path.getFileName().toString();
Path compressedPath = target.resolve(fileName.replaceAll("mmdb$", "") + "tgz");
terminal.println("Found " + fileName + ", will compress it to " + compressedPath.getFileName());
try (
OutputStream fos = Files.newOutputStream(compressedPath, TRUNCATE_EXISTING, CREATE);
OutputStream gos = new GZIPOutputStream(new BufferedOutputStream(fos))
) {
long size = Files.size(path);
gos.write(createTarHeader(fileName, size));
Files.copy(path, gos);
if (size % 512 != 0) {
gos.write(EMPTY_BUF, 0, (int) (512 - (size % 512)));
try (Stream<Path> files = Files.list(source)) {
for (Path path : files.filter(p -> p.getFileName().toString().endsWith(".mmdb")).collect(Collectors.toList())) {
String fileName = path.getFileName().toString();
Path compressedPath = target.resolve(fileName.replaceAll("mmdb$", "") + "tgz");
terminal.println("Found " + fileName + ", will compress it to " + compressedPath.getFileName());
try (
OutputStream fos = Files.newOutputStream(compressedPath, TRUNCATE_EXISTING, CREATE);
OutputStream gos = new GZIPOutputStream(new BufferedOutputStream(fos))
) {
long size = Files.size(path);
gos.write(createTarHeader(fileName, size));
Files.copy(path, gos);
if (size % 512 != 0) {
gos.write(EMPTY_BUF, 0, (int) (512 - (size % 512)));
}
gos.write(EMPTY_BUF);
gos.write(EMPTY_BUF);
}
gos.write(EMPTY_BUF);
gos.write(EMPTY_BUF);
}
}
}

private void createOverviewJson(Terminal terminal, Path directory) throws IOException {
List<Path> databasesPaths = Files.list(directory)
.filter(p -> p.getFileName().toString().endsWith(".tgz"))
.collect(Collectors.toList());
Path overview = directory.resolve("overview.json");
try (
Stream<Path> files = Files.list(directory);
OutputStream os = new BufferedOutputStream(Files.newOutputStream(overview, TRUNCATE_EXISTING, CREATE));
XContentGenerator generator = XContentType.JSON.xContent().createGenerator(os)
) {
generator.writeStartArray();
for (Path db : databasesPaths) {
for (Path db : files.filter(p -> p.getFileName().toString().endsWith(".tgz")).collect(Collectors.toList())) {
terminal.println("Adding " + db.getFileName() + " to overview.json");
MessageDigest md5 = MessageDigests.md5();
try (InputStream dis = new DigestInputStream(new BufferedInputStream(Files.newInputStream(db)), md5)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/71145")
@LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") // Don't randomly add 'extra' files to directory.
public class GeoIpCliTests extends LuceneTestCase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
Expand All @@ -47,7 +48,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -335,14 +335,17 @@ void retrieveDatabase(String databaseName,
MessageDigest md = MessageDigests.md5();
int firstChunk = metadata.getFirstChunk();
int lastChunk = metadata.getLastChunk();
long timestamp = metadata.getLastUpdate();
try {
// TODO: invoke open point in time api when this api is moved from xpack core to server module.
// (so that we have a consistent view of the chunk documents while doing the lookups)
// (the chance that the documents change is rare, given the low frequency of the updates for these databases)
for (int chunk = firstChunk; chunk <= lastChunk; chunk++) {
SearchRequest searchRequest = new SearchRequest(GeoIpDownloader.DATABASES_INDEX);
String id = String.format(Locale.ROOT, "%s_%d", databaseName, chunk);
searchRequest.source().query(new TermQueryBuilder("_id", id));
searchRequest.source().query(new BoolQueryBuilder()
.filter(new TermQueryBuilder("name", databaseName))
.filter(new TermQueryBuilder("chunk", chunk))
.filter(new TermQueryBuilder("timestamp", timestamp)));

// At most once a day a few searches may be executed to fetch the new files,
// so it is ok if this happens in a blocking manner on a thread from generic thread pool.
Expand All @@ -351,7 +354,8 @@ void retrieveDatabase(String databaseName,
SearchHit[] hits = searchResponse.getHits().getHits();
assert hits.length == 1 : "expected 1 hit, but instead got [" + hits.length + "]";
if (searchResponse.getHits().getHits().length == 0) {
failureHandler.accept(new ResourceNotFoundException("chunk document with id [" + id + "] not found"));
failureHandler.accept(new ResourceNotFoundException("chunk number [" + chunk + " for database [" + databaseName +
"] and last update time [" + timestamp + "] not found"));
return;
}
byte[] data = (byte[]) hits[0].getSourceAsMap().get("data");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
private volatile GeoIpDownloaderStats stats = GeoIpDownloaderStats.EMPTY;

GeoIpDownloader(Client client, HttpClient httpClient, ClusterService clusterService, ThreadPool threadPool, Settings settings,
long id, String type, String action, String description, TaskId parentTask,
Map<String, String> headers) {
long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
super(id, type, action, description, parentTask, headers);
this.httpClient = httpClient;
this.client = new OriginSettingClient(client, IngestService.INGEST_ORIGIN);
Expand Down Expand Up @@ -139,13 +138,13 @@ void processDatabase(Map<String, Object> databaseInfo) {
long start = System.currentTimeMillis();
try (InputStream is = httpClient.get(url)) {
int firstChunk = state.contains(name) ? state.get(name).getLastChunk() + 1 : 0;
int lastChunk = indexChunks(name, is, firstChunk, md5);
int lastChunk = indexChunks(name, is, firstChunk, md5, start);
if (lastChunk > firstChunk) {
state = state.put(name, new Metadata(System.currentTimeMillis(), firstChunk, lastChunk - 1, md5));
state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5));
updateTaskState();
stats = stats.successfulDownload(System.currentTimeMillis() - start).count(state.getDatabases().size());
logger.info("updated geoip database [" + name + "]");
deleteOldChunks(name, firstChunk);
deleteOldChunks(name, state.getDatabases().get(name).getFirstChunk());
}
} catch (Exception e) {
stats = stats.failedDownload();
Expand Down Expand Up @@ -180,13 +179,13 @@ void updateTaskState() {
}

//visible for testing
int indexChunks(String name, InputStream is, int chunk, String expectedMd5) throws IOException {
int indexChunks(String name, InputStream is, int chunk, String expectedMd5, long timestamp) throws IOException {
MessageDigest md = MessageDigests.md5();
for (byte[] buf = getChunk(is); buf.length != 0; buf = getChunk(is)) {
md.update(buf);
client.prepareIndex(DATABASES_INDEX).setId(name + "_" + chunk)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe keep the _id but with timestamp? That way the _id has meaning and if due to some issue we index a document with the same _id then we fail with an error (b/c create=true).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched back to using _id with added timestamp as you suggested

client.prepareIndex(DATABASES_INDEX)
.setCreate(true)
.setSource(XContentType.SMILE, "name", name, "chunk", chunk, "data", buf)
.setSource(XContentType.SMILE, "name", name, "chunk", chunk, "data", buf, "timestamp", timestamp)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setWaitForActiveShards(ActiveShardCount.ALL)
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ private static XContentBuilder mappings() {
.startObject("chunk")
.field("type", "integer")
.endObject()
.startObject("timestamp")
.field("type", "long")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use type date? This still accepts time in ms since epoch and treats values as date.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as timestamp is part of the id now it doesn't need to be indexed separately. I've removed it from mapping

.endObject()
.startObject("data")
.field("type", "binary")
.endObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public GeoIpDownloaderStats skippedDownload() {
}

public GeoIpDownloaderStats successfulDownload(long downloadTime) {
return new GeoIpDownloaderStats(successfulDownloads + 1, failedDownloads, totalDownloadTime + downloadTime, databasesCount,
skippedDownloads);
return new GeoIpDownloaderStats(successfulDownloads + 1, failedDownloads, totalDownloadTime + Math.max(downloadTime, 0),
databasesCount, skippedDownloads);
}

public GeoIpDownloaderStats failedDownload() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.ingest.IngestService;
Expand All @@ -62,6 +64,7 @@
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -122,7 +125,7 @@ public void cleanup() {
resourceWatcherService.close();
threadPool.shutdownNow();
}

public void testCheckDatabases() throws Exception {
String md5 = mockSearches("GeoIP2-City.mmdb", 5, 14);
String taskId = GeoIpDownloader.GEOIP_DOWNLOADER;
Expand Down Expand Up @@ -258,6 +261,7 @@ private String mockSearches(String databaseName, int firstChunk, int lastChunk)
List<byte[]> data = gzip(databaseName, dummyContent, lastChunk - firstChunk + 1);
assertThat(gunzip(data), equalTo(dummyContent));

Map<SearchRequest, ActionFuture<SearchResponse>> requestMap = new HashMap<>();
for (int i = firstChunk; i <= lastChunk; i++) {
byte[] chunk = data.get(i - firstChunk);
SearchHit hit = new SearchHit(i);
Expand All @@ -270,17 +274,31 @@ private String mockSearches(String databaseName, int firstChunk, int lastChunk)
throw new UncheckedIOException(ex);
}

SearchHits hits = new SearchHits(new SearchHit[] {hit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1f);
SearchHits hits = new SearchHits(new SearchHit[]{hit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1f);
SearchResponse searchResponse =
new SearchResponse(new SearchResponseSections(hits, null, null, false, null, null, 0), null, 1, 1, 0, 1L, null, null);
@SuppressWarnings("unchecked")
ActionFuture<SearchResponse> actionFuture = mock(ActionFuture.class);
when(actionFuture.actionGet()).thenReturn(searchResponse);
SearchRequest expectedSearchRequest = new SearchRequest(GeoIpDownloader.DATABASES_INDEX);
String id = String.format(Locale.ROOT, "%s_%d", databaseName, i);
expectedSearchRequest.source().query(new TermQueryBuilder("_id", id));
expectedSearchRequest.source().query(new BoolQueryBuilder()
.filter(new TermQueryBuilder("name", databaseName))
.filter(new TermQueryBuilder("chunk", i)));
requestMap.put(expectedSearchRequest, actionFuture);
when(client.search(eq(expectedSearchRequest))).thenReturn(actionFuture);
}
when(client.search(any())).thenAnswer(invocationOnMock -> {
SearchRequest req = (SearchRequest) invocationOnMock.getArguments()[0];
BoolQueryBuilder query = (BoolQueryBuilder) req.source().query();
List<QueryBuilder> filters = query.filter();
//remove timestamp filter, no way to get correct value upfront
filters.remove(2);
query = new BoolQueryBuilder();
filters.forEach(query::filter);
req = new SearchRequest(GeoIpDownloader.DATABASES_INDEX);
req.source().query(query);
return requestMap.get(req);
});

MessageDigest md = MessageDigests.md5();
data.forEach(md::update);
Expand Down Expand Up @@ -322,7 +340,7 @@ private static List<byte[]> gzip(String name, String content, int chunks) throws
int chunkSize = all.length / chunks;
List<byte[]> data = new ArrayList<>();

for (int from = 0; from < all.length;) {
for (int from = 0; from < all.length; ) {
int to = from + chunkSize;
if (to > all.length) {
to = all.length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,13 @@ public int read() throws IOException {
}

public void testIndexChunksNoData() throws IOException {
assertEquals(0, geoIpDownloader.indexChunks("test", new ByteArrayInputStream(new byte[0]), 0, "d41d8cd98f00b204e9800998ecf8427e"));
InputStream empty = new ByteArrayInputStream(new byte[0]);
assertEquals(0, geoIpDownloader.indexChunks("test", empty, 0, "d41d8cd98f00b204e9800998ecf8427e", 0));
}

public void testIndexChunksMd5Mismatch() {
IOException exception = expectThrows(IOException.class, () -> geoIpDownloader.indexChunks("test",
new ByteArrayInputStream(new byte[0]), 0, "123123"));
new ByteArrayInputStream(new byte[0]), 0, "123123", 0));
assertEquals("md5 checksum mismatch, expected [123123], actual [d41d8cd98f00b204e9800998ecf8427e]", exception.getMessage());
}

Expand All @@ -164,7 +165,6 @@ public void testIndexChunks() throws IOException {
client.addHandler(IndexAction.INSTANCE, (IndexRequest request, ActionListener<IndexResponse> listener) -> {
int chunk = chunkIndex.getAndIncrement();
assertEquals(OpType.CREATE, request.opType());
assertEquals("test_" + (chunk + 15), request.id());
assertEquals(XContentType.SMILE, request.getContentType());
Map<String, Object> source = request.sourceAsMap();
assertEquals("test", source.get("name"));
Expand All @@ -173,7 +173,8 @@ public void testIndexChunks() throws IOException {
listener.onResponse(mock(IndexResponse.class));
});

assertEquals(17, geoIpDownloader.indexChunks("test", new ByteArrayInputStream(bigArray), 15, "a67563dfa8f3cba8b8cff61eb989a749"));
InputStream big = new ByteArrayInputStream(bigArray);
assertEquals(17, geoIpDownloader.indexChunks("test", big, 15, "a67563dfa8f3cba8b8cff61eb989a749", 0));

assertEquals(2, chunkIndex.get());
}
Expand All @@ -191,7 +192,7 @@ void updateTaskState() {
}

@Override
int indexChunks(String name, InputStream is, int chunk, String expectedMd5) {
int indexChunks(String name, InputStream is, int chunk, String expectedMd5, long start) {
assertSame(bais, is);
assertEquals(0, chunk);
return 11;
Expand Down Expand Up @@ -226,7 +227,7 @@ void updateTaskState() {
}

@Override
int indexChunks(String name, InputStream is, int chunk, String expectedMd5) {
int indexChunks(String name, InputStream is, int chunk, String expectedMd5, long start) {
assertSame(bais, is);
assertEquals(9, chunk);
return 11;
Expand Down Expand Up @@ -263,7 +264,7 @@ void updateTaskState() {
}

@Override
int indexChunks(String name, InputStream is, int chunk, String expectedMd5) {
int indexChunks(String name, InputStream is, int chunk, String expectedMd5, long start) {
fail();
return 0;
}
Expand Down