Skip to content

Commit

Permalink
fix: cleaning up Monitor resources
Browse files Browse the repository at this point in the history
[ci skip]
  • Loading branch information
tegefaulkes committed Jul 6, 2023
1 parent ed6f3a3 commit 9ed44df
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 48 deletions.
78 changes: 39 additions & 39 deletions src/QUICConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import type {
VerifyCallback,
} from './types';
import type { Connection, ConnectionErrorCode, SendInfo } from './native/types';
import { Lock, LockBox, Monitor, RWLockWriter } from '@matrixai/async-locks';
import type { Monitor } from '@matrixai/async-locks';
import { Lock, LockBox, RWLockWriter } from '@matrixai/async-locks';
import {
ready,
running,
Expand Down Expand Up @@ -515,29 +516,34 @@ class QUICConnection extends EventTarget {
this.logger.debug('streams destroyed');
this.stopKeepAliveIntervalTimer();

mon = mon ?? new Monitor<RWLockWriter>(this.lockbox, RWLockWriter);
// Trigger closing connection in the background and await close later.
void mon.withF(this.lockCode, async (mon) => {
// If this is already closed, then `Done` will be thrown
// Otherwise it can send `CONNECTION_CLOSE` frame
// This can be 0x1c close at the QUIC layer or no errors
// Or it can be 0x1d for application close with an error
// Upon receiving a `CONNECTION_CLOSE`, you can send back
// 1 packet containing a `CONNECTION_CLOSE` frame too
// (with `NO_ERROR` code if appropriate)
// It must enter into a draining state, and no other packets can be sent
try {
this.conn.close(applicationError, errorCode, Buffer.from(errorMessage));
// If we get a `Done` exception we don't bother calling send
// The send only gets sent if the `Done` is not the case
await this.send(mon);
} catch (e) {
// Ignore 'Done' if already closed
if (e.message !== 'Done') {
// No other exceptions are expected
never();
void utils.withMonitor(mon, this.lockbox, RWLockWriter, async (mon) => {
await mon.withF(this.lockCode, async (mon) => {
// If this is already closed, then `Done` will be thrown
// Otherwise it can send `CONNECTION_CLOSE` frame
// This can be 0x1c close at the QUIC layer or no errors
// Or it can be 0x1d for application close with an error
// Upon receiving a `CONNECTION_CLOSE`, you can send back
// 1 packet containing a `CONNECTION_CLOSE` frame too
// (with `NO_ERROR` code if appropriate)
// It must enter into a draining state, and no other packets can be sent
try {
this.conn.close(
applicationError,
errorCode,
Buffer.from(errorMessage),
);
// If we get a `Done` exception we don't bother calling send
// The send only gets sent if the `Done` is not the case
await this.send(mon);
} catch (e) {
// Ignore 'Done' if already closed
if (e.message !== 'Done') {
// No other exceptions are expected
never();
}
}
}
});
});

if (this.conn.isClosed()) {
Expand Down Expand Up @@ -750,17 +756,14 @@ class QUICConnection extends EventTarget {
* Any errors must be emitted as events.
* @internal
*/
public async send(
mon: Monitor<RWLockWriter> = new Monitor<RWLockWriter>(
this.lockbox,
RWLockWriter,
),
): Promise<void> {
if (!mon.isLocked(this.lockCode)) {
return mon.withF(this.lockCode, async (mon) => {
return this.send(mon);
});
}
public async send(mon?: Monitor<RWLockWriter>): Promise<void> {
await utils.withMonitor(mon, this.lockbox, RWLockWriter, async (mon) => {
if (!mon.isLocked(this.lockCode)) {
return mon.withF(this.lockCode, async (mon) => {
return this.send(mon);
});
}
});

const sendBuffer = new Uint8Array(quiche.MAX_DATAGRAM_SIZE);
let sendLength: number;
Expand Down Expand Up @@ -914,17 +917,15 @@ class QUICConnection extends EventTarget {
this.resolveClosedP();
// If we are still running and not stopping then we need to stop
if (this[running] && this[status] !== 'stopping') {
const mon = new Monitor(this.lockbox, RWLockWriter);
// Background stopping, we don't want to block the timer resolving
void this.stop({ force: true }, mon);
void this.stop({ force: true });
}
logger.debug('CLEANING UP TIMER');
return;
}

const mon = new Monitor(this.lockbox, RWLockWriter);
// There may be data to send after timing out
void this.send(mon);
void this.send();

// Note that a `0` timeout is still a valid timeout
const timeout = this.conn.timeout();
Expand Down Expand Up @@ -982,9 +983,8 @@ class QUICConnection extends EventTarget {
// Intelligently schedule a PING frame.
// If the connection has already sent ack-eliciting frames
// then this is a noop.
const mon = new Monitor(this.lockbox, RWLockWriter);
this.conn.sendAckEliciting();
await this.send(mon);
await this.send();
this.keepAliveIntervalTimer = new Timer({
delay: ms,
handler: keepAliveHandler,
Expand Down
22 changes: 14 additions & 8 deletions src/QUICSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import dgram from 'dgram';
import Logger from '@matrixai/logger';
import { running } from '@matrixai/async-init';
import { StartStop, ready } from '@matrixai/async-init/dist/StartStop';
import { Monitor, RWLockWriter } from '@matrixai/async-locks';
import { RWLockWriter } from '@matrixai/async-locks';
import { status } from '@matrixai/async-init/dist/utils';
import QUICConnectionId from './QUICConnectionId';
import QUICConnectionMap from './QUICConnectionMap';
Expand Down Expand Up @@ -107,13 +107,19 @@ class QUICSocket extends EventTarget {
// Acquire the conn lock, this ensures mutual exclusion
// for state changes on the internal connection
try {
const mon = new Monitor<RWLockWriter>(connection.lockbox, RWLockWriter);
await mon.withF(connection.lockCode, async (mon) => {
// Even if we are `stopping`, the `quiche` library says we need to
// continue processing any packets.
await connection.recv(data, remoteInfo_, mon);
await connection.send(mon);
});
await utils.withMonitor(
undefined,
connection.lockbox,
RWLockWriter,
async (mon) => {
await mon.withF(connection.lockCode, async (mon) => {
// Even if we are `stopping`, the `quiche` library says we need to
// continue processing any packets.
await connection.recv(data, remoteInfo_, mon);
await connection.send(mon);
});
},
);
} catch (e) {
// Race condition with destroying socket, just ignore
if (!(e instanceof errors.ErrorQUICSocketNotRunning)) throw e;
Expand Down
16 changes: 16 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import type {
ServerCrypto,
} from './types';
import type { Connection } from '@/native';
import type { LockBox, RWLockWriter } from '@matrixai/async-locks';
import dns from 'dns';
import { IPv4, IPv6, Validator } from 'ip-num';
import { Monitor } from '@matrixai/async-locks';
import QUICConnectionId from './QUICConnectionId';
import * as errors from './errors';

Expand Down Expand Up @@ -428,6 +430,19 @@ function streamStats(
`;
}

async function withMonitor<T>(
mon: Monitor<RWLockWriter> | undefined,
lockBox: LockBox<RWLockWriter>,
lockConstructor: { new (): RWLockWriter },
fun: (mon: Monitor<RWLockWriter>) => Promise<T>,
locksPending?: Map<string, { count: number }>,
): Promise<T> {
const _mon = mon ?? new Monitor(lockBox, lockConstructor, locksPending);
const result = await fun(_mon);
if (mon != null) await _mon.unlockAll();
return result;
}

export {
isIPv4,
isIPv6,
Expand Down Expand Up @@ -455,4 +470,5 @@ export {
validateToken,
sleep,
streamStats,
withMonitor,
};
2 changes: 1 addition & 1 deletion tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ async function generateCertificate({
return await x509.X509CertificateGenerator.create(certConfig);
}

// async function createTLSConfigWithChain(
// Async function createTLSConfigWithChain(
// keyPairs: Array<{
// publicKey: JsonWebKey;
// privateKey: JsonWebKey;
Expand Down

0 comments on commit 9ed44df

Please sign in to comment.