diff --git a/docs/user/postgis/index_config.rst b/docs/user/postgis/index_config.rst index 4fe6de747512..ad753034fdc2 100644 --- a/docs/user/postgis/index_config.rst +++ b/docs/user/postgis/index_config.rst @@ -156,3 +156,14 @@ for each query, moving data out of it faster may improve performance. After the schema has been created, changes to the schedule can be made through the :ref:`postgis_cli_update_schema` command. + +Configuring WAL logging +----------------------- + +PostgreSQL uses a write-ahead log (WAL) to ensure data consistency and durability. By default, the WAL is written +for all changes to the database, including the partitioned tables. Disabling the WAL for the partitioned tables +can significantly improve write performance, but at the cost of data durability. If increased performance is desired, +the WAL can be disabled for the partitioned tables by setting the key ``pg.wal.enabled`` to ``false``. + +See the PostgreSQL `documentation `_ + for more information on the implications of disabling the WAL. \ No newline at end of file diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisDialect.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisDialect.scala index 449056db13e7..631c2f83f878 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisDialect.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisDialect.scala @@ -418,6 +418,9 @@ object PartitionedPostgisDialect { val WriteAheadPartitionsTableSpace = "pg.partitions.tablespace.wa-partitions" val MainTableSpace = "pg.partitions.tablespace.main" + // set postgres table wal logging + val WalLogEnabled = "pg.wal.enabled" + implicit class ConfigConversions(val sft: SimpleFeatureType) extends AnyVal { def getIntervalHours: Int = Option(sft.getUserData.get(IntervalHours)).map(int).getOrElse(6) def getMaxPartitions: Option[Int] = Option(sft.getUserData.get(MaxPartitions)).map(int) diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/package.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/package.scala index 1f850d990052..3fc47d155300 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/package.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/package.scala @@ -158,7 +158,13 @@ package object dialect { tables: Tables, cols: Columns, partitions: PartitionInfo, - userData: Map[String, String] = Map.empty) + userData: Map[String, String] = Map.empty) { + val walLogSQL: String = if (userData.getOrElse(Config.WalLogEnabled, "true").toBoolean) { + "" + } else { + " UNLOGGED " + } + } object TypeInfo { diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/CompactPartitions.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/CompactPartitions.scala index a1b73870dc22..6d953a43e7bf 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/CompactPartitions.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/CompactPartitions.scala @@ -75,7 +75,7 @@ object CompactPartitions extends SqlProcedure { | -- lock the child table to prevent any inserts that would be lost | EXECUTE 'LOCK TABLE ${info.schema.quoted}.' || quote_ident(spill_partition) || | ' IN SHARE ROW EXCLUSIVE MODE'; - | EXECUTE 'CREATE TABLE ${info.schema.quoted}.' || quote_ident(partition_name || '_tmp_sort') || + | EXECUTE 'CREATE ${info.walLogSQL} TABLE ${info.schema.quoted}.' || quote_ident(partition_name || '_tmp_sort') || | partition_tablespace || ' AS SELECT * FROM' || | ' (SELECT * FROM ${info.schema.quoted}.' || quote_ident(partition_name) || | ' UNION ALL SELECT * FROM ${info.schema.quoted}.' || quote_ident(spill_partition) || @@ -83,7 +83,7 @@ object CompactPartitions extends SqlProcedure { | ' ORDER BY _st_sortablehash($geomCol)'; | GET DIAGNOSTICS unsorted_count := ROW_COUNT; | ELSE - | EXECUTE 'CREATE TABLE ${info.schema.quoted}.' || quote_ident(partition_name || '_tmp_sort') || + | EXECUTE 'CREATE ${info.walLogSQL} TABLE ${info.schema.quoted}.' || quote_ident(partition_name || '_tmp_sort') || | partition_tablespace || ' AS SELECT * FROM ' || quote_ident(partition_name) || | ' ORDER BY _st_sortablehash($geomCol)'; | GET DIAGNOSTICS unsorted_count := ROW_COUNT; diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/MergeWriteAheadPartitions.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/MergeWriteAheadPartitions.scala index dafeca4f689a..dbacebaece6d 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/MergeWriteAheadPartitions.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/MergeWriteAheadPartitions.scala @@ -121,7 +121,7 @@ object MergeWriteAheadPartitions extends SqlProcedure { | -- use "create table as" (vs create then insert) for performance benefits related to WAL skipping | -- we need a "select distinct" to avoid primary key conflicts - this should be fairly cheap since | -- we're already sorting and there should be few or no conflicts - | EXECUTE 'CREATE TABLE ${info.schema.quoted}.' || quote_ident(partition_name) || + | EXECUTE 'CREATE ${info.walLogSQL} TABLE ${info.schema.quoted}.' || quote_ident(partition_name) || | partition_tablespace || ' AS SELECT DISTINCT ON' || | ' (_st_sortablehash($geomCol), fid, ${info.cols.dtg.quoted}) * FROM ' || | quote_ident(partition_name || '_tmp_migrate') || ' ORDER BY _st_sortablehash($geomCol)'; diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/PartitionWriteAheadLog.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/PartitionWriteAheadLog.scala index 22e3768a641d..62334f157b02 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/PartitionWriteAheadLog.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/PartitionWriteAheadLog.scala @@ -157,7 +157,7 @@ object PartitionWriteAheadLog extends SqlProcedure { | RAISE INFO '% Creating partition with insert % (unattached)', timeofday()::timestamp, partition_name; | -- upper bounds are exclusive | -- use "create table as" (vs create then insert) for performance benefits related to WAL skipping - | EXECUTE 'CREATE TABLE ${info.schema.quoted}.' || quote_ident(partition_name) || + | EXECUTE 'CREATE ${info.walLogSQL} TABLE ${info.schema.quoted}.' || quote_ident(partition_name) || | partition_tablespace || ' AS SELECT * FROM ' || quote_ident(write_ahead.name) || | ' WHERE $dtgCol >= ' || quote_literal(partition_start) || | ' AND $dtgCol < ' || quote_literal(partition_end) || diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/RollWriteAheadLog.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/RollWriteAheadLog.scala index 029e7104e9c7..09ed365b9d07 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/RollWriteAheadLog.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/procedures/RollWriteAheadLog.scala @@ -77,7 +77,7 @@ object RollWriteAheadLog extends SqlProcedure with CronSchedule { | END IF; | | -- requires SHARE UPDATE EXCLUSIVE - | EXECUTE 'CREATE TABLE IF NOT EXISTS ${info.schema.quoted}.' || quote_ident(next_partition) || '(' || + | EXECUTE 'CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${info.schema.quoted}.' || quote_ident(next_partition) || '(' || | 'CONSTRAINT ' || quote_ident(next_partition || '_pkey') || | ' PRIMARY KEY (fid, ${info.cols.dtg.quoted})' || index_space || ')' || | ' INHERITS (${table.name.qualified})${table.storage.opts}' || partition_tablespace; diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/AnalyzeQueueTable.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/AnalyzeQueueTable.scala index 32db57b0b7cc..2c0cf30392ae 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/AnalyzeQueueTable.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/AnalyzeQueueTable.scala @@ -16,7 +16,7 @@ object AnalyzeQueueTable extends SqlStatements { override protected def createStatements(info: TypeInfo): Seq[String] = { val create = - s"""CREATE TABLE IF NOT EXISTS ${info.tables.analyzeQueue.name.qualified} ( + s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${info.tables.analyzeQueue.name.qualified} ( | partition_name text, | enqueued timestamp without time zone |);""".stripMargin diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/PartitionTables.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/PartitionTables.scala index 27b1ee22a54e..87c0c28d8067 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/PartitionTables.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/PartitionTables.scala @@ -32,7 +32,7 @@ object PartitionTables extends SqlStatements { case Some(ts) => (s" TABLESPACE ${ts.quoted}", s" USING INDEX TABLESPACE ${ts.quoted}") } val create = - s"""CREATE TABLE IF NOT EXISTS ${table.name.qualified} ( + s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${table.name.qualified} ( | LIKE ${info.tables.writeAhead.name.qualified} INCLUDING DEFAULTS INCLUDING CONSTRAINTS, | CONSTRAINT ${escape(table.name.raw, "pkey")} PRIMARY KEY (fid, ${info.cols.dtg.quoted})$indexTs |) PARTITION BY RANGE(${info.cols.dtg.quoted})$tableTs;""".stripMargin diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/SortQueueTable.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/SortQueueTable.scala index 88eb29ededeb..f8bc80b57293 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/SortQueueTable.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/SortQueueTable.scala @@ -16,7 +16,7 @@ object SortQueueTable extends SqlStatements { override protected def createStatements(info: TypeInfo): Seq[String] = { val create = - s"""CREATE TABLE IF NOT EXISTS ${info.tables.sortQueue.name.qualified} ( + s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${info.tables.sortQueue.name.qualified} ( | partition_name text, | unsorted_count bigint, | enqueued timestamp without time zone diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/WriteAheadTable.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/WriteAheadTable.scala index f8c6cded38cf..f0e3623021a3 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/WriteAheadTable.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/WriteAheadTable.scala @@ -35,7 +35,7 @@ object WriteAheadTable extends SqlStatements { | WHERE type_name = ${literal(info.typeName)} INTO seq_val; | partition := ${literal(table.name.raw + "_")} || lpad(seq_val::text, 3, '0'); | - | EXECUTE 'CREATE TABLE IF NOT EXISTS ${info.schema.quoted}.' || quote_ident(partition) || '(' || + | EXECUTE 'CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${info.schema.quoted}.' || quote_ident(partition) || '(' || | 'CONSTRAINT ' || quote_ident(partition || '_pkey') || | ' PRIMARY KEY (fid, ${info.cols.dtg.quoted})$indexTs ' || | ') INHERITS (${table.name.qualified})${table.storage.opts}$tableTs'; diff --git a/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/PartitionedPostgisDataStoreTest.scala b/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/PartitionedPostgisDataStoreTest.scala index bee8bdc20d8f..b39013455f5e 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/PartitionedPostgisDataStoreTest.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/PartitionedPostgisDataStoreTest.scala @@ -21,6 +21,7 @@ import org.geotools.jdbc.JDBCDataStore import org.geotools.referencing.CRS import org.junit.runner.RunWith import org.locationtech.geomesa.filter.FilterHelper +import org.locationtech.geomesa.gt.partition.postgis.dialect.PartitionedPostgisDialect.Config.WalLogEnabled import org.locationtech.geomesa.gt.partition.postgis.dialect.procedures.{DropAgedOffPartitions, PartitionMaintenance, RollWriteAheadLog} import org.locationtech.geomesa.gt.partition.postgis.dialect.tables.{PartitionTablespacesTable, PrimaryKeyTable, SequenceTable, UserDataTable} import org.locationtech.geomesa.gt.partition.postgis.dialect.{PartitionedPostgisDialect, PartitionedPostgisPsDialect, TableConfig, TypeInfo} @@ -88,6 +89,27 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll "preparedStatements" -> "true" ) + def isTableLoggedQuery(tableName: String, schemaName: String): String = + s""" + |SELECT + | n.nspname AS schema_name, + | c.relname AS table_name, + | CASE c.relpersistence + | WHEN 'u' THEN 'unlogged' + | WHEN 'p' THEN 'permanent' + | WHEN 't' THEN 'temporary' + | ELSE 'unknown' + | END AS table_type + |FROM + | pg_class c + |JOIN + | pg_namespace n ON n.oid = c.relnamespace + |WHERE + | c.relname = '$tableName' + | AND n.nspname = '$schemaName'; + | + |""".stripMargin + var container: GenericContainer[_] = _ lazy val host = Option(container).map(_.getHost).getOrElse("localhost") @@ -133,6 +155,77 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll ok } + "create logged tables" in { + val ds = DataStoreFinder.getDataStore(params.asJava) + ds must not(beNull) + + try { + val sft = SimpleFeatureTypes.renameSft(this.sft, "logged_test") + + ds.createSchema(sft) + + val typeInfo: TypeInfo = TypeInfo(this.schema, sft) + + Seq( + typeInfo.tables.mainPartitions.name.raw, + typeInfo.tables.writeAheadPartitions.name.raw, + typeInfo.tables.spillPartitions.name.raw, + typeInfo.tables.analyzeQueue.name.raw, + typeInfo.tables.sortQueue.name.raw).forall { tableName => + val sql = isTableLoggedQuery(tableName, "public") + // verify that the table is logged + WithClose(ds.asInstanceOf[JDBCDataStore].getConnection(Transaction.AUTO_COMMIT)) { cx => + WithClose(cx.createStatement()) { st => + WithClose(st.executeQuery(sql)) { rs => + rs.next() must beTrue + logger.info(s"Table ${rs.getString("table_name")} is ${rs.getString("table_name")}") + rs.getString("table_type") mustEqual "permanent" + } + } + } + } + } finally { + ds.dispose() + } + } + + "create unlogged tables" in { + val ds = DataStoreFinder.getDataStore(params.asJava) + ds must not(beNull) + + try { + val sft = SimpleFeatureTypes.renameSft(this.sft, "unlogged_test") + sft.getUserData.put(WalLogEnabled, "false") + + ds.createSchema(sft) + + val typeInfo: TypeInfo = TypeInfo(this.schema, sft) + + Seq( + typeInfo.tables.mainPartitions.name.raw, + typeInfo.tables.writeAheadPartitions.name.raw, + // typeInfo.tables.writeAhead.name.raw, write ahead table is created with PartitionedPostgisDialect#encodePostCreateTable + // which doesnt have access to the user data, should be ok because the write ahead main table doesnt have any data + typeInfo.tables.spillPartitions.name.raw, + typeInfo.tables.analyzeQueue.name.raw, + typeInfo.tables.sortQueue.name.raw).forall { tableName => + val sql = isTableLoggedQuery(tableName, "public") + // verify that the table is unlogged + WithClose(ds.asInstanceOf[JDBCDataStore].getConnection(Transaction.AUTO_COMMIT)) { cx => + WithClose(cx.createStatement()) { st => + WithClose(st.executeQuery(sql)) { rs => + rs.next() must beTrue + logger.info(s"Table ${rs.getString("table_name")} is ${rs.getString("table_name")}") + rs.getString("table_type") mustEqual "unlogged" + } + } + } + } + } finally { + ds.dispose() + } + } + "work" in { val ds = DataStoreFinder.getDataStore(params.asJava) ds must not(beNull)