Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEOMESA-3423 - add unlogged user data option #3244

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,9 @@ object PartitionedPostgisDialect {
val WriteAheadPartitionsTableSpace = "pg.partitions.tablespace.wa-partitions"
val MainTableSpace = "pg.partitions.tablespace.main"

// set postgres table wal logging
val WalLogEnabled = "pg.wal.log.enabled"
autodidacticon marked this conversation as resolved.
Show resolved Hide resolved

implicit class ConfigConversions(val sft: SimpleFeatureType) extends AnyVal {
def getIntervalHours: Int = Option(sft.getUserData.get(IntervalHours)).map(int).getOrElse(6)
def getMaxPartitions: Option[Int] = Option(sft.getUserData.get(MaxPartitions)).map(int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,13 @@ package object dialect {
tables: Tables,
cols: Columns,
partitions: PartitionInfo,
userData: Map[String, String] = Map.empty)
userData: Map[String, String] = Map.empty) {
val walLogSQL: String = if (userData.getOrElse(Config.WalLogEnabled, "true").toBoolean) {
""
} else {
" UNLOGGED "
}
}

object TypeInfo {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ object CompactPartitions extends SqlProcedure {
| -- lock the child table to prevent any inserts that would be lost
| EXECUTE 'LOCK TABLE ${info.schema.quoted}.' || quote_ident(spill_partition) ||
| ' IN SHARE ROW EXCLUSIVE MODE';
| EXECUTE 'CREATE TABLE ${info.schema.quoted}.' || quote_ident(partition_name || '_tmp_sort') ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE ${info.schema.quoted}.' || quote_ident(partition_name || '_tmp_sort') ||
| partition_tablespace || ' AS SELECT * FROM' ||
| ' (SELECT * FROM ${info.schema.quoted}.' || quote_ident(partition_name) ||
| ' UNION ALL SELECT * FROM ${info.schema.quoted}.' || quote_ident(spill_partition) ||
| ') results' ||
| ' ORDER BY _st_sortablehash($geomCol)';
| GET DIAGNOSTICS unsorted_count := ROW_COUNT;
| ELSE
| EXECUTE 'CREATE TABLE ${info.schema.quoted}.' || quote_ident(partition_name || '_tmp_sort') ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE ${info.schema.quoted}.' || quote_ident(partition_name || '_tmp_sort') ||
| partition_tablespace || ' AS SELECT * FROM ' || quote_ident(partition_name) ||
| ' ORDER BY _st_sortablehash($geomCol)';
| GET DIAGNOSTICS unsorted_count := ROW_COUNT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object MergeWriteAheadPartitions extends SqlProcedure {
| -- use "create table as" (vs create then insert) for performance benefits related to WAL skipping
| -- we need a "select distinct" to avoid primary key conflicts - this should be fairly cheap since
| -- we're already sorting and there should be few or no conflicts
| EXECUTE 'CREATE TABLE ${info.schema.quoted}.' || quote_ident(partition_name) ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE ${info.schema.quoted}.' || quote_ident(partition_name) ||
| partition_tablespace || ' AS SELECT DISTINCT ON' ||
| ' (_st_sortablehash($geomCol), fid, ${info.cols.dtg.quoted}) * FROM ' ||
| quote_ident(partition_name || '_tmp_migrate') || ' ORDER BY _st_sortablehash($geomCol)';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ object PartitionWriteAheadLog extends SqlProcedure {
| RAISE INFO '% Creating partition with insert % (unattached)', timeofday()::timestamp, partition_name;
| -- upper bounds are exclusive
| -- use "create table as" (vs create then insert) for performance benefits related to WAL skipping
| EXECUTE 'CREATE TABLE ${info.schema.quoted}.' || quote_ident(partition_name) ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE ${info.schema.quoted}.' || quote_ident(partition_name) ||
| partition_tablespace || ' AS SELECT * FROM ' || quote_ident(write_ahead.name) ||
| ' WHERE $dtgCol >= ' || quote_literal(partition_start) ||
| ' AND $dtgCol < ' || quote_literal(partition_end) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object RollWriteAheadLog extends SqlProcedure with CronSchedule {
| END IF;
|
| -- requires SHARE UPDATE EXCLUSIVE
| EXECUTE 'CREATE TABLE IF NOT EXISTS ${info.schema.quoted}.' || quote_ident(next_partition) || '(' ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${info.schema.quoted}.' || quote_ident(next_partition) || '(' ||
| 'CONSTRAINT ' || quote_ident(next_partition || '_pkey') ||
| ' PRIMARY KEY (fid, ${info.cols.dtg.quoted})' || index_space || ')' ||
| ' INHERITS (${table.name.qualified})${table.storage.opts}' || partition_tablespace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object AnalyzeQueueTable extends SqlStatements {

override protected def createStatements(info: TypeInfo): Seq[String] = {
val create =
s"""CREATE TABLE IF NOT EXISTS ${info.tables.analyzeQueue.name.qualified} (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${info.tables.analyzeQueue.name.qualified} (
| partition_name text,
| enqueued timestamp without time zone
|);""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object PartitionTables extends SqlStatements {
case Some(ts) => (s" TABLESPACE ${ts.quoted}", s" USING INDEX TABLESPACE ${ts.quoted}")
}
val create =
s"""CREATE TABLE IF NOT EXISTS ${table.name.qualified} (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${table.name.qualified} (
| LIKE ${info.tables.writeAhead.name.qualified} INCLUDING DEFAULTS INCLUDING CONSTRAINTS,
| CONSTRAINT ${escape(table.name.raw, "pkey")} PRIMARY KEY (fid, ${info.cols.dtg.quoted})$indexTs
|) PARTITION BY RANGE(${info.cols.dtg.quoted})$tableTs;""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class PartitionTablespacesTable extends Sql {
val table = TableIdentifier(info.schema.raw, Name.raw)
val cName = TableName(Name.raw + "_pkey")
val create =
s"""CREATE TABLE IF NOT EXISTS ${table.quoted} (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${table.quoted} (
| type_name text not null,
| table_type text not null,
| table_space text
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class PrimaryKeyTable extends Sql {
// we need to define the primary key separately since the main view can't have any primary key columns
val table = s"${info.schema.quoted}.${Name.quoted}"
val create =
s"""CREATE TABLE IF NOT EXISTS $table (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS $table (
| table_schema character varying,
| table_name character varying,
| pk_column_idx integer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class SequenceTable extends Sql {
override def create(info: TypeInfo)(implicit ex: ExecutionContext): Unit = {
val table = TableIdentifier(info.schema.raw, Name.raw)
val create =
s"""CREATE TABLE IF NOT EXISTS ${table.qualified} (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${table.qualified} (
| type_name text PRIMARY KEY,
| value smallint NOT NULL CHECK (value >= 0 AND value <= 999)
|);""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object SortQueueTable extends SqlStatements {

override protected def createStatements(info: TypeInfo): Seq[String] = {
val create =
s"""CREATE TABLE IF NOT EXISTS ${info.tables.sortQueue.name.qualified} (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${info.tables.sortQueue.name.qualified} (
| partition_name text,
| unsorted_count bigint,
| enqueued timestamp without time zone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class UserDataTable extends Sql {
val table = TableIdentifier(info.schema.raw, Name.raw)
val cName = TableName(Name.raw + "_pkey")
val create =
s"""CREATE TABLE IF NOT EXISTS ${table.quoted} (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${table.quoted} (
| type_name text not null,
| key text not null,
| value text not null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object WriteAheadTable extends SqlStatements {
| WHERE type_name = ${literal(info.typeName)} INTO seq_val;
| partition := ${literal(table.name.raw + "_")} || lpad(seq_val::text, 3, '0');
|
| EXECUTE 'CREATE TABLE IF NOT EXISTS ${info.schema.quoted}.' || quote_ident(partition) || '(' ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${info.schema.quoted}.' || quote_ident(partition) || '(' ||
| 'CONSTRAINT ' || quote_ident(partition || '_pkey') ||
| ' PRIMARY KEY (fid, ${info.cols.dtg.quoted})$indexTs ' ||
| ') INHERITS (${table.name.qualified})${table.storage.opts}$tableTs';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.geotools.jdbc.JDBCDataStore
import org.geotools.referencing.CRS
import org.junit.runner.RunWith
import org.locationtech.geomesa.filter.FilterHelper
import org.locationtech.geomesa.gt.partition.postgis.dialect.PartitionedPostgisDialect.Config.WalLogEnabled
import org.locationtech.geomesa.gt.partition.postgis.dialect.procedures.{DropAgedOffPartitions, PartitionMaintenance, RollWriteAheadLog}
import org.locationtech.geomesa.gt.partition.postgis.dialect.tables.{PartitionTablespacesTable, PrimaryKeyTable, SequenceTable, UserDataTable}
import org.locationtech.geomesa.gt.partition.postgis.dialect.{PartitionedPostgisDialect, PartitionedPostgisPsDialect, TableConfig, TypeInfo}
Expand Down Expand Up @@ -88,6 +89,27 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll
"preparedStatements" -> "true"
)

def isTableLoggedQuery(tableName: String, schemaName: String): String =
s"""
|SELECT
| n.nspname AS schema_name,
| c.relname AS table_name,
| CASE c.relpersistence
| WHEN 'u' THEN 'unlogged'
| WHEN 'p' THEN 'permanent'
| WHEN 't' THEN 'temporary'
| ELSE 'unknown'
| END AS table_type
|FROM
| pg_class c
|JOIN
| pg_namespace n ON n.oid = c.relnamespace
|WHERE
| c.relname = '$tableName'
| AND n.nspname = '$schemaName';
|
|""".stripMargin

var container: GenericContainer[_] = _

lazy val host = Option(container).map(_.getHost).getOrElse("localhost")
Expand Down Expand Up @@ -133,6 +155,43 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll
ok
}

"create unlogged tables" in {
val ds = DataStoreFinder.getDataStore(params.asJava)
ds must not(beNull)

try {
val sft = SimpleFeatureTypes.renameSft(this.sft, "unlogged_test")
sft.getUserData.put(WalLogEnabled, "false")

ds.createSchema(sft)

val typeInfo: TypeInfo = TypeInfo(this.schema, sft)

Seq(
typeInfo.tables.mainPartitions.name.raw,
typeInfo.tables.writeAheadPartitions.name.raw,
// typeInfo.tables.writeAhead.name.raw, write ahead table is created with PartitionedPostgisDialect#encodePostCreateTable
// which doesnt have access to the user data, should be ok because the write ahead main table doesnt have any data
typeInfo.tables.spillPartitions.name.raw,
typeInfo.tables.analyzeQueue.name.raw,
typeInfo.tables.sortQueue.name.raw).forall { tableName =>
val sql = isTableLoggedQuery(tableName, "public")
// verify that the table is unlogged
WithClose(ds.asInstanceOf[JDBCDataStore].getConnection(Transaction.AUTO_COMMIT)) { cx =>
WithClose(cx.createStatement()) { st =>
WithClose(st.executeQuery(sql)) { rs =>
rs.next() must beTrue
logger.info(s"Table ${rs.getString("table_name")} is ${rs.getString("table_name")}")
rs.getString("table_type") mustEqual "unlogged"
}
}
}
}
} finally {
ds.dispose()
}
}

"work" in {
val ds = DataStoreFinder.getDataStore(params.asJava)
ds must not(beNull)
Expand Down