Skip to content

Commit

Permalink
Make it possible to partition osm2pgsql tables
Browse files Browse the repository at this point in the history
  • Loading branch information
apmon committed May 22, 2013
1 parent d32bad6 commit 0570fdc
Showing 1 changed file with 123 additions and 35 deletions.
158 changes: 123 additions & 35 deletions output-pgsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ static struct s_table {
};
#define NUM_TABLES ((signed)(sizeof(tables) / sizeof(tables[0])))

static struct p_table {
const char *name;
enum table_id parent;
const char * table_constraint;
const char * trigger_constraint;
} partitions [] = {
{ .name = "buildings", t_poly, "building is not null", "NEW.building is not null"},
{ .name = "nonbuildings", t_poly, "building is null", "NEW.building is null"},
{ .name = "highways", t_line, "highway is not null", "NEW.highway is not null"},
{ .name = "nonhighways", t_line, "highway is null", "NEW.highway is null"}
};

#define NUM_PARTITIONS ((signed)(sizeof(partitions) / sizeof(partitions[0])))


static struct flagsname {
char *name;
Expand Down Expand Up @@ -832,11 +846,13 @@ static int pgsql_out_connect(const struct output_options *options, int startTran
static int pgsql_out_start(const struct output_options *options)
{
char *sql, tmp[256];
char *partition_sql;
PGresult *res;
int i,j;
int i,j,p;
unsigned int sql_len;
int their_srid;
int i_hstore_column;
int has_partitions;
enum OsmType type;
int numTags;
struct taginfo *exportTags;
Expand All @@ -847,6 +863,7 @@ static int pgsql_out_start(const struct output_options *options)

sql_len = 2048;
sql = malloc(sql_len);
partition_sql = malloc(sql_len);
assert(sql);

for (i=0; i<NUM_TABLES; i++) {
Expand All @@ -870,7 +887,7 @@ static int pgsql_out_start(const struct output_options *options)
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "SET synchronous_commit TO off;");

if (!options->append) {
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "DROP TABLE IF EXISTS %s", tables[i].name);
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "DROP TABLE IF EXISTS %s CASCADE", tables[i].name);
}
else
{
Expand All @@ -891,7 +908,7 @@ static int pgsql_out_start(const struct output_options *options)
}

/* These _tmp tables can be left behind if we run out of disk space */
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "DROP TABLE IF EXISTS %s_tmp", tables[i].name);
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "DROP TABLE IF EXISTS %s_tmp CASCADE", tables[i].name);

pgsql_exec(sql_conn, PGRES_COMMAND_OK, "BEGIN");

Expand Down Expand Up @@ -940,6 +957,35 @@ static int pgsql_out_start(const struct output_options *options)
}
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", sql);
}

sprintf(partition_sql, "CREATE OR REPLACE FUNCTION %s_insert_trigger() RETURNS TRIGGER AS $$ BEGIN", tables[i].name);

has_partitions = 0;
for (p = 0; p < NUM_PARTITIONS; p++) {
if (partitions[p].parent == i) {
has_partitions = 1;
sprintf(sql, "CREATE TABLE %s_%s ( CHECK (%s) ) INHERITS (%s);",tables[i].name, partitions[p].name, partitions[p].table_constraint, tables[i].name);
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", sql);
if (Options->slim && !Options->droptemp) {
sprintf(sql,
"CREATE INDEX %s_%s_pkey ON %s_%s USING BTREE (osm_id)",
tables[i].name, partitions[p].name, tables[i].name, partitions[p].name);
if (Options->tblsmain_index) {
sprintf(sql + strlen(sql), " TABLESPACE %s\n",
Options->tblsmain_index);
}
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", sql);
}
sprintf(partition_sql, "%s IF ( %s ) THEN INSERT INTO %s_%s VALUES (NEW.*); END IF;",
partition_sql, partitions[p].trigger_constraint, tables[i].name, partitions[p].name);
}
}

