From 0ef69b9607f0c3a020b20b8ae15523653986eb4c Mon Sep 17 00:00:00 2001 From: osaxma <46427323+osaxma@users.noreply.github.com> Date: Fri, 9 Sep 2022 22:27:54 +0300 Subject: [PATCH 1/6] add replication mode configuration --- lib/src/connection.dart | 17 +++++++++++++++++ lib/src/replication.dart | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) create mode 100644 lib/src/replication.dart diff --git a/lib/src/connection.dart b/lib/src/connection.dart index 3701060c..373550c6 100644 --- a/lib/src/connection.dart +++ b/lib/src/connection.dart @@ -14,6 +14,7 @@ import 'message_window.dart'; import 'query.dart'; import 'query_cache.dart'; import 'query_queue.dart'; +import 'replication.dart'; import 'server_messages.dart'; part 'connection_fsm.dart'; @@ -52,6 +53,8 @@ class PostgreSQLConnection extends Object this.useSSL = false, this.isUnixSocket = false, this.allowClearTextPassword = false, + this.replicationMode = ReplicationMode.none, + this.logicalDecodingPlugin = LogicalDecodingPlugin.pgoutput, }) { _connectionState = _PostgreSQLConnectionStateClosed(); _connectionState.connection = this; @@ -99,6 +102,20 @@ class PostgreSQLConnection extends Object /// If true, allows password in clear text for authentication. final bool allowClearTextPassword; + /// The replication mode for connecting in streaming replication mode. + /// + /// When the value is set to either [ReplicationMode.physical] or [ReplicationMode.logical], + /// the query protocol will no longer work as the connection will be switched to a replication + /// connection. In other words, using [query] or [mappedResultsQuery] will throw an error. Use + /// [execute] for executing statements while in replication mode. + final ReplicationMode replicationMode; + + /// The Logical Decoding Output for streaming replication mode + /// + /// The default value is [LogicalDecodingPlugin.pgoutput]. This value is only used + /// when [replicationMode] is not equal to [ReplicationMode.none]. + final LogicalDecodingPlugin logicalDecodingPlugin; + /// Stream of notification from the database. /// /// Listen to this [Stream] to receive events from PostgreSQL NOTIFY commands. diff --git a/lib/src/replication.dart b/lib/src/replication.dart new file mode 100644 index 00000000..40a6bbec --- /dev/null +++ b/lib/src/replication.dart @@ -0,0 +1,39 @@ +// TODO: these types could move to a common "connection_config.dart" file + +/// Streaming Replication Protocol Options +/// +/// [physical] or [logical] are used to start the connection a streaming +/// replication mode. +/// +/// See [Protocol Replication][] for more details. +/// +/// [Protocol Replication]: https://www.postgresql.org/docs/current/protocol-replication.html +enum ReplicationMode { + physical('true'), + logical('database'), + none('false'); + + final String value; + + const ReplicationMode(this.value); +} + +/// The Logical Decoding Output Plugins For Streaming Replication +/// +/// [pgoutput] is the standard logical decoding plugin that is built in +/// PostgreSQL since version 10. +/// +/// [wal2json] is a popular output plugin for logical decoding. The extension +/// must be available on the database when using this output option. When using +/// [wal2json] plugin, the following are some limitations: +/// - the plug-in does not emit events for tables without primary keys +/// - the plug-in does not support special values (NaN or infinity) for floating +/// point types +/// +/// For more info, see [wal2json repo][]. +/// +/// [wal2json repo]: https://github.com/eulerto/wal2json +enum LogicalDecodingPlugin { + pgoutput, + wal2json, +} \ No newline at end of file From 175f8adaddf8e9829e5588a051ea4f14f0370602 Mon Sep 17 00:00:00 2001 From: osaxma <46427323+osaxma@users.noreply.github.com> Date: Sat, 10 Sep 2022 01:04:24 +0300 Subject: [PATCH 2/6] add replication mode option to StartUpMessage --- lib/src/client_messages.dart | 19 +++++++++++++++++-- lib/src/connection_fsm.dart | 3 ++- lib/src/constants.dart | 14 ++++++++++++++ 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/lib/src/client_messages.dart b/lib/src/client_messages.dart index 9bbcf5c5..89443d9d 100644 --- a/lib/src/client_messages.dart +++ b/lib/src/client_messages.dart @@ -4,6 +4,7 @@ import 'package:buffer/buffer.dart'; import 'constants.dart'; import 'query.dart'; +import 'replication.dart'; import 'utf8_backed_string.dart'; abstract class ClientMessage { @@ -39,11 +40,14 @@ class StartupMessage extends ClientMessage { final UTF8BackedString? _username; final UTF8BackedString _databaseName; final UTF8BackedString _timeZone; + final UTF8BackedString _replication; - StartupMessage(String databaseName, String timeZone, {String? username}) + StartupMessage(String databaseName, String timeZone, + {String? username, ReplicationMode replication = ReplicationMode.none}) : _databaseName = UTF8BackedString(databaseName), _timeZone = UTF8BackedString(timeZone), - _username = username == null ? null : UTF8BackedString(username); + _username = username == null ? null : UTF8BackedString(username), + _replication = UTF8BackedString(replication.value); @override void applyToBuffer(ByteDataWriter buffer) { @@ -55,6 +59,11 @@ class StartupMessage extends ClientMessage { variableLength += _username!.utf8Length + 1; } + if (_replication.string != ReplicationMode.none.value) { + fixedLength += UTF8ByteConstants.replication.length; + variableLength += _replication.utf8Length + 1; + } + buffer.writeInt32(fixedLength + variableLength); buffer.writeInt32(ClientMessage.ProtocolVersion); @@ -63,6 +72,11 @@ class StartupMessage extends ClientMessage { _username!.applyToBuffer(buffer); } + if (_replication.string != ReplicationMode.none.value) { + buffer.write(UTF8ByteConstants.replication); + _replication.applyToBuffer(buffer); + } + buffer.write(UTF8ByteConstants.database); _databaseName.applyToBuffer(buffer); @@ -76,6 +90,7 @@ class StartupMessage extends ClientMessage { } } + class QueryMessage extends ClientMessage { final UTF8BackedString _queryString; diff --git a/lib/src/connection_fsm.dart b/lib/src/connection_fsm.dart index 3ae086a9..5472f20e 100644 --- a/lib/src/connection_fsm.dart +++ b/lib/src/connection_fsm.dart @@ -49,7 +49,8 @@ class _PostgreSQLConnectionStateSocketConnected _PostgreSQLConnectionState onEnter() { final startupMessage = StartupMessage( connection!.databaseName, connection!.timeZone, - username: connection!.username); + username: connection!.username, + replication: connection!.replicationMode); connection!._socket!.add(startupMessage.asBytes()); diff --git a/lib/src/constants.dart b/lib/src/constants.dart index 2e1c0aeb..1cfd7cf6 100644 --- a/lib/src/constants.dart +++ b/lib/src/constants.dart @@ -21,4 +21,18 @@ class UTF8ByteConstants { ]; static const utf8 = [85, 84, 70, 56, 0]; static const timeZone = [84, 105, 109, 101, 90, 111, 110, 101, 0]; + static const replication = [ + 114, + 101, + 112, + 108, + 105, + 99, + 97, + 116, + 105, + 111, + 110, + 0 + ]; } From bae26ffe0b49fbb923de7dee8f8e3e599d9f3d02 Mon Sep 17 00:00:00 2001 From: osaxma <46427323+osaxma@users.noreply.github.com> Date: Sat, 10 Sep 2022 01:24:38 +0300 Subject: [PATCH 3/6] add streaming replication connection tests --- test/connection_test.dart | 65 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/test/connection_test.dart b/test/connection_test.dart index 183e14e4..62b82d39 100644 --- a/test/connection_test.dart +++ b/test/connection_test.dart @@ -5,6 +5,7 @@ import 'dart:io'; import 'dart:mirrors'; import 'package:postgres/postgres.dart'; +import 'package:postgres/src/replication.dart'; import 'package:test/test.dart'; import 'docker.dart'; @@ -79,6 +80,70 @@ void main() { 'Attempting to reopen a closed connection. Create a instance instead.'), ))); }); + + test('Connect with ReplicationMode.none', () async { + final conn = PostgreSQLConnection( + 'localhost', + 5432, + 'dart_test', + username: 'dart', + password: 'dart', + replicationMode: ReplicationMode.none, + ); + + await conn.open(); + + expect(await conn.execute('select 1'), equals(1)); + }); + + test('Connect with logical ReplicationMode.logical', () async { + final conn = PostgreSQLConnection( + 'localhost', + 5432, + 'dart_test', + username: 'dart', + password: 'dart', + replicationMode: ReplicationMode.logical, + ); + + await conn.open(); + + expect(await conn.execute('select 1'), equals(1)); + }); + + test('IDENTIFY_SYSTEM returns system information', () async { + final conn = PostgreSQLConnection( + 'localhost', + 5432, + 'dart_test', + username: 'dart', + password: 'dart', + replicationMode: ReplicationMode.logical, + ); + + await conn.open(); + + // This query can only be executed in Streaming Replication Protocol + // In addition, it can only be executed using Simple Query Protocol: + // "In either physical replication or logical replication walsender mode, + // only the simple query protocol can be used." + // source and more info: + // https://www.postgresql.org/docs/current/protocol-replication.html + final result = await conn.query( + 'IDENTIFY_SYSTEM;', + useSimpleQueryProtocol: true, + ); + + expect(result.length, 1); + expect(result.columnDescriptions.length, 4); + expect(result.columnDescriptions[0].columnName, 'systemid'); + expect(result.columnDescriptions[1].columnName, 'timeline'); + expect(result.columnDescriptions[2].columnName, 'xlogpos'); + expect(result.columnDescriptions[3].columnName, 'dbname'); + }); + + // TODO: add test for ReplicationMode.physical which requires tuning some + // settings in the pg_hba.conf }); // These tests are disabled, as we'd need to setup ci/pg_hba.conf into the CI From a3f90e578aba2a959b601ce81d5fc069ba4e684a Mon Sep 17 00:00:00 2001 From: osaxma <46427323+osaxma@users.noreply.github.com> Date: Sat, 10 Sep 2022 01:37:15 +0300 Subject: [PATCH 4/6] improve docs for replicationMode and logicalDecodingPlugin --- lib/src/connection.dart | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/lib/src/connection.dart b/lib/src/connection.dart index 373550c6..b8fa11e2 100644 --- a/lib/src/connection.dart +++ b/lib/src/connection.dart @@ -106,14 +106,25 @@ class PostgreSQLConnection extends Object /// /// When the value is set to either [ReplicationMode.physical] or [ReplicationMode.logical], /// the query protocol will no longer work as the connection will be switched to a replication - /// connection. In other words, using [query] or [mappedResultsQuery] will throw an error. Use - /// [execute] for executing statements while in replication mode. + /// connection. In other words, using the default [query] or [mappedResultsQuery] will cause + /// the database to throw an error and drop the connection. + /// + /// Use [query] `useSimpleQueryProtocol` set to `true` or [execute] for executing statements + /// while in replication mode. + /// + /// For more info, see [Streaming Replication Protocol] + /// + /// [Streaming Replication Protocol]: https://www.postgresql.org/docs/current/protocol-replication.html final ReplicationMode replicationMode; /// The Logical Decoding Output for streaming replication mode /// - /// The default value is [LogicalDecodingPlugin.pgoutput]. This value is only used - /// when [replicationMode] is not equal to [ReplicationMode.none]. + /// The default value is [LogicalDecodingPlugin.pgoutput]. To use [LogicalDecodingPlugin.wal2json], + /// the [wal2json] plugin must be installed in the database. + /// + /// [logicalDecodingPlugin] is only used when [replicationMode] is not equal to [ReplicationMode.none]. + /// + /// [wal2json]: https://github.com/eulerto/wal2json final LogicalDecodingPlugin logicalDecodingPlugin; /// Stream of notification from the database. From 8ee3f007cc3554c2694eca0249dfaabfce82845d Mon Sep 17 00:00:00 2001 From: osaxma <46427323+osaxma@users.noreply.github.com> Date: Sat, 10 Sep 2022 01:43:10 +0300 Subject: [PATCH 5/6] modify test for ReplicationMode.none to be meaningful --- test/connection_test.dart | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/test/connection_test.dart b/test/connection_test.dart index 62b82d39..b08f5201 100644 --- a/test/connection_test.dart +++ b/test/connection_test.dart @@ -81,7 +81,8 @@ void main() { ))); }); - test('Connect with ReplicationMode.none', () async { + test('Connecting with ReplicationMode.none uses Extended Query Protocol', + () async { final conn = PostgreSQLConnection( 'localhost', 5432, @@ -92,8 +93,9 @@ void main() { ); await conn.open(); - - expect(await conn.execute('select 1'), equals(1)); + // This would throw for ReplicationMode.logical or ReplicationMode.physical + final result = await conn.query('select 1'); + expect(result.affectedRowCount, equals(1)); }); test('Connect with logical ReplicationMode.logical', () async { From 9e80f6f13fcfe662b14eacb7bf002a1004c7aad7 Mon Sep 17 00:00:00 2001 From: osaxma <46427323+osaxma@users.noreply.github.com> Date: Sat, 10 Sep 2022 02:01:43 +0300 Subject: [PATCH 6/6] remove unnecessary logical decoding enum --- lib/src/connection.dart | 21 +++++---------------- lib/src/replication.dart | 20 -------------------- 2 files changed, 5 insertions(+), 36 deletions(-) diff --git a/lib/src/connection.dart b/lib/src/connection.dart index b8fa11e2..555ab5e0 100644 --- a/lib/src/connection.dart +++ b/lib/src/connection.dart @@ -54,7 +54,6 @@ class PostgreSQLConnection extends Object this.isUnixSocket = false, this.allowClearTextPassword = false, this.replicationMode = ReplicationMode.none, - this.logicalDecodingPlugin = LogicalDecodingPlugin.pgoutput, }) { _connectionState = _PostgreSQLConnectionStateClosed(); _connectionState.connection = this; @@ -108,25 +107,15 @@ class PostgreSQLConnection extends Object /// the query protocol will no longer work as the connection will be switched to a replication /// connection. In other words, using the default [query] or [mappedResultsQuery] will cause /// the database to throw an error and drop the connection. - /// - /// Use [query] `useSimpleQueryProtocol` set to `true` or [execute] for executing statements + /// + /// Use [query] `useSimpleQueryProtocol` set to `true` or [execute] for executing statements /// while in replication mode. - /// + /// /// For more info, see [Streaming Replication Protocol] - /// + /// /// [Streaming Replication Protocol]: https://www.postgresql.org/docs/current/protocol-replication.html final ReplicationMode replicationMode; - /// The Logical Decoding Output for streaming replication mode - /// - /// The default value is [LogicalDecodingPlugin.pgoutput]. To use [LogicalDecodingPlugin.wal2json], - /// the [wal2json] plugin must be installed in the database. - /// - /// [logicalDecodingPlugin] is only used when [replicationMode] is not equal to [ReplicationMode.none]. - /// - /// [wal2json]: https://github.com/eulerto/wal2json - final LogicalDecodingPlugin logicalDecodingPlugin; - /// Stream of notification from the database. /// /// Listen to this [Stream] to receive events from PostgreSQL NOTIFY commands. @@ -593,7 +582,7 @@ abstract class _PostgreSQLExecutionContextMixin StackTrace.current, useSendSimple: true, // TODO: this could be removed from Query since useSendSimple covers the - // functionality. + // functionality. onlyReturnAffectedRowCount: onlyReturnAffectedRows, ); diff --git a/lib/src/replication.dart b/lib/src/replication.dart index 40a6bbec..cdc170ba 100644 --- a/lib/src/replication.dart +++ b/lib/src/replication.dart @@ -16,24 +16,4 @@ enum ReplicationMode { final String value; const ReplicationMode(this.value); -} - -/// The Logical Decoding Output Plugins For Streaming Replication -/// -/// [pgoutput] is the standard logical decoding plugin that is built in -/// PostgreSQL since version 10. -/// -/// [wal2json] is a popular output plugin for logical decoding. The extension -/// must be available on the database when using this output option. When using -/// [wal2json] plugin, the following are some limitations: -/// - the plug-in does not emit events for tables without primary keys -/// - the plug-in does not support special values (NaN or infinity) for floating -/// point types -/// -/// For more info, see [wal2json repo][]. -/// -/// [wal2json repo]: https://github.com/eulerto/wal2json -enum LogicalDecodingPlugin { - pgoutput, - wal2json, } \ No newline at end of file