From fe7348f052b93561f6698758d23a2a944f9b0459 Mon Sep 17 00:00:00 2001 From: Emillio Mariscal Date: Tue, 27 Feb 2024 14:37:24 -0300 Subject: [PATCH] Refactor, working on relations --- setup/bootstrap.sh | 2 +- setup/db/underpass.sql | 17 +++++++ src/bootstrap/bootstrap.cc | 68 ++++++++++++++------------ src/bootstrap/bootstrap.hh | 29 +++++++++--- src/raw/queryraw.cc | 18 ++++--- src/raw/queryraw.hh | 5 +- src/replicator/threads.cc | 46 +++++++++++------- src/underpass.cc | 5 +- src/underpassconfig.hh | 4 ++ src/validate/queryvalidate.cc | 89 +++++++++++++++++++++++++++++++++++ src/validate/queryvalidate.hh | 7 +++ 11 files changed, 224 insertions(+), 66 deletions(-) diff --git a/setup/bootstrap.sh b/setup/bootstrap.sh index 4c4a6b0e..db0cd30d 100644 --- a/setup/bootstrap.sh +++ b/setup/bootstrap.sh @@ -107,7 +107,7 @@ then PGPASSWORD=$PASS psql --host $HOST --user $USER --port $PORT $DB < db/indexes.sql echo "Configuring Underpass ..." - python3 poly2geojson.py $COUNTRY.poly + python3 ../utils/poly2geojson.py $COUNTRY.poly if "$use_docker"; then docker cp $COUNTRY.geojson underpass:/etc/underpass/priority.geojson diff --git a/setup/db/underpass.sql b/setup/db/underpass.sql index bae39f2f..5e8e2781 100644 --- a/setup/db/underpass.sql +++ b/setup/db/underpass.sql @@ -94,11 +94,28 @@ CREATE TABLE IF NOT EXISTS public.nodes ( uid int8 ); +CREATE TABLE IF NOT EXISTS public.rels ( + osm_id int8, + changeset int8, + geom public.geometry(Geometry,4326), + tags JSONB, + members int8[], + timestamp timestamp with time zone, + version int, + "user" text, + uid int8 +); + CREATE TABLE IF NOT EXISTS public.way_refs ( way_id int8, node_id int8 ); +CREATE TABLE IF NOT EXISTS public.rel_members ( + rel_id int8, + way_id int8 +); + CREATE UNIQUE INDEX nodes_id_idx ON public.nodes (osm_id DESC); CREATE UNIQUE INDEX ways_poly_id_idx ON public.ways_poly (osm_id DESC); CREATE UNIQUE INDEX ways_line_id_idx ON public.ways_line(osm_id DESC); diff --git a/src/bootstrap/bootstrap.cc b/src/bootstrap/bootstrap.cc index 13b4c6b1..474d2483 100644 --- a/src/bootstrap/bootstrap.cc +++ b/src/bootstrap/bootstrap.cc @@ -45,10 +45,10 @@ using namespace logger; namespace bootstrap { -const int PAGE_SIZE = 100; +Bootstrap::Bootstrap(void) {} std::string -allTasksQueries(std::shared_ptr> tasks) { +Bootstrap::allTasksQueries(std::shared_ptr> tasks) { std::string queries = ""; for (auto it = tasks->begin(); it != tasks->end(); ++it) { queries += it->query ; @@ -56,10 +56,10 @@ allTasksQueries(std::shared_ptr> tasks) { return queries; } -void startProcessingWays(const underpassconfig::UnderpassConfig &config) { - +void +Bootstrap::start(const underpassconfig::UnderpassConfig &config) { std::cout << "Connecting to the database ..." << std::endl; - auto db = std::make_shared(); + db = std::make_shared(); if (!db->connect(config.underpass_db_url)) { std::cout << "Could not connect to Underpass DB, aborting bootstrapping thread!" << std::endl; return; @@ -81,10 +81,23 @@ void startProcessingWays(const underpassconfig::UnderpassConfig &config) { log_debug("Couldn't load plugin! %1%", e.what()); exit(0); } - auto validator = creator(); - auto queryvalidate = std::make_shared(db); - auto queryraw = std::make_shared(db); + validator = creator(); + queryvalidate = std::make_shared(db); + queryraw = std::make_shared(db); + page_size = config.bootstrap_page_size; + concurrency = config.concurrency; + norefs = config.norefs; + + processWays(); + // processNodes(); + // processRels(); + +} + +void +Bootstrap::processWays() { + std::vector tables = { QueryRaw::polyTable, QueryRaw::lineTable @@ -94,27 +107,25 @@ void startProcessingWays(const underpassconfig::UnderpassConfig &config) { std::cout << std::endl << "Counting geometries ... " << std::endl; long int total = queryraw->getWaysCount(*table_it); long int count = 0; - int num_chunks = total / PAGE_SIZE; + int num_chunks = total / page_size; std::cout << "Total: " << total << std::endl; - std::cout << "Threads: " << config.concurrency << std::endl; - std::cout << "Page size: " << PAGE_SIZE << std::endl; + std::cout << "Threads: " << concurrency << std::endl; + std::cout << "Page size: " << page_size << std::endl; long lastid = 0; - int concurrentTasks = config.concurrency; + int concurrentTasks = concurrency; int taskIndex = 0; - std::chrono::steady_clock::time_point begin; - std::chrono::steady_clock::time_point end; for (int chunkIndex = 1; chunkIndex <= (num_chunks/concurrentTasks); chunkIndex++) { int percentage = (count * 100) / total; auto ways = std::make_shared>(); - if (!config.norefs) { - ways = queryraw->getWaysFromDB(lastid, config.concurrency * PAGE_SIZE, *table_it); + if (!norefs) { + ways = queryraw->getWaysFromDB(lastid, concurrency * page_size, *table_it); } else { - ways = queryraw->getWaysFromDBWithoutRefs(lastid, config.concurrency * PAGE_SIZE, *table_it); + ways = queryraw->getWaysFromDBWithoutRefs(lastid, concurrency * page_size, *table_it); } auto tasks = std::make_shared>(concurrentTasks); @@ -122,15 +133,13 @@ void startProcessingWays(const underpassconfig::UnderpassConfig &config) { for (int taskIndex = 0; taskIndex < concurrentTasks; taskIndex++) { auto taskWays = std::make_shared>(); WayTask wayTask { - std::ref(validator), - std::ref(queryvalidate), - config, taskIndex, std::ref(tasks), std::ref(ways), }; std::cout << "\r" << "Processing " << *table_it << ": " << count << "/" << total << " (" << percentage << "%)"; - boost::asio::post(pool, boost::bind(threadBootstrapTask, wayTask)); + + boost::asio::post(pool, boost::bind(&Bootstrap::threadBootstrapTask, this, wayTask)); } pool.join(); @@ -148,14 +157,11 @@ void startProcessingWays(const underpassconfig::UnderpassConfig &config) { // This thread get started for every page of way void -threadBootstrapTask(WayTask wayTask) +Bootstrap::threadBootstrapTask(WayTask wayTask) { #ifdef TIMING_DEBUG boost::timer::auto_cpu_timer timer("bootstrap::threadBootstrapTask(wayTask): took %w seconds\n"); #endif - auto plugin = wayTask.plugin; - auto queryvalidate = wayTask.queryvalidate; - auto config = wayTask.config; auto taskIndex = wayTask.taskIndex; auto tasks = wayTask.tasks; auto ways = wayTask.ways; @@ -163,16 +169,15 @@ threadBootstrapTask(WayTask wayTask) BootstrapTask task; int processed = 0; + auto wayval = std::make_shared>>(); + // Proccesing ways - for (int i = 0; i < PAGE_SIZE; ++i) { + for (int i = 0; i < page_size; ++i) { if (i * taskIndex < ways->size()) { auto way = ways->at(i * (taskIndex + 1)); - auto status = plugin->checkWay(way, "building"); - for (auto status_it = status->status.begin(); status_it != status->status.end(); ++status_it) { - task.query += queryvalidate->applyChange(*status, *status_it); - } + wayval->push_back(validator->checkWay(way, "building")); // Fill the way_refs table - if (!config.norefs) { + if (!norefs) { for (auto ref = way.refs.begin(); ref != way.refs.end(); ++ref) { task.query += "INSERT INTO way_refs (way_id, node_id) VALUES (" + std::to_string(way.id) + "," + std::to_string(*ref) + "); "; } @@ -180,6 +185,7 @@ threadBootstrapTask(WayTask wayTask) ++processed; } } + queryvalidate->ways(wayval, task.query); task.processed = processed; const std::lock_guard lock(tasks_change_mutex); (*tasks)[taskIndex] = task; diff --git a/src/bootstrap/bootstrap.hh b/src/bootstrap/bootstrap.hh index 8210ad99..3758ea7c 100644 --- a/src/bootstrap/bootstrap.hh +++ b/src/bootstrap/bootstrap.hh @@ -21,6 +21,7 @@ #include "raw/queryraw.hh" #include "underpassconfig.hh" #include "validate/validate.hh" +#include using namespace queryvalidate; using namespace queryraw; @@ -37,18 +38,34 @@ struct BootstrapTask { }; struct WayTask { - std::shared_ptr plugin; - std::shared_ptr queryvalidate; - underpassconfig::UnderpassConfig config; int taskIndex; std::shared_ptr> tasks; std::shared_ptr> ways; }; -void startProcessingWays(const underpassconfig::UnderpassConfig &config); +class Bootstrap { + public: + Bootstrap(void); + ~Bootstrap(void){}; + + static const std::string polyTable; + static const std::string lineTable; + + void start(const underpassconfig::UnderpassConfig &config); + void processWays(); -// This thread get started for every page of way -void threadBootstrapTask(WayTask wayTask); + // This thread get started for every page of way + void threadBootstrapTask(WayTask wayTask); + std::string allTasksQueries(std::shared_ptr> tasks); + + std::shared_ptr validator; + std::shared_ptr queryvalidate; + std::shared_ptr queryraw; + std::shared_ptr db; + bool norefs; + unsigned int concurrency; + unsigned int page_size; +}; static std::mutex tasks_change_mutex; diff --git a/src/raw/queryraw.cc b/src/raw/queryraw.cc index c75738ab..d1d03fd0 100644 --- a/src/raw/queryraw.cc +++ b/src/raw/queryraw.cc @@ -24,8 +24,6 @@ /// includes querying existing data in the database, as well as /// updating the database. -// TODO: add support for relations/multipolygon - // This is generated by autoconf #ifdef HAVE_CONFIG_H #include "unconfig.h" @@ -233,6 +231,12 @@ QueryRaw::applyChange(const OsmWay &way) const return query; } +std::string +QueryRaw::applyChange(const OsmRelation &relation) const +{ + +} + std::vector arrayStrToVector(std::string &refs_str) { refs_str.erase(0, 1); refs_str.erase(refs_str.size() - 1); @@ -261,10 +265,8 @@ void QueryRaw::getNodeCache(std::shared_ptr osmchanges, const mul if (way->action != osmobjects::remove) { // Save referenced nodes for later use for (auto rit = std::begin(way->refs); rit != std::end(way->refs); ++rit) { - if (way->action != osmobjects::remove) { - if (!osmchanges->nodecache.count(*rit)) { - referencedNodeIds += std::to_string(*rit) + ","; - } + if (!osmchanges->nodecache.count(*rit)) { + referencedNodeIds += std::to_string(*rit) + ","; } } } else { @@ -337,6 +339,10 @@ void QueryRaw::getNodeCache(std::shared_ptr osmchanges, const mul } } + // Build relation multipolyon geometries + // TODO + + } void diff --git a/src/raw/queryraw.hh b/src/raw/queryraw.hh index 20c99eda..4396bb71 100644 --- a/src/raw/queryraw.hh +++ b/src/raw/queryraw.hh @@ -64,14 +64,15 @@ class QueryRaw { std::string applyChange(const OsmNode &node) const; /// Build query for processed Way std::string applyChange(const OsmWay &way) const; - /// Build query for updating modified nodes in ways geometries - // std::string applyChange(const std::shared_ptr>> nodes) const; + /// Build query for processed Relation + std::string applyChange(const OsmRelation &relation) const; /// Get nodes for filling Node cache void getNodeCache(std::shared_ptr osmchanges, const multipolygon_t &poly); /// Get nodes for filling Node cache from ways refs void getNodeCacheFromWays(std::shared_ptr> ways, std::map &nodecache) const; // Get ways by refs std::list> getWaysByNodesRefs(std::string &nodeIds) const; + // DB connection std::shared_ptr dbconn; // Get ways count diff --git a/src/replicator/threads.cc b/src/replicator/threads.cc index 670d2565..9187ab98 100644 --- a/src/replicator/threads.cc +++ b/src/replicator/threads.cc @@ -479,6 +479,7 @@ threadOsmChange(OsmChangeTask osmChangeTask) auto removed_nodes = std::make_shared>(); auto removed_ways = std::make_shared>(); + auto removed_relations = std::make_shared>(); auto validation_removals = std::make_shared>(); // Raw data and validation @@ -524,6 +525,25 @@ threadOsmChange(OsmChangeTask osmChangeTask) } } + // Relations + // for (auto rit = std::begin(change->relations); rit != std::end(change->relations); ++rit) { + // osmobjects::OsmRelation *relation = rit->get(); + + // if (relation->action != osmobjects::remove && !relation->priority) { + // continue; + // } + + // // Remove deleted relations from validation table + // if (!config->disable_validation && relation->action == osmobjects::remove) { + // removed_relations->push_back(relation->id); + // } + + // // Update relations, ignore new ones outside priority area + // if (!config->disable_raw) { + // task.query += queryraw->applyChange(*relation); + // } + // } + } } @@ -532,30 +552,20 @@ threadOsmChange(OsmChangeTask osmChangeTask) // Validate ways auto wayval = osmchanges->validateWays(poly, plugin); - for (auto it = wayval->begin(); it != wayval->end(); ++it) { + queryvalidate->ways(wayval, task.query, validation_removals); - if (it->get()->status.size() > 0) { - for (auto status_it = it->get()->status.begin(); status_it != it->get()->status.end(); ++status_it) { - task.query += queryvalidate->applyChange(*it->get(), *status_it); - } - if (!it->get()->hasStatus(overlapping)) { - task.query += queryvalidate->updateValidation(it->get()->osm_id, overlapping, "building"); - } - if (!it->get()->hasStatus(duplicate)) { - task.query += queryvalidate->updateValidation(it->get()->osm_id, duplicate, "building"); - } - if (!it->get()->hasStatus(badgeom)) { - task.query += queryvalidate->updateValidation(it->get()->osm_id, badgeom, "building"); - } - } else { - validation_removals->push_back(it->get()->osm_id); - } - } + // Validate nodes + auto nodeval = osmchanges->validateNodes(poly, plugin); + queryvalidate->nodes(nodeval, task.query, validation_removals); + + // Validate relations + // task.query += queryvalidate->rels(wayval, task.query, validation_removals); // Remove validation entries for removed objects task.query += queryvalidate->updateValidation(validation_removals); task.query += queryvalidate->updateValidation(removed_nodes); task.query += queryvalidate->updateValidation(removed_ways); + // task.query += queryvalidate->updateValidation(removed_relations); } diff --git a/src/underpass.cc b/src/underpass.cc index b16bec4c..723db754 100644 --- a/src/underpass.cc +++ b/src/underpass.cc @@ -329,8 +329,9 @@ main(int argc, char *argv[]) if (vm.count("bootstrap")){ std::thread bootstrapThread; - std::cout << "Starting bootstrapping proccess ..." << std::endl; - bootstrapThread = std::thread(bootstrap::startProcessingWays, std::ref(config)); + std::cout << "Starting bootstrapping process ..." << std::endl; + auto boostrapper = bootstrap::Bootstrap(); + bootstrapThread = std::thread(&bootstrap::Bootstrap::start, &boostrapper, std::ref(config)); log_info("Waiting..."); if (bootstrapThread.joinable()) { bootstrapThread.join(); diff --git a/src/underpassconfig.hh b/src/underpassconfig.hh index 45446e14..85f7abf7 100644 --- a/src/underpassconfig.hh +++ b/src/underpassconfig.hh @@ -110,6 +110,9 @@ struct UnderpassConfig { if (yaml.contains_key("underpass_db_url")) { underpass_db_url = yamlConfig.get_value("underpass_db_url"); } + if (yaml.contains_key("bootstrap_page_size")) { + bootstrap_page_size = std::stoul(yamlConfig.get_value("bootstrap_page_size")); + } if (yaml.contains_key("planet_servers")) { std::vector planet_servers_config = yamlConfig.get_values("planet_servers"); for (auto it = planet_servers_config.begin(); it != planet_servers_config.end(); ++it) { @@ -155,6 +158,7 @@ struct UnderpassConfig { std::string datadir; std::vector planet_servers; unsigned int concurrency = 1; + unsigned int bootstrap_page_size = 100; frequency_t frequency = frequency_t::minutely; ptime start_time = not_a_date_time; ///< Starting time for changesets and OSM changes import diff --git a/src/validate/queryvalidate.cc b/src/validate/queryvalidate.cc index 499beae6..afdd11db 100644 --- a/src/validate/queryvalidate.cc +++ b/src/validate/queryvalidate.cc @@ -117,6 +117,17 @@ QueryValidate::updateValidation(long osm_id, const valerror_t &status, const std return query; } +std::string +QueryValidate::updateValidation(long osm_id, const valerror_t &status) const +{ + std::string format = "DELETE FROM validation WHERE osm_id = %d and status = '%s';"; + boost::format fmt(format); + fmt % osm_id; + fmt % status_list[status]; + std::string query = fmt.str(); + return query; +} + std::string QueryValidate::applyChange(const ValidateStatus &validation, const valerror_t &status) const { @@ -171,6 +182,84 @@ QueryValidate::applyChange(const ValidateStatus &validation, const valerror_t &s return query; } + +void +QueryValidate::ways( + std::shared_ptr>> wayval, + std::string &task_query +) { + for (auto it = wayval->begin(); it != wayval->end(); ++it) { + if (it->get()->status.size() > 0) { + for (auto status_it = it->get()->status.begin(); status_it != it->get()->status.end(); ++status_it) { + task_query += applyChange(*it->get(), *status_it); + } + } + } +} + +void +QueryValidate::ways( + std::shared_ptr>> wayval, + std::string &task_query, + std::shared_ptr> validation_removals +) { + for (auto it = wayval->begin(); it != wayval->end(); ++it) { + if (it->get()->status.size() > 0) { + for (auto status_it = it->get()->status.begin(); status_it != it->get()->status.end(); ++status_it) { + task_query += applyChange(*it->get(), *status_it); + } + if (!it->get()->hasStatus(overlapping)) { + task_query += updateValidation(it->get()->osm_id, overlapping, "building"); + } + if (!it->get()->hasStatus(duplicate)) { + task_query += updateValidation(it->get()->osm_id, duplicate, "building"); + } + if (!it->get()->hasStatus(badgeom)) { + task_query += updateValidation(it->get()->osm_id, badgeom, "building"); + } + if (!it->get()->hasStatus(badvalue)) { + task_query += updateValidation(it->get()->osm_id, badvalue); + } + } else { + validation_removals->push_back(it->get()->osm_id); + } + } +} + +void +QueryValidate::nodes( + std::shared_ptr>> nodeval, + std::string &task_query +) { + for (auto it = nodeval->begin(); it != nodeval->end(); ++it) { + if (it->get()->status.size() > 0) { + for (auto status_it = it->get()->status.begin(); status_it != it->get()->status.end(); ++status_it) { + task_query += applyChange(*it->get(), *status_it); + } + } + } +} + +void +QueryValidate::nodes( + std::shared_ptr>> nodeval, + std::string &task_query, + std::shared_ptr> validation_removals +) { + for (auto it = nodeval->begin(); it != nodeval->end(); ++it) { + if (it->get()->status.size() > 0) { + for (auto status_it = it->get()->status.begin(); status_it != it->get()->status.end(); ++status_it) { + task_query += applyChange(*it->get(), *status_it); + } + if (!it->get()->hasStatus(badvalue)) { + task_query += updateValidation(it->get()->osm_id, badvalue); + } + } else { + validation_removals->push_back(it->get()->osm_id); + } + } +} + } // namespace queryvalidate // local Variables: diff --git a/src/validate/queryvalidate.hh b/src/validate/queryvalidate.hh index f3b16b5c..f3c3536f 100644 --- a/src/validate/queryvalidate.hh +++ b/src/validate/queryvalidate.hh @@ -78,6 +78,13 @@ class QueryValidate { /// Update the validation table, delete any feature that has been fixed. std::string updateValidation(std::shared_ptr> removals); std::string updateValidation(long osm_id, const valerror_t &status, const std::string &source) const; + std::string updateValidation(long osm_id, const valerror_t &status) const; + void ways(std::shared_ptr>> wayval, std::string &task_query); + void nodes(std::shared_ptr>> nodeval, std::string &task_query); + void rels(std::shared_ptr>> relval, std::string &task_query); + void ways(std::shared_ptr>> wayval, std::string &task_query, std::shared_ptr> validation_removals); + void nodes(std::shared_ptr>> nodeval, std::string &task_query, std::shared_ptr> validation_removals); + void rels(std::shared_ptr>> relval, std::string &task_query, std::shared_ptr> validation_removals); // Database connection, used for escape strings std::shared_ptr dbconn; };