Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming Replication 6th] Add new message types for replication #57

Merged
merged 6 commits into from
Sep 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/messages.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
library postgres.messages;

export 'src/client_messages.dart';
export 'src/logical_replication_messages.dart';
export 'src/server_messages.dart';
export 'src/shared_messages.dart';
92 changes: 92 additions & 0 deletions lib/src/client_messages.dart
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import 'dart:typed_data';

import 'package:buffer/buffer.dart';
import 'package:postgres/src/time_converters.dart';

import 'constants.dart';
import 'query.dart';
import 'replication.dart';

import 'shared_messages.dart';
import 'types.dart';
import 'utf8_backed_string.dart';

abstract class ClientMessage {
Expand Down Expand Up @@ -237,3 +241,91 @@ class SyncMessage extends ClientMessage {
buffer.writeUint32(4);
}
}

class StandbyStatusUpdateMessage extends ClientMessage
implements ReplicationMessage {
/// The WAL position that's been locally written
final LSN walWritePosition;

/// The WAL position that's been locally flushed
late final LSN walFlushPosition;

/// The WAL position that's been locally applied
late final LSN walApplyPosition;

/// Client system clock time
late final DateTime clientTime;

/// Request server to reply immediately.
final bool mustReply;

/// StandbyStatusUpdate to the PostgreSQL server.
///
/// The only required field is [walWritePosition]. If either [walFlushPosition]
/// or [walApplyPosition] are `null`, [walWritePosition] will be assigned to them.
/// If [clientTime] is not given, then the current time is used.
///
/// When sending this message, it must be wrapped within [CopyDataMessage]
StandbyStatusUpdateMessage({
required this.walWritePosition,
LSN? walFlushPosition,
LSN? walApplyPosition,
DateTime? clientTime,
this.mustReply = false,
}) {
this.walFlushPosition = walFlushPosition ?? walWritePosition;
this.walApplyPosition = walApplyPosition ?? walWritePosition;
this.clientTime = clientTime ?? DateTime.now().toUtc();
}

@override
void applyToBuffer(ByteDataWriter buffer) {
buffer.writeUint8(ReplicationMessage.standbyStatusUpdateIdentifier);
buffer.writeUint64(walWritePosition.value);
buffer.writeUint64(walFlushPosition.value);
buffer.writeUint64(walApplyPosition.value);
buffer.writeUint64(dateTimeToMicrosecondsSinceY2k(clientTime));
buffer.writeUint8(mustReply ? 1 : 0);
}
}

class HotStandbyFeedbackMessage extends ClientMessage
implements ReplicationMessage {
/// The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
final DateTime clientTime;

/// The standby's current global xmin, excluding the catalog_xmin from any
/// replication slots. If both this value and the following catalog_xmin are 0
/// this is treated as a notification that Hot Standby feedback will no longer
/// be sent on this connection. Later non-zero messages may reinitiate the
/// feedback mechanism
final int currentGlobalXmin;

/// The epoch of the global xmin xid on the standby.
final int epochGlobalXminXid;

/// The lowest catalog_xmin of any replication slots on the standby. Set to 0
/// if no catalog_xmin exists on the standby or if hot standby feedback is
/// being disabled.
final int lowestCatalogXmin;

/// The epoch of the catalog_xmin xid on the standby.
final int epochCatalogXminXid;

HotStandbyFeedbackMessage(
this.clientTime,
this.currentGlobalXmin,
this.epochGlobalXminXid,
this.lowestCatalogXmin,
this.epochCatalogXminXid);

@override
void applyToBuffer(ByteDataWriter buffer) {
buffer.writeUint8(ReplicationMessage.hotStandbyFeedbackIdentifier);
buffer.writeUint64(dateTimeToMicrosecondsSinceY2k(clientTime));
buffer.writeUint32(currentGlobalXmin);
buffer.writeUint32(epochGlobalXminXid);
buffer.writeUint32(lowestCatalogXmin);
buffer.writeUint32(epochCatalogXminXid);
}
}
Loading