diff --git a/lib/src/client_messages.dart b/lib/src/client_messages.dart index 9bbcf5c5..89443d9d 100644 --- a/lib/src/client_messages.dart +++ b/lib/src/client_messages.dart @@ -4,6 +4,7 @@ import 'package:buffer/buffer.dart'; import 'constants.dart'; import 'query.dart'; +import 'replication.dart'; import 'utf8_backed_string.dart'; abstract class ClientMessage { @@ -39,11 +40,14 @@ class StartupMessage extends ClientMessage { final UTF8BackedString? _username; final UTF8BackedString _databaseName; final UTF8BackedString _timeZone; + final UTF8BackedString _replication; - StartupMessage(String databaseName, String timeZone, {String? username}) + StartupMessage(String databaseName, String timeZone, + {String? username, ReplicationMode replication = ReplicationMode.none}) : _databaseName = UTF8BackedString(databaseName), _timeZone = UTF8BackedString(timeZone), - _username = username == null ? null : UTF8BackedString(username); + _username = username == null ? null : UTF8BackedString(username), + _replication = UTF8BackedString(replication.value); @override void applyToBuffer(ByteDataWriter buffer) { @@ -55,6 +59,11 @@ class StartupMessage extends ClientMessage { variableLength += _username!.utf8Length + 1; } + if (_replication.string != ReplicationMode.none.value) { + fixedLength += UTF8ByteConstants.replication.length; + variableLength += _replication.utf8Length + 1; + } + buffer.writeInt32(fixedLength + variableLength); buffer.writeInt32(ClientMessage.ProtocolVersion); @@ -63,6 +72,11 @@ class StartupMessage extends ClientMessage { _username!.applyToBuffer(buffer); } + if (_replication.string != ReplicationMode.none.value) { + buffer.write(UTF8ByteConstants.replication); + _replication.applyToBuffer(buffer); + } + buffer.write(UTF8ByteConstants.database); _databaseName.applyToBuffer(buffer); @@ -76,6 +90,7 @@ class StartupMessage extends ClientMessage { } } + class QueryMessage extends ClientMessage { final UTF8BackedString _queryString; diff --git a/lib/src/connection.dart b/lib/src/connection.dart index 3701060c..555ab5e0 100644 --- a/lib/src/connection.dart +++ b/lib/src/connection.dart @@ -14,6 +14,7 @@ import 'message_window.dart'; import 'query.dart'; import 'query_cache.dart'; import 'query_queue.dart'; +import 'replication.dart'; import 'server_messages.dart'; part 'connection_fsm.dart'; @@ -52,6 +53,7 @@ class PostgreSQLConnection extends Object this.useSSL = false, this.isUnixSocket = false, this.allowClearTextPassword = false, + this.replicationMode = ReplicationMode.none, }) { _connectionState = _PostgreSQLConnectionStateClosed(); _connectionState.connection = this; @@ -99,6 +101,21 @@ class PostgreSQLConnection extends Object /// If true, allows password in clear text for authentication. final bool allowClearTextPassword; + /// The replication mode for connecting in streaming replication mode. + /// + /// When the value is set to either [ReplicationMode.physical] or [ReplicationMode.logical], + /// the query protocol will no longer work as the connection will be switched to a replication + /// connection. In other words, using the default [query] or [mappedResultsQuery] will cause + /// the database to throw an error and drop the connection. + /// + /// Use [query] `useSimpleQueryProtocol` set to `true` or [execute] for executing statements + /// while in replication mode. + /// + /// For more info, see [Streaming Replication Protocol] + /// + /// [Streaming Replication Protocol]: https://www.postgresql.org/docs/current/protocol-replication.html + final ReplicationMode replicationMode; + /// Stream of notification from the database. /// /// Listen to this [Stream] to receive events from PostgreSQL NOTIFY commands. @@ -565,7 +582,7 @@ abstract class _PostgreSQLExecutionContextMixin StackTrace.current, useSendSimple: true, // TODO: this could be removed from Query since useSendSimple covers the - // functionality. + // functionality. onlyReturnAffectedRowCount: onlyReturnAffectedRows, ); diff --git a/lib/src/connection_fsm.dart b/lib/src/connection_fsm.dart index 3ae086a9..5472f20e 100644 --- a/lib/src/connection_fsm.dart +++ b/lib/src/connection_fsm.dart @@ -49,7 +49,8 @@ class _PostgreSQLConnectionStateSocketConnected _PostgreSQLConnectionState onEnter() { final startupMessage = StartupMessage( connection!.databaseName, connection!.timeZone, - username: connection!.username); + username: connection!.username, + replication: connection!.replicationMode); connection!._socket!.add(startupMessage.asBytes()); diff --git a/lib/src/constants.dart b/lib/src/constants.dart index 2e1c0aeb..1cfd7cf6 100644 --- a/lib/src/constants.dart +++ b/lib/src/constants.dart @@ -21,4 +21,18 @@ class UTF8ByteConstants { ]; static const utf8 = [85, 84, 70, 56, 0]; static const timeZone = [84, 105, 109, 101, 90, 111, 110, 101, 0]; + static const replication = [ + 114, + 101, + 112, + 108, + 105, + 99, + 97, + 116, + 105, + 111, + 110, + 0 + ]; } diff --git a/lib/src/replication.dart b/lib/src/replication.dart new file mode 100644 index 00000000..cdc170ba --- /dev/null +++ b/lib/src/replication.dart @@ -0,0 +1,19 @@ +// TODO: these types could move to a common "connection_config.dart" file + +/// Streaming Replication Protocol Options +/// +/// [physical] or [logical] are used to start the connection a streaming +/// replication mode. +/// +/// See [Protocol Replication][] for more details. +/// +/// [Protocol Replication]: https://www.postgresql.org/docs/current/protocol-replication.html +enum ReplicationMode { + physical('true'), + logical('database'), + none('false'); + + final String value; + + const ReplicationMode(this.value); +} \ No newline at end of file diff --git a/test/connection_test.dart b/test/connection_test.dart index 183e14e4..b08f5201 100644 --- a/test/connection_test.dart +++ b/test/connection_test.dart @@ -5,6 +5,7 @@ import 'dart:io'; import 'dart:mirrors'; import 'package:postgres/postgres.dart'; +import 'package:postgres/src/replication.dart'; import 'package:test/test.dart'; import 'docker.dart'; @@ -79,6 +80,72 @@ void main() { 'Attempting to reopen a closed connection. Create a instance instead.'), ))); }); + + test('Connecting with ReplicationMode.none uses Extended Query Protocol', + () async { + final conn = PostgreSQLConnection( + 'localhost', + 5432, + 'dart_test', + username: 'dart', + password: 'dart', + replicationMode: ReplicationMode.none, + ); + + await conn.open(); + // This would throw for ReplicationMode.logical or ReplicationMode.physical + final result = await conn.query('select 1'); + expect(result.affectedRowCount, equals(1)); + }); + + test('Connect with logical ReplicationMode.logical', () async { + final conn = PostgreSQLConnection( + 'localhost', + 5432, + 'dart_test', + username: 'dart', + password: 'dart', + replicationMode: ReplicationMode.logical, + ); + + await conn.open(); + + expect(await conn.execute('select 1'), equals(1)); + }); + + test('IDENTIFY_SYSTEM returns system information', () async { + final conn = PostgreSQLConnection( + 'localhost', + 5432, + 'dart_test', + username: 'dart', + password: 'dart', + replicationMode: ReplicationMode.logical, + ); + + await conn.open(); + + // This query can only be executed in Streaming Replication Protocol + // In addition, it can only be executed using Simple Query Protocol: + // "In either physical replication or logical replication walsender mode, + // only the simple query protocol can be used." + // source and more info: + // https://www.postgresql.org/docs/current/protocol-replication.html + final result = await conn.query( + 'IDENTIFY_SYSTEM;', + useSimpleQueryProtocol: true, + ); + + expect(result.length, 1); + expect(result.columnDescriptions.length, 4); + expect(result.columnDescriptions[0].columnName, 'systemid'); + expect(result.columnDescriptions[1].columnName, 'timeline'); + expect(result.columnDescriptions[2].columnName, 'xlogpos'); + expect(result.columnDescriptions[3].columnName, 'dbname'); + }); + + // TODO: add test for ReplicationMode.physical which requires tuning some + // settings in the pg_hba.conf }); // These tests are disabled, as we'd need to setup ci/pg_hba.conf into the CI