From 2b39253b0675557ee612592afda090933b39b18c Mon Sep 17 00:00:00 2001 From: Govorunb Date: Thu, 13 Jun 2024 18:17:01 +1000 Subject: [PATCH] add stress test re-send unsent messages from queue --- ebs/src/modules/game/connection.ts | 85 +++++++++----- ebs/src/modules/game/index.ts | 37 +++++- ebs/src/modules/game/stresstest.ts | 181 +++++++++++++++++++++++++++++ ebs/src/modules/transactions.ts | 7 +- ebs/src/util/jwt.ts | 4 + ebs/src/util/middleware.ts | 2 +- 6 files changed, 282 insertions(+), 34 deletions(-) create mode 100644 ebs/src/modules/game/stresstest.ts diff --git a/ebs/src/modules/game/connection.ts b/ebs/src/modules/game/connection.ts index 3bb736d..43cc3fd 100644 --- a/ebs/src/modules/game/connection.ts +++ b/ebs/src/modules/game/connection.ts @@ -17,6 +17,8 @@ export class GameConnection { private outstandingRedeems: Map = new Map(); private resultHandlers: Map = new Map(); static resultWaitTimeout: number = 10000; + private resendIntervalHandle?: number; + private resendInterval = 500; public isConnected() { return this.socket?.readyState == ServerWS.OPEN; @@ -29,9 +31,9 @@ export class GameConnection { if (!ws) { return; } - ws.on('connection', () => { - this.handshake = false; - }) + console.log("Connected to game"); + this.handshake = false; + this.resendIntervalHandle = +setInterval(() => this.tryResendFromQueue(), this.resendInterval); ws.on('message', async (message) => { const msgText = message.toString(); let msg: GameMessage; @@ -46,11 +48,15 @@ export class GameConnection { this.processMessage(msg); }); ws.on("close", (code, reason) => { - console.log(`Connection closed with code ${code} and reason ${reason}`); + const reasonStr = reason ? `reason '${reason}'` : "no reason" + console.log(`Game socket closed with code ${code} and ${reasonStr}`); setIngame(false); + if (this.resendIntervalHandle) { + clearInterval(this.resendIntervalHandle); + } }) ws.on("error", (error) => { - console.log(`Connection error ${error}`); + console.log(`Game socket error\n${error}`); }) } public async processMessage(msg: GameMessage) { @@ -61,10 +67,10 @@ export class GameConnection { ...this.makeMessage(MessageType.HelloBack), allowed: msg.version == VERSION, } - this.sendMessage(reply); + this.sendMessage(reply).then().catch(e => e); break; case MessageType.Ping: - this.sendMessage(this.makeMessage(MessageType.Pong)); + this.sendMessage(this.makeMessage(MessageType.Pong)).then().catch(e => e); break; case MessageType.Result: if (!this.outstandingRedeems.has(msg.guid)) { @@ -89,21 +95,32 @@ export class GameConnection { } } - public sendMessage(msg: ServerMessage) { - if (!this.socket) { - this.msgSendError(msg, `Tried to send message without a connected socket`); - return; - } - if (!this.handshake) { - this.msgSendError(msg, `Tried to send message before handshake was complete`); - return; - } - this.socket.send(JSON.stringify(msg), { binary: false, fin: true }, (err) => { - if (err) - console.error(err); + public sendMessage(msg: ServerMessage): Promise { + return new Promise((resolve, reject) => { + if (!this.isConnected()) { + const error = `Tried to send message without a connected socket`; + this.msgSendError(msg, error); + reject(error); + return; + } + // allow pong for stress test + if (!this.handshake && msg.messageType !== MessageType.Pong) { + const error = `Tried to send message before handshake was complete`; + this.msgSendError(msg, error); + reject(error); + return; + } + this.socket!.send(JSON.stringify(msg), { binary: false, fin: true }, (err) => { + if (err) { + this.msgSendError(msg, `${err.name}: ${err.message}`); + reject(err); + return; + } + if (msg.messageType !== MessageType.Pong) + console.debug(`Sent message ${JSON.stringify(msg)}`); + resolve(); + }); }); - if (msg.messageType !== MessageType.Pong) - console.debug(`Sent message ${JSON.stringify(msg)}`); } public makeMessage(type: MessageType, guid?: string): Message { return { @@ -114,7 +131,7 @@ export class GameConnection { } public redeem(redeem: Redeem, cart: Cart, user: TwitchUser, transactionId: string) : Promise { return Promise.race([ - new Promise((_, reject) => setTimeout(() => reject(`Timed out waiting for result`), GameConnection.resultWaitTimeout)), + new Promise((_, reject) => setTimeout(() => reject(`Timed out waiting for result. The redeem may still go through later, contact Alexejhero if it doesn't.`), GameConnection.resultWaitTimeout)), new Promise((resolve, reject) => { if (!transactionId) { reject(`Tried to redeem without transaction ID`); @@ -136,14 +153,9 @@ export class GameConnection { return; } this.outstandingRedeems.set(msg.guid, msg); - - if (!this.isConnected()) { - reject(`Redeemed without active connection`); - return; - } this.resultHandlers.set(msg.guid, resolve); - this.sendMessage(msg); + this.sendMessage(msg).then().catch(e => e); // will get queued to re-send later }) ]); } @@ -154,6 +166,21 @@ export class GameConnection { private msgSendError(msg: ServerMessage, error: any) { this.unsentQueue.push(msg); - console.error(error + `\n${JSON.stringify(msg)}`); + console.error(`Error sending message\n\tMessage: ${JSON.stringify(msg)}\n\tError: ${error}`); + console.log(`Position ${this.unsentQueue.length} in queue`); + } + + private tryResendFromQueue() { + const msg = this.unsentQueue.shift(); + if (msg === undefined) { + //console.log("Nothing to re-send"); + return; + } + + console.log(`Re-sending message ${JSON.stringify(msg)}`); + this.sendMessage(msg).then().catch(e => e); + } + public stressTestSetHandshake(handshake: boolean) { + this.handshake = handshake; } } diff --git a/ebs/src/modules/game/index.ts b/ebs/src/modules/game/index.ts index d934e2e..562cc40 100644 --- a/ebs/src/modules/game/index.ts +++ b/ebs/src/modules/game/index.ts @@ -3,6 +3,7 @@ import { GameConnection } from "./connection"; import { MessageType } from "./messages"; import { ResultMessage } from "./messages.game"; import { CommandInvocationSource, RedeemMessage } from "./messages.server"; +import { StressTestRequest, isStressTesting, startStressTest } from "./stresstest"; export let connection: GameConnection = new GameConnection(); @@ -22,8 +23,12 @@ app.post("/private/redeem", async (req, res) => { return; } - connection.sendMessage(msg); - res.status(201).send(JSON.stringify(msg)); + try { + await connection.sendMessage(msg); + res.status(201).send(JSON.stringify(msg)); + } catch (e) { + res.status(500).send(e); + } }) app.post("/private/setresult", async (req, res) => { @@ -39,4 +44,30 @@ app.post("/private/setresult", async (req, res) => { connection.processMessage(msg); res.sendStatus(200); -}); \ No newline at end of file +}); + +app.post("/private/stress", async (req, res) => { + if (!process.env.ENABLE_STRESS_TEST) { + res.status(403).send("Disabled unless you set the ENABLE_STRESS_TEST env var"); + return; + } + + if (isStressTesting()) { + res.status(400).send("Already stress testing"); + return; + } + + if (!connection.isConnected()) { + res.status(500).send("Not connected"); + return; + } + + const reqObj = req.body as StressTestRequest; + if (reqObj.type === undefined || reqObj.duration === undefined || reqObj.interval === undefined) { + res.status(400).send("Must have type, duration, and interval"); + return; + } + console.log(reqObj); + startStressTest(reqObj.type, reqObj.duration, reqObj.interval); + res.sendStatus(200); +}) diff --git a/ebs/src/modules/game/stresstest.ts b/ebs/src/modules/game/stresstest.ts new file mode 100644 index 0000000..370d03f --- /dev/null +++ b/ebs/src/modules/game/stresstest.ts @@ -0,0 +1,181 @@ +import { IdentifiableCart } from "common/types"; +import { connection } from "."; +import { getConfig } from "../config"; +import { v4 as uuid } from "uuid"; +import { signJWT } from "../../util/jwt"; +import { AuthorizationPayload, BitsTransactionPayload } from "../../types"; + +export enum StressTestType { + GameSpawnQueue, + GameUnsentQueue, + TransactionSpam, +} + +export type StressTestRequest = { + type: StressTestType; + duration: number; + interval: number; +} + +let inStressTest: boolean = false; + +export function isStressTesting(): boolean { + return inStressTest; +} + +let activeInterval: number; + +export async function startStressTest(type: StressTestType, duration: number, interval: number) { + console.log(`Starting stress test ${StressTestType[type]} for ${duration}ms`) + switch (type) { + case StressTestType.GameSpawnQueue: + activeInterval = +setInterval(() => sendSpawnRedeem().then(), interval); + break; + case StressTestType.GameUnsentQueue: + connection.stressTestSetHandshake(false); + const count = Math.floor(duration / interval); + console.log(`Sending ${count} spawns...`); + for (let i = 0; i < count; i++) { + sendSpawnRedeem().then().catch(e => e); + } + break; + case StressTestType.TransactionSpam: + activeInterval = +setInterval(() => sendTransaction().then(), interval); + break; + } + inStressTest = true; + setTimeout(() => { + inStressTest = false; + if (type === StressTestType.GameUnsentQueue) + connection.stressTestSetHandshake(true); + return clearInterval(activeInterval); + }, duration); +} + +const redeemId: string = "spawn_passive"; +const user = { + id: "stress", + login: "stresstest", + displayName: "Stress Test", +}; +const cart: IdentifiableCart = { + userId: "stress", + version: 1, + id: redeemId, + sku: "bits1", + args: { + "creature": "0", + "behind": false, + } +}; +async function sendSpawnRedeem() { + const config = await getConfig(); + const redeem = config.redeems![redeemId]; + + connection.redeem(redeem, cart, user, uuid()).then().catch(err => { + console.log(err); + }); +} + +const invalidAuth: AuthorizationPayload = { + channel_id: "stress", + exp: Date.now() + 1000, + is_unlinked: false, + opaque_user_id: "Ustress", + pubsub_perms: { + listen: [], + send: [], + }, + role: "viewer", +}; +const validAuth: AuthorizationPayload = { + ...invalidAuth, + user_id: "stress", +} +const signedValidJWT = signJWT(validAuth); +const signedInvalidJWT = signJWT(invalidAuth); +const invalidJWT = "trust me bro"; + +async function sendTransaction() { + // we have to go through the http flow because the handler is scuffed + // and we need to stress the logging webhook as well + const urlPrepurchase = "http://localhost:3000/public/prepurchase"; + const urlTransaction = "http://localhost:3000/public/transaction"; + + const jwtChoice = Math.floor(3*Math.random()); + const token = jwtChoice == 0 ? signedValidJWT + : jwtChoice == 1 ? signedInvalidJWT + : invalidJWT; + const auth = `Bearer ${token}`; + console.log(`Prepurchasing with ${jwtChoice == 0 ? "signed valid" : jwtChoice == 1 ? "signed invalid" : "unsigned invalid"} JWT`); + + const prepurchase = await fetch(urlPrepurchase, { + method: "POST", + headers: { + "Authorization": auth, + "Content-Type": "application/json", + }, + body: JSON.stringify(cart), + }); + switch (jwtChoice) { + case 0: + if (!prepurchase.ok) + console.error("Valid JWT should have succeeded"); + break; + case 1: + if (prepurchase.ok) + console.error("JWT without user ID should have failed"); + break; + case 2: + if (prepurchase.ok) + console.error("Invalid bearer token should have failed"); + break; + } + const transactionId = await prepurchase.text(); + + const receipt: BitsTransactionPayload = { + exp: Date.now() + 1000, + topic: "topic", + data: { + transactionId, + product: { + sku: "bits1", + cost: { + amount: 1, + type: "bits" + }, + displayName: "", + domainId: "" + }, + userId: "stress", + time: "time" + } + }; + + console.log(`Sending transaction (${jwtChoice})`); + const transaction = await fetch(urlTransaction, { + method: "POST", + headers: { + "Authorization": auth, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + token: transactionId, + receipt: signJWT(receipt), + }), + }); + switch (jwtChoice) { + case 0: + if (prepurchase.ok && !transaction.ok) + console.error("Valid JWT should have succeeded"); + break; + case 1: + if (transaction.ok) + console.error("JWT without user ID should have failed"); + break; + case 2: + if (transaction.ok) + console.error("Invalid bearer token should have failed"); + break; + } +} diff --git a/ebs/src/modules/transactions.ts b/ebs/src/modules/transactions.ts index adcc6cd..bf08e55 100644 --- a/ebs/src/modules/transactions.ts +++ b/ebs/src/modules/transactions.ts @@ -224,7 +224,12 @@ app.post("/public/transaction", async (req, res) => { return; } - let userInfo = await getTwitchUser(cart.userId); + let userInfo: TwitchUser | null; + try { + userInfo = await getTwitchUser(cart.userId); + } catch { + userInfo = null; + } if (!userInfo) { logToDiscord({ transactionToken: transaction.token, diff --git a/ebs/src/util/jwt.ts b/ebs/src/util/jwt.ts index da9cb77..cdb0ce9 100644 --- a/ebs/src/util/jwt.ts +++ b/ebs/src/util/jwt.ts @@ -25,3 +25,7 @@ function getJwtSecretBuffer() { if (cachedBuffer) return cachedBuffer; return cachedBuffer = Buffer.from(process.env.JWT_SECRET!, "base64"); } + +export function signJWT(payload: object, buffer: Buffer = getJwtSecretBuffer()) { + return jwt.sign(payload, buffer); +} diff --git a/ebs/src/util/middleware.ts b/ebs/src/util/middleware.ts index 3d76885..8e9863a 100644 --- a/ebs/src/util/middleware.ts +++ b/ebs/src/util/middleware.ts @@ -31,7 +31,7 @@ export function publicApiAuth(req: Request, res: Response, next: NextFunction) { }, ], }).then(); - res.status(500).send("Missing required data in JTW"); + res.status(500).send("Missing required data in JWT"); return; }