diff --git a/lib/postgres.dart b/lib/postgres.dart index fdde21da..1ffd4de0 100644 --- a/lib/postgres.dart +++ b/lib/postgres.dart @@ -3,5 +3,6 @@ library postgres; export 'src/connection.dart'; export 'src/execution_context.dart'; export 'src/models.dart'; +export 'src/replication.dart' show ReplicationMode; export 'src/substituter.dart'; export 'src/types.dart'; diff --git a/lib/src/logical_replication_messages.dart b/lib/src/logical_replication_messages.dart index 90c13770..b27cc952 100644 --- a/lib/src/logical_replication_messages.dart +++ b/lib/src/logical_replication_messages.dart @@ -15,63 +15,93 @@ import 'types.dart'; abstract class LogicalReplicationMessage implements ReplicationMessage, ServerMessage {} -class XLogDataLogicalMessage extends XLogDataMessage { - late final LogicalReplicationMessage _message; +class XLogDataLogicalMessage implements XLogDataMessage { + @override + final Uint8List bytes; @override - LogicalReplicationMessage get data => _message; + final DateTime time; - XLogDataLogicalMessage(Uint8List bytes) : super(bytes) { - _message = parseLogicalReplicationMessage(this.bytes); - } + @override + final LSN walEnd; + + @override + final LSN walStart; + + @override + LSN get walDataLength => LSN(bytes.length); + + late final LogicalReplicationMessage message; + + @override + LogicalReplicationMessage get data => message; + + XLogDataLogicalMessage({ + required this.message, + required this.bytes, + required this.time, + required this.walEnd, + required this.walStart, + }); @override String toString() => super.toString(); } -LogicalReplicationMessage parseLogicalReplicationMessage(Uint8List bytesList) { +/// Tries to check if the [bytesList] is a [LogicalReplicationMessage]. If so, +/// [LogicalReplicationMessage] is returned, otherwise `null` is returned. +LogicalReplicationMessage? tryParseLogicalReplicationMessage( + Uint8List bytesList) { // the first byte is the msg type final firstByte = bytesList.first; final msgType = LogicalReplicationMessageTypes.fromByte(firstByte); - // remaining bytes are the data + // remaining bytes are the data final bytes = bytesList.sublist(1); - switch (msgType) { case LogicalReplicationMessageTypes.Begin: return BeginMessage(bytes); + case LogicalReplicationMessageTypes.Commit: return CommitMessage(bytes); + case LogicalReplicationMessageTypes.Origin: return OriginMessage(bytes); + case LogicalReplicationMessageTypes.Relation: return RelationMessage(bytes); + case LogicalReplicationMessageTypes.Type: return TypeMessage(bytes); + case LogicalReplicationMessageTypes.Insert: return InsertMessage(bytes); + case LogicalReplicationMessageTypes.Update: return UpdateMessage(bytes); + case LogicalReplicationMessageTypes.Delete: return DeleteMessage(bytes); + case LogicalReplicationMessageTypes.Truncate: return TruncateMessage(bytes); + case LogicalReplicationMessageTypes.Unsupported: default: - return _parseJsonMessageOrReturnUnknownMessage(bytes); + // note this needs the full set of bytes unlike other cases + return _tryParseJsonMessage(bytesList); } } -LogicalReplicationMessage _parseJsonMessageOrReturnUnknownMessage( - Uint8List bytes) { +LogicalReplicationMessage? _tryParseJsonMessage(Uint8List bytes) { // wal2json messages starts with `{` as the first byte if (bytes.first == '{'.codeUnits.first) { try { return JsonMessage(utf8.decode(bytes)); } catch (e) { - return UnknownLogicalReplicationMessage(bytes); + return null; } } - return UnknownLogicalReplicationMessage(bytes); + return null; } enum LogicalReplicationMessageTypes { diff --git a/lib/src/message_window.dart b/lib/src/message_window.dart index 91c47d63..68c76d17 100644 --- a/lib/src/message_window.dart +++ b/lib/src/message_window.dart @@ -4,6 +4,7 @@ import 'dart:typed_data'; import 'package:buffer/buffer.dart'; import 'server_messages.dart'; +import 'shared_messages.dart'; const int _headerByteSize = 5; final _emptyData = Uint8List(0); @@ -21,7 +22,9 @@ Map _messageTypeMap = { 82: (d) => AuthenticationMessage(d), 83: (d) => ParameterStatusMessage(d), 84: (d) => RowDescriptionMessage(d), + 87: (d) => CopyBothResponseMessage(d), 90: (d) => ReadyForQueryMessage(d), + 100: (d) => CopyDataMessage(d), 110: (d) => NoDataMessage(), 116: (d) => ParameterDescriptionMessage(d), }; @@ -51,20 +54,54 @@ class MessageFramer { _expectedLength = _reader.readUint32() - 4; } - if (_hasReadHeader && _isComplete) { + // special case + if (_type == SharedMessages.copyDoneIdentifier) { + // unlike other messages, CopyDoneMessage only takes the length as an + // argument (must be the full length including the length bytes) + final msg = CopyDoneMessage(_expectedLength + 4); + _addMsg(msg); + evaluateNextMessage = true; + } else if (_hasReadHeader && _isComplete) { final data = _expectedLength == 0 ? _emptyData : _reader.read(_expectedLength); final msgMaker = _messageTypeMap[_type]; - final msg = + var msg = msgMaker == null ? UnknownMessage(_type, data) : msgMaker(data); - messageQueue.add(msg); - _type = null; - _expectedLength = 0; + + // Copy Data message is a wrapper around data stream messages + // such as replication messages. + if (msg is CopyDataMessage) { + // checks if it's a replication message, otherwise returns given msg + msg = _extractReplicationMessageIfAny(msg); + } + + _addMsg(msg); evaluateNextMessage = true; } } } + void _addMsg(ServerMessage msg) { + messageQueue.add(msg); + _type = null; + _expectedLength = 0; + } + + /// Returns a [ReplicationMessage] if the [CopyDataMessage] contains such message. + /// Otherwise, it'll just return the provided [copyData]. + ServerMessage _extractReplicationMessageIfAny(CopyDataMessage copyData) { + final bytes = copyData.bytes; + final code = bytes.first; + final data = bytes.sublist(1); + if (code == ReplicationMessage.primaryKeepAliveIdentifier) { + return PrimaryKeepAliveMessage(data); + } else if (code == ReplicationMessage.xLogDataIdentifier) { + return XLogDataMessage.parse(data); + } else { + return copyData; + } + } + bool get hasMessage => messageQueue.isNotEmpty; ServerMessage popMessage() { diff --git a/lib/src/server_messages.dart b/lib/src/server_messages.dart index 82ed9934..5fae3864 100644 --- a/lib/src/server_messages.dart +++ b/lib/src/server_messages.dart @@ -2,6 +2,7 @@ import 'dart:convert'; import 'dart:typed_data'; import 'package:buffer/buffer.dart'; +import 'package:postgres/messages.dart'; import 'connection.dart'; import 'query.dart'; @@ -278,10 +279,10 @@ class PrimaryKeepAliveMessage implements ReplicationMessage, ServerMessage { } class XLogDataMessage implements ReplicationMessage, ServerMessage { - late final LSN walStart; - late final LSN walEnd; - late final DateTime time; - late final Uint8List bytes; + final LSN walStart; + final LSN walEnd; + final DateTime time; + final Uint8List bytes; // this is used for standby msg LSN get walDataLength => LSN(bytes.length); @@ -289,13 +290,44 @@ class XLogDataMessage implements ReplicationMessage, ServerMessage { /// For logical replication, see [XLogDataLogicalMessage] Object get data => bytes; - XLogDataMessage(Uint8List bytes) { + XLogDataMessage({ + required this.walStart, + required this.walEnd, + required this.time, + required this.bytes, + }); + + /// Parses the XLogDataMessage + /// + /// If [XLogDataMessage.data] is a [LogicalReplicationMessage], then the method + /// will return a [XLogDataLogicalMessage] with that message. Otherwise, it'll + /// return [XLogDataMessage] with raw data. + static XLogDataMessage parse(Uint8List bytes) { final reader = ByteDataReader()..add(bytes); - walStart = LSN(reader.readUint64()); - walEnd = LSN(reader.readUint64()); - time = dateTimeFromMicrosecondsSinceY2k(reader.readUint64()); - this.bytes = reader.read(reader.remainingLength); + final walStart = LSN(reader.readUint64()); + final walEnd = LSN(reader.readUint64()); + final time = dateTimeFromMicrosecondsSinceY2k(reader.readUint64()); + final data = reader.read(reader.remainingLength); + + final message = tryParseLogicalReplicationMessage(data); + if (message != null) { + return XLogDataLogicalMessage( + message: message, + bytes: bytes, + time: time, + walEnd: walEnd, + walStart: walStart, + ); + } else { + return XLogDataMessage( + bytes: bytes, + time: time, + walEnd: walEnd, + walStart: walStart, + ); + } } + @override String toString() => 'XLogDataMessage(walStart: $walStart, walEnd: $walEnd, time: $time, data: $data)'; diff --git a/test/framer_test.dart b/test/framer_test.dart index d0bf0459..19cc0001 100644 --- a/test/framer_test.dart +++ b/test/framer_test.dart @@ -1,8 +1,10 @@ import 'dart:typed_data'; import 'package:buffer/buffer.dart'; +import 'package:postgres/src/logical_replication_messages.dart'; import 'package:postgres/src/message_window.dart'; import 'package:postgres/src/server_messages.dart'; +import 'package:postgres/src/shared_messages.dart'; import 'package:test/test.dart'; void main() { @@ -173,6 +175,139 @@ void main() { final messages = framer.messageQueue.toList(); expect(messages, [UnknownMessage(10, Uint8List(0))]); }); + + test('Identify CopyDoneMessage with length equals size length (min)', () { + // min length + final length = [0, 0, 0, 4]; // min length (4 bytes) as 32-bit + final bytes = Uint8List.fromList([ + SharedMessages.copyDoneIdentifier, + ...length, + ]); + framer.addBytes(bytes); + + final message = framer.messageQueue.toList().first; + expect(message, isA()); + expect((message as CopyDoneMessage).length, 4); + }); + + test('Identify CopyDoneMessage when length larger than size length', () { + final length = (ByteData(4)..setUint32(0, 42)).buffer.asUint8List(); + final bytes = Uint8List.fromList([ + SharedMessages.copyDoneIdentifier, + ...length, + ]); + framer.addBytes(bytes); + + final message = framer.messageQueue.toList().first; + expect(message, isA()); + expect((message as CopyDoneMessage).length, 42); + }); + + test('Adds XLogDataMessage to queue', () { + final bits64 = (ByteData(8)..setUint64(0, 42)).buffer.asUint8List(); + // random data bytes + final dataBytes = [1, 2, 3, 4, 5, 6, 7, 8]; + + /// This represent a raw [XLogDataMessage] + final xlogDataMessage = [ + ReplicationMessage.xLogDataIdentifier, + ...bits64, // walStart (64bit) + ...bits64, // walEnd (64bit) + ...bits64, // time (64bit) + ...dataBytes // bytes (any) + ]; + final length = ByteData(4)..setUint32(0, xlogDataMessage.length + 4); + + // this represents the [CopyDataMessage] which is a wrapper for [XLogDataMessage] + // and such + final copyDataBytes = [ + SharedMessages.copyDataIdentifier, + ...length.buffer.asUint8List(), + ...xlogDataMessage, + ]; + + framer.addBytes(Uint8List.fromList(copyDataBytes)); + final message = framer.messageQueue.toList().first; + expect(message, isA()); + expect(message, isNot(isA())); + }); + + test('Adds XLogDataLogicalMessage with JsonMessage to queue', () { + final bits64 = (ByteData(8)..setUint64(0, 42)).buffer.asUint8List(); + + /// represent an empty json object so we should get a XLogDataLogicalMessage + /// with JsonMessage as its message. + final dataBytes = '{}'.codeUnits; + + /// This represent a raw [XLogDataMessage] + final xlogDataMessage = [ + ReplicationMessage.xLogDataIdentifier, + ...bits64, // walStart (64bit) + ...bits64, // walEnd (64bit) + ...bits64, // time (64bit) + ...dataBytes, // bytes (any) + ]; + + final length = ByteData(4)..setUint32(0, xlogDataMessage.length + 4); + + /// this represents the [CopyDataMessage] in which [XLogDataMessage] + /// is delivered per protocol + final copyDataMessage = [ + SharedMessages.copyDataIdentifier, + ...length.buffer.asUint8List(), + ...xlogDataMessage, + ]; + + framer.addBytes(Uint8List.fromList(copyDataMessage)); + final message = framer.messageQueue.toList().first; + expect(message, isA()); + expect((message as XLogDataLogicalMessage).message, isA()); + }); + + test('Adds PrimaryKeepAliveMessage to queue', () { + final bits64 = (ByteData(8)..setUint64(0, 42)).buffer.asUint8List(); + + /// This represent a raw [PrimaryKeepAliveMessage] + final xlogDataMessage = [ + ReplicationMessage.primaryKeepAliveIdentifier, + ...bits64, // walEnd (64bits) + ...bits64, // time (64bits) + 0, // mustReply (1bit) + ]; + final length = ByteData(4)..setUint32(0, xlogDataMessage.length + 4); + + /// This represents the [CopyDataMessage] in which [PrimaryKeepAliveMessage] + /// is delivered per protocol + final copyDataMessage = [ + SharedMessages.copyDataIdentifier, + ...length.buffer.asUint8List(), + ...xlogDataMessage, + ]; + + framer.addBytes(Uint8List.fromList(copyDataMessage)); + final message = framer.messageQueue.toList().first; + expect(message, isA()); + }); + + test('Adds raw CopyDataMessage for unknown stream message', () { + final xlogDataBytes = [ + -1, // unknown id + ]; + + final length = ByteData(4)..setUint32(0, xlogDataBytes.length + 4); + + /// This represents the [CopyDataMessage] in which data is delivered per protocol + /// typically contains [XLogData] and such but this tests unknown content + final copyDataMessage = [ + SharedMessages.copyDataIdentifier, + ...length.buffer.asUint8List(), + ...xlogDataBytes, + ]; + + framer.addBytes(Uint8List.fromList(copyDataMessage)); + final message = framer.messageQueue.toList().first; + expect(message, isA()); + }); } List messageWithBytes(List bytes, int messageID) { diff --git a/test/logical_replication_test.dart b/test/logical_replication_test.dart new file mode 100644 index 00000000..140e158c --- /dev/null +++ b/test/logical_replication_test.dart @@ -0,0 +1,291 @@ +import 'dart:async'; +import 'dart:io'; + +import 'package:postgres/messages.dart'; +import 'package:postgres/postgres.dart'; +import 'package:test/expect.dart'; +import 'package:test/scaffolding.dart'; + +import 'docker.dart'; + +void main() { + // Running these tests on the CI will fail since the the `SetUpAll` function + // alter systems configuration (i.e. wal_level, max_replication_slots, max_wal_senders) + // which requires reloading the database changes. The only known possible way to do that + // now is to restart the docker container which apparently won't work on the CI + // + // TODO: enable replication configuration before spinning up the container + // i.e. pre-set wal_level, max_replication_slots, max_wal_senders in + // `postgresql.conf` file + if (Platform.environment.containsKey('GITHUB_ACTION')) { + test('NO LOGICAL REPLICATION TESTS ARE RUNNING.', () { + // no-op + }); + return; + } + + usePostgresDocker(); + + // NOTES: + // - Two PostgreSQL connections are needed for testing replication. + // - One for listening to streaming replications (this connection will be locked). + // - The other one to modify the database (e.g. insert, delete, update, truncate) + group('test logical replication with pgoutput for decoding', () { + final _host = 'localhost'; + final _port = 5432; + final _username = 'dart'; + final _password = 'dart'; + final _database = 'dart_test'; + + final logicalDecodingPlugin = 'pgoutput'; + final replicationMode = ReplicationMode.logical; + // use this for listening to messages + var replicationConn = PostgreSQLConnection( + _host, + _port, + _database, + username: _username, + password: _password, + replicationMode: replicationMode, + ); + + // use this for sending queries + final changesConn = PostgreSQLConnection( + _host, + _port, + _database, + username: _username, + password: _password, + ); + + setUpAll(() async { + await replicationConn.open(); + + // Setup the database for replication + await replicationConn.execute('ALTER SYSTEM SET wal_level = logical;'); + await replicationConn + .execute('ALTER SYSTEM SET max_replication_slots = 5;'); + await replicationConn.execute('ALTER SYSTEM SET max_wal_senders=5;'); + + /// 'ALTER SYSTEM' statement requires restarting the database + /// An easy way is to restart the docker container + await replicationConn.close(); + + // This is a temp work around until a better way is found + // Adding this to `docker.dart` can be problmatic since it should not be + // used in tests that run on the CI. + // TODO: remove this once an alternative method is found + await Process.run('docker', ['restart', 'postgres-dart-test']); + + // it is necessary re-construct the object as calling `conn.open()` won't work + replicationConn = PostgreSQLConnection( + _host, + _port, + _database, + username: _username, + password: _password, + replicationMode: replicationMode, + ); + + // wait for a second then open the connections. + await Future.delayed(Duration(seconds: 1)); + await replicationConn.open(); + await changesConn.open(); + + // create a temp table for testing + await replicationConn.execute('create table if not exists temp' + '(id int GENERATED ALWAYS AS IDENTITY, value text,' + 'PRIMARY KEY (id));'); + + // create publication + final publicationName = 'test_publication'; + await replicationConn + .execute('DROP PUBLICATION IF EXISTS $publicationName;'); + await replicationConn + .execute('CREATE PUBLICATION $publicationName FOR ALL TABLES;'); + + final sysInfoRes = await replicationConn.query('IDENTIFY_SYSTEM;', + useSimpleQueryProtocol: true); + + final xlogpos = sysInfoRes.first.toColumnMap()['xlogpos'] as String; + + // create replication slot + final slotName = 'a_test_slot'; + + // `TEMPORARY` will remove the slot after the connection is closed/dropped + await replicationConn.execute( + 'CREATE_REPLICATION_SLOT $slotName TEMPORARY LOGICAL' + '$logicalDecodingPlugin NOEXPORT_SNAPSHOT', + ); + + // start replication process + final statement = 'START_REPLICATION SLOT $slotName LOGICAL $xlogpos ' + "(proto_version '1', publication_names '$publicationName')"; + + // This future will not complete until the replication process stops + // by closing the connection, an error or timing out. + // ignore: unawaited_futures + replicationConn.execute(statement, timeoutInSeconds: 120).catchError((e) { + // this query will be cancelled once the connection is closed. + // no need to handle the error + return 0; + }); + + await Future.delayed(Duration(seconds: 1)); + }); + + tearDownAll(() async { + // this will stop the streaming and delete the replication slot + await replicationConn.close(); + await changesConn.close(); + }); + + // BeginMessage -> InsertMessage -> CommitMessage + test('- Receive InsertMessage after insert statement', () async { + final stream = replicationConn.messages + .where((event) => event is XLogDataMessage) + .map((event) => (event as XLogDataMessage).data) + // RelationMessage isn't always present (appears conditionally) so + // it's skipped when present + .where((event) => event is! RelationMessage) + .take(3); + + late final StreamController controller; + controller = StreamController( + onListen: () async { + // don't await here otherwise what's after won't be executed. + final future = controller.addStream(stream); + await changesConn + .execute("insert into temp (value) values ('test');"); + await future; + await controller.close(); + }, + ); + + final matchers = [ + isA(), + isA(), + isA(), + ]; + + expect(controller.stream, emitsInAnyOrder(matchers)); + }); + + // BeginMessage -> UpdateMessage -> CommitMessage + test('- Receive UpdateMessage after update statement', () async { + // insert data to be updated + await changesConn + .execute("insert into temp (value) values ('update_test');"); + // wait to avoid capturing INSERT + await Future.delayed(Duration(seconds: 3)); + final stream = replicationConn.messages + .where((event) => event is XLogDataMessage) + .map((event) => (event as XLogDataMessage).data) + // RelationMessage isn't always present (appears conditionally) so + // it's skipped when present + .where((event) => event is! RelationMessage) + .take(3); + + late final StreamController controller; + controller = StreamController( + onListen: () async { + // don't await here otherwise what's after won't be executed. + final future = controller.addStream(stream); + await changesConn.execute( + "update temp set value = 'updated_test_value'" + "where value = 'update_test';", + ); + await future; + await controller.close(); + }, + ); + + final matchers = [ + isA(), + isA(), + isA(), + ]; + + expect(controller.stream, emitsInAnyOrder(matchers)); + }); + // BeginMessage -> DeleteMessage -> CommitMessage + test('- Receive DeleteMessage after delete statement', () async { + // insert data to be delete + await changesConn + .execute("insert into temp (value) values ('update_test');"); + // wait to avoid capturing INSERT + await Future.delayed(Duration(seconds: 3)); + final stream = replicationConn.messages + .where((event) => event is XLogDataMessage) + .map((event) => (event as XLogDataMessage).data) + // RelationMessage isn't always present (appears conditionally) so + // it's skipped when present + .where((event) => event is! RelationMessage) + .take(3); + + late final StreamController controller; + controller = StreamController( + onListen: () async { + // don't await here otherwise what's after won't be executed. + final future = controller.addStream(stream); + await changesConn.execute( + "delete from temp where value = 'update_test';", + ); + await future; + await controller.close(); + }, + ); + + final matchers = [ + isA(), + isA(), + isA(), + ]; + + expect(controller.stream, emitsInAnyOrder(matchers)); + }); + + // BeginMessage -> TruncateMessage -> CommitMessage + test('- Receive TruncateMessage after delete statement', () async { + final tableName = 'temp_truncate'; + // create table to be truncated + await changesConn.execute(''' +create table if not exists $tableName ( + id int GENERATED ALWAYS AS IDENTITY, + value text, + PRIMARY KEY (id) + ); +'''); + // wait to for a second + await Future.delayed(Duration(seconds: 1)); + final stream = replicationConn.messages + .where((event) => event is XLogDataMessage) + .map((event) => (event as XLogDataMessage).data) + // RelationMessage isn't always present (appears conditionally) so + // it's skipped when present + .where((event) => event is! RelationMessage) + .take(3); + + late final StreamController controller; + controller = StreamController( + onListen: () async { + // don't await here otherwise what's after won't be executed. + final future = controller.addStream(stream); + await changesConn.execute( + 'truncate table $tableName;', + ); + await future; + await controller.close(); + }, + ); + + final matchers = [ + isA(), + isA(), + isA(), + ]; + + expect(controller.stream, emitsInOrder(matchers)); + }); + }); +}