Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming Replication 7th] Replication Mode Messages Handling #58

Merged
merged 8 commits into from
Sep 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/postgres.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ library postgres;
export 'src/connection.dart';
export 'src/execution_context.dart';
export 'src/models.dart';
export 'src/replication.dart' show ReplicationMode;
export 'src/substituter.dart';
export 'src/types.dart';
58 changes: 44 additions & 14 deletions lib/src/logical_replication_messages.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,93 @@ import 'types.dart';
abstract class LogicalReplicationMessage
implements ReplicationMessage, ServerMessage {}

class XLogDataLogicalMessage extends XLogDataMessage {
late final LogicalReplicationMessage _message;
class XLogDataLogicalMessage implements XLogDataMessage {
@override
final Uint8List bytes;

@override
LogicalReplicationMessage get data => _message;
final DateTime time;

XLogDataLogicalMessage(Uint8List bytes) : super(bytes) {
_message = parseLogicalReplicationMessage(this.bytes);
}
@override
final LSN walEnd;

@override
final LSN walStart;

@override
LSN get walDataLength => LSN(bytes.length);

late final LogicalReplicationMessage message;

@override
LogicalReplicationMessage get data => message;

XLogDataLogicalMessage({
required this.message,
required this.bytes,
required this.time,
required this.walEnd,
required this.walStart,
});

@override
String toString() => super.toString();
}

LogicalReplicationMessage parseLogicalReplicationMessage(Uint8List bytesList) {
/// Tries to check if the [bytesList] is a [LogicalReplicationMessage]. If so,
/// [LogicalReplicationMessage] is returned, otherwise `null` is returned.
LogicalReplicationMessage? tryParseLogicalReplicationMessage(
Uint8List bytesList) {
// the first byte is the msg type
final firstByte = bytesList.first;
final msgType = LogicalReplicationMessageTypes.fromByte(firstByte);
// remaining bytes are the data
// remaining bytes are the data
final bytes = bytesList.sublist(1);

switch (msgType) {
case LogicalReplicationMessageTypes.Begin:
return BeginMessage(bytes);

case LogicalReplicationMessageTypes.Commit:
return CommitMessage(bytes);

case LogicalReplicationMessageTypes.Origin:
return OriginMessage(bytes);

case LogicalReplicationMessageTypes.Relation:
return RelationMessage(bytes);

case LogicalReplicationMessageTypes.Type:
return TypeMessage(bytes);

case LogicalReplicationMessageTypes.Insert:
return InsertMessage(bytes);

case LogicalReplicationMessageTypes.Update:
return UpdateMessage(bytes);

case LogicalReplicationMessageTypes.Delete:
return DeleteMessage(bytes);

case LogicalReplicationMessageTypes.Truncate:
return TruncateMessage(bytes);

case LogicalReplicationMessageTypes.Unsupported:
default:
return _parseJsonMessageOrReturnUnknownMessage(bytes);
// note this needs the full set of bytes unlike other cases
return _tryParseJsonMessage(bytesList);
}
}

LogicalReplicationMessage _parseJsonMessageOrReturnUnknownMessage(
Uint8List bytes) {
LogicalReplicationMessage? _tryParseJsonMessage(Uint8List bytes) {
// wal2json messages starts with `{` as the first byte
if (bytes.first == '{'.codeUnits.first) {
try {
return JsonMessage(utf8.decode(bytes));
} catch (e) {
return UnknownLogicalReplicationMessage(bytes);
return null;
}
}
return UnknownLogicalReplicationMessage(bytes);
return null;
}

enum LogicalReplicationMessageTypes {
Expand Down
47 changes: 42 additions & 5 deletions lib/src/message_window.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'dart:typed_data';
import 'package:buffer/buffer.dart';

import 'server_messages.dart';
import 'shared_messages.dart';

const int _headerByteSize = 5;
final _emptyData = Uint8List(0);
Expand All @@ -21,7 +22,9 @@ Map<int, _ServerMessageFn> _messageTypeMap = {
82: (d) => AuthenticationMessage(d),
83: (d) => ParameterStatusMessage(d),
84: (d) => RowDescriptionMessage(d),
87: (d) => CopyBothResponseMessage(d),
90: (d) => ReadyForQueryMessage(d),
100: (d) => CopyDataMessage(d),
110: (d) => NoDataMessage(),
116: (d) => ParameterDescriptionMessage(d),
};
Expand Down Expand Up @@ -51,20 +54,54 @@ class MessageFramer {
_expectedLength = _reader.readUint32() - 4;
}

if (_hasReadHeader && _isComplete) {
// special case
if (_type == SharedMessages.copyDoneIdentifier) {
// unlike other messages, CopyDoneMessage only takes the length as an
// argument (must be the full length including the length bytes)
final msg = CopyDoneMessage(_expectedLength + 4);
_addMsg(msg);
evaluateNextMessage = true;
} else if (_hasReadHeader && _isComplete) {
final data =
_expectedLength == 0 ? _emptyData : _reader.read(_expectedLength);
final msgMaker = _messageTypeMap[_type];
final msg =
var msg =
msgMaker == null ? UnknownMessage(_type, data) : msgMaker(data);
messageQueue.add(msg);
_type = null;
_expectedLength = 0;

// Copy Data message is a wrapper around data stream messages
// such as replication messages.
if (msg is CopyDataMessage) {
// checks if it's a replication message, otherwise returns given msg
msg = _extractReplicationMessageIfAny(msg);
}

_addMsg(msg);
evaluateNextMessage = true;
}
}
}

void _addMsg(ServerMessage msg) {
messageQueue.add(msg);
_type = null;
_expectedLength = 0;
}

/// Returns a [ReplicationMessage] if the [CopyDataMessage] contains such message.
/// Otherwise, it'll just return the provided [copyData].
ServerMessage _extractReplicationMessageIfAny(CopyDataMessage copyData) {
final bytes = copyData.bytes;
final code = bytes.first;
final data = bytes.sublist(1);
if (code == ReplicationMessage.primaryKeepAliveIdentifier) {
return PrimaryKeepAliveMessage(data);
} else if (code == ReplicationMessage.xLogDataIdentifier) {
return XLogDataMessage.parse(data);
} else {
return copyData;
}
}

bool get hasMessage => messageQueue.isNotEmpty;

ServerMessage popMessage() {
Expand Down
50 changes: 41 additions & 9 deletions lib/src/server_messages.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'dart:convert';
import 'dart:typed_data';

import 'package:buffer/buffer.dart';
import 'package:postgres/messages.dart';

import 'connection.dart';
import 'query.dart';
Expand Down Expand Up @@ -278,24 +279,55 @@ class PrimaryKeepAliveMessage implements ReplicationMessage, ServerMessage {
}

class XLogDataMessage implements ReplicationMessage, ServerMessage {
late final LSN walStart;
late final LSN walEnd;
late final DateTime time;
late final Uint8List bytes;
final LSN walStart;
final LSN walEnd;
final DateTime time;
final Uint8List bytes;
// this is used for standby msg
LSN get walDataLength => LSN(bytes.length);

/// For physical replication, this is the raw [bytes]
/// For logical replication, see [XLogDataLogicalMessage]
Object get data => bytes;

XLogDataMessage(Uint8List bytes) {
XLogDataMessage({
required this.walStart,
required this.walEnd,
required this.time,
required this.bytes,
});

/// Parses the XLogDataMessage
///
/// If [XLogDataMessage.data] is a [LogicalReplicationMessage], then the method
/// will return a [XLogDataLogicalMessage] with that message. Otherwise, it'll
/// return [XLogDataMessage] with raw data.
static XLogDataMessage parse(Uint8List bytes) {
final reader = ByteDataReader()..add(bytes);
walStart = LSN(reader.readUint64());
walEnd = LSN(reader.readUint64());
time = dateTimeFromMicrosecondsSinceY2k(reader.readUint64());
this.bytes = reader.read(reader.remainingLength);
final walStart = LSN(reader.readUint64());
final walEnd = LSN(reader.readUint64());
final time = dateTimeFromMicrosecondsSinceY2k(reader.readUint64());
final data = reader.read(reader.remainingLength);

final message = tryParseLogicalReplicationMessage(data);
if (message != null) {
return XLogDataLogicalMessage(
message: message,
bytes: bytes,
time: time,
walEnd: walEnd,
walStart: walStart,
);
} else {
return XLogDataMessage(
bytes: bytes,
time: time,
walEnd: walEnd,
walStart: walStart,
);
}
}

@override
String toString() =>
'XLogDataMessage(walStart: $walStart, walEnd: $walEnd, time: $time, data: $data)';
Expand Down
Loading