Skip to content

Commit

Permalink
Spawn runSender logic in an Isolate
Browse files Browse the repository at this point in the history
This still ignores UI updates from the spawned isolate and doesn't even
have a proper splash screen for a confirmed send.

It has also yet to be smoke tested
  • Loading branch information
DanGould committed Dec 20, 2024
1 parent a262d70 commit 4b49339
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 96 deletions.
233 changes: 169 additions & 64 deletions lib/_pkg/payjoin/manager.dart
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import 'dart:async';
import 'dart:isolate';
import 'dart:math';

import 'package:bb_mobile/_model/transaction.dart';
import 'package:bb_mobile/_model/wallet.dart';
import 'package:bb_mobile/_pkg/error.dart';
import 'package:bb_mobile/_pkg/wallet/transaction.dart';
import 'package:dio/dio.dart';
import 'package:payjoin_flutter/common.dart';
import 'package:payjoin_flutter/send.dart';
import 'package:payjoin_flutter/src/generated/frb_generated.dart';
import 'package:payjoin_flutter/uri.dart' as pj_uri;

const List<String> _ohttpRelayUrls = [
Expand All @@ -12,6 +18,11 @@ const List<String> _ohttpRelayUrls = [
];

class PayjoinManager {
PayjoinManager(this._walletTx);
final WalletTx _walletTx;
final Map<String, Isolate> _activePollers = {};
final Map<String, ReceivePort> _activePorts = {};

Future<Sender> initSender(
String pjUriString,
int networkFeesSatPerVb,
Expand Down Expand Up @@ -42,81 +53,175 @@ class PayjoinManager {
}
}

/// Sends a payjoin using the v2 protocol given an initialized Sender.
/// V2 protocol first attempts a v2 request, but if one cannot be extracted
/// from the given bitcoin URI, it will attempt to send a v1 request.
Future<String?> runSender(Sender sender) async {
Request postReq;
V2PostContext postReqCtx;
final dio = Dio();

Future<Err?> spawnSender({
required bool isTestnet,
required Sender sender,
required Wallet wallet,
required Transaction transaction,
required String address,
}) async {
try {
final result =
await sender.extractV2(ohttpProxyUrl: await _randomOhttpRelayUrl());
postReq = result.$1;
postReqCtx = result.$2;
} catch (e) {
// extract v2 failed, attempt to send v1
return await _runSenderV1(sender, dio);
}
final completer = Completer<Err?>();
final receivePort = ReceivePort();

try {
final postRes = await _postRequest(dio, postReq);
final getCtx = await postReqCtx.processResponse(
response: postRes.data as List<int>,
);
while (true) {
try {
final (getRequest, getReqCtx) = await getCtx.extractReq(
ohttpRelay: await _randomOhttpRelayUrl(),
);
final getRes = await _postRequest(dio, getRequest);
return await getCtx.processResponse(
response: getRes.data as List<int>,
ohttpCtx: getReqCtx,
);
} catch (e) {
// loop
// TODO Create unique ID for this payjoin session
const sessionId = 'TODO_SENDER_ENDPOINT';

receivePort.listen((message) async {
if (message is Map<String, dynamic>) {
if (message['type'] == 'psbt_to_sign') {
final proposalPsbt = message['psbt'] as String;
final (wtxid, err) = await _walletTx.signAndBroadcastPsbt(
psbt: proposalPsbt,
wallet: wallet,
transaction: transaction,
address: address,
);
// TODO propagate success or failure to the UI including this logic
// from when the signing and broadcasting was done in the send cubit

// final txWithId = state.tx?.copyWith(txid: wtxid?.$2 ?? '');
// emit(state.copyWith(tx: txWithId));

// final (updatedWallet, _) = wtxid!;
// state.selectedWalletBloc!.add(
// UpdateWallet(
// updatedWallet,
// updateTypes: [
// UpdateWalletTypes.addresses,
// UpdateWalletTypes.transactions,
// UpdateWalletTypes.swaps,
// ],
// ),
// );
if (err != null) {
completer.complete(err);
return;
}
await _cleanupSession(sessionId);
} else if (message is Err) {
// TODO propagate this error to the UI
await _cleanupSession(sessionId);
}
}
}
});

final args = [
receivePort.sendPort,
sender.toJson(),
];

final isolate = await Isolate.spawn(
_isolateSender,
args,
);
_activePollers[sessionId] = isolate;
_activePorts[sessionId] = receivePort;
return completer.future;
} catch (e) {
throw Exception('Error polling payjoin sender: $e');
return Err(e.toString());
}
}

/// Returns a random OHTTP proxy URL from the list of available URLs.
/// Random proxying makes it more difficult for a single ohttp relay or
/// payjoin directory to conduct attacks based on timing metadata.
Future<pj_uri.Url> _randomOhttpRelayUrl() async {
return await pj_uri.Url.fromStr(
_ohttpRelayUrls[Random.secure().nextInt(_ohttpRelayUrls.length)],
);
Future<void> _cleanupSession(String sessionId) async {
_activePollers[sessionId]?.kill();
_activePollers.remove(sessionId);
_activePorts[sessionId]?.close();
_activePorts.remove(sessionId);
}
}

// Attempt to send a payjoin using the v1 protocol as fallback.
Future<String> _runSenderV1(Sender sender, Dio dio) async {
try {
final (req, v1Ctx) = await sender.extractV1();
final response = await _postRequest(dio, req);
final proposalPsbt =
await v1Ctx.processResponse(response: response.data as List<int>);
return proposalPsbt;
} catch (e) {
throw Exception('Send V1 payjoin error: $e');
}
// Top-level function to generate random OHTTP relay URL
Future<pj_uri.Url> _randomOhttpRelayUrl() async {
return await pj_uri.Url.fromStr(
_ohttpRelayUrls[Random.secure().nextInt(_ohttpRelayUrls.length)],
);
}

/// Top-level function that runs inside the isolate.
/// It should not reference instance-specific variables or methods.
Future<void> _isolateSender(List<dynamic> args) async {
// Initialize any core dependencies here if required
await core.init();

final sendPort = args[0] as SendPort;
final senderJson = args[1] as String;

// Reconstruct the Sender from the JSON
final sender = Sender.fromJson(senderJson);

// Run the sender logic inside the isolate
try {
final proposalPsbt = await _runSender(sender);
sendPort.send({
'type': 'psbt_to_sign',
'psbt': proposalPsbt,
});
} catch (e) {
sendPort.send(Err(e.toString()));
}
}

/// Top-level function that attempts to run payjoin sender (V2 protocol first, fallback to V1).
Future<String?> _runSender(Sender sender) async {
final dio = Dio();

try {
final result = await sender.extractV2(
ohttpProxyUrl: await _randomOhttpRelayUrl(),
);
final postReq = result.$1;
final postReqCtx = result.$2;

/// Take a Request from the payjoin sender and post it over OHTTP.
Future<Response<dynamic>> _postRequest(Dio dio, Request req) async {
return await dio.post(
req.url.asString(),
options: Options(
headers: {
'Content-Type': req.contentType,
},
responseType: ResponseType.bytes,
),
data: req.body,
// Attempt V2
final postRes = await _postRequest(dio, postReq);
final getCtx = await postReqCtx.processResponse(
response: postRes.data as List<int>,
);

while (true) {
try {
final (getRequest, getReqCtx) = await getCtx.extractReq(
ohttpRelay: await _randomOhttpRelayUrl(),
);
final getRes = await _postRequest(dio, getRequest);
return await getCtx.processResponse(
response: getRes.data as List<int>,
ohttpCtx: getReqCtx,
);
} catch (e) {
// Loop until a valid response is found
}
}
} catch (e) {
// If V2 fails, attempt V1
return await _runSenderV1(sender, dio);
}
}

/// Attempt to send payjoin using the V1 protocol.
Future<String> _runSenderV1(Sender sender, Dio dio) async {
try {
final (req, v1Ctx) = await sender.extractV1();
final response = await _postRequest(dio, req);
final proposalPsbt =
await v1Ctx.processResponse(response: response.data as List<int>);
return proposalPsbt;
} catch (e) {
throw Exception('Send V1 payjoin error: $e');
}
}

/// Posts a request via dio and returns the response.
Future<Response<dynamic>> _postRequest(Dio dio, Request req) async {
return await dio.post(
req.url.asString(),
options: Options(
headers: {
'Content-Type': req.contentType,
},
responseType: ResponseType.bytes,
),
data: req.body,
);
}
2 changes: 1 addition & 1 deletion lib/locator.dart
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ Future _setupBlocs() async {
);

locator.registerSingleton<PayjoinManager>(
PayjoinManager(),
PayjoinManager(locator<WalletTx>()),
);

locator.registerSingleton<NetworkFeesCubit>(
Expand Down
32 changes: 4 additions & 28 deletions lib/send/bloc/send_cubit.dart
Original file line number Diff line number Diff line change
Expand Up @@ -956,37 +956,13 @@ class SendCubit extends Cubit<SendState> {
// TODO copy originalPsbt.extractTx() to state.tx
// emit(state.copyWith(tx: originalPsbtTxWithId));
emit(state.copyWith(sending: true, sent: false));
final proposalPsbt = await _payjoinManager.runSender(
state.payjoinSender!,
);
final (wtxid, errSignBroadcast) = await _walletTx.signAndBroadcastPsbt(
await _payjoinManager.spawnSender(
isTestnet: _networkCubit.state.testnet,
sender: state.payjoinSender!,
wallet: wallet,
psbt: proposalPsbt!,
address: address,
note: state.note,
transaction: state.tx!,
address: address,
);
if (errSignBroadcast != null) {
emit(state.copyWith(
errSending: errSignBroadcast.toString(), sending: false));
return;
}

final txWithId = state.tx?.copyWith(txid: wtxid?.$2 ?? '');
emit(state.copyWith(tx: txWithId));

final (updatedWallet, _) = wtxid!;
state.selectedWalletBloc!.add(
UpdateWallet(
updatedWallet,
updateTypes: [
UpdateWalletTypes.addresses,
UpdateWalletTypes.transactions,
UpdateWalletTypes.swaps,
],
),
);

Future.delayed(150.ms);
state.selectedWalletBloc!.add(SyncWallet());

Expand Down
6 changes: 3 additions & 3 deletions pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ packages:
source: hosted
version: "1.18.0"
convert:
dependency: transitive
dependency: "direct main"
description:
name: convert
sha256: b30acd5944035672bc15c6b7a8b47d773e41e2f17de064350988c5d02adb1c68
Expand All @@ -280,7 +280,7 @@ packages:
source: hosted
version: "0.3.4+2"
crypto:
dependency: transitive
dependency: "direct main"
description:
name: crypto
sha256: "1e445881f28f22d6140f181e07737b22f1e099a5e1ff94b0af2f9e4a463f4855"
Expand Down Expand Up @@ -644,7 +644,7 @@ packages:
source: hosted
version: "2.5.7"
freezed_annotation:
dependency: transitive
dependency: "direct main"
description:
name: freezed_annotation
sha256: c2e2d632dd9b8a2b7751117abcfc2b4888ecfe181bd9fca7170d9ef02e595fe2
Expand Down

0 comments on commit 4b49339

Please sign in to comment.