Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming Replication - 1st] Introduce new methods and getter to PostgreSQLConnection #51

Merged
merged 6 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ doc/api/
*.iml
*.ipr
*.iws

# VS Code IDE
.vscode/
95 changes: 90 additions & 5 deletions lib/src/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class PostgreSQLConnection extends Object
final StreamController<Notification> _notifications =
StreamController<Notification>.broadcast();

final StreamController<ServerMessage> _messages =
StreamController<ServerMessage>.broadcast();

/// Hostname of database this connection refers to.
final String host;

Expand Down Expand Up @@ -104,6 +107,14 @@ class PostgreSQLConnection extends Object
/// to [Notification.processID].
Stream<Notification> get notifications => _notifications.stream;

/// Stream of server messages
///
/// Listen to this [Stream] to receive events for all PostgreSQL server messages
///
/// This includes all messages whether from Extended Query Protocol, Simple Query Protocol
/// or Streaming Replication Protocol.
Stream<ServerMessage> get messages => _messages.stream;
isoos marked this conversation as resolved.
Show resolved Hide resolved

/// Reports on the latest known status of the connection: whether it was open or failed for some reason.
///
/// This is `true` when this instance is first created and after it has been closed or encountered an unrecoverable error.
Expand Down Expand Up @@ -192,6 +203,19 @@ class PostgreSQLConnection extends Object
/// After the returned [Future] completes, this connection can no longer be used to execute queries. Any queries in progress or queued are cancelled.
Future close() => _close();

/// Adds a Client Message to the existing connection
///
/// This is a low level API and the message must follow the protocol of this
/// connection. It's the responsiblity of the caller to ensure this message
/// does not interfere with any running queries or transactions.
void addMessage(ClientMessage message) {
isoos marked this conversation as resolved.
Show resolved Hide resolved
if (isClosed) {
throw PostgreSQLException(
'Attempting to add a message, but connection is not open.');
}
_socket!.add(message.asBytes());
}

/// Executes a series of queries inside a transaction on this connection.
///
/// Queries executed inside [queryBlock] will be grouped together in a transaction. The return value of the [queryBlock]
Expand Down Expand Up @@ -264,6 +288,8 @@ class PostgreSQLConnection extends Object
}
await _notifications.close();

await _messages.close();

_queue.cancel(error, trace);
}

