Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming Replication 5th] add connection configuration for Streaming Replication Protocol #56

Merged
merged 6 commits into from
Sep 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions lib/src/client_messages.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'package:buffer/buffer.dart';

import 'constants.dart';
import 'query.dart';
import 'replication.dart';
import 'utf8_backed_string.dart';

abstract class ClientMessage {
Expand Down Expand Up @@ -39,11 +40,14 @@ class StartupMessage extends ClientMessage {
final UTF8BackedString? _username;
final UTF8BackedString _databaseName;
final UTF8BackedString _timeZone;
final UTF8BackedString _replication;

StartupMessage(String databaseName, String timeZone, {String? username})
StartupMessage(String databaseName, String timeZone,
{String? username, ReplicationMode replication = ReplicationMode.none})
: _databaseName = UTF8BackedString(databaseName),
_timeZone = UTF8BackedString(timeZone),
_username = username == null ? null : UTF8BackedString(username);
_username = username == null ? null : UTF8BackedString(username),
_replication = UTF8BackedString(replication.value);

@override
void applyToBuffer(ByteDataWriter buffer) {
Expand All @@ -55,6 +59,11 @@ class StartupMessage extends ClientMessage {
variableLength += _username!.utf8Length + 1;
}

if (_replication.string != ReplicationMode.none.value) {
fixedLength += UTF8ByteConstants.replication.length;
variableLength += _replication.utf8Length + 1;
}

buffer.writeInt32(fixedLength + variableLength);
buffer.writeInt32(ClientMessage.ProtocolVersion);

Expand All @@ -63,6 +72,11 @@ class StartupMessage extends ClientMessage {
_username!.applyToBuffer(buffer);
}

if (_replication.string != ReplicationMode.none.value) {
buffer.write(UTF8ByteConstants.replication);
_replication.applyToBuffer(buffer);
}

buffer.write(UTF8ByteConstants.database);
_databaseName.applyToBuffer(buffer);

Expand All @@ -76,6 +90,7 @@ class StartupMessage extends ClientMessage {
}
}


class QueryMessage extends ClientMessage {
final UTF8BackedString _queryString;

Expand Down
19 changes: 18 additions & 1 deletion lib/src/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import 'message_window.dart';
import 'query.dart';
import 'query_cache.dart';
import 'query_queue.dart';
import 'replication.dart';
import 'server_messages.dart';

part 'connection_fsm.dart';
Expand Down Expand Up @@ -52,6 +53,7 @@ class PostgreSQLConnection extends Object
this.useSSL = false,
this.isUnixSocket = false,
this.allowClearTextPassword = false,
this.replicationMode = ReplicationMode.none,
}) {
_connectionState = _PostgreSQLConnectionStateClosed();
_connectionState.connection = this;
Expand Down Expand Up @@ -99,6 +101,21 @@ class PostgreSQLConnection extends Object
/// If true, allows password in clear text for authentication.
final bool allowClearTextPassword;

/// The replication mode for connecting in streaming replication mode.
///
/// When the value is set to either [ReplicationMode.physical] or [ReplicationMode.logical],
/// the query protocol will no longer work as the connection will be switched to a replication
/// connection. In other words, using the default [query] or [mappedResultsQuery] will cause
/// the database to throw an error and drop the connection.
///
/// Use [query] `useSimpleQueryProtocol` set to `true` or [execute] for executing statements
/// while in replication mode.
///
/// For more info, see [Streaming Replication Protocol]
///
/// [Streaming Replication Protocol]: https://www.postgresql.org/docs/current/protocol-replication.html
final ReplicationMode replicationMode;

/// Stream of notification from the database.
///
/// Listen to this [Stream] to receive events from PostgreSQL NOTIFY commands.
Expand Down Expand Up @@ -565,7 +582,7 @@ abstract class _PostgreSQLExecutionContextMixin
StackTrace.current,
useSendSimple: true,
// TODO: this could be removed from Query since useSendSimple covers the
// functionality.
// functionality.
onlyReturnAffectedRowCount: onlyReturnAffectedRows,
);

Expand Down
3 changes: 2 additions & 1 deletion lib/src/connection_fsm.dart
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class _PostgreSQLConnectionStateSocketConnected
_PostgreSQLConnectionState onEnter() {
final startupMessage = StartupMessage(
connection!.databaseName, connection!.timeZone,
username: connection!.username);
username: connection!.username,
replication: connection!.replicationMode);

connection!._socket!.add(startupMessage.asBytes());

Expand Down
14 changes: 14 additions & 0 deletions lib/src/constants.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,18 @@ class UTF8ByteConstants {
];
static const utf8 = <int>[85, 84, 70, 56, 0];
static const timeZone = <int>[84, 105, 109, 101, 90, 111, 110, 101, 0];
static const replication = <int>[
114,
101,
112,
108,
105,
99,
97,
116,
105,
111,
110,
0
];
}
19 changes: 19 additions & 0 deletions lib/src/replication.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// TODO: these types could move to a common "connection_config.dart" file

