Skip to content

Commit

Permalink
MatrixRTCSession: handle rate limit errors (#4494)
Browse files Browse the repository at this point in the history
* MatrixRTCSession: handle rate limit errors

* Lint

* Handle ratelimiting for non-legacy state setting

Each request must be retried, as the non-legacy flow involves a sequence
of requests that must resolve in order.

* Fix broken test

* Check for MSC3757 instead of the unmerged MSC3779

* Move helper out of beforeEach

* Test ratelimit errors
  • Loading branch information
AndrewFerr authored Nov 11, 2024
1 parent 98f7637 commit 10a4fd8
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 35 deletions.
51 changes: 46 additions & 5 deletions spec/unit/matrixrtc/MatrixRTCSession.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -472,14 +472,54 @@ describe("MatrixRTCSession", () => {
const activeFocus = { type: "livekit", focus_selection: "oldest_membership" };

async function testJoin(useOwnedStateEvents: boolean): Promise<void> {
const realSetTimeout = setTimeout;
if (useOwnedStateEvents) {
mockRoom.getVersion = jest.fn().mockReturnValue("org.matrix.msc3779.default");
mockRoom.getVersion = jest.fn().mockReturnValue("org.matrix.msc3757.default");
}

jest.useFakeTimers();

// preparing the delayed disconnect should handle ratelimiting
const sendDelayedStateAttempt = new Promise<void>((resolve) => {
const error = new MatrixError({ errcode: "M_LIMIT_EXCEEDED" });
sendDelayedStateMock.mockImplementationOnce(() => {
resolve();
return Promise.reject(error);
});
});

// setting the membership state should handle ratelimiting (also with a retry-after value)
const sendStateEventAttempt = new Promise<void>((resolve) => {
const error = new MatrixError(
{ errcode: "M_LIMIT_EXCEEDED" },
429,
undefined,
undefined,
new Headers({ "Retry-After": "1" }),
);
sendStateEventMock.mockImplementationOnce(() => {
resolve();
return Promise.reject(error);
});
});

// needed to advance the mock timers properly
const scheduledDelayDisconnection = new Promise<void>((resolve) => {
const originalFn: () => void = (sess as any).scheduleDelayDisconnection;
(sess as any).scheduleDelayDisconnection = jest.fn(() => {
originalFn.call(sess);
resolve();
});
});

sess!.joinRoomSession([activeFocusConfig], activeFocus, { useLegacyMemberEvents: false });
await Promise.race([sentStateEvent, new Promise((resolve) => realSetTimeout(resolve, 500))]);

await sendDelayedStateAttempt;
jest.advanceTimersByTime(5000);

await sendStateEventAttempt.then(); // needed to resolve after resendIfRateLimited catches
jest.advanceTimersByTime(1000);

await sentStateEvent;
expect(client.sendStateEvent).toHaveBeenCalledWith(
mockRoom!.roomId,
EventType.GroupCallMemberPrefix,
Expand All @@ -493,9 +533,10 @@ describe("MatrixRTCSession", () => {
} satisfies SessionMembershipData,
`${!useOwnedStateEvents ? "_" : ""}@alice:example.org_AAAAAAA`,
);
await Promise.race([sentDelayedState, new Promise((resolve) => realSetTimeout(resolve, 500))]);
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1);
await sentDelayedState;

// should have prepared the heartbeat to keep delaying the leave event while still connected
await scheduledDelayDisconnection;
// should have tried updating the delayed leave to test that it wasn't replaced by own state
expect(client._unstable_updateDelayedEvent).toHaveBeenCalledTimes(1);
// should update delayed disconnect
Expand Down
94 changes: 64 additions & 30 deletions src/matrixrtc/MatrixRTCSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import { randomString, secureRandomBase64Url } from "../randomstring.ts";
import { EncryptionKeysEventContent } from "./types.ts";
import { decodeBase64, encodeUnpaddedBase64 } from "../base64.ts";
import { KnownMembership } from "../@types/membership.ts";
import { MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts";
import { HTTPError, MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts";
import { MatrixEvent } from "../models/event.ts";
import { isLivekitFocusActive } from "./LivekitFocus.ts";
import { ExperimentalGroupCallRoomMemberState } from "../webrtc/groupCall.ts";
Expand Down Expand Up @@ -1031,39 +1031,39 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
const prepareDelayedDisconnection = async (): Promise<void> => {
try {
// TODO: If delayed event times out, re-join!
const res = await this.client._unstable_sendDelayedStateEvent(
this.room.roomId,
{
delay: 8000,
},
EventType.GroupCallMemberPrefix,
{}, // leave event
stateKey,
const res = await resendIfRateLimited(() =>
this.client._unstable_sendDelayedStateEvent(
this.room.roomId,
{
delay: 8000,
},
EventType.GroupCallMemberPrefix,
{}, // leave event
stateKey,
),
);
this.disconnectDelayId = res.delay_id;
} catch (e) {
// TODO: Retry if rate-limited
logger.error("Failed to prepare delayed disconnection event:", e);
}
};
await prepareDelayedDisconnection();
// Send join event _after_ preparing the delayed disconnection event
await this.client.sendStateEvent(
this.room.roomId,
EventType.GroupCallMemberPrefix,
newContent,
stateKey,
await resendIfRateLimited(() =>
this.client.sendStateEvent(this.room.roomId, EventType.GroupCallMemberPrefix, newContent, stateKey),
);
// If sending state cancels your own delayed state, prepare another delayed state
// TODO: Remove this once MSC4140 is stable & doesn't cancel own delayed state
if (this.disconnectDelayId !== undefined) {
try {
await this.client._unstable_updateDelayedEvent(
this.disconnectDelayId,
UpdateDelayedEventAction.Restart,
const knownDisconnectDelayId = this.disconnectDelayId;
await resendIfRateLimited(() =>
this.client._unstable_updateDelayedEvent(
knownDisconnectDelayId,
UpdateDelayedEventAction.Restart,
),
);
} catch (e) {
// TODO: Make embedded client include errcode, and retry only if not M_NOT_FOUND (or rate-limited)
logger.warn("Failed to update delayed disconnection event, prepare it again:", e);
this.disconnectDelayId = undefined;
await prepareDelayedDisconnection();
Expand All @@ -1076,23 +1076,27 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
let sentDelayedDisconnect = false;
if (this.disconnectDelayId !== undefined) {
try {
await this.client._unstable_updateDelayedEvent(
this.disconnectDelayId,
UpdateDelayedEventAction.Send,
const knownDisconnectDelayId = this.disconnectDelayId;
await resendIfRateLimited(() =>
this.client._unstable_updateDelayedEvent(
knownDisconnectDelayId,
UpdateDelayedEventAction.Send,
),
);
sentDelayedDisconnect = true;
} catch (e) {
// TODO: Retry if rate-limited
logger.error("Failed to send our delayed disconnection event:", e);
}
this.disconnectDelayId = undefined;
}
if (!sentDelayedDisconnect) {
await this.client.sendStateEvent(
this.room.roomId,
EventType.GroupCallMemberPrefix,
{},
this.makeMembershipStateKey(localUserId, localDeviceId),
await resendIfRateLimited(() =>
this.client.sendStateEvent(
this.room.roomId,
EventType.GroupCallMemberPrefix,
{},
this.makeMembershipStateKey(localUserId, localDeviceId),
),
);
}
}
Expand All @@ -1111,10 +1115,12 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M

private readonly delayDisconnection = async (): Promise<void> => {
try {
await this.client._unstable_updateDelayedEvent(this.disconnectDelayId!, UpdateDelayedEventAction.Restart);
const knownDisconnectDelayId = this.disconnectDelayId!;
await resendIfRateLimited(() =>
this.client._unstable_updateDelayedEvent(knownDisconnectDelayId, UpdateDelayedEventAction.Restart),
);
this.scheduleDelayDisconnection();
} catch (e) {
// TODO: Retry if rate-limited
logger.error("Failed to delay our disconnection event:", e);
}
};
Expand Down Expand Up @@ -1162,3 +1168,31 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
this.sendEncryptionKeysEvent(newKeyIndex);
};
}

async function resendIfRateLimited<T>(func: () => Promise<T>, numRetriesAllowed: number = 1): Promise<T> {
// eslint-disable-next-line no-constant-condition
while (true) {
try {
return await func();
} catch (e) {
if (numRetriesAllowed > 0 && e instanceof HTTPError && e.isRateLimitError()) {
numRetriesAllowed--;
let resendDelay: number;
const defaultMs = 5000;
try {
resendDelay = e.getRetryAfterMs() ?? defaultMs;
logger.info(`Rate limited by server, retrying in ${resendDelay}ms`);
} catch (e) {
logger.warn(
`Error while retrieving a rate-limit retry delay, retrying after default delay of ${defaultMs}`,
e,
);
resendDelay = defaultMs;
}
await sleep(resendDelay);
} else {
throw e;
}
}
}
}

0 comments on commit 10a4fd8

Please sign in to comment.