sprintf(partition_sql, "%s RETURN NULL; END; $$ LANGUAGE plpgsql; "
"CREATE TRIGGER insert_%s_insert_trigger BEFORE "
"INSERT ON %s FOR EACH ROW EXECUTE PROCEDURE %s_insert_trigger();",
partition_sql, tables[i].name, tables[i].name, tables[i].name);
if (has_partitions) pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", partition_sql);
} else {
/* Add any new columns referenced in the default.style */
PGresult *res;
Expand Down Expand Up @@ -1066,9 +1112,13 @@ static void pgsql_out_commit(void) {

static void *pgsql_out_stop_one(void *arg)
{
int i_column;
int i_column, p;
struct s_table *table = arg;
PGconn *sql_conn = table->sql_conn;
char * table_name;
int has_partitions = 0;

table_name = malloc(2048);

if( table->buflen != 0 )
{
Expand All @@ -1084,7 +1134,7 @@ static void *pgsql_out_stop_one(void *arg)
fprintf(stderr, "Sorting data and creating indexes for %s\n", table->name);
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "ANALYZE %s;\n", table->name);
fprintf(stderr, "Analyzing %s finished\n", table->name);
if (Options->tblsmain_data) {
/*if (Options->tblsmain_data) {
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "CREATE TABLE %s_tmp "
"TABLESPACE %s AS SELECT * FROM %s ORDER BY way;\n",
table->name, Options->tblsmain_data, table->name);
Expand All @@ -1094,44 +1144,82 @@ static void *pgsql_out_stop_one(void *arg)
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "DROP TABLE %s;\n", table->name);
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "ALTER TABLE %s_tmp RENAME TO %s;\n", table->name, table->name);
fprintf(stderr, "Copying %s to cluster by geometry finished\n", table->name);
fprintf(stderr, "Creating geometry index on %s\n", table->name);
if (Options->tblsmain_index) {
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "CREATE INDEX %s_index ON %s USING GIST (way) TABLESPACE %s;\n", table->name, table->name, Options->tblsmain_index);
} else {
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "CREATE INDEX %s_index ON %s USING GIST (way);\n", table->name, table->name);
}
*/

/* slim mode needs this to be able to apply diffs */
if (Options->slim && !Options->droptemp)
{
fprintf(stderr, "Creating osm_id index on %s\n", table->name);
if (Options->tblsmain_index) {
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "CREATE INDEX %s_pkey ON %s USING BTREE (osm_id) TABLESPACE %s;\n", table->name, table->name, Options->tblsmain_index);
for (p = 0; p <= NUM_PARTITIONS ; p++) {
if (p == NUM_PARTITIONS && (!has_partitions)) {
sprintf(table_name, "%s", table->name);
} else {
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "CREATE INDEX %s_pkey ON %s USING BTREE (osm_id);\n", table->name, table->name);
if (&(tables[partitions[p].parent]) != table) continue;
has_partitions = 1;
sprintf(table_name, "%s_%s", table->name, partitions[p].name);
}
}
/* Create hstore index if selected */
if (Options->enable_hstore_index) {
fprintf(stderr, "Creating hstore indexes on %s\n", table->name);

fprintf(stderr, "Creating geometry index on %s\n", table_name);
if (Options->tblsmain_index) {
if (HSTORE_NONE != (Options->enable_hstore))
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "CREATE INDEX %s_tags_index ON %s USING GIN (tags) TABLESPACE %s;\n", table->name, table->name, Options->tblsmain_index);
for(i_column = 0; i_column < Options->n_hstore_columns; i_column++) {
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "CREATE INDEX %s_hstore_%i_index ON %s USING GIN (\"%s\") TABLESPACE %s;\n",
table->name, i_column,table->name, Options->hstore_columns[i_column], Options->tblsmain_index);
}
pgsql_exec(sql_conn, PGRES_COMMAND_OK,
"CREATE INDEX %s_index ON %s USING GIST (way) TABLESPACE %s;\n",
table_name, table_name, Options->tblsmain_index);
} else {
if (HSTORE_NONE != (Options->enable_hstore))
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "CREATE INDEX %s_tags_index ON %s USING GIN (tags);\n", table->name, table->name);
for(i_column = 0; i_column < Options->n_hstore_columns; i_column++) {
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "CREATE INDEX %s_hstore_%i_index ON %s USING GIN (\"%s\");\n", table->name, i_column,table->name, Options->hstore_columns[i_column]);
pgsql_exec(sql_conn, PGRES_COMMAND_OK,
"CREATE INDEX %s_index ON %s USING GIST (way);\n",
table_name, table_name);
}

pgsql_exec(sql_conn, PGRES_COMMAND_OK, "CLUSTER %s USING %s_index;\n", table_name, table_name);

/* slim mode needs this to be able to apply diffs */
if (Options->slim && !Options->droptemp) {
fprintf(stderr, "Creating osm_id index on %s\n", table->name);
if (Options->tblsmain_index) {
pgsql_exec(sql_conn, PGRES_COMMAND_OK,
"CREATE INDEX %s_pkey ON %s USING BTREE (osm_id) TABLESPACE %s;\n",
table_name, table_name, Options->tblsmain_index);
} else {
pgsql_exec(sql_conn, PGRES_COMMAND_OK,
"CREATE INDEX %s_pkey ON %s USING BTREE (osm_id);\n",
table_name, table_name);
}
}
/* Create hstore index if selected */
if (Options->enable_hstore_index) {
fprintf(stderr, "Creating hstore indexes on %s\n",
table_name);
if (Options->tblsmain_index) {
if (HSTORE_NONE != (Options->enable_hstore))
pgsql_exec(sql_conn, PGRES_COMMAND_OK,
"CREATE INDEX %s_tags_index ON %s USING GIN (tags) TABLESPACE %s;\n",
table_name, table_name,
Options->tblsmain_index);
for (i_column = 0; i_column < Options->n_hstore_columns;
i_column++) {
pgsql_exec(sql_conn, PGRES_COMMAND_OK,
"CREATE INDEX %s_hstore_%i_index ON %s USING GIN (\"%s\") TABLESPACE %s;\n",
table_name, i_column, table_name,
Options->hstore_columns[i_column],
Options->tblsmain_index);
}
} else {
if (HSTORE_NONE != (Options->enable_hstore))
pgsql_exec(sql_conn, PGRES_COMMAND_OK,
"CREATE INDEX %s_tags_index ON %s USING GIN (tags);\n",
table_name, table_name);
for (i_column = 0; i_column < Options->n_hstore_columns;
i_column++) {
pgsql_exec(sql_conn, PGRES_COMMAND_OK,
"CREATE INDEX %s_hstore_%i_index ON %s USING GIN (\"%s\");\n",
table_name, i_column, table_name,
Options->hstore_columns[i_column]);
}
}
}
fprintf(stderr, "Creating indexes on %s finished\n", table_name);
pgsql_exec(sql_conn, PGRES_COMMAND_OK,
"GRANT SELECT ON %s TO PUBLIC;\n", table_name);
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "ANALYZE %s;\n",
table_name);
}
fprintf(stderr, "Creating indexes on %s finished\n", table->name);
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "GRANT SELECT ON %s TO PUBLIC;\n", table->name);
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "ANALYZE %s;\n", table->name);

time(&end);
fprintf(stderr, "All indexes on %s created in %ds\n", table->name, (int)(end - start));
}
Expand Down

0 comments on commit 0570fdc

Please sign in to comment.