Skip to content
This repository has been archived by the owner on Oct 5, 2024. It is now read-only.

Commit

Permalink
Refactor, working on relations
Browse files Browse the repository at this point in the history
  • Loading branch information
emi420 committed Feb 27, 2024
1 parent 6cd727e commit fe7348f
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 66 deletions.
2 changes: 1 addition & 1 deletion setup/bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ then
PGPASSWORD=$PASS psql --host $HOST --user $USER --port $PORT $DB < db/indexes.sql

echo "Configuring Underpass ..."
python3 poly2geojson.py $COUNTRY.poly
python3 ../utils/poly2geojson.py $COUNTRY.poly
if "$use_docker";
then
docker cp $COUNTRY.geojson underpass:/etc/underpass/priority.geojson
Expand Down
17 changes: 17 additions & 0 deletions setup/db/underpass.sql
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,28 @@ CREATE TABLE IF NOT EXISTS public.nodes (
uid int8
);

CREATE TABLE IF NOT EXISTS public.rels (
osm_id int8,
changeset int8,
geom public.geometry(Geometry,4326),
tags JSONB,
members int8[],
timestamp timestamp with time zone,
version int,
"user" text,
uid int8
);

CREATE TABLE IF NOT EXISTS public.way_refs (
way_id int8,
node_id int8
);

CREATE TABLE IF NOT EXISTS public.rel_members (
rel_id int8,
way_id int8
);

CREATE UNIQUE INDEX nodes_id_idx ON public.nodes (osm_id DESC);
CREATE UNIQUE INDEX ways_poly_id_idx ON public.ways_poly (osm_id DESC);
CREATE UNIQUE INDEX ways_line_id_idx ON public.ways_line(osm_id DESC);
Expand Down
68 changes: 37 additions & 31 deletions src/bootstrap/bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,21 @@ using namespace logger;