Expand All @@ -277,6 +303,9 @@ class PostgreSQLConnection extends Object
while (_framer.hasMessage) {
final msg = _framer.popMessage();
try {
if (_messages.hasListener) {
_messages.add(msg);
}
if (msg is ErrorResponseMessage) {
_transitionToState(_connectionState.onErrorResponse(msg));
} else if (msg is NotificationResponseMessage) {
Expand Down Expand Up @@ -423,11 +452,13 @@ abstract class _PostgreSQLExecutionContextMixin
Map<String, dynamic>? substitutionValues,
bool? allowReuse,
int? timeoutInSeconds,
bool? useSimpleQueryProtocol,
}) =>
_query(
fmtString,
substitutionValues: substitutionValues,
allowReuse: allowReuse ?? true,
useSimpleQueryProtocol: useSimpleQueryProtocol ?? false,
timeoutInSeconds: timeoutInSeconds,
);

Expand All @@ -437,9 +468,20 @@ abstract class _PostgreSQLExecutionContextMixin
required bool allowReuse,
int? timeoutInSeconds,
bool resolveOids = true,
bool useSimpleQueryProtocol = false,
}) async {
timeoutInSeconds ??= _connection.queryTimeoutInSeconds;

if (useSimpleQueryProtocol) {
// re-route the query to the `_execute` method which will execute the query
// using the Simple Query Protocol.
return _execute(
fmtString,
timeoutInSeconds: timeoutInSeconds,
onlyReturnAffectedRows: false,
);
}

if (_connection.isClosed) {
throw PostgreSQLException(
'Attempting to execute query, but connection is not open.');
Expand Down Expand Up @@ -493,18 +535,61 @@ abstract class _PostgreSQLExecutionContextMixin
Future<int> execute(String fmtString,
{Map<String, dynamic>? substitutionValues = const {},
int? timeoutInSeconds}) async {
timeoutInSeconds ??= _connection.queryTimeoutInSeconds;
final result = await _execute(
fmtString,
substitutionValues: substitutionValues,
timeoutInSeconds: _connection.queryTimeoutInSeconds,
onlyReturnAffectedRows: true,
);
return result.affectedRowCount;
}

// TODO: replace [execute] with [_execute] and remove `useSimpleQueryProtocol`
// from the [query] method in the future major breaking change.
Future<PostgreSQLResult> _execute(
String fmtString, {
Map<String, dynamic>? substitutionValues = const {},
required int timeoutInSeconds,
required bool onlyReturnAffectedRows,
}) async {
if (_connection.isClosed) {
throw PostgreSQLException(
'Attempting to execute query, but connection is not open.');
}

final query = Query<void>(fmtString, substitutionValues, _connection,
_transaction, StackTrace.current,
onlyReturnAffectedRowCount: true);
final query = Query<dynamic>(
fmtString,
substitutionValues,
_connection,
_transaction,
StackTrace.current,
useSendSimple: true,
// TODO: this could be removed from Query since useSendSimple covers the
// functionality.
onlyReturnAffectedRowCount: onlyReturnAffectedRows,
);

final result = await _enqueue(query, timeoutInSeconds: timeoutInSeconds);
return result.affectedRowCount;

final affectedRowCount = result.affectedRowCount;
final columnDescriptions = query.fieldDescriptions ?? [];
final metaData = _PostgreSQLResultMetaData(columnDescriptions);

final value = result.value;
late final List<PostgreSQLResultRow> rows;
if (value != null && value is List<List>) {
rows = value
.map((columns) => _PostgreSQLResultRow(metaData, columns))
.toList();
} else {
rows = [];
}

return _PostgreSQLResult(
affectedRowCount,
metaData,
rows,
);
}

@override
Expand Down
4 changes: 2 additions & 2 deletions lib/src/connection_fsm.dart
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class _PostgreSQLConnectionStateIdle extends _PostgreSQLConnectionState {

_PostgreSQLConnectionState processQuery(Query<dynamic> q) {
try {
if (q.onlyReturnAffectedRowCount) {
if (q.onlyReturnAffectedRowCount || q.useSendSimple) {
q.sendSimple(connection!._socket!);
return _PostgreSQLConnectionStateBusy(q);
}
Expand Down Expand Up @@ -333,7 +333,7 @@ class _PostgreSQLConnectionStateReadyInTransaction

_PostgreSQLConnectionState processQuery(Query<dynamic> q) {
try {
if (q.onlyReturnAffectedRowCount) {
if (q.onlyReturnAffectedRowCount || q.useSendSimple) {
q.sendSimple(connection!._socket!);
return _PostgreSQLConnectionStateBusy(q);
}
Expand Down
24 changes: 23 additions & 1 deletion lib/src/execution_context.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,31 @@ abstract class PostgreSQLExecutionContext {
/// By default, instances of this class will reuse queries. This allows significantly more efficient transport to and from the database. You do not have to do
/// anything to opt in to this behavior, this connection will track the necessary information required to reuse queries without intervention. (The [fmtString] is
/// the unique identifier to look up reuse information.) You can disable reuse by passing false for [allowReuse].
///
/// [useSimpleQueryProtocol] indicates that the query will be executed using
/// the [Simple Query Protocol][]. This is similar to runing [execute] but
/// instead of receiving the `affectedRowCount` only, this method will return
/// [PostgreSQLResult] which contains `affectedRowCount` in addition to any
/// data returned by the executed statement.
///
/// It's important to understand that when [useSimpleQueryProtocol] is `true`,
/// all values will be of type [String] even if they have different type in the
/// database. For instance, the value of an `int4` column will be returned as
/// a [String] instead of an [int].
///
/// Setting [useSimpleQueryProtocol] to `true` is mainly useful for when the
/// connection is established using the Streaming Replication Protocol. When
/// the connection is in replication mode, the default Extended Query Protocol
/// cannot be used as the database will throw an error and drop the connection.
/// In other words, only the Simple Query Protocol can be used with Streaming
/// Replication Protocol.
///
/// [Simple Query Protocol]: https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.4
Future<PostgreSQLResult> query(String fmtString,
{Map<String, dynamic>? substitutionValues,
bool? allowReuse,
int? timeoutInSeconds});
int? timeoutInSeconds,
bool? useSimpleQueryProtocol});

/// Executes a query on this context.
///
Expand Down Expand Up @@ -85,6 +106,7 @@ abstract class PostgreSQLExecutionContext {
{Map<String, dynamic>? substitutionValues,
bool? allowReuse,
int? timeoutInSeconds});

}

/// A description of a column.
Expand Down
19 changes: 19 additions & 0 deletions lib/src/query.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ class Query<T> {
this.transaction,
this.queryStackTrace, {
this.onlyReturnAffectedRowCount = false,
this.useSendSimple = false,
});

final bool onlyReturnAffectedRowCount;

final bool useSendSimple;

String? statementIdentifier;

Future<QueryResult<T>?> get future => _onComplete.future;
Expand Down Expand Up @@ -142,6 +145,22 @@ class Query<T> {
return;
}

// Simple queries do not follow the same binary codecs. All values will be
// returned as strings.
//
// For instance, a column can be defined as `int4` which is expected to be
// 4 bytes long (i.e. decoded using bytes.getUint32) but when using simple
// query (i.e. sendSimple), the value will be returned as a string.
//
// See Simple Query section in Protocol Message Flow:
// "In simple Query mode, the format of retrieved values is always text"
// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.4
if (useSendSimple) {
final data = rawRowData.map((e) => utf8.decode(e!));
rows.add(data.toList());
return;
}

final iterator = fieldDescriptions!.iterator;
final lazyDecodedData = rawRowData.map((bd) {
iterator.moveNext();
Expand Down