Skip to content

Commit

Permalink
[Streaming Replication 6th] Add new message types for replication (#57)
Browse files Browse the repository at this point in the history
* Add shared messages between client and server

* Add new client messages

* Add new server messages

* Add Logical Replication Messages Formats

* export messages as a stand-alone library

This is exported separately to avoid polluting the
namespace when the user is not interested in the
messages. Yet, this is necessary for those needs to
communicate in replication mode

* fix tyop
  • Loading branch information
osaxma authored Sep 10, 2022
1 parent f0fe101 commit 51c2874
Show file tree
Hide file tree
Showing 5 changed files with 867 additions and 0 deletions.
6 changes: 6 additions & 0 deletions lib/messages.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
library postgres.messages;

export 'src/client_messages.dart';
export 'src/logical_replication_messages.dart';
export 'src/server_messages.dart';
export 'src/shared_messages.dart';
92 changes: 92 additions & 0 deletions lib/src/client_messages.dart
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import 'dart:typed_data';

import 'package:buffer/buffer.dart';
import 'package:postgres/src/time_converters.dart';

import 'constants.dart';
import 'query.dart';
import 'replication.dart';

import 'shared_messages.dart';
import 'types.dart';
import 'utf8_backed_string.dart';

abstract class ClientMessage {
Expand Down Expand Up @@ -237,3 +241,91 @@ class SyncMessage extends ClientMessage {
buffer.writeUint32(4);
}
}

class StandbyStatusUpdateMessage extends ClientMessage
implements ReplicationMessage {
/// The WAL position that's been locally written
final LSN walWritePosition;

/// The WAL position that's been locally flushed
late final LSN walFlushPosition;

/// The WAL position that's been locally applied
late final LSN walApplyPosition;

/// Client system clock time
late final DateTime clientTime;

/// Request server to reply immediately.
final bool mustReply;

/// StandbyStatusUpdate to the PostgreSQL server.
///
/// The only required field is [walWritePosition]. If either [walFlushPosition]
/// or [walApplyPosition] are `null`, [walWritePosition] will be assigned to them.
/// If [clientTime] is not given, then the current time is used.
///
/// When sending this message, it must be wrapped within [CopyDataMessage]
StandbyStatusUpdateMessage({
required this.walWritePosition,
LSN? walFlushPosition,
LSN? walApplyPosition,
DateTime? clientTime,
this.mustReply = false,
}) {
this.walFlushPosition = walFlushPosition ?? walWritePosition;
this.walApplyPosition = walApplyPosition ?? walWritePosition;
this.clientTime = clientTime ?? DateTime.now().toUtc();
}

@override
void applyToBuffer(ByteDataWriter buffer) {
buffer.writeUint8(ReplicationMessage.standbyStatusUpdateIdentifier);
buffer.writeUint64(walWritePosition.value);
buffer.writeUint64(walFlushPosition.value);
buffer.writeUint64(walApplyPosition.value);
buffer.writeUint64(dateTimeToMicrosecondsSinceY2k(clientTime));
buffer.writeUint8(mustReply ? 1 : 0);
}
}

class HotStandbyFeedbackMessage extends ClientMessage
implements ReplicationMessage {
/// The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
final DateTime clientTime;

/// The standby's current global xmin, excluding the catalog_xmin from any
/// replication slots. If both this value and the following catalog_xmin are 0
/// this is treated as a notification that Hot Standby feedback will no longer
/// be sent on this connection. Later non-zero messages may reinitiate the
/// feedback mechanism
final int currentGlobalXmin;

/// The epoch of the global xmin xid on the standby.
final int epochGlobalXminXid;

/// The lowest catalog_xmin of any replication slots on the standby. Set to 0
/// if no catalog_xmin exists on the standby or if hot standby feedback is
/// being disabled.
final int lowestCatalogXmin;

/// The epoch of the catalog_xmin xid on the standby.
final int epochCatalogXminXid;

HotStandbyFeedbackMessage(
this.clientTime,
this.currentGlobalXmin,
this.epochGlobalXminXid,
this.lowestCatalogXmin,
this.epochCatalogXminXid);

@override
void applyToBuffer(ByteDataWriter buffer) {
buffer.writeUint8(ReplicationMessage.hotStandbyFeedbackIdentifier);
buffer.writeUint64(dateTimeToMicrosecondsSinceY2k(clientTime));
buffer.writeUint32(currentGlobalXmin);
buffer.writeUint32(epochGlobalXminXid);
buffer.writeUint32(lowestCatalogXmin);
buffer.writeUint32(epochCatalogXminXid);
}
}
Loading

0 comments on commit 51c2874

Please sign in to comment.