Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add caching of address information for Nominatim export #850

Merged
merged 17 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package de.komoot.photon.opensearch;

import de.komoot.photon.searcher.PhotonResult;
import jakarta.json.JsonArray;
import org.json.JSONObject;

import java.util.Map;
Expand Down
100 changes: 87 additions & 13 deletions src/main/java/de/komoot/photon/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import com.beust.jcommander.JCommander;
import com.beust.jcommander.ParameterException;
import de.komoot.photon.nominatim.NominatimConnector;
import de.komoot.photon.nominatim.ImportThread;
import de.komoot.photon.nominatim.NominatimImporter;
import de.komoot.photon.nominatim.NominatimUpdater;
import de.komoot.photon.searcher.ReverseHandler;
import de.komoot.photon.searcher.SearchHandler;
Expand All @@ -14,7 +15,8 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Date;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;

import static spark.Spark.*;

Expand Down Expand Up @@ -107,9 +109,8 @@ private static void startJsonDump(CommandLineArgs args) {
try {
final String filename = args.getJsonDump();
final JsonDumper jsonDumper = new JsonDumper(filename, args.getLanguages(), args.getExtraTags());
NominatimConnector nominatimConnector = new NominatimConnector(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword());
nominatimConnector.setImporter(jsonDumper);
nominatimConnector.readEntireDatabase(args.getCountryCodes());

importFromDatabase(args, jsonDumper);
LOGGER.info("Json dump was created: {}", filename);
} catch (FileNotFoundException e) {
throw new UsageException("Cannot create dump: " + e.getMessage());
Expand All @@ -121,22 +122,95 @@ private static void startJsonDump(CommandLineArgs args) {
* Read all data from a Nominatim database and import it into a Photon database.
*/
private static void startNominatimImport(CommandLineArgs args, Server esServer) {
DatabaseProperties dbProperties;
NominatimConnector nominatimConnector = new NominatimConnector(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword());
Date importDate = nominatimConnector.getLastImportDate();
final var languages = initDatabase(args, esServer);

LOGGER.info("Starting import from nominatim to photon with languages: {}", String.join(",", languages));
importFromDatabase(args, esServer.createImporter(languages, args.getExtraTags()));

LOGGER.info("Imported data from nominatim to photon with languages: {}", String.join(",", languages));
}

private static String[] initDatabase(CommandLineArgs args, Server esServer) {
final var nominatimConnector = new NominatimImporter(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword());
final Date importDate = nominatimConnector.getLastImportDate();

try {
dbProperties = esServer.recreateIndex(args.getLanguages(), importDate, args.getSupportStructuredQueries()); // clear out previous data
// Clear out previous data.
var dbProperties = esServer.recreateIndex(args.getLanguages(), importDate, args.getSupportStructuredQueries());
return dbProperties.getLanguages();
} catch (IOException e) {
throw new UsageException("Cannot setup index, elastic search config files not readable");
}
}

private static void importFromDatabase(CommandLineArgs args, Importer importer) {
final var connector = new NominatimImporter(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword());
connector.prepareDatabase();
connector.loadCountryNames();

LOGGER.info("Starting import from nominatim to photon with languages: {}", String.join(",", dbProperties.getLanguages()));
nominatimConnector.setImporter(esServer.createImporter(dbProperties.getLanguages(), args.getExtraTags()));
nominatimConnector.readEntireDatabase(args.getCountryCodes());
String[] countries = args.getCountryCodes();

if (countries == null || countries.length == 0) {
countries = connector.getCountriesFromDatabase();
} else {
countries = Arrays.stream(countries).map(String::trim).filter(s -> !s.isBlank()).toArray(String[]::new);
}

final int numThreads = args.getThreads();
ImportThread importThread = new ImportThread(importer);

try {

if (numThreads == 1) {
for (var country : countries) {
connector.readCountry(country, importThread);
}
} else {
final Queue<String> todolist = new ConcurrentLinkedQueue<>(List.of(countries));

final List<Thread> readerThreads = new ArrayList<>(numThreads);

for (int i = 0; i < numThreads; ++i) {
final NominatimImporter threadConnector;
if (i > 0) {
threadConnector = new NominatimImporter(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword());
threadConnector.loadCountryNames();
} else {
threadConnector = connector;
}
final int threadno = i;
Runnable runner = () -> {
String nextCc = todolist.poll();
while (nextCc != null) {
LOGGER.info("Thread {}: reading country '{}'", threadno, nextCc);
threadConnector.readCountry(nextCc, importThread);
nextCc = todolist.poll();
}
};
Thread thread = new Thread(runner);
thread.start();
readerThreads.add(thread);
}
readerThreads.forEach(t -> {
while (true) {
try {
t.join();
break;
} catch (InterruptedException e) {
LOGGER.warn("Thread interrupted:", e);
// Restore interrupted state.
Thread.currentThread().interrupt();
}
}
});
}
} finally {
importThread.finish();
}

LOGGER.info("Imported data from nominatim to photon with languages: {}", String.join(",", dbProperties.getLanguages()));
}


private static void startNominatimUpdateInit(CommandLineArgs args) {
NominatimUpdater nominatimUpdater = new NominatimUpdater(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword());
nominatimUpdater.initUpdates(args.getNominatimUpdateInit());
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/de/komoot/photon/CommandLineArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
@Parameters(parametersValidators = CorsMutuallyExclusiveValidator.class)
public class CommandLineArgs {

@Parameter(names = "-j", description = "Number of threads to use for import.")
private int threads = 1;

@Parameter(names = "-structured", description = "Enable support for structured queries.")
private boolean supportStructuredQueries = false;

Expand Down Expand Up @@ -107,6 +110,10 @@ public String[] getLanguages() {
return getLanguages(true);
}

public int getThreads() {
return Integer.min(10, Integer.max(0, threads));
}

public String getCluster() {
return this.cluster;
}
Expand Down
32 changes: 30 additions & 2 deletions src/main/java/de/komoot/photon/PhotonDoc.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.komoot.photon;

import de.komoot.photon.nominatim.model.AddressRow;
import org.locationtech.jts.geom.Envelope;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.Point;
Expand Down Expand Up @@ -217,17 +218,27 @@ public boolean isUsefulForIndex() {
private void extractAddress(Map<String, String> address, AddressType addressType, String addressFieldName) {
String field = address.get(addressFieldName);

if (field != null) {
Map<String, String> map = addressParts.computeIfAbsent(addressType, k -> new HashMap<>());
if (field == null) {
return;
}

Map<String, String> map = addressParts.get(addressType);
if (map == null) {
map = new HashMap<>();
map.put("name", field);
addressParts.put(addressType, map);
} else {
String existingName = map.get("name");
if (!field.equals(existingName)) {
// Make a copy of the original name map because the map is reused for other addresses.
map = new HashMap<>(map);
LOGGER.debug("Replacing {} name '{}' with '{}' for osmId #{}", addressFieldName, existingName, field, osmId);
// we keep the former name in the context as it might be helpful when looking up typos
if (!Objects.isNull(existingName)) {
context.add(Collections.singletonMap("formerName", existingName));
}
map.put("name", field);
addressParts.put(addressType, map);
}
}
}
Expand All @@ -241,6 +252,23 @@ public boolean setAddressPartIfNew(AddressType addressType, Map<String, String>
return addressParts.computeIfAbsent(addressType, k -> names) == names;
}

/**
* Complete address data from a list of address rows.
*/
public void completePlace(List<AddressRow> addresses) {
final AddressType doctype = getAddressType();
for (AddressRow address : addresses) {
final AddressType atype = address.getAddressType();

if (atype != null
&& (atype == doctype || !setAddressPartIfNew(atype, address.getName()))
&& address.isUsefulForContext()) {
// no specifically handled item, check if useful for context
getContext().add(address.getName());
}
}
}

public void setCountry(Map<String, String> names) {
addressParts.put(AddressType.COUNTRY, names);
}
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/de/komoot/photon/nominatim/DBDataAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ public interface DBDataAdapter {
* Wrap a DELETE statement with a RETURNING clause.
*/
String deleteReturning(String deleteSQL, String columns);

/**
* Wrap function to create a json array from a SELECT.
*/
String jsonArrayFromSelect(String valueSQL, String fromSQL);
}
9 changes: 5 additions & 4 deletions src/main/java/de/komoot/photon/nominatim/ImportThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
/**
* Worker thread for bulk importing data from a Nominatim database.
*/
class ImportThread {
public class ImportThread {
private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(ImportThread.class);

private static final int PROGRESS_INTERVAL = 50000;
private static final NominatimResult FINAL_DOCUMENT = new NominatimResult(new PhotonDoc(0, null, 0, null, null));
private final BlockingQueue<NominatimResult> documents = new LinkedBlockingDeque<>(20);
private static final NominatimResult FINAL_DOCUMENT = NominatimResult.fromAddress(new PhotonDoc(0, null, 0, null, null), null);
private final BlockingQueue<NominatimResult> documents = new LinkedBlockingDeque<>(100);
private final AtomicLong counter = new AtomicLong();
private final Importer importer;
private final Thread thread;
Expand Down Expand Up @@ -70,7 +70,8 @@ public void finish() {
Thread.currentThread().interrupt();
}
}
LOGGER.info("Finished import of {} photon documents.", counter.longValue());
LOGGER.info("Finished import of {} photon documents. (Total processing time: {}s)",
counter.longValue(), (System.currentTimeMillis() - startMillis)/1000);
}

private class ImportRunnable implements Runnable {
Expand Down
Loading