Skip to content

Commit

Permalink
Close vfs when wasm database is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
simolus3 committed Jan 6, 2024
1 parent 42a0a40 commit 09c6cf0
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 27 deletions.
1 change: 1 addition & 0 deletions drift/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## 2.15.0-dev

- Methods in the query builder API now respect custom types.
- Close wasm databases hosted in workers after the last client disconnects.

## 2.14.1

Expand Down
33 changes: 30 additions & 3 deletions drift/lib/src/web/channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import 'dart:html';

import 'package:stream_channel/stream_channel.dart';

const _disconnectMessage = '_disconnect';

/// Extension to transform a raw [MessagePort] from web workers into a Dart
/// [StreamChannel].
extension PortToChannel on MessagePort {
Expand All @@ -10,10 +12,35 @@ extension PortToChannel on MessagePort {
///
/// This can be used to implement a remote database connection over service
/// workers.
StreamChannel<Object?> channel() {
///
/// The [explicitClose] parameter can be used to control whether a close
/// message should be sent through the channel when it is closed. This will
/// cause it to be closed on the other end as well. Note that this is not a
/// reliable way of determining channel closures though, as there is no event
/// for channels being closed due to a tab or worker being closed.
/// Both "ends" of a JS channel calling [channel] on their part must use the
/// value for [explicitClose].
StreamChannel<Object?> channel({bool explicitClose = false}) {
final controller = StreamChannelController<Object?>();
onMessage.map((event) => event.data).pipe(controller.local.sink);
controller.local.stream.listen(postMessage, onDone: close);
onMessage.listen((event) {
final message = event.data;

if (explicitClose && message == _disconnectMessage) {
// Other end has closed the connection
controller.local.sink.close();
} else {
controller.local.sink.add(message);
}
});

controller.local.stream.listen(postMessage, onDone: () {
// Closed locally, inform the other end.
if (explicitClose) {
postMessage(_disconnectMessage);
}

close();
});

return controller.foreign;
}
Expand Down
36 changes: 26 additions & 10 deletions drift/lib/src/web/wasm_setup.dart
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class WasmDatabaseOpener {
as DedicatedWorkerCompatibilityResult;

_handleCompatibilityResult(status);
dedicatedWorker.version = status.version;

if (status.supportsNestedWorkers &&
status.canAccessOpfs &&
Expand Down Expand Up @@ -142,6 +143,7 @@ class WasmDatabaseOpener {
as SharedWorkerCompatibilityResult;

_handleCompatibilityResult(sharedFeatures);
shared.version = sharedFeatures.version;

// Prefer to use the shared worker to host the database if it supports the
// necessary APIs.
Expand All @@ -160,6 +162,7 @@ class WasmDatabaseOpener {

final class _DriftWorker {
final AbstractWorker worker;
ProtocolVersion version = ProtocolVersion.legacy;

/// The message port to communicate with the worker, if it's a shared worker.
final MessagePort? portForShared;
Expand Down Expand Up @@ -225,27 +228,37 @@ final class _ProbeResult implements WasmProbeResult {
final channel = MessageChannel();
final initializer = initializeDatabase;
final initChannel = initializer != null ? MessageChannel() : null;
final local = channel.port1.channel();

final message = ServeDriftDatabase(
sqlite3WasmUri: opener.sqlite3WasmUri,
port: channel.port2,
storage: implementation,
databaseName: name,
initializationPort: initChannel?.port2,
);

ServeDriftDatabase message;
final sharedWorker = opener._sharedWorker;
final dedicatedWorker = opener._dedicatedWorker;

switch (implementation) {
case WasmStorageImplementation.opfsShared:
case WasmStorageImplementation.sharedIndexedDb:
// Forward connection request to shared worker.
message.sendTo(sharedWorker!.send);
message = ServeDriftDatabase(
sqlite3WasmUri: opener.sqlite3WasmUri,
port: channel.port2,
storage: implementation,
databaseName: name,
initializationPort: initChannel?.port2,
protocolVersion: sharedWorker!.version,
);

message.sendTo(sharedWorker.send);
case WasmStorageImplementation.opfsLocks:
case WasmStorageImplementation.unsafeIndexedDb:
if (dedicatedWorker != null) {
message = ServeDriftDatabase(
sqlite3WasmUri: opener.sqlite3WasmUri,
port: channel.port2,
storage: implementation,
databaseName: name,
initializationPort: initChannel?.port2,
protocolVersion: dedicatedWorker.version,
);

message.sendTo(dedicatedWorker.send);
} else {
// Workers seem to be broken, but we don't need them with this storage
Expand Down Expand Up @@ -276,6 +289,9 @@ final class _ProbeResult implements WasmProbeResult {
}
});

final local = channel.port1
.channel(explicitClose: message.protocolVersion >= ProtocolVersion.v1);

var connection = await connectToRemoteAndInitialize(local);
if (implementation == WasmStorageImplementation.opfsLocks) {
// We want stream queries to update for writes in other tabs. For the
Expand Down
1 change: 1 addition & 0 deletions drift/lib/src/web/wasm_setup/dedicated_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class DedicatedDriftWorker {
opfsExists: opfsExists,
indexedDbExists: indexedDbExists,
existingDatabases: existingDatabases,
version: ProtocolVersion.current,
).sendToClient(self);
case ServeDriftDatabase():
_servers.serve(message);
Expand Down
73 changes: 73 additions & 0 deletions drift/lib/src/web/wasm_setup/protocol.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,56 @@ import 'package:sqlite3/wasm.dart';

import 'types.dart';

/// Due to in-browser caching or users not updating their `drift_worker.dart`
/// file after updating drift, the main web app and the workers may be compiled
/// with different versions of drift. To avoid inconsistencies in the
/// communication channel between them, they compare their versions in a
/// handshake and only use features supported by both.
class ProtocolVersion {
final int versionCode;

const ProtocolVersion._(this.versionCode);

void writeToJs(Object object) {
setProperty(object, 'v', versionCode);
}

bool operator >=(ProtocolVersion other) {
return versionCode >= other.versionCode;
}

static ProtocolVersion negotiate(int? versionCode) {
return switch (versionCode) {
null => legacy,
<= 0 => legacy,
1 => v1,
> 1 => current,
_ => throw AssertionError(),
};
}

static ProtocolVersion fromJsObject(Object object) {
if (hasProperty(object, 'v')) {
return negotiate(getProperty<int>(object, 'v'));
} else {
return legacy;
}
}

/// The protocol version used for drift versions up to 2.14 - these don't have
/// a version marker anywhere.
static const legacy = ProtocolVersion._(0);

/// This version makes workers report their supported protocol version.
///
/// When both the client and the involved worker support this version, an
/// explicit close notification is sent from clients to workers when closing
/// databases. This allows workers to release resources more effieciently.
static const v1 = ProtocolVersion._(1);

static const current = v1;
}

typedef PostMessage = void Function(Object? msg, [List<Object>? transfer]);

/// Sealed superclass for JavaScript objects exchanged between the UI tab and
Expand Down Expand Up @@ -65,6 +115,12 @@ sealed class CompatibilityResult extends WasmInitializationMessage {
/// be used to check whether the database exists.
final List<ExistingDatabase> existingDatabases;

/// The latest protocol version spoken by the worker.
///
/// Workers only started to report their version in drift 2.15, we assume
/// [ProtocolVersion.legacy] for workers that don't report their version.
final ProtocolVersion version;

final bool indexedDbExists;
final bool opfsExists;

Expand All @@ -74,6 +130,7 @@ sealed class CompatibilityResult extends WasmInitializationMessage {
required this.existingDatabases,
required this.indexedDbExists,
required this.opfsExists,
required this.version,
});
}

Expand All @@ -96,16 +153,23 @@ final class SharedWorkerCompatibilityResult extends CompatibilityResult {
required super.indexedDbExists,
required super.opfsExists,
required super.existingDatabases,
required super.version,
});

factory SharedWorkerCompatibilityResult.fromJsPayload(Object payload) {
final asList = payload as List;
final asBooleans = asList.cast<bool>();

final List<ExistingDatabase> existingDatabases;
var version = ProtocolVersion.legacy;

if (asList.length > 5) {
existingDatabases =
EncodeLocations.readFromJs(asList[5] as List<dynamic>);

if (asList.length > 6) {
version = ProtocolVersion.negotiate(asList[6] as int);
}
} else {
existingDatabases = const [];
}
Expand All @@ -117,6 +181,7 @@ final class SharedWorkerCompatibilityResult extends CompatibilityResult {
indexedDbExists: asBooleans[3],
opfsExists: asBooleans[4],
existingDatabases: existingDatabases,
version: version,
);
}

Expand All @@ -129,6 +194,7 @@ final class SharedWorkerCompatibilityResult extends CompatibilityResult {
indexedDbExists,
opfsExists,
existingDatabases.encodeToJs(),
version.versionCode,
]);
}

Expand Down Expand Up @@ -175,13 +241,15 @@ final class ServeDriftDatabase extends WasmInitializationMessage {
final WasmStorageImplementation storage;
final String databaseName;
final MessagePort? initializationPort;
final ProtocolVersion protocolVersion;

ServeDriftDatabase({
required this.sqlite3WasmUri,
required this.port,
required this.storage,
required this.databaseName,
required this.initializationPort,
required this.protocolVersion,
});

factory ServeDriftDatabase.fromJsPayload(Object payload) {
Expand All @@ -192,6 +260,7 @@ final class ServeDriftDatabase extends WasmInitializationMessage {
.byName(getProperty(payload, 'storage')),
databaseName: getProperty(payload, 'database'),
initializationPort: getProperty(payload, 'initPort'),
protocolVersion: ProtocolVersion.fromJsObject(payload),
);
}

Expand All @@ -204,6 +273,7 @@ final class ServeDriftDatabase extends WasmInitializationMessage {
setProperty(object, 'database', databaseName);
final initPort = initializationPort;
setProperty(object, 'initPort', initPort);
protocolVersion.writeToJs(object);

sender.sendTyped(type, object, [
port,
Expand Down Expand Up @@ -249,6 +319,7 @@ final class DedicatedWorkerCompatibilityResult extends CompatibilityResult {
required super.indexedDbExists,
required super.opfsExists,
required super.existingDatabases,
required super.version,
});

factory DedicatedWorkerCompatibilityResult.fromJsPayload(Object payload) {
Expand All @@ -268,6 +339,7 @@ final class DedicatedWorkerCompatibilityResult extends CompatibilityResult {
indexedDbExists: getProperty(payload, 'indexedDbExists'),
opfsExists: getProperty(payload, 'opfsExists'),
existingDatabases: existingDatabases,
version: ProtocolVersion.fromJsObject(payload),
);
}

Expand All @@ -283,6 +355,7 @@ final class DedicatedWorkerCompatibilityResult extends CompatibilityResult {
setProperty(object, 'indexedDbExists', indexedDbExists);
setProperty(object, 'opfsExists', opfsExists);
setProperty(object, 'existing', existingDatabases.encodeToJs());
version.writeToJs(object);

sender.sendTyped(type, object);
}
Expand Down
Loading

0 comments on commit 09c6cf0

Please sign in to comment.