From c4bb00dbbadc30cd292277b1d8d5ecf09d76b68c Mon Sep 17 00:00:00 2001 From: osaxma <46427323+osaxma@users.noreply.github.com> Date: Tue, 6 Sep 2022 22:20:10 +0300 Subject: [PATCH 1/6] Add shared messages between client and server --- lib/src/shared_messages.dart | 62 ++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 lib/src/shared_messages.dart diff --git a/lib/src/shared_messages.dart b/lib/src/shared_messages.dart new file mode 100644 index 00000000..935ca352 --- /dev/null +++ b/lib/src/shared_messages.dart @@ -0,0 +1,62 @@ +import 'dart:typed_data'; + +import 'package:buffer/buffer.dart'; + +import 'package:postgres/src/client_messages.dart'; +import 'package:postgres/src/server_messages.dart'; + +/// An abstraction for all client and server replication messages +/// +/// For more details, see [Streaming Replication Protocol][] +/// +/// [Streaming Replication Protocol]: https://www.postgresql.org/docs/current/protocol-replication.html +abstract class ReplicationMessage { + static const int primaryKeepAliveIdentifier = 107; // k + static const int xLogDataIdentifier = 119; // w + static const int hotStandbyFeedbackIdentifier = 104; // h + static const int standbyStatusUpdateIdentifier = 114; // r +} + +/// Messages that are shared between both the server and the client +/// +/// For more details, see [Message Formats][] +/// +/// [Message Formats]: https://www.postgresql.org/docs/current/protocol-message-formats.html +abstract class SharedMessages extends ClientMessage implements ServerMessage { + static const int copyDoneIdentifier = 99; // c + static const int copyDataIdentifier = 100; // d +} + +/// A COPY data message. +class CopyDataMessage extends SharedMessages { + /// Data that forms part of a COPY data stream. Messages sent from the backend + /// will always correspond to single data rows, but messages sent by frontends + /// might divide the data stream arbitrarily. + final Uint8List bytes; + + /// Length of message contents in bytes, including self (i.e. int32). + int get length => bytes.length + 4; + + CopyDataMessage(this.bytes); + + @override + void applyToBuffer(ByteDataWriter buffer) { + buffer.writeUint8(SharedMessages.copyDataIdentifier); + buffer.writeInt32(length); + buffer.write(bytes); + } +} + +/// A COPY-complete indicator. +class CopyDoneMessage extends SharedMessages { + /// Length of message contents in bytes, including self. + late final int length; + + CopyDoneMessage(this.length); + + @override + void applyToBuffer(ByteDataWriter buffer) { + buffer.writeUint8(SharedMessages.copyDoneIdentifier); + buffer.writeInt32(length); + } +} From e745237eb8cf2938de6bb9ae2278cee67bafb277 Mon Sep 17 00:00:00 2001 From: osaxma <46427323+osaxma@users.noreply.github.com> Date: Sat, 10 Sep 2022 02:20:26 +0300 Subject: [PATCH 2/6] Add new client messages --- lib/src/client_messages.dart | 92 ++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/lib/src/client_messages.dart b/lib/src/client_messages.dart index 89443d9d..f0762cf7 100644 --- a/lib/src/client_messages.dart +++ b/lib/src/client_messages.dart @@ -1,10 +1,14 @@ import 'dart:typed_data'; import 'package:buffer/buffer.dart'; +import 'package:postgres/src/time_converters.dart'; import 'constants.dart'; import 'query.dart'; import 'replication.dart'; + +import 'shared_messages.dart'; +import 'types.dart'; import 'utf8_backed_string.dart'; abstract class ClientMessage { @@ -237,3 +241,91 @@ class SyncMessage extends ClientMessage { buffer.writeUint32(4); } } + +class StandbyStatusUpdateMessage extends ClientMessage + implements ReplicationMessage { + /// The WAL position that's been locally written + final LSN walWritePosition; + + /// The WAL position that's been locally flushed + late final LSN walFlushPosition; + + /// The WAL position that's been locally applied + late final LSN walApplyPosition; + + /// Client system clock time + late final DateTime clientTime; + + /// Request server to reply immediately. + final bool mustReply; + + /// StandbyStatusUpdate to the PostgreSQL server. + /// + /// The only required field is [walWritePosition]. If either [walFlushPosition] + /// or [walApplyPosition] are `null`, [walWritePosition] will be assigned to them. + /// If [clientTime], then the current time is used. + /// + /// When sending this message, it must be wrapped within [CopyDataMessage] + StandbyStatusUpdateMessage({ + required this.walWritePosition, + LSN? walFlushPosition, + LSN? walApplyPosition, + DateTime? clientTime, + this.mustReply = false, + }) { + this.walFlushPosition = walFlushPosition ?? walWritePosition; + this.walApplyPosition = walApplyPosition ?? walWritePosition; + this.clientTime = clientTime ?? DateTime.now().toUtc(); + } + + @override + void applyToBuffer(ByteDataWriter buffer) { + buffer.writeUint8(ReplicationMessage.standbyStatusUpdateIdentifier); + buffer.writeUint64(walWritePosition.value); + buffer.writeUint64(walFlushPosition.value); + buffer.writeUint64(walApplyPosition.value); + buffer.writeUint64(dateTimeToMicrosecondsSinceY2k(clientTime)); + buffer.writeUint8(mustReply ? 1 : 0); + } +} + +class HotStandbyFeedbackMessage extends ClientMessage + implements ReplicationMessage { + /// The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01. + final DateTime clientTime; + + /// The standby's current global xmin, excluding the catalog_xmin from any + /// replication slots. If both this value and the following catalog_xmin are 0 + /// this is treated as a notification that Hot Standby feedback will no longer + /// be sent on this connection. Later non-zero messages may reinitiate the + /// feedback mechanism + final int currentGlobalXmin; + + /// The epoch of the global xmin xid on the standby. + final int epochGlobalXminXid; + + /// The lowest catalog_xmin of any replication slots on the standby. Set to 0 + /// if no catalog_xmin exists on the standby or if hot standby feedback is + /// being disabled. + final int lowestCatalogXmin; + + /// The epoch of the catalog_xmin xid on the standby. + final int epochCatalogXminXid; + + HotStandbyFeedbackMessage( + this.clientTime, + this.currentGlobalXmin, + this.epochGlobalXminXid, + this.lowestCatalogXmin, + this.epochCatalogXminXid); + + @override + void applyToBuffer(ByteDataWriter buffer) { + buffer.writeUint8(ReplicationMessage.hotStandbyFeedbackIdentifier); + buffer.writeUint64(dateTimeToMicrosecondsSinceY2k(clientTime)); + buffer.writeUint32(currentGlobalXmin); + buffer.writeUint32(epochGlobalXminXid); + buffer.writeUint32(lowestCatalogXmin); + buffer.writeUint32(epochCatalogXminXid); + } +} From bf6bc21e955220283861d0d06e01af4bfdb4de10 Mon Sep 17 00:00:00 2001 From: osaxma <46427323+osaxma@users.noreply.github.com> Date: Tue, 6 Sep 2022 22:20:10 +0300 Subject: [PATCH 3/6] Add new server messages --- lib/src/server_messages.dart | 77 ++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/lib/src/server_messages.dart b/lib/src/server_messages.dart index a2ecce2d..82ed9934 100644 --- a/lib/src/server_messages.dart +++ b/lib/src/server_messages.dart @@ -5,6 +5,9 @@ import 'package:buffer/buffer.dart'; import 'connection.dart'; import 'query.dart'; +import 'shared_messages.dart'; +import 'time_converters.dart'; +import 'types.dart'; abstract class ServerMessage {} @@ -224,6 +227,80 @@ class NoDataMessage extends ServerMessage { String toString() => 'No Data Message'; } +/// Identifies the message as a Start Copy Both response. +/// This message is used only for Streaming Replication. +class CopyBothResponseMessage implements ServerMessage { + /// 0 indicates the overall COPY format is textual (rows separated by newlines, + /// columns separated by separator characters, etc). 1 indicates the overall copy + /// format is binary (similar to DataRow format). + late final int copyFormat; + + /// The format codes to be used for each column. Each must presently be zero (text) + /// or one (binary). All must be zero if the overall copy format is textual + final columnsFormatCode = []; + + CopyBothResponseMessage(Uint8List bytes) { + final reader = ByteDataReader()..add(bytes); + copyFormat = reader.readInt8(); + + final numberOfColumns = reader.readInt16(); + + for (var i = 0; i < numberOfColumns; i++) { + columnsFormatCode.add(reader.readInt16()); + } + } + + @override + String toString() { + final format = copyFormat == 0 ? 'textual' : 'binary'; + return 'CopyBothResponseMessage with $format COPY format for ${columnsFormatCode.length}-columns'; + } +} + +class PrimaryKeepAliveMessage implements ReplicationMessage, ServerMessage { + /// The current end of WAL on the server. + late final LSN walEnd; + late final DateTime time; + // If `true`, it means that the client should reply to this message as soon as possible, + // to avoid a timeout disconnect. + late final bool mustReply; + + PrimaryKeepAliveMessage(Uint8List bytes) { + final reader = ByteDataReader()..add(bytes); + walEnd = LSN(reader.readUint64()); + time = dateTimeFromMicrosecondsSinceY2k(reader.readUint64()); + mustReply = reader.readUint8() != 0; + } + + @override + String toString() => + 'PrimaryKeepAliveMessage(walEnd: $walEnd, time: $time, mustReply: $mustReply)'; +} + +class XLogDataMessage implements ReplicationMessage, ServerMessage { + late final LSN walStart; + late final LSN walEnd; + late final DateTime time; + late final Uint8List bytes; + // this is used for standby msg + LSN get walDataLength => LSN(bytes.length); + + /// For physical replication, this is the raw [bytes] + /// For logical replication, see [XLogDataLogicalMessage] + Object get data => bytes; + + XLogDataMessage(Uint8List bytes) { + final reader = ByteDataReader()..add(bytes); + walStart = LSN(reader.readUint64()); + walEnd = LSN(reader.readUint64()); + time = dateTimeFromMicrosecondsSinceY2k(reader.readUint64()); + this.bytes = reader.read(reader.remainingLength); + } + @override + String toString() => + 'XLogDataMessage(walStart: $walStart, walEnd: $walEnd, time: $time, data: $data)'; +} + class UnknownMessage extends ServerMessage { final int? code; final Uint8List? bytes; From a97dace1300cf3e6e2cd53157adcb7a8add5f5cb Mon Sep 17 00:00:00 2001 From: osaxma <46427323+osaxma@users.noreply.github.com> Date: Tue, 6 Sep 2022 22:20:10 +0300 Subject: [PATCH 4/6] Add Logical Replication Messages Formats --- lib/src/logical_replication_messages.dart | 630 ++++++++++++++++++++++ 1 file changed, 630 insertions(+) create mode 100644 lib/src/logical_replication_messages.dart diff --git a/lib/src/logical_replication_messages.dart b/lib/src/logical_replication_messages.dart new file mode 100644 index 00000000..90c13770 --- /dev/null +++ b/lib/src/logical_replication_messages.dart @@ -0,0 +1,630 @@ +import 'dart:convert'; +import 'dart:typed_data'; + +import 'package:buffer/buffer.dart'; + +import 'binary_codec.dart'; +import 'server_messages.dart'; +import 'shared_messages.dart'; +import 'time_converters.dart'; +import 'types.dart'; + +/// A base class for all [Logical Replication Message Formats][] from the server +/// +/// [Logical Replication Message Formats]: https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html +abstract class LogicalReplicationMessage + implements ReplicationMessage, ServerMessage {} + +class XLogDataLogicalMessage extends XLogDataMessage { + late final LogicalReplicationMessage _message; + + @override + LogicalReplicationMessage get data => _message; + + XLogDataLogicalMessage(Uint8List bytes) : super(bytes) { + _message = parseLogicalReplicationMessage(this.bytes); + } + + @override + String toString() => super.toString(); +} + +LogicalReplicationMessage parseLogicalReplicationMessage(Uint8List bytesList) { + // the first byte is the msg type + final firstByte = bytesList.first; + final msgType = LogicalReplicationMessageTypes.fromByte(firstByte); + // remaining bytes are the data + final bytes = bytesList.sublist(1); + + switch (msgType) { + case LogicalReplicationMessageTypes.Begin: + return BeginMessage(bytes); + case LogicalReplicationMessageTypes.Commit: + return CommitMessage(bytes); + case LogicalReplicationMessageTypes.Origin: + return OriginMessage(bytes); + case LogicalReplicationMessageTypes.Relation: + return RelationMessage(bytes); + case LogicalReplicationMessageTypes.Type: + return TypeMessage(bytes); + case LogicalReplicationMessageTypes.Insert: + return InsertMessage(bytes); + case LogicalReplicationMessageTypes.Update: + return UpdateMessage(bytes); + case LogicalReplicationMessageTypes.Delete: + return DeleteMessage(bytes); + case LogicalReplicationMessageTypes.Truncate: + return TruncateMessage(bytes); + case LogicalReplicationMessageTypes.Unsupported: + default: + return _parseJsonMessageOrReturnUnknownMessage(bytes); + } +} + +LogicalReplicationMessage _parseJsonMessageOrReturnUnknownMessage( + Uint8List bytes) { + // wal2json messages starts with `{` as the first byte + if (bytes.first == '{'.codeUnits.first) { + try { + return JsonMessage(utf8.decode(bytes)); + } catch (e) { + return UnknownLogicalReplicationMessage(bytes); + } + } + return UnknownLogicalReplicationMessage(bytes); +} + +enum LogicalReplicationMessageTypes { + Begin('B'), + Commit('C'), + Origin('O'), + Relation('R'), + Type('Y'), + Insert('I'), + Update('U'), + Delete('D'), + Truncate('T'), + Unsupported(''); + + final String id; + const LogicalReplicationMessageTypes(this.id); + + static LogicalReplicationMessageTypes fromID(String id) { + return LogicalReplicationMessageTypes.values.firstWhere( + (element) => element.id == id, + orElse: () => LogicalReplicationMessageTypes.Unsupported, + ); + } + + static LogicalReplicationMessageTypes fromByte(int byte) { + return fromID(String.fromCharCode(byte)); + } +} + +/// A non-standrd message for JSON data +/// +/// This is mainly used to deliver wal2json messages which is a popular plugin +/// for decoding logical replication output. +class JsonMessage implements LogicalReplicationMessage { + final String json; + + JsonMessage(this.json); + + @override + String toString() => json; +} + +/// A non-stnadard message for unkown messages +/// +/// This message only holds the bytes as data. +class UnknownLogicalReplicationMessage implements LogicalReplicationMessage { + final Uint8List bytes; + + UnknownLogicalReplicationMessage(this.bytes); + + @override + String toString() => 'UnknownLogicalReplicationMessage(bytes: $bytes)'; +} + +class BeginMessage implements LogicalReplicationMessage { + /// The message type + late final baseMessage = LogicalReplicationMessageTypes.Begin; + + /// The final LSN of the transaction. + late final LSN finalLSN; + + /// The commit timestamp of the transaction. + late final DateTime commitTime; + + /// The transaction id + late final int xid; + + BeginMessage(Uint8List bytes) { + final reader = ByteDataReader()..add(bytes); + // reading order matters + finalLSN = reader.readLSN(); + commitTime = reader.readTime(); + xid = reader.readUint32(); + } + + @override + String toString() => + 'BeginMessage(finalLSN: $finalLSN, commitTime: $commitTime, xid: $xid)'; +} + +// CommitMessage is a commit message. +class CommitMessage implements LogicalReplicationMessage { + /// The message type + late final baseMessage = LogicalReplicationMessageTypes.Commit; + + // Flags currently unused (must be 0). + late final int flags; + + /// The LSN of the commit. + late final LSN commitLSN; + + /// The end LSN of the transaction. + late final LSN transactionEndLSN; + + /// The commit timestamp of the transaction. + late final DateTime commitTime; + + CommitMessage(Uint8List bytes) { + final reader = ByteDataReader()..add(bytes); + // reading order matters + flags = reader.readUint8(); + commitLSN = reader.readLSN(); + transactionEndLSN = reader.readLSN(); + commitTime = reader.readTime(); + } + + @override + String toString() { + return 'CommitMessage(flags: $flags, commitLSN: $commitLSN, transactionEndLSN: $transactionEndLSN, commitTime: $commitTime)'; + } +} + +class OriginMessage implements LogicalReplicationMessage { + /// The message type + late final baseMessage = LogicalReplicationMessageTypes.Origin; + + /// The LSN of the commit on the origin server. + late final LSN commitLSN; + + late final String name; + + OriginMessage(Uint8List bytes) { + final reader = ByteDataReader()..add(bytes); + // reading order matters + commitLSN = reader.readLSN(); + name = reader.decodeString(); + } + + @override + String toString() => 'OriginMessage(commitLSN: $commitLSN, name: $name)'; +} + +class RelationMessageColumn { + /// Flags for the column. Currently can be either 0 for no flags or 1 which + /// marks the column as part of the key. + final int flags; + + final String name; + + /// The ID of the column's data type. + final int dataType; + + /// type modifier of the column (atttypmod). + final int typeModifier; + + RelationMessageColumn({ + required this.flags, + required this.name, + required this.dataType, + required this.typeModifier, + }); + + @override + String toString() { + return 'RelationMessageColumn(flags: $flags, name: $name, dataType: $dataType, typeModifier: $typeModifier)'; + } +} + +class RelationMessage implements LogicalReplicationMessage { + /// The message type + late final baseMessage = LogicalReplicationMessageTypes.Relation; + late final int relationID; + late final String nameSpace; + late final String relationName; + late final int replicaIdentity; + late final int columnNum; + late final columns = []; + + RelationMessage(Uint8List bytes) { + final reader = ByteDataReader()..add(bytes); + // reading order matters + relationID = reader.readUint32(); + nameSpace = reader.decodeString(); + relationName = reader.decodeString(); + replicaIdentity = reader.readUint8(); + columnNum = reader.readUint16(); + + for (var i = 0; i < columnNum; i++) { + // reading order matters + final flags = reader.readUint8(); + final name = reader.decodeString(); + final dataType = reader.readUint32(); + final typeModifier = reader.readUint32(); + columns.add( + RelationMessageColumn( + flags: flags, + name: name, + dataType: dataType, + typeModifier: typeModifier, + ), + ); + } + } + + @override + String toString() { + return 'RelationMessage(relationID: $relationID, nameSpace: $nameSpace, relationName: $relationName, replicaIdentity: $replicaIdentity, columnNum: $columnNum, columns: $columns)'; + } +} + +class TypeMessage implements LogicalReplicationMessage { + /// The message type + late final baseMessage = LogicalReplicationMessageTypes.Type; + + /// This is the type OID + // TODO: create a getter for the type as a string + late final int dataType; + + late final String nameSpace; + + late final String name; + + TypeMessage(Uint8List bytes) { + final reader = ByteDataReader()..add(bytes); + // reading order matters + dataType = reader.readUint32(); + nameSpace = reader.decodeString(); + name = reader.decodeString(); + } + + @override + String toString() => + 'TypeMessage(dataType: $dataType, nameSpace: $nameSpace, name: $name)'; +} + +enum TupleDataType { + nullType('n'), + toastType('u'), + textType('t'), + binaryType('b'); + + final String id; + const TupleDataType(this.id); + static TupleDataType fromID(String id) { + return TupleDataType.values.firstWhere((element) => element.id == id); + } + + static TupleDataType fromByte(int byte) { + return fromID(String.fromCharCode(byte)); + } +} + +class TupleDataColumn { + /// Indicates the how does the data is stored. + /// Byte1('n') Identifies the data as NULL value. + /// Or + /// Byte1('u') Identifies unchanged TOASTed value (the actual value is not sent). + /// Or + /// Byte1('t') Identifies the data as text formatted value. + /// Or + /// Byte1('b') Identifies the data as binary value. + final int dataType; + final int length; + + String get dataTypeName => + PostgresBinaryDecoder.typeMap[dataType]?.name ?? dataType.toString(); + + /// Data is the value of the column, in text format. + /// n is the above length. + final String data; + + TupleDataColumn({ + required this.dataType, + required this.length, + required this.data, + }); + + @override + String toString() => + 'TupleDataColumn(dataType: $dataTypeName, length: $length, data: $data)'; +} + +class TupleData { + /// The message type + // late final ReplicationMessageTypes baseMessage; + + late final int columnNum; + late final columns = []; + + /// TupleData does not consume the entire bytes + /// + /// It'll read until the types are generated. + TupleData(ByteDataReader reader) { + columnNum = reader.readUint16(); + for (var i = 0; i < columnNum; i++) { + // reading order matters + final dataType = reader.readUint8(); + final tupleDataType = TupleDataType.fromByte(dataType); + late final int length; + late final Uint8List data; + switch (tupleDataType) { + case TupleDataType.textType: + case TupleDataType.binaryType: + length = reader.readUint32(); + data = reader.read(length); + break; + case TupleDataType.nullType: + case TupleDataType.toastType: + length = 0; + data = Uint8List(0); + break; + } + columns.add( + TupleDataColumn( + dataType: dataType, + length: length, + data: utf8.decode(data), + ), + ); + } + } + + @override + String toString() => 'TupleData(columnNum: $columnNum, columns: $columns)'; +} + +class InsertMessage implements LogicalReplicationMessage { + /// The message type + late final baseMessage = LogicalReplicationMessageTypes.Insert; + + /// The ID of the relation corresponding to the ID in the relation message. + late final int relationID; + late final TupleData tuple; + + InsertMessage(Uint8List bytes) { + final reader = ByteDataReader()..add(bytes); + relationID = reader.readUint32(); + final tupleType = reader.readUint8(); + if (tupleType != 'N'.codeUnitAt(0)) { + throw Exception("InsertMessage must have 'N' tuple type"); + } + tuple = TupleData(reader); + } + + @override + String toString() => 'InsertMessage(relationID: $relationID, tuple: $tuple)'; +} + +enum UpdateMessageTuple { + noneType('0'), // This is Zero not the letter 'O' + keyType('K'), + oldType('O'), + newType('N'); + + final String id; + const UpdateMessageTuple(this.id); + static UpdateMessageTuple fromID(String id) { + return UpdateMessageTuple.values + .firstWhere((element) => element.id == id, orElse: () => noneType); + } + + static UpdateMessageTuple fromByte(int byte) { + if (byte == 0) { + return noneType; + } + return fromID(String.fromCharCode(byte)); + } +} + +class UpdateMessage implements LogicalReplicationMessage { + /// The message type + late final baseMessage = LogicalReplicationMessageTypes.Update; + + late final int relationID; + + /// OldTupleType + /// Byte1('K'): + /// Identifies the following TupleData submessage as a key. + /// This field is optional and is only present if the update changed data + /// in any of the column(s) that are part of the REPLICA IDENTITY index. + /// + /// Byte1('O'): + /// Identifies the following TupleData submessage as an old tuple. + /// This field is optional & is only present if table in which the update + /// happened has REPLICA IDENTITY set to FULL. + /// + /// The Update message may contain either a 'K' message part or an 'O' message + /// part or neither of them, but never both of them. + late final UpdateMessageTuple? oldTupleType; + late final TupleData? oldTuple; + + /// NewTuple is the contents of a new tuple. + /// Byte1('N'): Identifies the following TupleData message as a new tuple. + late final TupleData? newTuple; + + UpdateMessage(Uint8List bytes) { + final reader = ByteDataReader()..add(bytes); + // reading order matters + relationID = reader.readUint32(); + var tupleType = UpdateMessageTuple.fromByte(reader.readUint8()); + + if (tupleType == UpdateMessageTuple.oldType || + tupleType == UpdateMessageTuple.keyType) { + oldTupleType = tupleType; + oldTuple = TupleData(reader); + tupleType = UpdateMessageTuple.fromByte(reader.readUint8()); + } else { + oldTupleType = null; + oldTuple = null; + } + + if (tupleType == UpdateMessageTuple.newType) { + newTuple = TupleData(reader); + } else { + throw Exception('Invalid Tuple Type for UpdateMessage'); + } + } + + @override + String toString() { + return 'UpdateMessage(relationID: $relationID, oldTupleType: $oldTupleType, oldTuple: $oldTuple, newTuple: $newTuple)'; + } +} + +enum DeleteMessageTuple { + keyType('K'), + oldType('O'), + unknown(''); + + final String id; + const DeleteMessageTuple(this.id); + static DeleteMessageTuple fromID(String id) { + return DeleteMessageTuple.values.firstWhere( + (element) => element.id == id, + orElse: () => unknown, + ); + } + + static DeleteMessageTuple fromByte(int byte) { + return fromID(String.fromCharCode(byte)); + } +} + +class DeleteMessage implements LogicalReplicationMessage { + /// The message type + late final baseMessage = LogicalReplicationMessageTypes.Delete; + + late final int relationID; + + /// OldTupleType + /// Byte1('K'): + /// Identifies the following TupleData submessage as a key. + /// This field is optional and is only present if the update changed data + /// in any of the column(s) that are part of the REPLICA IDENTITY index. + /// + /// Byte1('O'): + /// Identifies the following TupleData submessage as an old tuple. + /// This field is optional & is only present if table in which the update + /// happened has REPLICA IDENTITY set to FULL. + /// + /// The Update message may contain either a 'K' message part or an 'O' message + /// part or neither of them, but never both of them. + late final DeleteMessageTuple oldTupleType; + + /// NewTuple is the contents of a new tuple. + /// Byte1('N'): Identifies the following TupleData message as a new tuple. + late final TupleData oldTuple; + + DeleteMessage(Uint8List bytes) { + final reader = ByteDataReader()..add(bytes); + relationID = reader.readUint32(); + oldTupleType = DeleteMessageTuple.fromByte(reader.readUint8()); + + switch (oldTupleType) { + case DeleteMessageTuple.keyType: + case DeleteMessageTuple.oldType: + oldTuple = TupleData(reader); + break; + default: + throw Exception('Unknown tuple type for DeleteMessage'); + } + } + + @override + String toString() => + 'DeleteMessage(relationID: $relationID, oldTupleType: $oldTupleType, oldTuple: $oldTuple)'; +} + +// see https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html +enum TruncateOptions { + cascade(1), + restartIdentity(2), + none(0); + + final int value; + const TruncateOptions(this.value); + + static TruncateOptions fromValue(int value) { + return TruncateOptions.values + .firstWhere((element) => element.value == value, orElse: () => none); + } +} + +class TruncateMessage implements LogicalReplicationMessage { + /// The message type + late final baseMessage = LogicalReplicationMessageTypes.Truncate; + + late final int relationNum; + + late final TruncateOptions option; + + final relationIds = []; + + TruncateMessage(Uint8List bytes) { + final reader = ByteDataReader()..add(bytes); + relationNum = reader.readUint32(); + option = TruncateOptions.fromValue(reader.readUint8()); + for (var i = 0; i < relationNum; i++) { + final id = reader.readUint32(); + relationIds.add(id); + } + } + + @override + String toString() => + 'TruncateMessage(relationNum: $relationNum, option: $option, relationIds: $relationIds)'; +} + +/// Extension contain commonly used methods within this file +extension on ByteDataReader { + LSN readLSN() { + return LSN(readUint64()); + } + + DateTime readTime() { + return dateTimeFromMicrosecondsSinceY2k(readUint64()); + } + + /// Decodes a string from reader current offset + /// + /// String type definition: https://www.postgresql.org/docs/current/protocol-message-types.html + /// String(s) + /// A null-terminated string (C-style string). There is no specific length limitation on strings. + /// If s is specified it is the exact value that will appear, otherwise the value is variable. + /// Eg. String, String("user"). + /// + /// If there is no null byte, return empty string. + String decodeString() { + var foundNullByte = false; + final string = []; + while (remainingLength > 0) { + final byte = readUint8(); + if (byte == 0) { + foundNullByte = true; + break; + } + string.add(byte); + } + + if (!foundNullByte) { + return ''; + } + + return utf8.decode(string); + } +} From fd828c377f7d0c585b08fe2349dfa38cc72874cd Mon Sep 17 00:00:00 2001 From: osaxma <46427323+osaxma@users.noreply.github.com> Date: Tue, 6 Sep 2022 22:20:10 +0300 Subject: [PATCH 5/6] export messages as a stand-alone library This is exported separately to avoid polluting the namespace when the user is not interested in the messages. Yet, this is necessary for those needs to communicate in replication mode --- lib/messages.dart | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 lib/messages.dart diff --git a/lib/messages.dart b/lib/messages.dart new file mode 100644 index 00000000..0c67c425 --- /dev/null +++ b/lib/messages.dart @@ -0,0 +1,6 @@ +library postgres.messages; + +export 'src/client_messages.dart'; +export 'src/logical_replication_messages.dart'; +export 'src/server_messages.dart'; +export 'src/shared_messages.dart'; From a7cb727e685e471d5a229379c88191867570ef9f Mon Sep 17 00:00:00 2001 From: osaxma <46427323+osaxma@users.noreply.github.com> Date: Sat, 10 Sep 2022 12:27:34 +0300 Subject: [PATCH 6/6] fix tyop --- lib/src/client_messages.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/client_messages.dart b/lib/src/client_messages.dart index f0762cf7..fdbd77d2 100644 --- a/lib/src/client_messages.dart +++ b/lib/src/client_messages.dart @@ -263,7 +263,7 @@ class StandbyStatusUpdateMessage extends ClientMessage /// /// The only required field is [walWritePosition]. If either [walFlushPosition] /// or [walApplyPosition] are `null`, [walWritePosition] will be assigned to them. - /// If [clientTime], then the current time is used. + /// If [clientTime] is not given, then the current time is used. /// /// When sending this message, it must be wrapped within [CopyDataMessage] StandbyStatusUpdateMessage({