Skip to content
This repository has been archived by the owner on Oct 5, 2024. It is now read-only.

Commit

Permalink
Merge pull request #475 from hotosm/feature/refactor-relations
Browse files Browse the repository at this point in the history
Feature/refactor relations
  • Loading branch information
emi420 authored Feb 28, 2024
2 parents 6cd727e + ef79e72 commit 6011231
Show file tree
Hide file tree
Showing 19 changed files with 653 additions and 117 deletions.
4 changes: 2 additions & 2 deletions config/validate/building.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ config:
- badgeom_maxangle:
- 91
- badvalue:
- yes
- no
- incomplete:
- yes
- no


2 changes: 1 addition & 1 deletion setup/bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ then
PGPASSWORD=$PASS psql --host $HOST --user $USER --port $PORT $DB < db/indexes.sql

echo "Configuring Underpass ..."
python3 poly2geojson.py $COUNTRY.poly
python3 ../utils/poly2geojson.py $COUNTRY.poly
if "$use_docker";
then
docker cp $COUNTRY.geojson underpass:/etc/underpass/priority.geojson
Expand Down
17 changes: 17 additions & 0 deletions setup/db/underpass.sql
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,28 @@ CREATE TABLE IF NOT EXISTS public.nodes (
uid int8
);

CREATE TABLE IF NOT EXISTS public.relations (
osm_id int8,
changeset int8,
geom public.geometry(Geometry,4326),
tags JSONB,
refs int8[],
timestamp timestamp with time zone,
version int,
"user" text,
uid int8
);

CREATE TABLE IF NOT EXISTS public.way_refs (
way_id int8,
node_id int8
);

CREATE TABLE IF NOT EXISTS public.rel_refs (
rel_id int8,
way_id int8
);

CREATE UNIQUE INDEX nodes_id_idx ON public.nodes (osm_id DESC);
CREATE UNIQUE INDEX ways_poly_id_idx ON public.ways_poly (osm_id DESC);
CREATE UNIQUE INDEX ways_line_id_idx ON public.ways_line(osm_id DESC);
Expand Down
254 changes: 215 additions & 39 deletions src/bootstrap/bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,25 @@ using namespace queryraw;
using namespace underpassconfig;
using namespace logger;

typedef boost::geometry::model::d2::point_xy<double> point_t;

