diff --git a/setup/db/indexes.sql b/setup/db/indexes.sql index d4646450..ee5c5d64 100644 --- a/setup/db/indexes.sql +++ b/setup/db/indexes.sql @@ -2,10 +2,6 @@ CREATE UNIQUE INDEX nodes_id_idx ON public.nodes (osm_id DESC); CREATE UNIQUE INDEX ways_poly_id_idx ON public.ways_poly (osm_id DESC); CREATE UNIQUE INDEX ways_line_id_idx ON public.ways_line(osm_id DESC); CREATE UNIQUE INDEX relations_id_idx ON public.relations(osm_id DESC); -CREATE INDEX way_refs_node_id_idx ON public.way_refs (node_id); -CREATE INDEX way_refs_way_id_idx ON public.way_refs (way_id); -CREATE INDEX rel_refs_rel_id_idx ON public.rel_refs (rel_id); -CREATE INDEX rel_refs_way_id_idx ON public.rel_refs (way_id); CREATE INDEX nodes_version_idx ON public.nodes (version); CREATE INDEX ways_poly_version_idx ON public.ways_poly (version); diff --git a/setup/db/underpass.sql b/setup/db/underpass.sql index 3d30dfd4..8ba98ac2 100644 --- a/setup/db/underpass.sql +++ b/setup/db/underpass.sql @@ -109,25 +109,10 @@ CREATE TABLE IF NOT EXISTS public.relations ( ALTER TABLE ONLY public.relations ADD CONSTRAINT relations_pkey PRIMARY KEY (osm_id); -CREATE TABLE IF NOT EXISTS public.way_refs ( - way_id int8, - node_id int8 -); - -CREATE TABLE IF NOT EXISTS public.rel_refs ( - rel_id int8, - way_id int8 -); - CREATE UNIQUE INDEX nodes_id_idx ON public.nodes (osm_id DESC); CREATE UNIQUE INDEX ways_poly_id_idx ON public.ways_poly (osm_id DESC); CREATE UNIQUE INDEX ways_line_id_idx ON public.ways_line(osm_id DESC); CREATE UNIQUE INDEX relations_id_idx ON public.relations(osm_id DESC); -CREATE INDEX way_refs_node_id_idx ON public.way_refs (node_id); -CREATE INDEX way_refs_way_id_idx ON public.way_refs (way_id); - -CREATE INDEX rel_refs_rel_id_idx ON public.rel_refs (rel_id); -CREATE INDEX rel_refs_way_id_idx ON public.rel_refs (way_id); CREATE INDEX nodes_version_idx ON public.nodes (version); CREATE INDEX ways_poly_version_idx ON public.ways_poly (version); diff --git a/src/bootstrap/bootstrap.cc b/src/bootstrap/bootstrap.cc index 805d6d13..5b484a65 100644 --- a/src/bootstrap/bootstrap.cc +++ b/src/bootstrap/bootstrap.cc @@ -53,8 +53,12 @@ BootstrapQueries Bootstrap::allTasksQueries(std::shared_ptr> tasks) { BootstrapQueries queries; for (auto it = tasks->begin(); it != tasks->end(); ++it) { - queries.underpass += it->query ; - queries.osm += it->osmquery ; + for (auto itt = it->query.begin(); itt != it->query.end(); ++itt) { + queries.underpass.push_back(*itt); + } + for (auto itt = it->osmquery.begin(); itt != it->osmquery.end(); ++itt) { + queries.osm.push_back(*itt); + } } return queries; } @@ -154,8 +158,12 @@ Bootstrap::processWays() { auto queries = allTasksQueries(tasks); - db->query(queries.underpass); - osmdb->query(queries.osm); + for (auto it = queries.underpass.begin(); it != queries.underpass.end(); ++it) { + db->query(*it); + } + for (auto it = queries.osm.begin(); it != queries.osm.end(); ++it) { + osmdb->query(*it); + } lastid = ways->back().id; for (auto it = tasks->begin(); it != tasks->end(); ++it) { @@ -206,8 +214,12 @@ Bootstrap::processNodes() { pool.join(); auto queries = allTasksQueries(tasks); - db->query(queries.underpass); - osmdb->query(queries.osm); + for (auto it = queries.underpass.begin(); it != queries.underpass.end(); ++it) { + db->query(*it); + } + for (auto it = queries.osm.begin(); it != queries.osm.end(); ++it) { + osmdb->query(*it); + } lastid = nodes->back().id; for (auto it = tasks->begin(); it != tasks->end(); ++it) { count += it->processed; @@ -256,9 +268,12 @@ Bootstrap::processRelations() { pool.join(); auto queries = allTasksQueries(tasks); - db->query(queries.underpass); - osmdb->query(queries.osm); - + for (auto it = queries.underpass.begin(); it != queries.underpass.end(); ++it) { + db->query(*it); + } + for (auto it = queries.osm.begin(); it != queries.osm.end(); ++it) { + osmdb->query(*it); + } lastid = relations->back().id; for (auto it = tasks->begin(); it != tasks->end(); ++it) { count += it->processed; @@ -292,16 +307,15 @@ Bootstrap::threadBootstrapWayTask(WayTask wayTask) if (i < ways->size()) { auto way = ways->at(i); wayval->push_back(validator->checkWay(way, "building")); - // Fill the way_refs table - if (!norefs) { - for (auto ref = way.refs.begin(); ref != way.refs.end(); ++ref) { - task.osmquery += "INSERT INTO way_refs (way_id, node_id) VALUES (" + std::to_string(way.id) + "," + std::to_string(*ref) + "); "; - } - } ++processed; } } - queryvalidate->ways(wayval, task.query); + + auto result = queryvalidate->ways(wayval); + for (auto it = result->begin(); it != result->end(); ++it) { + task.query.push_back(*it); + } + task.processed = processed; const std::lock_guard lock(tasks_change_mutex); (*tasks)[taskIndex] = task; @@ -337,7 +351,12 @@ Bootstrap::threadBootstrapNodeTask(NodeTask nodeTask) ++processed; } } - queryvalidate->nodes(nodeval, task.query); + + auto result = queryvalidate->nodes(nodeval); + for (auto it = result->begin(); it != result->end(); ++it) { + task.query.push_back(*it); + } + task.processed = processed; const std::lock_guard lock(tasks_change_mutex); (*tasks)[taskIndex] = task; @@ -367,7 +386,7 @@ Bootstrap::threadBootstrapRelationTask(RelationTask relationTask) // relationval->push_back(validator->checkRelation(way, "building")); // Fill the rel_refs table for (auto mit = relation.members.begin(); mit != relation.members.end(); ++mit) { - task.osmquery += "INSERT INTO rel_refs (rel_id, way_id) VALUES (" + std::to_string(relation.id) + "," + std::to_string(mit->ref) + "); "; + task.osmquery.push_back("INSERT INTO rel_refs (rel_id, way_id) VALUES (" + std::to_string(relation.id) + "," + std::to_string(mit->ref) + "); "); } ++processed; } diff --git a/src/bootstrap/bootstrap.hh b/src/bootstrap/bootstrap.hh index 1b58fa41..bbe859c2 100644 --- a/src/bootstrap/bootstrap.hh +++ b/src/bootstrap/bootstrap.hh @@ -33,19 +33,18 @@ namespace bootstrap { /// \struct BootstrapTask /// \brief Represents a bootstrap task struct BootstrapTask { - std::string query = ""; - std::string osmquery = ""; + std::vector query; + std::vector osmquery; int processed = 0; }; /// \struct BootstrapQueries /// \brief Represents a bootstrap queries list struct BootstrapQueries { - std::string underpass = ""; - std::string osm = ""; + std::vector underpass; + std::vector osm; }; - struct WayTask { int taskIndex; std::shared_ptr> tasks; diff --git a/src/data/pq.cc b/src/data/pq.cc index 05c2222c..ea7af0ad 100644 --- a/src/data/pq.cc +++ b/src/data/pq.cc @@ -152,8 +152,15 @@ Pq::query(const std::string &query) { std::scoped_lock write_lock{pqxx_mutex}; pqxx::work worker(*sdb); - auto result = worker.exec(query); - worker.commit(); + pqxx::result result; + try { + result = worker.exec(query); + worker.commit(); + } catch (std::exception &e) { + log_error("ERROR executing query %1%", e.what()); + // Return an empty result so higher level code can handle the error + return result; + } return result; } diff --git a/src/osm/osmchange.cc b/src/osm/osmchange.cc index 4418cdd0..e5f5a4b9 100644 --- a/src/osm/osmchange.cc +++ b/src/osm/osmchange.cc @@ -697,7 +697,7 @@ OsmChange::dump(void) // rel->dump(); // } // } - std::cerr << "Final timestamp: " << to_simple_string(final_entry) << std::endl; + // std::cerr << "Final timestamp: " << to_simple_string(final_entry) << std::endl; } diff --git a/src/raw/queryraw.cc b/src/raw/queryraw.cc index 4e27b94a..d7e30a4e 100644 --- a/src/raw/queryraw.cc +++ b/src/raw/queryraw.cc @@ -170,15 +170,17 @@ std::vector> parseJSONArrayStr(std::string in return arr; } + // Apply the change for a Node. It will return a string of a query for // insert, update or delete the Node in the database. -std::string +std::shared_ptr> QueryRaw::applyChange(const OsmNode &node) const { - std::string query; + auto queries = std::make_shared>(); + // If create or modify, then insert or update if (node.action == osmobjects::create || node.action == osmobjects::modify) { - query = "INSERT INTO nodes as r (osm_id, geom, tags, timestamp, version, \"user\", uid, changeset) VALUES("; + std::string query = "INSERT INTO nodes as r (osm_id, geom, tags, timestamp, version, \"user\", uid, changeset) VALUES("; std::string format = "%d, ST_GeomFromText(\'%s\', 4326), %s, \'%s\', %d, \'%s\', %d, %d \ ) ON CONFLICT (osm_id) DO UPDATE SET geom = ST_GeomFromText(\'%s\', \ 4326), tags = %s, timestamp = \'%s\', version = %d, \"user\" = \'%s\', uid = %d, changeset = %d WHERE r.version < %d;"; @@ -218,14 +220,15 @@ QueryRaw::applyChange(const OsmNode &node) const fmt % node.changeset; fmt % node.version; - query += fmt.str(); + query.append(fmt.str()); + queries->push_back(query); // If remove, then delete the object } else if (node.action == osmobjects::remove) { - query = "DELETE from nodes where osm_id = " + std::to_string(node.id) + ";"; + queries->push_back("DELETE from nodes where osm_id = " + std::to_string(node.id) + ";"); } - return query; + return queries; } const std::string QueryRaw::polyTable = "ways_poly"; @@ -233,10 +236,11 @@ const std::string QueryRaw::lineTable = "ways_line"; // Apply the change for a Way. It will return a string of a query for // insert, update or delete the Way in the database. -std::string +std::shared_ptr> QueryRaw::applyChange(const OsmWay &way) const { - std::string query = ""; + auto queries = std::make_shared>(); + std::string query; const std::string* tableName; // Get a Polygon or LineString geometry string depending on the Way @@ -314,12 +318,7 @@ QueryRaw::applyChange(const OsmWay &way) const fmt % way.version; query += fmt.str(); - - // Refresh all refs stored into the way_refs table - query += "DELETE FROM way_refs WHERE way_id=" + std::to_string(way.id) + ";"; - for (auto ref = way.refs.begin(); ref != way.refs.end(); ++ref) { - query += "INSERT INTO way_refs (way_id, node_id) VALUES (" + std::to_string(way.id) + "," + std::to_string(*ref) + ");"; - } + queries->push_back(query); } else { @@ -344,6 +343,7 @@ QueryRaw::applyChange(const OsmWay &way) const fmt % way.id; query += fmt.str(); + queries->push_back(query); } // If the Way's geometry is a LineString, remove all Polygons from the Polygons table. @@ -358,27 +358,27 @@ QueryRaw::applyChange(const OsmWay &way) const delquery_fmt % QueryRaw::polyTable; } delquery_fmt % way.id; - query += delquery_fmt.str(); + queries->push_back(delquery_fmt.str()); } } else if (way.action == osmobjects::remove) { // Delete a Way geometry and its references. - query += "DELETE FROM way_refs WHERE way_id=" + std::to_string(way.id) + ";"; if (tableName == &QueryRaw::polyTable) { - query += "DELETE FROM " + QueryRaw::polyTable + " where osm_id = " + std::to_string(way.id) + ";"; + queries->push_back("DELETE FROM " + QueryRaw::polyTable + " where osm_id = " + std::to_string(way.id) + ";"); } else { - query += "DELETE FROM " + QueryRaw::lineTable + " where osm_id = " + std::to_string(way.id) + ";"; + queries->push_back("DELETE FROM " + QueryRaw::lineTable + " where osm_id = " + std::to_string(way.id) + ";"); } } - return query; + return queries; } // Apply the change for a Relation. It will return a string of a query for // insert, update or delete the Relation in the database. -std::string +std::shared_ptr> QueryRaw::applyChange(const OsmRelation &relation) const { - std::string query = ""; + auto queries = std::make_shared>(); + std::string query; // Create, modify or modify the geometry of a Relation if (relation.action == osmobjects::create || relation.action == osmobjects::modify || relation.action == osmobjects::modify_geom) { @@ -444,13 +444,8 @@ QueryRaw::applyChange(const OsmRelation &relation) const fmt % relation.changeset; fmt % relation.version; - query += fmt.str(); - - // Refresh all refs stored into the rel_refs table - query += "DELETE FROM rel_refs WHERE rel_id=" + std::to_string(relation.id) + ";"; - for (auto mit = relation.members.begin(); mit != relation.members.end(); ++mit) { - query += "INSERT INTO rel_refs (rel_id, way_id) VALUES (" + std::to_string(relation.id) + "," + std::to_string(mit->ref) + ");"; - } + query.append(fmt.str()); + queries->push_back(query); } else { @@ -474,15 +469,16 @@ QueryRaw::applyChange(const OsmRelation &relation) const // osm_id fmt % relation.id; - query += fmt.str(); + query.append(fmt.str()); + queries->push_back(query); } } } else if (relation.action == osmobjects::remove) { // Delete a Relation geometry and its references. - query += "DELETE FROM relations where osm_id = " + std::to_string(relation.id) + ";"; + queries->push_back("DELETE FROM relations where osm_id = " + std::to_string(relation.id) + ";"); } - return query; + return queries; } // Receives a string of comma separated values and @@ -515,7 +511,7 @@ QueryRaw::getRelationsByWaysRefs(std::string &wayIds) const std::list> rels; // Query for getting Relations - std::string relsQuery = "SELECT distinct(osm_id), refs, version, tags, uid, changeset from rel_refs join relations r on r.osm_id = rel_id where way_id = any(ARRAY[" + wayIds + "])"; + std::string relsQuery = "SELECT distinct(osm_id), refs, version, tags, uid, changeset FROM relations WHERE EXISTS (SELECT 1 FROM jsonb_array_elements(refs) AS ref WHERE (ref->>'ref')::bigint IN (" + wayIds + "));"; auto rels_result = dbconn->query(relsQuery); // Fill vector with OsmRelation objects @@ -568,6 +564,10 @@ QueryRaw::getWaysByIds(std::string &waysIds, std::mapquery(waysQuery); + if (ways_result.size() == 0) { + log_debug("No results returned!"); + return; + } std::string resultIds = ""; @@ -726,6 +726,10 @@ void QueryRaw::buildGeometries(std::shared_ptr osmchanges, const // Get Nodes geoemtries from DB std::string nodesQuery = "SELECT osm_id, st_x(geom) as lat, st_y(geom) as lon FROM nodes where osm_id in (" + referencedNodeIds + ");"; auto result = dbconn->query(nodesQuery); + if (result.size() == 0) { + log_debug("No results returned!"); + return; + } // Fill nodecache for (auto node_it = result.begin(); node_it != result.end(); ++node_it) { auto node_id = (*node_it)[0].as(); @@ -829,6 +833,10 @@ QueryRaw::getNodeCacheFromWays(std::shared_ptr> ways, std::m // Get Nodes geometries from the DB std::string nodesQuery = "SELECT osm_id, st_x(geom) as lat, st_y(geom) as lon FROM nodes where osm_id in (" + nodeIds + ") and st_x(geom) is not null and st_y(geom) is not null;"; auto result = dbconn->query(nodesQuery); + if (result.size() == 0) { + log_debug("No results returned!"); + return; + } // Fill nodecache with Nodes geometries (Points) for (auto node_it = result.begin(); node_it != result.end(); ++node_it) { @@ -841,7 +849,7 @@ QueryRaw::getNodeCacheFromWays(std::shared_ptr> ways, std::m } } -// Recive a string of comma separated values of Nodes ids +// Recieve a string of comma separated values of Nodes ids // and return a vector of Ways std::list> QueryRaw::getWaysByNodesRefs(std::string &nodeIds) const @@ -850,38 +858,49 @@ QueryRaw::getWaysByNodesRefs(std::string &nodeIds) const boost::timer::auto_cpu_timer timer("getWaysByNodesRefs(nodeIds): took %w seconds\n"); #endif std::list> ways; + std::vector queries; // Get all Ways that have references to Nodes from the DB, including Polygons and LineString geometries - std::string waysQuery = "SELECT distinct(osm_id), refs, version, tags, uid, changeset from way_refs join ways_poly wp on wp.osm_id = way_id where node_id = any(ARRAY[" + nodeIds + "])"; - waysQuery += " UNION SELECT distinct(osm_id), refs, version, tags, uid, changeset from way_refs join ways_line wl on wl.osm_id = way_id where node_id = any(ARRAY[" + nodeIds + "]);"; - auto ways_result = dbconn->query(waysQuery); + // std::string waysQuery = "SELECT distinct(osm_id), refs, version, tags, uid, changeset from way_refs join ways_poly wp on wp.osm_id = way_id where node_id = any(ARRAY[" + nodeIds + "])"; + queries.push_back("SELECT distinct(osm_id), refs, version, tags, uid, changeset from ways_poly where refs @> '{" + nodeIds + "}'"); - // Create Ways objects and fill the vector - for (auto way_it = ways_result.begin(); way_it != ways_result.end(); ++way_it) { - auto way = std::make_shared(); - way->id = (*way_it)[0].as(); - std::string refs_str = (*way_it)[1].as(); - if (refs_str.size() > 1) { - way->refs = arrayStrToVector(refs_str); + queries.push_back("SELECT distinct(osm_id), refs, version, tags, uid, changeset from ways_line where refs @> '{" + nodeIds + "}'"); + // waysQuery += " UNION SELECT distinct(osm_id), refs, version, tags, uid, changeset from way_refs join ways_line wl on wl.osm_id = way_id where node_id = any(ARRAY[" + nodeIds + "]);"; + + for (auto it = queries.begin(); it != queries.end(); ++it) { + + auto ways_result = dbconn->query(*it); + if (ways_result.size() == 0) { + log_debug("No results returned!"); + return ways; } - way->version = (*way_it)[2].as(); - auto tags = (*way_it)[3]; - if (!tags.is_null()) { - auto tags = parseJSONObjectStr((*way_it)[3].as()); - for (auto const& [key, val] : tags) - { - way->addTag(key, val); + + // Create Ways objects and fill the vector + for (auto way_it = ways_result.begin(); way_it != ways_result.end(); ++way_it) { + auto way = std::make_shared(); + way->id = (*way_it)[0].as(); + std::string refs_str = (*way_it)[1].as(); + if (refs_str.size() > 1) { + way->refs = arrayStrToVector(refs_str); } + way->version = (*way_it)[2].as(); + auto tags = (*way_it)[3]; + if (!tags.is_null()) { + auto tags = parseJSONObjectStr((*way_it)[3].as()); + for (auto const& [key, val] : tags) { + way->addTag(key, val); + } + } + auto uid = (*way_it)[4]; + if (!uid.is_null()) { + way->uid = (*way_it)[4].as(); + } + auto changeset = (*way_it)[5]; + if (!changeset.is_null()) { + way->changeset = (*way_it)[5].as(); + } + ways.push_back(way); } - auto uid = (*way_it)[4]; - if (!uid.is_null()) { - way->uid = (*way_it)[4].as(); - } - auto changeset = (*way_it)[5]; - if (!changeset.is_null()) { - way->changeset = (*way_it)[5].as(); - } - ways.push_back(way); } return ways; } @@ -905,9 +924,14 @@ QueryRaw::getNodesFromDB(long lastid, int pageSize) { } else { nodesQuery += ", version, tags FROM nodes order by osm_id desc limit " + std::to_string(pageSize) + ";"; } + auto nodes = std::make_shared>(); auto nodes_result = dbconn->query(nodesQuery); + if (nodes_result.size() == 0) { + log_debug("No results returned!"); + return nodes; + } + // Fill vector of OsmNode objects - auto nodes = std::make_shared>(); for (auto node_it = nodes_result.begin(); node_it != nodes_result.end(); ++node_it) { OsmNode node; node.id = (*node_it)[0].as(); @@ -949,9 +973,14 @@ QueryRaw::getWaysFromDB(long lastid, int pageSize, const std::string &tableName) waysQuery += ", version, tags FROM " + tableName + " order by osm_id desc limit " + std::to_string(pageSize) + ";"; } + auto ways = std::make_shared>(); auto ways_result = dbconn->query(waysQuery); + if (ways_result.size() == 0) { + log_debug("No results returned!"); + return ways; + } + // Fill vector of OsmWay objects - auto ways = std::make_shared>(); for (auto way_it = ways_result.begin(); way_it != ways_result.end(); ++way_it) { OsmWay way; way.id = (*way_it)[0].as(); @@ -999,9 +1028,14 @@ QueryRaw::getWaysFromDBWithoutRefs(long lastid, int pageSize, const std::string waysQuery += ", tags FROM " + tableName + " order by osm_id desc limit " + std::to_string(pageSize) + ";"; } + auto ways = std::make_shared>(); auto ways_result = dbconn->query(waysQuery); + if (ways_result.size() == 0) { + log_debug("No results returned!"); + return ways; + } + // Fill vector of OsmWay objects - auto ways = std::make_shared>(); for (auto way_it = ways_result.begin(); way_it != ways_result.end(); ++way_it) { OsmWay way; way.id = (*way_it)[0].as(); @@ -1040,9 +1074,13 @@ QueryRaw::getRelationsFromDB(long lastid, int pageSize) { relationsQuery += ", version, tags FROM relations order by osm_id desc limit " + std::to_string(pageSize) + ";"; } + auto relations = std::make_shared>(); auto relations_result = dbconn->query(relationsQuery); + if (relations_result.size() == 0) { + log_debug("No results returned!"); + return relations; + } // Fill vector of OsmRelation objects - auto relations = std::make_shared>(); for (auto rel_it = relations_result.begin(); rel_it != relations_result.end(); ++rel_it) { OsmRelation relation; relation.id = (*rel_it)[0].as(); @@ -1084,7 +1122,6 @@ QueryRaw::getRelationsFromDB(long lastid, int pageSize) { return relations; } - } // namespace queryraw // local Variables: diff --git a/src/raw/queryraw.hh b/src/raw/queryraw.hh index 037c0030..d9a5bb61 100644 --- a/src/raw/queryraw.hh +++ b/src/raw/queryraw.hh @@ -63,11 +63,11 @@ class QueryRaw { static const std::string lineTable; /// Build query for processed Node - std::string applyChange(const OsmNode &node) const; + std::shared_ptr> applyChange(const OsmNode &node) const; /// Build query for processed Way - std::string applyChange(const OsmWay &way) const; + std::shared_ptr> applyChange(const OsmWay &way) const; /// Build query for processed Relation - std::string applyChange(const OsmRelation &relation) const; + std::shared_ptr> applyChange(const OsmRelation &relation) const; /// Build all geometries for a OsmChange file void buildGeometries(std::shared_ptr osmchanges, const multipolygon_t &poly); /// Get nodes for filling Node cache from refs on ways diff --git a/src/replicator/threads.cc b/src/replicator/threads.cc index 791af11d..49860fcf 100644 --- a/src/replicator/threads.cc +++ b/src/replicator/threads.cc @@ -98,12 +98,26 @@ using namespace underpassconfig; namespace replicatorthreads { -std::string +std::shared_ptr> allTasksQueries(std::shared_ptr> tasks) { - std::string queries = ""; + auto queries = std::make_shared>(); + std::string osmsql; + std::string unsql; for (auto it = tasks->begin(); it != tasks->end(); ++it) { - queries += it->query; + for (auto itt = it->query.begin(); itt != it->query.end(); ++itt) { + if (itt->rfind(';') != itt->size() - 1) { + log_debug("HAS SEMI-COLON: %1%", *itt); + log_debug("HAS SEMI-COLON: %1% %2%", itt->size() - 1, itt->rfind(';')); + } + if (itt->find(" nodes ") != std::string::npos || itt->find(" ways_poly ") != std::string::npos || itt->find(" ways_line ") != std::string::npos || itt->find(" relations ") != std::string::npos) { + unsql.append(*itt); + } else { + osmsql.append(*itt); + } + } } + queries->push_back(osmsql); + queries->push_back(unsql); return queries; } @@ -139,14 +153,22 @@ startMonitorChangesets(std::shared_ptr &remote, assert(remote->frequency == frequency_t::changeset); auto db = std::make_shared(); - if (!db->connect(config.underpass_db_url)) { + if (!db->connect(config.underpass_osm_db_url)) { log_error("Could not connect to Underpass DB, aborting monitoring thread!"); return; } else { - log_debug("Connected to database: %1%", config.underpass_db_url); + log_debug("Connected to database: %1%", config.underpass_osm_db_url); } auto querystats = std::make_shared(db); + auto osmdb = std::make_shared(); + if (!osmdb->connect(config.underpass_osm_db_url)) { + log_error("Could not connect to Underpass DB, aborting monitoring thread!"); + return; + } else { + log_debug("Connected to database: %1%", config.underpass_osm_db_url); + } + int cores = config.concurrency; // Support multiple OSM planet servers @@ -200,7 +222,13 @@ startMonitorChangesets(std::shared_ptr &remote, remote->updateDomain(planets.front()->domain); } pool.join(); - db->query(allTasksQueries(tasks)); + auto result = allTasksQueries(tasks); + if (result->at(0).size() > 0) { + db->query(result->at(0)); + } + if (result->at(1).size() > 0) { + osmdb->query(result->at(1)); + } ptime now = boost::posix_time::second_clock::universal_time(); last_task = getClosest(tasks, now); @@ -275,7 +303,16 @@ startMonitorChanges(std::shared_ptr &remote, } auto querystats = std::make_shared(db); auto queryvalidate = std::make_shared(db); - auto queryraw = std::make_shared(db); + + // Connect to the raw OSM database, which is separate + auto osmdb = std::make_shared(); + if (!osmdb->connect(config.underpass_osm_db_url)) { + log_error("Could not connect to raw OSM DB, aborting monitoring thread!"); + return; + } else { + log_debug("Connected to database: %1%", config.underpass_osm_db_url); + } + auto queryraw = std::make_shared(osmdb); int cores = config.concurrency; @@ -339,7 +376,13 @@ startMonitorChanges(std::shared_ptr &remote, boost::asio::post(pool, task); } while (--i); pool.join(); - db->query(allTasksQueries(tasks)); + auto result = allTasksQueries(tasks); + if (result->at(0).size() > 0) { + db->query(result->at(0)); + } + if (result->at(1).size() > 0) { + osmdb->query(result->at(1)); + } ptime now = boost::posix_time::second_clock::universal_time(); last_task = getClosest(tasks, now); @@ -401,7 +444,7 @@ threadChangeSet(std::shared_ptr &remote, log_debug("ChangeSet last_closed_at: %1%", task.timestamp); changeset->areaFilter(poly); for (auto cit = std::begin(changeset->changes); cit != std::end(changeset->changes); ++cit) { - task.query += querystats->applyChange(*cit->get()); + task.query.push_back(querystats->applyChange(*cit->get())); } } const std::lock_guard lock(tasks_changeset_mutex); @@ -412,7 +455,9 @@ threadChangeSet(std::shared_ptr &remote, void threadOsmChange(OsmChangeTask osmChangeTask) { - +#ifdef TIMING_DEBUG + boost::timer::auto_cpu_timer timer("threadOsmChange: took %w seconds\n"); +#endif auto remote = osmChangeTask.remote; auto planet = osmChangeTask.planet; auto poly = osmChangeTask.poly; @@ -425,9 +470,6 @@ threadOsmChange(OsmChangeTask osmChangeTask) auto taskIndex = osmChangeTask.taskIndex; auto osmchanges = std::make_shared(); -#ifdef TIMING_DEBUG - boost::timer::auto_cpu_timer timer("threadOsmChange: took %w seconds\n"); -#endif log_debug("Processing OsmChange: %1%", remote->filespec); ReplicationTask task; task.url = remote->subpath; @@ -454,7 +496,7 @@ threadOsmChange(OsmChangeTask osmChangeTask) osmchanges->readXML(changes_xml); if (osmchanges->changes.size() > 0) { task.timestamp = osmchanges->changes.back()->final_entry; - log_debug("OsmChange final_entry: %1%", task.timestamp); + // log_debug("OsmChange final_entry: %1%", task.timestamp); } } catch (std::exception &e) { log_error("Couldn't parse: %1%", remote->filespec); @@ -488,7 +530,7 @@ threadOsmChange(OsmChangeTask osmChangeTask) if (it->second->added.size() == 0 && it->second->modified.size() == 0) { continue; } - task.query += querystats->applyChange(*it->second); + task.query.push_back(querystats->applyChange(*it->second)); } } @@ -517,7 +559,10 @@ threadOsmChange(OsmChangeTask osmChangeTask) // Update nodes, ignore new ones outside priority area if (!config->disable_raw) { - task.query += queryraw->applyChange(*node); + auto changes = queryraw->applyChange(*node); + for (auto it = changes->begin(); it != changes->end(); ++it) { + task.query.push_back(*it); + } } } @@ -536,7 +581,10 @@ threadOsmChange(OsmChangeTask osmChangeTask) // Update ways, ignore new ones outside priority area if (!config->disable_raw) { - task.query += queryraw->applyChange(*way); + auto changes = queryraw->applyChange(*way); + for (auto it = changes->begin(); it != changes->end(); ++it) { + task.query.push_back(*it); + } } } @@ -557,10 +605,12 @@ threadOsmChange(OsmChangeTask osmChangeTask) // Update relations, ignore new ones outside priority area if (!config->disable_raw) { - task.query += queryraw->applyChange(*relation); + auto changes = queryraw->applyChange(*relation); + for (auto it = changes->begin(); it != changes->end(); ++it) { + task.query.push_back(*it); + } } } - } } @@ -569,24 +619,33 @@ threadOsmChange(OsmChangeTask osmChangeTask) // Validate ways auto wayval = osmchanges->validateWays(poly, plugin); - queryvalidate->ways(wayval, task.query, validation_removals); + for (auto it = task.query.begin(); it != task.query.end(); ++it) { + auto result = queryvalidate->ways(wayval, validation_removals); + for (auto itt = result->begin(); itt != result->end(); ++itt) { + task.query.push_back(*itt); + } + } // Validate nodes auto nodeval = osmchanges->validateNodes(poly, plugin); - queryvalidate->nodes(nodeval, task.query, validation_removals); + for (auto it = task.query.begin(); it != task.query.end(); ++it) { + auto result = queryvalidate->nodes(nodeval, validation_removals); + for (auto itt = result->begin(); itt != result->end(); ++itt) { + task.query.push_back(*itt); + } + } // Validate relations // relval = osmchanges->validateRelations(poly, plugin); // queryvalidate->relations(relval, task.query, validation_removals); // Remove validation entries for removed objects - task.query += queryvalidate->updateValidation(validation_removals); - task.query += queryvalidate->updateValidation(removed_nodes); - task.query += queryvalidate->updateValidation(removed_ways); + task.query.push_back(*queryvalidate->updateValidation(validation_removals)); + task.query.push_back(*queryvalidate->updateValidation(removed_nodes)); + task.query.push_back(*queryvalidate->updateValidation(removed_ways)); // task.query += queryvalidate->updateValidation(removed_relations); } - const std::lock_guard lock(tasks_change_mutex); (*tasks)[taskIndex] = task; diff --git a/src/replicator/threads.hh b/src/replicator/threads.hh index dca7727a..fbd269ab 100644 --- a/src/replicator/threads.hh +++ b/src/replicator/threads.hh @@ -96,7 +96,7 @@ struct ReplicationTask { std::string url; ptime timestamp = not_a_date_time; replication::reqfile_t status = replication::reqfile_t::none; - std::string query = ""; + std::vector query; }; /// This monitors the planet server for new changesets files. diff --git a/src/testsuite/libunderpass.all/raw-test.cc b/src/testsuite/libunderpass.all/raw-test.cc index ff758d2a..4890b841 100644 --- a/src/testsuite/libunderpass.all/raw-test.cc +++ b/src/testsuite/libunderpass.all/raw-test.cc @@ -83,27 +83,39 @@ bool processFile(const std::string &filename, std::shared_ptr &db) { osmchanges->readChanges(destdir_base + "/testsuite/testdata/raw/" + filename); queryraw->buildGeometries(osmchanges, poly); osmchanges->areaFilter(poly); - std::string rawquery; + auto rawquery = std::make_shared>(); for (auto it = std::begin(osmchanges->changes); it != std::end(osmchanges->changes); ++it) { osmchange::OsmChange *change = it->get(); // Nodes for (auto nit = std::begin(change->nodes); nit != std::end(change->nodes); ++nit) { osmobjects::OsmNode *node = nit->get(); - rawquery += queryraw->applyChange(*node); + auto changes = *queryraw->applyChange(*node); + for (auto it = changes.begin(); it != changes.end(); ++it) { + rawquery->push_back(*it); + } } // Ways for (auto wit = std::begin(change->ways); wit != std::end(change->ways); ++wit) { osmobjects::OsmWay *way = wit->get(); - rawquery += queryraw->applyChange(*way); + auto changes = *queryraw->applyChange(*way); + for (auto it = changes.begin(); it != changes.end(); ++it) { + rawquery->push_back(*it); + } } // Relations for (auto rit = std::begin(change->relations); rit != std::end(change->relations); ++rit) { osmobjects::OsmRelation *relation = rit->get(); - rawquery += queryraw->applyChange(*relation); + auto changes = *queryraw->applyChange(*relation); + for (auto it = changes.begin(); it != changes.end(); ++it) { + rawquery->push_back(*it); + } } } - db->query(rawquery); + + for (auto rit = std::begin(*rawquery); rit != std::end(*rawquery); ++rit) { + db->query(*rit); + } } const std::vector expectedGeometries = { diff --git a/src/underpass.cc b/src/underpass.cc index c9d24b7e..34575ecb 100644 --- a/src/underpass.cc +++ b/src/underpass.cc @@ -123,8 +123,8 @@ main(int argc, char *argv[]) ("disable-raw", "Disable raw OSM data") ("norefs", "Disable refs (useful for non OSM data)") ("bootstrap", "Bootstrap data tables") - ("silent", "Silent"); - ("rawdb", opts::value(), "Database URI for raw OSM data"); + ("silent", "Silent") + ("rawdb", opts::value(), "Database URI for raw OSM data"); // clang-format on opts::store(opts::command_line_parser(argc, argv).options(desc).positional(p).run(), vm); @@ -306,8 +306,13 @@ main(int argc, char *argv[]) if (!config.silent) { osmchange->dump(); } +#ifdef SINGLE_THREAD // debugging hack + replicatorthreads::startMonitorChanges(std::ref(osmchange), + std::ref(*osmboundary), config); +#else osmChangeThread = std::thread(replicatorthreads::startMonitorChanges, std::ref(osmchange), std::ref(*osmboundary), config); +#endif } // Changesets @@ -326,18 +331,23 @@ main(int argc, char *argv[]) if (!config.silent) { changeset->dump(); } +#ifdef SINGLE_THREAD // debugging hack + replicatorthreads::startMonitorChangesets(std::ref(changeset), std::ref(*oscboundary), config); +#else changesetThread = std::thread(replicatorthreads::startMonitorChangesets, std::ref(changeset), std::ref(*oscboundary), config); +#endif } // Start processing - +#ifndef SINGLE_THREAD if (changesetThread.joinable()) { changesetThread.join(); } if (osmChangeThread.joinable()) { osmChangeThread.join(); } +#endif exit(0); diff --git a/src/validate/queryvalidate.cc b/src/validate/queryvalidate.cc index dc1c20f6..b12fac85 100644 --- a/src/validate/queryvalidate.cc +++ b/src/validate/queryvalidate.cc @@ -87,48 +87,52 @@ QueryValidate::QueryValidate(std::shared_ptr db) { dbconn = db; } -std::string +std::shared_ptr QueryValidate::updateValidation(std::shared_ptr> removals) { #ifdef TIMING_DEBUG_X boost::timer::auto_cpu_timer timer("updateValidation: took %w seconds\n"); #endif - std::string query = ""; + auto query = std::make_shared(); if (removals->size() > 0) { - query = "DELETE FROM validation WHERE osm_id IN("; + *query = "DELETE FROM validation WHERE osm_id IN("; for (const auto &osm_id : *removals) { - query += std::to_string(osm_id) + ","; + query->append(std::to_string(osm_id) + ","); }; - query.erase(query.size() - 1); - query += ");"; + query->erase(query->size() - 1); + query->append(");"); } return query; } -std::string -QueryValidate::updateValidation(long osm_id, const valerror_t &status, const std::string &source) const +std::shared_ptr +QueryValidate::updateValidation(long osm_id, + const valerror_t &status, + const std::string &source) const { + auto query = std::make_shared(); std::string format = "DELETE FROM validation WHERE osm_id = %d and source = '%s' and status = '%s';"; boost::format fmt(format); fmt % osm_id; fmt % source; fmt % status_list[status]; - std::string query = fmt.str(); + *query = fmt.str(); return query; } -std::string +std::shared_ptr QueryValidate::updateValidation(long osm_id, const valerror_t &status) const { + auto query = std::make_shared(); std::string format = "DELETE FROM validation WHERE osm_id = %d and status = '%s';"; boost::format fmt(format); fmt % osm_id; fmt % status_list[status]; - std::string query = fmt.str(); + *query = fmt.str(); return query; } -std::string +std::shared_ptr QueryValidate::applyChange(const ValidateStatus &validation, const valerror_t &status) const { #ifdef TIMING_DEBUG_X @@ -137,13 +141,13 @@ QueryValidate::applyChange(const ValidateStatus &validation, const valerror_t &s // log_debug("Applying Validation data"); std::string format; - std::string query; + auto query = std::make_shared(); if (validation.values.size() > 0) { - query = "INSERT INTO validation as v (osm_id, changeset, uid, type, status, values, timestamp, location, source, version) VALUES("; + *query = "INSERT INTO validation as v (osm_id, changeset, uid, type, status, values, timestamp, location, source, version) VALUES("; format = "%d, %d, %g, \'%s\', \'%s\', ARRAY[%s], \'%s\', ST_GeomFromText(\'%s\', 4326), \'%s\', %s) "; } else { - query = "INSERT INTO validation as v (osm_id, changeset, uid, type, status, timestamp, location, source, version) VALUES("; + *query = "INSERT INTO validation as v (osm_id, changeset, uid, type, status, timestamp, location, source, version) VALUES("; format = "%d, %d, %g, \'%s\', \'%s\', \'%s\', ST_GeomFromText(\'%s\', 4326), \'%s\', %s) "; } format += "ON CONFLICT (osm_id, status, source) DO UPDATE SET version = %d, timestamp = \'%s\' WHERE v.version < %d;"; @@ -177,87 +181,86 @@ QueryValidate::applyChange(const ValidateStatus &validation, const valerror_t &s fmt % validation.version; fmt % to_simple_string(validation.timestamp); fmt % validation.version; - query += fmt.str(); + query->append(fmt.str()); return query; } - -void -QueryValidate::ways( - std::shared_ptr>> wayval, - std::string &task_query -) { +std::shared_ptr> +QueryValidate::ways(std::shared_ptr>> wayval) { + auto query = std::make_shared>(); for (auto it = wayval->begin(); it != wayval->end(); ++it) { if (it->get()->status.size() > 0) { for (auto status_it = it->get()->status.begin(); status_it != it->get()->status.end(); ++status_it) { - task_query += applyChange(*it->get(), *status_it); + query->push_back(*applyChange(*it->get(), *status_it)); } } } + return query; } -void +std::shared_ptr> QueryValidate::ways( std::shared_ptr>> wayval, - std::string &task_query, std::shared_ptr> validation_removals ) { + auto query = std::make_shared>(); for (auto it = wayval->begin(); it != wayval->end(); ++it) { if (it->get()->status.size() > 0) { for (auto status_it = it->get()->status.begin(); status_it != it->get()->status.end(); ++status_it) { - task_query += applyChange(*it->get(), *status_it); + query->push_back(*applyChange(*it->get(), *status_it)); } if (!it->get()->hasStatus(overlapping)) { - task_query += updateValidation(it->get()->osm_id, overlapping, "building"); + query->push_back(*updateValidation(it->get()->osm_id, overlapping, "building")); } if (!it->get()->hasStatus(duplicate)) { - task_query += updateValidation(it->get()->osm_id, duplicate, "building"); + query->push_back(*updateValidation(it->get()->osm_id, duplicate, "building")); } if (!it->get()->hasStatus(badgeom)) { - task_query += updateValidation(it->get()->osm_id, badgeom, "building"); + query->push_back(*updateValidation(it->get()->osm_id, badgeom, "building")); } if (!it->get()->hasStatus(badvalue)) { - task_query += updateValidation(it->get()->osm_id, badvalue); + query->push_back(*updateValidation(it->get()->osm_id, badvalue)); } } else { validation_removals->push_back(it->get()->osm_id); } } + return query; } -void -QueryValidate::nodes( - std::shared_ptr>> nodeval, - std::string &task_query -) { +std::shared_ptr> +QueryValidate::nodes(std::shared_ptr>> nodeval) { + auto query = std::make_shared>(); for (auto it = nodeval->begin(); it != nodeval->end(); ++it) { if (it->get()->status.size() > 0) { for (auto status_it = it->get()->status.begin(); status_it != it->get()->status.end(); ++status_it) { - task_query += applyChange(*it->get(), *status_it); + query->push_back(*applyChange(*it->get(), *status_it)); } } } + return query; } -void +std::shared_ptr> QueryValidate::nodes( std::shared_ptr>> nodeval, - std::string &task_query, std::shared_ptr> validation_removals ) { + auto query = std::make_shared>(); for (auto it = nodeval->begin(); it != nodeval->end(); ++it) { if (it->get()->status.size() > 0) { for (auto status_it = it->get()->status.begin(); status_it != it->get()->status.end(); ++status_it) { - task_query += applyChange(*it->get(), *status_it); + query->push_back(*applyChange(*it->get(), *status_it)); } if (!it->get()->hasStatus(badvalue)) { - task_query += updateValidation(it->get()->osm_id, badvalue); + query->push_back(*updateValidation(it->get()->osm_id, badvalue)); } } else { validation_removals->push_back(it->get()->osm_id); } } + return query; } } // namespace queryvalidate diff --git a/src/validate/queryvalidate.hh b/src/validate/queryvalidate.hh index f3c3536f..01b81acf 100644 --- a/src/validate/queryvalidate.hh +++ b/src/validate/queryvalidate.hh @@ -74,17 +74,30 @@ class QueryValidate { QueryValidate(std::shared_ptr db); /// Apply data validation to the database - std::string applyChange(const ValidateStatus &validation, const valerror_t &status) const; + std::shared_ptr applyChange(const ValidateStatus &validation, + const valerror_t &status) const; /// Update the validation table, delete any feature that has been fixed. - std::string updateValidation(std::shared_ptr> removals); - std::string updateValidation(long osm_id, const valerror_t &status, const std::string &source) const; - std::string updateValidation(long osm_id, const valerror_t &status) const; - void ways(std::shared_ptr>> wayval, std::string &task_query); - void nodes(std::shared_ptr>> nodeval, std::string &task_query); - void rels(std::shared_ptr>> relval, std::string &task_query); - void ways(std::shared_ptr>> wayval, std::string &task_query, std::shared_ptr> validation_removals); - void nodes(std::shared_ptr>> nodeval, std::string &task_query, std::shared_ptr> validation_removals); - void rels(std::shared_ptr>> relval, std::string &task_query, std::shared_ptr> validation_removals); + std::shared_ptr updateValidation( + std::shared_ptr> removals); + std::shared_ptr updateValidation( + long osm_id, const valerror_t &status, const std::string &source) const; + std::shared_ptr updateValidation( + long osm_id, const valerror_t &status) const; + std::shared_ptr> ways( + std::shared_ptr>> wayval); + std::shared_ptr> ways( + std::shared_ptr>> wayval, + std::shared_ptr> validation_removals); + std::shared_ptr> nodes( + std::shared_ptr>> nodeval); + std::shared_ptr> nodes( + std::shared_ptr>> nodeval, + std::shared_ptr> validation_removals); + std::shared_ptr> rels( + std::shared_ptr>> relval); + std::shared_ptr> rels( + std::shared_ptr>> relval, + std::shared_ptr> validation_removals); // Database connection, used for escape strings std::shared_ptr dbconn; };