/// Streaming Replication Protocol Options
///
/// [physical] or [logical] are used to start the connection a streaming
/// replication mode.
///
/// See [Protocol Replication][] for more details.
///
/// [Protocol Replication]: https://www.postgresql.org/docs/current/protocol-replication.html
enum ReplicationMode {
physical('true'),
logical('database'),
none('false');

final String value;

const ReplicationMode(this.value);
}
67 changes: 67 additions & 0 deletions test/connection_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'dart:io';
import 'dart:mirrors';

import 'package:postgres/postgres.dart';
import 'package:postgres/src/replication.dart';
import 'package:test/test.dart';

import 'docker.dart';
Expand Down Expand Up @@ -79,6 +80,72 @@ void main() {
'Attempting to reopen a closed connection. Create a instance instead.'),
)));
});

test('Connecting with ReplicationMode.none uses Extended Query Protocol',
() async {
final conn = PostgreSQLConnection(
'localhost',
5432,
'dart_test',
username: 'dart',
password: 'dart',
replicationMode: ReplicationMode.none,
);

await conn.open();
// This would throw for ReplicationMode.logical or ReplicationMode.physical
final result = await conn.query('select 1');
expect(result.affectedRowCount, equals(1));
});

test('Connect with logical ReplicationMode.logical', () async {
final conn = PostgreSQLConnection(
'localhost',
5432,
'dart_test',
username: 'dart',
password: 'dart',
replicationMode: ReplicationMode.logical,
);

await conn.open();

expect(await conn.execute('select 1'), equals(1));
});

test('IDENTIFY_SYSTEM returns system information', () async {
final conn = PostgreSQLConnection(
'localhost',
5432,
'dart_test',
username: 'dart',
password: 'dart',
replicationMode: ReplicationMode.logical,
);

await conn.open();

// This query can only be executed in Streaming Replication Protocol
// In addition, it can only be executed using Simple Query Protocol:
// "In either physical replication or logical replication walsender mode,
// only the simple query protocol can be used."
// source and more info:
// https://www.postgresql.org/docs/current/protocol-replication.html
final result = await conn.query(
'IDENTIFY_SYSTEM;',
useSimpleQueryProtocol: true,
);

expect(result.length, 1);
expect(result.columnDescriptions.length, 4);
expect(result.columnDescriptions[0].columnName, 'systemid');
expect(result.columnDescriptions[1].columnName, 'timeline');
expect(result.columnDescriptions[2].columnName, 'xlogpos');
expect(result.columnDescriptions[3].columnName, 'dbname');
});

// TODO: add test for ReplicationMode.physical which requires tuning some
// settings in the pg_hba.conf
});

// These tests are disabled, as we'd need to setup ci/pg_hba.conf into the CI
Expand Down