namespace bootstrap {

const int PAGE_SIZE = 100;
Bootstrap::Bootstrap(void) {}

std::string
allTasksQueries(std::shared_ptr<std::vector<BootstrapTask>> tasks) {
Bootstrap::allTasksQueries(std::shared_ptr<std::vector<BootstrapTask>> tasks) {
std::string queries = "";
for (auto it = tasks->begin(); it != tasks->end(); ++it) {
queries += it->query ;
}
return queries;
}

void startProcessingWays(const underpassconfig::UnderpassConfig &config) {

void
Bootstrap::start(const underpassconfig::UnderpassConfig &config) {
std::cout << "Connecting to the database ..." << std::endl;
auto db = std::make_shared<Pq>();
db = std::make_shared<Pq>();
if (!db->connect(config.underpass_db_url)) {
std::cout << "Could not connect to Underpass DB, aborting bootstrapping thread!" << std::endl;
return;
Expand All @@ -81,56 +83,63 @@ void startProcessingWays(const underpassconfig::UnderpassConfig &config) {
log_debug("Couldn't load plugin! %1%", e.what());
exit(0);
}
auto validator = creator();

auto queryvalidate = std::make_shared<QueryValidate>(db);
auto queryraw = std::make_shared<QueryRaw>(db);
validator = creator();
queryvalidate = std::make_shared<QueryValidate>(db);
queryraw = std::make_shared<QueryRaw>(db);
page_size = config.bootstrap_page_size;
concurrency = config.concurrency;
norefs = config.norefs;

processWays();
processNodes();
processRelations();

}

void
Bootstrap::processWays() {

std::vector<std::string> tables = {
QueryRaw::polyTable,
QueryRaw::lineTable
};

for (auto table_it = tables.begin(); table_it != tables.end(); ++table_it) {
std::cout << std::endl << "Counting geometries ... " << std::endl;
long int total = queryraw->getWaysCount(*table_it);
std::cout << std::endl << "Processing ways ... ";
long int total = queryraw->getCount(*table_it);
long int count = 0;
int num_chunks = total / PAGE_SIZE;
int num_chunks = total / page_size;

std::cout << "Total: " << total << std::endl;
std::cout << "Threads: " << config.concurrency << std::endl;
std::cout << "Page size: " << PAGE_SIZE << std::endl;
long lastid = 0;

int concurrentTasks = config.concurrency;
int concurrentTasks = concurrency;
int taskIndex = 0;
std::chrono::steady_clock::time_point begin;
std::chrono::steady_clock::time_point end;
int percentage = 0;

for (int chunkIndex = 1; chunkIndex <= (num_chunks/concurrentTasks); chunkIndex++) {
for (int chunkIndex = 0; chunkIndex <= (num_chunks/concurrentTasks); chunkIndex++) {

int percentage = (count * 100) / total;
percentage = (count * 100) / total;

auto ways = std::make_shared<std::vector<OsmWay>>();
if (!config.norefs) {
ways = queryraw->getWaysFromDB(lastid, config.concurrency * PAGE_SIZE, *table_it);
if (!norefs) {
ways = queryraw->getWaysFromDB(lastid, concurrency * page_size, *table_it);
} else {
ways = queryraw->getWaysFromDBWithoutRefs(lastid, config.concurrency * PAGE_SIZE, *table_it);
ways = queryraw->getWaysFromDBWithoutRefs(lastid, concurrency * page_size, *table_it);
}

auto tasks = std::make_shared<std::vector<BootstrapTask>>(concurrentTasks);
boost::asio::thread_pool pool(concurrentTasks);
for (int taskIndex = 0; taskIndex < concurrentTasks; taskIndex++) {
auto taskWays = std::make_shared<std::vector<OsmWay>>();
WayTask wayTask {
std::ref(validator),
std::ref(queryvalidate),
config,
taskIndex,
std::ref(tasks),
std::ref(ways),
};
std::cout << "\r" << "Processing " << *table_it << ": " << count << "/" << total << " (" << percentage << "%)";
boost::asio::post(pool, boost::bind(threadBootstrapTask, wayTask));

boost::asio::post(pool, boost::bind(&Bootstrap::threadBootstrapWayTask, this, wayTask));
}

pool.join();
Expand All @@ -141,45 +150,212 @@ void startProcessingWays(const underpassconfig::UnderpassConfig &config) {
count += it->processed;
}
}
percentage = (count * 100) / total;
std::cout << "\r" << "Processing " << *table_it << ": " << count << "/" << total << " (" << percentage << "%)";
}
std::cout << std::endl;

}

void
Bootstrap::processNodes() {

std::cout << "Processing nodes ... ";
long int total = queryraw->getCount("nodes");
long int count = 0;
int num_chunks = total / page_size;

long lastid = 0;

int concurrentTasks = concurrency;
int taskIndex = 0;
int percentage = 0;

for (int chunkIndex = 0; chunkIndex <= (num_chunks/concurrentTasks); chunkIndex++) {

percentage = (count * 100) / total;

auto nodes = std::make_shared<std::vector<OsmNode>>();
nodes = queryraw->getNodesFromDB(lastid, concurrency * page_size);

auto tasks = std::make_shared<std::vector<BootstrapTask>>(concurrentTasks);
boost::asio::thread_pool pool(concurrentTasks);
for (int taskIndex = 0; taskIndex < concurrentTasks; taskIndex++) {
auto taskNodes = std::make_shared<std::vector<OsmNode>>();
NodeTask nodeTask {
taskIndex,
std::ref(tasks),
std::ref(nodes),
};
std::cout << "\r" << "Processing nodes: " << count << "/" << total << " (" << percentage << "%)";
boost::asio::post(pool, boost::bind(&Bootstrap::threadBootstrapNodeTask, this, nodeTask));
}

pool.join();

db->query(allTasksQueries(tasks));
lastid = nodes->back().id;
for (auto it = tasks->begin(); it != tasks->end(); ++it) {
count += it->processed;
}
}
percentage = (count * 100) / total;
std::cout << "\r" << "Processing nodes: " << count << "/" << total << " (" << percentage << "%)";
std::cout << std::endl;

}

void
Bootstrap::processRelations() {

std::cout << "Processing relations ... ";
long int total = queryraw->getCount("relations");
long int count = 0;
int num_chunks = total / page_size;

long lastid = 0;

int concurrentTasks = concurrency;
int taskIndex = 0;
int percentage = 0;

for (int chunkIndex = 0; chunkIndex <= (num_chunks/concurrentTasks); chunkIndex++) {

percentage = (count * 100) / total;

auto relations = std::make_shared<std::vector<OsmRelation>>();
relations = queryraw->getRelationsFromDB(lastid, concurrency * page_size);

auto tasks = std::make_shared<std::vector<BootstrapTask>>(concurrentTasks);
boost::asio::thread_pool pool(concurrentTasks);
for (int taskIndex = 0; taskIndex < concurrentTasks; taskIndex++) {
auto taskRelations = std::make_shared<std::vector<OsmRelation>>();
RelationTask relationTask {
taskIndex,
std::ref(tasks),
std::ref(relations),
};
std::cout << "\r" << "Processing relations: " << count << "/" << total << " (" << percentage << "%)";
boost::asio::post(pool, boost::bind(&Bootstrap::threadBootstrapRelationTask, this, relationTask));
}

pool.join();

db->query(allTasksQueries(tasks));
lastid = relations->back().id;
for (auto it = tasks->begin(); it != tasks->end(); ++it) {
count += it->processed;
}
}
percentage = (count * 100) / total;
std::cout << "\r" << "Processing relations: " << count << "/" << total << " (" << percentage << "%)";

std::cout << std::endl;

}

// This thread get started for every page of way
void
threadBootstrapTask(WayTask wayTask)
Bootstrap::threadBootstrapWayTask(WayTask wayTask)
{
#ifdef TIMING_DEBUG
boost::timer::auto_cpu_timer timer("bootstrap::threadBootstrapTask(wayTask): took %w seconds\n");
boost::timer::auto_cpu_timer timer("bootstrap::threadBootstrapWayTask(wayTask): took %w seconds\n");
#endif
auto plugin = wayTask.plugin;
auto queryvalidate = wayTask.queryvalidate;
auto config = wayTask.config;
auto taskIndex = wayTask.taskIndex;
auto tasks = wayTask.tasks;
auto ways = wayTask.ways;

BootstrapTask task;
int processed = 0;

auto wayval = std::make_shared<std::vector<std::shared_ptr<ValidateStatus>>>();

// Proccesing ways
for (int i = 0; i < PAGE_SIZE; ++i) {
if (i * taskIndex < ways->size()) {
auto way = ways->at(i * (taskIndex + 1));
auto status = plugin->checkWay(way, "building");
for (auto status_it = status->status.begin(); status_it != status->status.end(); ++status_it) {
task.query += queryvalidate->applyChange(*status, *status_it);
}
for (int i = taskIndex * page_size; i < (taskIndex + 1) * page_size; ++i) {
if (i < ways->size()) {
auto way = ways->at(i);
wayval->push_back(validator->checkWay(way, "building"));
// Fill the way_refs table
if (!config.norefs) {
if (!norefs) {
for (auto ref = way.refs.begin(); ref != way.refs.end(); ++ref) {
task.query += "INSERT INTO way_refs (way_id, node_id) VALUES (" + std::to_string(way.id) + "," + std::to_string(*ref) + "); ";
}
}
++processed;
}
}
queryvalidate->ways(wayval, task.query);
task.processed = processed;
const std::lock_guard<std::mutex> lock(tasks_change_mutex);
(*tasks)[taskIndex] = task;

}

// This thread get started for every page of node
void
Bootstrap::threadBootstrapNodeTask(NodeTask nodeTask)
{
#ifdef TIMING_DEBUG
boost::timer::auto_cpu_timer timer("bootstrap::threadBootstrapNodeTask(nodeTask): took %w seconds\n");
#endif
auto taskIndex = nodeTask.taskIndex;
auto tasks = nodeTask.tasks;
auto nodes = nodeTask.nodes;

BootstrapTask task;
int processed = 0;

auto nodeval = std::make_shared<std::vector<std::shared_ptr<ValidateStatus>>>();

// Proccesing nodes
std::vector<std::string> node_tests = {"building", "natural", "place", "waterway"};
for (int i = taskIndex * page_size; i < (taskIndex + 1) * page_size; ++i) {
if (i < nodes->size()) {
auto node = nodes->at(i);
for (auto test_it = std::begin(node_tests); test_it != std::end(node_tests); ++test_it) {
if (node.containsKey(*test_it)) {
nodeval->push_back(validator->checkNode(node, *test_it));
}
}
++processed;
}
}
queryvalidate->nodes(nodeval, task.query);
task.processed = processed;
const std::lock_guard<std::mutex> lock(tasks_change_mutex);
(*tasks)[taskIndex] = task;

}

// This thread get started for every page of relation
void
Bootstrap::threadBootstrapRelationTask(RelationTask relationTask)
{
#ifdef TIMING_DEBUG
boost::timer::auto_cpu_timer timer("bootstrap::threadBootstrapRelationTask(relationTask): took %w seconds\n");
#endif
auto taskIndex = relationTask.taskIndex;
auto tasks = relationTask.tasks;
auto relations = relationTask.relations;

BootstrapTask task;
int processed = 0;

auto relationval = std::make_shared<std::vector<std::shared_ptr<ValidateStatus>>>();

// Proccesing relations
for (int i = taskIndex * page_size; i < (taskIndex + 1) * page_size; ++i) {
if (i < relations->size()) {
auto relation = relations->at(i);
// relationval->push_back(validator->checkRelation(way, "building"));
// Fill the rel_members table
// for (auto ref = relation.refs.begin(); ref != relation.refs.end(); ++ref) {
// task.query += "INSERT INTO rel_refs (rel_id, way_id) VALUES (" + std::to_string(rel.id) + "," + std::to_string(*ref) + "); ";
// }
++processed;
}
}
// queryvalidate->relations(relval, task.query);
task.processed = processed;
const std::lock_guard<std::mutex> lock(tasks_change_mutex);
(*tasks)[taskIndex] = task;
Expand Down
Loading

0 comments on commit 6011231

Please sign in to comment.