Skip to content

Commit

Permalink
[Streaming Replication 7th] Replication Mode Messages Handling (#58)
Browse files Browse the repository at this point in the history
* Encapsulate logical replication parsing

This helps letting the MessageFramer to be aganostic to ReplicationMode

* Modify MessageFramer to handle new messages

tests were added as well

* export replication mode in postgres library

* add test for logical replication

* modify message for logicial replication CI tests

* fix several comments

* Modify logical replication message parsing

* fix docs
  • Loading branch information
osaxma authored Sep 10, 2022
1 parent 51c2874 commit 4ba7680
Show file tree
Hide file tree
Showing 6 changed files with 554 additions and 28 deletions.
1 change: 1 addition & 0 deletions lib/postgres.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ library postgres;
export 'src/connection.dart';
export 'src/execution_context.dart';
export 'src/models.dart';
export 'src/replication.dart' show ReplicationMode;
export 'src/substituter.dart';
export 'src/types.dart';
58 changes: 44 additions & 14 deletions lib/src/logical_replication_messages.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,93 @@ import 'types.dart';
abstract class LogicalReplicationMessage
implements ReplicationMessage, ServerMessage {}

class XLogDataLogicalMessage extends XLogDataMessage {
late final LogicalReplicationMessage _message;
class XLogDataLogicalMessage implements XLogDataMessage {
@override
final Uint8List bytes;

@override
LogicalReplicationMessage get data => _message;
final DateTime time;

XLogDataLogicalMessage(Uint8List bytes) : super(bytes) {
_message = parseLogicalReplicationMessage(this.bytes);
}
@override
final LSN walEnd;

@override
final LSN walStart;

@override
LSN get walDataLength => LSN(bytes.length);

late final LogicalReplicationMessage message;

@override
LogicalReplicationMessage get data => message;

XLogDataLogicalMessage({
required this.message,
required this.bytes,
required this.time,
required this.walEnd,
required this.walStart,
});

@override
String toString() => super.toString();
}

LogicalReplicationMessage parseLogicalReplicationMessage(Uint8List bytesList) {
/// Tries to check if the [bytesList] is a [LogicalReplicationMessage]. If so,
/// [LogicalReplicationMessage] is returned, otherwise `null` is returned.
LogicalReplicationMessage? tryParseLogicalReplicationMessage(
Uint8List bytesList) {
// the first byte is the msg type
final firstByte = bytesList.first;
final msgType = LogicalReplicationMessageTypes.fromByte(firstByte);
// remaining bytes are the data
// remaining bytes are the data
final bytes = bytesList.sublist(1);

switch (msgType) {
case LogicalReplicationMessageTypes.Begin:
return BeginMessage(bytes);

case LogicalReplicationMessageTypes.Commit:
return CommitMessage(bytes);

case LogicalReplicationMessageTypes.Origin:
return OriginMessage(bytes);

case LogicalReplicationMessageTypes.Relation:
return RelationMessage(bytes);

case LogicalReplicationMessageTypes.Type:
return TypeMessage(bytes);

case LogicalReplicationMessageTypes.Insert:
return InsertMessage(bytes);

case LogicalReplicationMessageTypes.Update:
return UpdateMessage(bytes);

case LogicalReplicationMessageTypes.Delete:
return DeleteMessage(bytes);

case LogicalReplicationMessageTypes.Truncate:
return TruncateMessage(bytes);

case LogicalReplicationMessageTypes.Unsupported:
default:
return _parseJsonMessageOrReturnUnknownMessage(bytes);
// note this needs the full set of bytes unlike other cases
return _tryParseJsonMessage(bytesList);
}
}

LogicalReplicationMessage _parseJsonMessageOrReturnUnknownMessage(
Uint8List bytes) {
LogicalReplicationMessage? _tryParseJsonMessage(Uint8List bytes) {
// wal2json messages starts with `{` as the first byte
if (bytes.first == '{'.codeUnits.first) {
try {
return JsonMessage(utf8.decode(bytes));
} catch (e) {
return UnknownLogicalReplicationMessage(bytes);
return null;
}
}
return UnknownLogicalReplicationMessage(bytes);
return null;
}

enum LogicalReplicationMessageTypes {
Expand Down
47 changes: 42 additions & 5 deletions lib/src/message_window.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'dart:typed_data';
import 'package:buffer/buffer.dart';

import 'server_messages.dart';
import 'shared_messages.dart';

const int _headerByteSize = 5;
final _emptyData = Uint8List(0);
Expand All @@ -21,7 +22,9 @@ Map<int, _ServerMessageFn> _messageTypeMap = {
82: (d) => AuthenticationMessage(d),
83: (d) => ParameterStatusMessage(d),
84: (d) => RowDescriptionMessage(d),
87: (d) => CopyBothResponseMessage(d),
90: (d) => ReadyForQueryMessage(d),
100: (d) => CopyDataMessage(d),
110: (d) => NoDataMessage(),
116: (d) => ParameterDescriptionMessage(d),
};
Expand Down Expand Up @@ -51,20 +54,54 @@ class MessageFramer {
_expectedLength = _reader.readUint32() - 4;
}

if (_hasReadHeader && _isComplete) {
// special case
if (_type == SharedMessages.copyDoneIdentifier) {
// unlike other messages, CopyDoneMessage only takes the length as an
// argument (must be the full length including the length bytes)
final msg = CopyDoneMessage(_expectedLength + 4);
_addMsg(msg);
evaluateNextMessage = true;
} else if (_hasReadHeader && _isComplete) {
final data =
_expectedLength == 0 ? _emptyData : _reader.read(_expectedLength);
final msgMaker = _messageTypeMap[_type];
final msg =
var msg =
msgMaker == null ? UnknownMessage(_type, data) : msgMaker(data);
messageQueue.add(msg);
_type = null;
_expectedLength = 0;

// Copy Data message is a wrapper around data stream messages
// such as replication messages.
if (msg is CopyDataMessage) {
// checks if it's a replication message, otherwise returns given msg
msg = _extractReplicationMessageIfAny(msg);
}

_addMsg(msg);
evaluateNextMessage = true;
}
}
}

void _addMsg(ServerMessage msg) {
messageQueue.add(msg);
_type = null;
_expectedLength = 0;
}

/// Returns a [ReplicationMessage] if the [CopyDataMessage] contains such message.
/// Otherwise, it'll just return the provided [copyData].
ServerMessage _extractReplicationMessageIfAny(CopyDataMessage copyData) {
final bytes = copyData.bytes;
final code = bytes.first;
final data = bytes.sublist(1);
if (code == ReplicationMessage.primaryKeepAliveIdentifier) {
return PrimaryKeepAliveMessage(data);
} else if (code == ReplicationMessage.xLogDataIdentifier) {
return XLogDataMessage.parse(data);
} else {
return copyData;
}
}

bool get hasMessage => messageQueue.isNotEmpty;

ServerMessage popMessage() {
Expand Down
50 changes: 41 additions & 9 deletions lib/src/server_messages.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'dart:convert';
import 'dart:typed_data';

import 'package:buffer/buffer.dart';
import 'package:postgres/messages.dart';

import 'connection.dart';
import 'query.dart';
Expand Down Expand Up @@ -278,24 +279,55 @@ class PrimaryKeepAliveMessage implements ReplicationMessage, ServerMessage {
}

class XLogDataMessage implements ReplicationMessage, ServerMessage {
late final LSN walStart;
late final LSN walEnd;
late final DateTime time;
late final Uint8List bytes;
final LSN walStart;
final LSN walEnd;
final DateTime time;
final Uint8List bytes;
// this is used for standby msg
LSN get walDataLength => LSN(bytes.length);

/// For physical replication, this is the raw [bytes]
/// For logical replication, see [XLogDataLogicalMessage]
Object get data => bytes;

XLogDataMessage(Uint8List bytes) {
XLogDataMessage({
required this.walStart,
required this.walEnd,
required this.time,
required this.bytes,
});

/// Parses the XLogDataMessage
///
/// If [XLogDataMessage.data] is a [LogicalReplicationMessage], then the method
/// will return a [XLogDataLogicalMessage] with that message. Otherwise, it'll
/// return [XLogDataMessage] with raw data.
static XLogDataMessage parse(Uint8List bytes) {
final reader = ByteDataReader()..add(bytes);
walStart = LSN(reader.readUint64());
walEnd = LSN(reader.readUint64());
time = dateTimeFromMicrosecondsSinceY2k(reader.readUint64());
this.bytes = reader.read(reader.remainingLength);
final walStart = LSN(reader.readUint64());
final walEnd = LSN(reader.readUint64());
final time = dateTimeFromMicrosecondsSinceY2k(reader.readUint64());
final data = reader.read(reader.remainingLength);

final message = tryParseLogicalReplicationMessage(data);
if (message != null) {
return XLogDataLogicalMessage(
message: message,
bytes: bytes,
time: time,
walEnd: walEnd,
walStart: walStart,
);
} else {
return XLogDataMessage(
bytes: bytes,
time: time,
walEnd: walEnd,
walStart: walStart,
);
}
}

@override
String toString() =>
'XLogDataMessage(walStart: $walStart, walEnd: $walEnd, time: $time, data: $data)';
Expand Down
Loading

0 comments on commit 4ba7680

Please sign in to comment.