Skip to content

Commit

Permalink
add stress test
Browse files Browse the repository at this point in the history
re-send unsent messages from queue
  • Loading branch information
Govorunb committed Jun 13, 2024
1 parent 0c051eb commit 2b39253
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 34 deletions.
85 changes: 56 additions & 29 deletions ebs/src/modules/game/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ export class GameConnection {
private outstandingRedeems: Map<string, RedeemMessage> = new Map();
private resultHandlers: Map<string, ResultHandler> = new Map();
static resultWaitTimeout: number = 10000;
private resendIntervalHandle?: number;
private resendInterval = 500;

public isConnected() {
return this.socket?.readyState == ServerWS.OPEN;
Expand All @@ -29,9 +31,9 @@ export class GameConnection {
if (!ws) {
return;
}
ws.on('connection', () => {
this.handshake = false;
})
console.log("Connected to game");
this.handshake = false;
this.resendIntervalHandle = +setInterval(() => this.tryResendFromQueue(), this.resendInterval);
ws.on('message', async (message) => {
const msgText = message.toString();
let msg: GameMessage;
Expand All @@ -46,11 +48,15 @@ export class GameConnection {
this.processMessage(msg);
});
ws.on("close", (code, reason) => {
console.log(`Connection closed with code ${code} and reason ${reason}`);
const reasonStr = reason ? `reason '${reason}'` : "no reason"
console.log(`Game socket closed with code ${code} and ${reasonStr}`);
setIngame(false);
if (this.resendIntervalHandle) {
clearInterval(this.resendIntervalHandle);
}
})
ws.on("error", (error) => {
console.log(`Connection error ${error}`);
console.log(`Game socket error\n${error}`);
})
}
public async processMessage(msg: GameMessage) {
Expand All @@ -61,10 +67,10 @@ export class GameConnection {
...this.makeMessage(MessageType.HelloBack),
allowed: msg.version == VERSION,
}
this.sendMessage(reply);
this.sendMessage(reply).then().catch(e => e);
break;
case MessageType.Ping:
this.sendMessage(this.makeMessage(MessageType.Pong));
this.sendMessage(this.makeMessage(MessageType.Pong)).then().catch(e => e);
break;
case MessageType.Result:
if (!this.outstandingRedeems.has(msg.guid)) {
Expand All @@ -89,21 +95,32 @@ export class GameConnection {
}
}

public sendMessage(msg: ServerMessage) {
if (!this.socket) {
this.msgSendError(msg, `Tried to send message without a connected socket`);
return;
}
if (!this.handshake) {
this.msgSendError(msg, `Tried to send message before handshake was complete`);
return;
}
this.socket.send(JSON.stringify(msg), { binary: false, fin: true }, (err) => {
if (err)
console.error(err);
public sendMessage(msg: ServerMessage): Promise<void> {
return new Promise((resolve, reject) => {
if (!this.isConnected()) {
const error = `Tried to send message without a connected socket`;
this.msgSendError(msg, error);
reject(error);
return;
}
// allow pong for stress test
if (!this.handshake && msg.messageType !== MessageType.Pong) {
const error = `Tried to send message before handshake was complete`;
this.msgSendError(msg, error);
reject(error);
return;
}
this.socket!.send(JSON.stringify(msg), { binary: false, fin: true }, (err) => {
if (err) {
this.msgSendError(msg, `${err.name}: ${err.message}`);
reject(err);
return;
}
if (msg.messageType !== MessageType.Pong)
console.debug(`Sent message ${JSON.stringify(msg)}`);
resolve();
});
});
if (msg.messageType !== MessageType.Pong)
console.debug(`Sent message ${JSON.stringify(msg)}`);
}
public makeMessage(type: MessageType, guid?: string): Message {
return {
Expand All @@ -114,7 +131,7 @@ export class GameConnection {
}
public redeem(redeem: Redeem, cart: Cart, user: TwitchUser, transactionId: string) : Promise<ResultMessage> {
return Promise.race([
new Promise<any>((_, reject) => setTimeout(() => reject(`Timed out waiting for result`), GameConnection.resultWaitTimeout)),
new Promise<any>((_, reject) => setTimeout(() => reject(`Timed out waiting for result. The redeem may still go through later, contact Alexejhero if it doesn't.`), GameConnection.resultWaitTimeout)),
new Promise<ResultMessage>((resolve, reject) => {
if (!transactionId) {
reject(`Tried to redeem without transaction ID`);
Expand All @@ -136,14 +153,9 @@ export class GameConnection {
return;
}
this.outstandingRedeems.set(msg.guid, msg);

if (!this.isConnected()) {
reject(`Redeemed without active connection`);
return;
}
this.resultHandlers.set(msg.guid, resolve);

this.sendMessage(msg);
this.sendMessage(msg).then().catch(e => e); // will get queued to re-send later
})
]);
}
Expand All @@ -154,6 +166,21 @@ export class GameConnection {

private msgSendError(msg: ServerMessage, error: any) {
this.unsentQueue.push(msg);
console.error(error + `\n${JSON.stringify(msg)}`);
console.error(`Error sending message\n\tMessage: ${JSON.stringify(msg)}\n\tError: ${error}`);
console.log(`Position ${this.unsentQueue.length} in queue`);
}

private tryResendFromQueue() {
const msg = this.unsentQueue.shift();
if (msg === undefined) {
//console.log("Nothing to re-send");
return;
}

console.log(`Re-sending message ${JSON.stringify(msg)}`);
this.sendMessage(msg).then().catch(e => e);
}
public stressTestSetHandshake(handshake: boolean) {
this.handshake = handshake;
}
}
37 changes: 34 additions & 3 deletions ebs/src/modules/game/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { GameConnection } from "./connection";
import { MessageType } from "./messages";
import { ResultMessage } from "./messages.game";
import { CommandInvocationSource, RedeemMessage } from "./messages.server";
import { StressTestRequest, isStressTesting, startStressTest } from "./stresstest";

export let connection: GameConnection = new GameConnection();

Expand All @@ -22,8 +23,12 @@ app.post("/private/redeem", async (req, res) => {
return;
}

connection.sendMessage(msg);
res.status(201).send(JSON.stringify(msg));
try {
await connection.sendMessage(msg);
res.status(201).send(JSON.stringify(msg));
} catch (e) {
res.status(500).send(e);
}
})

app.post("/private/setresult", async (req, res) => {
Expand All @@ -39,4 +44,30 @@ app.post("/private/setresult", async (req, res) => {

connection.processMessage(msg);
res.sendStatus(200);
});
});

app.post("/private/stress", async (req, res) => {
if (!process.env.ENABLE_STRESS_TEST) {
res.status(403).send("Disabled unless you set the ENABLE_STRESS_TEST env var");
return;
}

if (isStressTesting()) {
res.status(400).send("Already stress testing");
return;
}

if (!connection.isConnected()) {
res.status(500).send("Not connected");
return;
}

const reqObj = req.body as StressTestRequest;
if (reqObj.type === undefined || reqObj.duration === undefined || reqObj.interval === undefined) {
res.status(400).send("Must have type, duration, and interval");
return;
}
console.log(reqObj);
startStressTest(reqObj.type, reqObj.duration, reqObj.interval);
res.sendStatus(200);
})
181 changes: 181 additions & 0 deletions ebs/src/modules/game/stresstest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import { IdentifiableCart } from "common/types";
import { connection } from ".";
import { getConfig } from "../config";
import { v4 as uuid } from "uuid";
import { signJWT } from "../../util/jwt";
import { AuthorizationPayload, BitsTransactionPayload } from "../../types";

export enum StressTestType {
GameSpawnQueue,
GameUnsentQueue,
TransactionSpam,
}

export type StressTestRequest = {
type: StressTestType;
duration: number;
interval: number;
}

let inStressTest: boolean = false;

export function isStressTesting(): boolean {
return inStressTest;
}

let activeInterval: number;

export async function startStressTest(type: StressTestType, duration: number, interval: number) {
console.log(`Starting stress test ${StressTestType[type]} for ${duration}ms`)
switch (type) {
case StressTestType.GameSpawnQueue:
activeInterval = +setInterval(() => sendSpawnRedeem().then(), interval);
break;
case StressTestType.GameUnsentQueue:
connection.stressTestSetHandshake(false);
const count = Math.floor(duration / interval);
console.log(`Sending ${count} spawns...`);
for (let i = 0; i < count; i++) {
sendSpawnRedeem().then().catch(e => e);
}
break;
case StressTestType.TransactionSpam:
activeInterval = +setInterval(() => sendTransaction().then(), interval);
break;
}
inStressTest = true;
setTimeout(() => {
inStressTest = false;
if (type === StressTestType.GameUnsentQueue)
connection.stressTestSetHandshake(true);
return clearInterval(activeInterval);
}, duration);
}

const redeemId: string = "spawn_passive";
const user = {
id: "stress",
login: "stresstest",
displayName: "Stress Test",
};
const cart: IdentifiableCart = {
userId: "stress",
version: 1,
id: redeemId,
sku: "bits1",
args: {
"creature": "0",
"behind": false,
}
};
async function sendSpawnRedeem() {
const config = await getConfig();
const redeem = config.redeems![redeemId];

connection.redeem(redeem, cart, user, uuid()).then().catch(err => {
console.log(err);
});
}

const invalidAuth: AuthorizationPayload = {
channel_id: "stress",
exp: Date.now() + 1000,
is_unlinked: false,
opaque_user_id: "Ustress",
pubsub_perms: {
listen: [],
send: [],
},
role: "viewer",
};
const validAuth: AuthorizationPayload = {
...invalidAuth,
user_id: "stress",
}
const signedValidJWT = signJWT(validAuth);
const signedInvalidJWT = signJWT(invalidAuth);
const invalidJWT = "trust me bro";

async function sendTransaction() {
// we have to go through the http flow because the handler is scuffed
// and we need to stress the logging webhook as well
const urlPrepurchase = "http://localhost:3000/public/prepurchase";
const urlTransaction = "http://localhost:3000/public/transaction";

const jwtChoice = Math.floor(3*Math.random());
const token = jwtChoice == 0 ? signedValidJWT
: jwtChoice == 1 ? signedInvalidJWT
: invalidJWT;
const auth = `Bearer ${token}`;
console.log(`Prepurchasing with ${jwtChoice == 0 ? "signed valid" : jwtChoice == 1 ? "signed invalid" : "unsigned invalid"} JWT`);

const prepurchase = await fetch(urlPrepurchase, {
method: "POST",
headers: {
"Authorization": auth,
"Content-Type": "application/json",
},
body: JSON.stringify(cart),
});
switch (jwtChoice) {
case 0:
if (!prepurchase.ok)
console.error("Valid JWT should have succeeded");
break;
case 1:
if (prepurchase.ok)
console.error("JWT without user ID should have failed");
break;
case 2:
if (prepurchase.ok)
console.error("Invalid bearer token should have failed");
break;
}
const transactionId = await prepurchase.text();

const receipt: BitsTransactionPayload = {
exp: Date.now() + 1000,
topic: "topic",
data: {
transactionId,
product: {
sku: "bits1",
cost: {
amount: 1,
type: "bits"
},
displayName: "",
domainId: ""
},
userId: "stress",
time: "time"
}
};

console.log(`Sending transaction (${jwtChoice})`);
const transaction = await fetch(urlTransaction, {
method: "POST",
headers: {
"Authorization": auth,
"Content-Type": "application/json",
},
body: JSON.stringify({
token: transactionId,
receipt: signJWT(receipt),
}),
});
switch (jwtChoice) {
case 0:
if (prepurchase.ok && !transaction.ok)
console.error("Valid JWT should have succeeded");
break;
case 1:
if (transaction.ok)
console.error("JWT without user ID should have failed");
break;
case 2:
if (transaction.ok)
console.error("Invalid bearer token should have failed");
break;
}
}
7 changes: 6 additions & 1 deletion ebs/src/modules/transactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ app.post("/public/transaction", async (req, res) => {
return;
}

let userInfo = await getTwitchUser(cart.userId);
let userInfo: TwitchUser | null;
try {
userInfo = await getTwitchUser(cart.userId);
} catch {
userInfo = null;
}
if (!userInfo) {
logToDiscord({
transactionToken: transaction.token,
Expand Down
Loading

0 comments on commit 2b39253

Please sign in to comment.