namespace bootstrap {

const int PAGE_SIZE = 100;
Bootstrap::Bootstrap(void) {}

std::string
allTasksQueries(std::shared_ptr<std::vector<BootstrapTask>> tasks) {
Bootstrap::allTasksQueries(std::shared_ptr<std::vector<BootstrapTask>> tasks) {
std::string queries = "";
for (auto it = tasks->begin(); it != tasks->end(); ++it) {
queries += it->query ;
}
return queries;
}

void startProcessingWays(const underpassconfig::UnderpassConfig &config) {

void
Bootstrap::start(const underpassconfig::UnderpassConfig &config) {
std::cout << "Connecting to the database ..." << std::endl;
auto db = std::make_shared<Pq>();
db = std::make_shared<Pq>();
if (!db->connect(config.underpass_db_url)) {
std::cout << "Could not connect to Underpass DB, aborting bootstrapping thread!" << std::endl;
return;
Expand All @@ -81,10 +81,23 @@ void startProcessingWays(const underpassconfig::UnderpassConfig &config) {
log_debug("Couldn't load plugin! %1%", e.what());
exit(0);
}
auto validator = creator();

auto queryvalidate = std::make_shared<QueryValidate>(db);
auto queryraw = std::make_shared<QueryRaw>(db);
validator = creator();
queryvalidate = std::make_shared<QueryValidate>(db);
queryraw = std::make_shared<QueryRaw>(db);
page_size = config.bootstrap_page_size;
concurrency = config.concurrency;
norefs = config.norefs;

processWays();
// processNodes();
// processRels();

}

void
Bootstrap::processWays() {

std::vector<std::string> tables = {
QueryRaw::polyTable,
QueryRaw::lineTable
Expand All @@ -94,43 +107,39 @@ void startProcessingWays(const underpassconfig::UnderpassConfig &config) {
std::cout << std::endl << "Counting geometries ... " << std::endl;
long int total = queryraw->getWaysCount(*table_it);
long int count = 0;
int num_chunks = total / PAGE_SIZE;
int num_chunks = total / page_size;

std::cout << "Total: " << total << std::endl;
std::cout << "Threads: " << config.concurrency << std::endl;
std::cout << "Page size: " << PAGE_SIZE << std::endl;
std::cout << "Threads: " << concurrency << std::endl;
std::cout << "Page size: " << page_size << std::endl;
long lastid = 0;

int concurrentTasks = config.concurrency;
int concurrentTasks = concurrency;
int taskIndex = 0;
std::chrono::steady_clock::time_point begin;
std::chrono::steady_clock::time_point end;

for (int chunkIndex = 1; chunkIndex <= (num_chunks/concurrentTasks); chunkIndex++) {

int percentage = (count * 100) / total;

auto ways = std::make_shared<std::vector<OsmWay>>();
if (!config.norefs) {
ways = queryraw->getWaysFromDB(lastid, config.concurrency * PAGE_SIZE, *table_it);
if (!norefs) {
ways = queryraw->getWaysFromDB(lastid, concurrency * page_size, *table_it);
} else {
ways = queryraw->getWaysFromDBWithoutRefs(lastid, config.concurrency * PAGE_SIZE, *table_it);
ways = queryraw->getWaysFromDBWithoutRefs(lastid, concurrency * page_size, *table_it);
}

auto tasks = std::make_shared<std::vector<BootstrapTask>>(concurrentTasks);
boost::asio::thread_pool pool(concurrentTasks);
for (int taskIndex = 0; taskIndex < concurrentTasks; taskIndex++) {
auto taskWays = std::make_shared<std::vector<OsmWay>>();
WayTask wayTask {
std::ref(validator),
std::ref(queryvalidate),
config,
taskIndex,
std::ref(tasks),
std::ref(ways),
};
std::cout << "\r" << "Processing " << *table_it << ": " << count << "/" << total << " (" << percentage << "%)";
boost::asio::post(pool, boost::bind(threadBootstrapTask, wayTask));

boost::asio::post(pool, boost::bind(&Bootstrap::threadBootstrapTask, this, wayTask));
}

pool.join();
Expand All @@ -148,38 +157,35 @@ void startProcessingWays(const underpassconfig::UnderpassConfig &config) {

// This thread get started for every page of way
void
threadBootstrapTask(WayTask wayTask)
Bootstrap::threadBootstrapTask(WayTask wayTask)
{
#ifdef TIMING_DEBUG
boost::timer::auto_cpu_timer timer("bootstrap::threadBootstrapTask(wayTask): took %w seconds\n");
#endif
auto plugin = wayTask.plugin;
auto queryvalidate = wayTask.queryvalidate;
auto config = wayTask.config;
auto taskIndex = wayTask.taskIndex;
auto tasks = wayTask.tasks;
auto ways = wayTask.ways;

BootstrapTask task;
int processed = 0;

auto wayval = std::make_shared<std::vector<std::shared_ptr<ValidateStatus>>>();

// Proccesing ways
for (int i = 0; i < PAGE_SIZE; ++i) {
for (int i = 0; i < page_size; ++i) {
if (i * taskIndex < ways->size()) {
auto way = ways->at(i * (taskIndex + 1));
auto status = plugin->checkWay(way, "building");
for (auto status_it = status->status.begin(); status_it != status->status.end(); ++status_it) {
task.query += queryvalidate->applyChange(*status, *status_it);
}
wayval->push_back(validator->checkWay(way, "building"));
// Fill the way_refs table
if (!config.norefs) {
if (!norefs) {
for (auto ref = way.refs.begin(); ref != way.refs.end(); ++ref) {
task.query += "INSERT INTO way_refs (way_id, node_id) VALUES (" + std::to_string(way.id) + "," + std::to_string(*ref) + "); ";
}
}
++processed;
}
}
queryvalidate->ways(wayval, task.query);
task.processed = processed;
const std::lock_guard<std::mutex> lock(tasks_change_mutex);
(*tasks)[taskIndex] = task;
Expand Down
29 changes: 23 additions & 6 deletions src/bootstrap/bootstrap.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "raw/queryraw.hh"
#include "underpassconfig.hh"
#include "validate/validate.hh"
#include <mutex>

using namespace queryvalidate;
using namespace queryraw;
Expand All @@ -37,18 +38,34 @@ struct BootstrapTask {
};

struct WayTask {
std::shared_ptr<Validate> plugin;
std::shared_ptr<QueryValidate> queryvalidate;
underpassconfig::UnderpassConfig config;
int taskIndex;
std::shared_ptr<std::vector<BootstrapTask>> tasks;
std::shared_ptr<std::vector<OsmWay>> ways;
};

void startProcessingWays(const underpassconfig::UnderpassConfig &config);
class Bootstrap {
public:
Bootstrap(void);
~Bootstrap(void){};

static const std::string polyTable;
static const std::string lineTable;

void start(const underpassconfig::UnderpassConfig &config);
void processWays();

// This thread get started for every page of way
void threadBootstrapTask(WayTask wayTask);
// This thread get started for every page of way
void threadBootstrapTask(WayTask wayTask);
std::string allTasksQueries(std::shared_ptr<std::vector<BootstrapTask>> tasks);

std::shared_ptr<Validate> validator;
std::shared_ptr<QueryValidate> queryvalidate;
std::shared_ptr<QueryRaw> queryraw;
std::shared_ptr<Pq> db;
bool norefs;
unsigned int concurrency;
unsigned int page_size;
};

static std::mutex tasks_change_mutex;

Expand Down
18 changes: 12 additions & 6 deletions src/raw/queryraw.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
/// includes querying existing data in the database, as well as
/// updating the database.

// TODO: add support for relations/multipolygon

// This is generated by autoconf
#ifdef HAVE_CONFIG_H
#include "unconfig.h"
Expand Down Expand Up @@ -233,6 +231,12 @@ QueryRaw::applyChange(const OsmWay &way) const
return query;
}

std::string
QueryRaw::applyChange(const OsmRelation &relation) const
{

}

std::vector<long> arrayStrToVector(std::string &refs_str) {
refs_str.erase(0, 1);
refs_str.erase(refs_str.size() - 1);
Expand Down Expand Up @@ -261,10 +265,8 @@ void QueryRaw::getNodeCache(std::shared_ptr<OsmChangeFile> osmchanges, const mul
if (way->action != osmobjects::remove) {
// Save referenced nodes for later use
for (auto rit = std::begin(way->refs); rit != std::end(way->refs); ++rit) {
if (way->action != osmobjects::remove) {
if (!osmchanges->nodecache.count(*rit)) {
referencedNodeIds += std::to_string(*rit) + ",";
}
if (!osmchanges->nodecache.count(*rit)) {
referencedNodeIds += std::to_string(*rit) + ",";
}
}
} else {
Expand Down Expand Up @@ -337,6 +339,10 @@ void QueryRaw::getNodeCache(std::shared_ptr<OsmChangeFile> osmchanges, const mul
}
}

// Build relation multipolyon geometries
// TODO


}

void
Expand Down
5 changes: 3 additions & 2 deletions src/raw/queryraw.hh
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@ class QueryRaw {
std::string applyChange(const OsmNode &node) const;
/// Build query for processed Way
std::string applyChange(const OsmWay &way) const;
/// Build query for updating modified nodes in ways geometries
// std::string applyChange(const std::shared_ptr<std::map<long, std::pair<double, double>>> nodes) const;
/// Build query for processed Relation
std::string applyChange(const OsmRelation &relation) const;
/// Get nodes for filling Node cache
void getNodeCache(std::shared_ptr<OsmChangeFile> osmchanges, const multipolygon_t &poly);
/// Get nodes for filling Node cache from ways refs
void getNodeCacheFromWays(std::shared_ptr<std::vector<OsmWay>> ways, std::map<double, point_t> &nodecache) const;
// Get ways by refs
std::list<std::shared_ptr<OsmWay>> getWaysByNodesRefs(std::string &nodeIds) const;

// DB connection
std::shared_ptr<Pq> dbconn;
// Get ways count
Expand Down
46 changes: 28 additions & 18 deletions src/replicator/threads.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ threadOsmChange(OsmChangeTask osmChangeTask)

auto removed_nodes = std::make_shared<std::vector<long>>();
auto removed_ways = std::make_shared<std::vector<long>>();
auto removed_relations = std::make_shared<std::vector<long>>();
auto validation_removals = std::make_shared<std::vector<long>>();

// Raw data and validation
Expand Down Expand Up @@ -524,6 +525,25 @@ threadOsmChange(OsmChangeTask osmChangeTask)
}
}

// Relations
// for (auto rit = std::begin(change->relations); rit != std::end(change->relations); ++rit) {
// osmobjects::OsmRelation *relation = rit->get();

// if (relation->action != osmobjects::remove && !relation->priority) {
// continue;
// }

// // Remove deleted relations from validation table
// if (!config->disable_validation && relation->action == osmobjects::remove) {
// removed_relations->push_back(relation->id);
// }

// // Update relations, ignore new ones outside priority area
// if (!config->disable_raw) {
// task.query += queryraw->applyChange(*relation);
// }
// }

}
}

Expand All @@ -532,30 +552,20 @@ threadOsmChange(OsmChangeTask osmChangeTask)

// Validate ways
auto wayval = osmchanges->validateWays(poly, plugin);
for (auto it = wayval->begin(); it != wayval->end(); ++it) {
queryvalidate->ways(wayval, task.query, validation_removals);

if (it->get()->status.size() > 0) {
for (auto status_it = it->get()->status.begin(); status_it != it->get()->status.end(); ++status_it) {
task.query += queryvalidate->applyChange(*it->get(), *status_it);
}
if (!it->get()->hasStatus(overlapping)) {
task.query += queryvalidate->updateValidation(it->get()->osm_id, overlapping, "building");
}
if (!it->get()->hasStatus(duplicate)) {
task.query += queryvalidate->updateValidation(it->get()->osm_id, duplicate, "building");
}
if (!it->get()->hasStatus(badgeom)) {
task.query += queryvalidate->updateValidation(it->get()->osm_id, badgeom, "building");
}
} else {
validation_removals->push_back(it->get()->osm_id);
}
}
// Validate nodes
auto nodeval = osmchanges->validateNodes(poly, plugin);
queryvalidate->nodes(nodeval, task.query, validation_removals);

// Validate relations
// task.query += queryvalidate->rels(wayval, task.query, validation_removals);

// Remove validation entries for removed objects
task.query += queryvalidate->updateValidation(validation_removals);
task.query += queryvalidate->updateValidation(removed_nodes);
task.query += queryvalidate->updateValidation(removed_ways);
// task.query += queryvalidate->updateValidation(removed_relations);

}

Expand Down
5 changes: 3 additions & 2 deletions src/underpass.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,9 @@ main(int argc, char *argv[])

if (vm.count("bootstrap")){
std::thread bootstrapThread;
std::cout << "Starting bootstrapping proccess ..." << std::endl;
bootstrapThread = std::thread(bootstrap::startProcessingWays, std::ref(config));
std::cout << "Starting bootstrapping process ..." << std::endl;
auto boostrapper = bootstrap::Bootstrap();
bootstrapThread = std::thread(&bootstrap::Bootstrap::start, &boostrapper, std::ref(config));
log_info("Waiting...");
if (bootstrapThread.joinable()) {
bootstrapThread.join();
Expand Down
Loading

0 comments on commit fe7348f

Please sign in to comment.