From 35527ed133d08911a4d523a01a2cc388ce9c6f51 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Mon, 12 Dec 2022 09:41:46 -0600 Subject: [PATCH] Support Queue events in tail Queue events have an "event" section containing two fields -- "queue" and "batchSize", which contain the queue name and number of messages in the batch. Queue events can be distinguished from other events by the presence of the "queue" field, so that's what I do here. I've followed the example of RequestEvent more closely than either ScheduledEvent or AlarmEvent here, but am happy to change up the format if there's a reason to do so. --- .../__tests__/pages-deployment-tail.test.ts | 57 ++++++++++++++++++- packages/wrangler/src/__tests__/tail.test.ts | 51 ++++++++++++++++- packages/wrangler/src/tail/createTail.ts | 24 +++++++- packages/wrangler/src/tail/printing.ts | 15 +++++ 4 files changed, 144 insertions(+), 3 deletions(-) diff --git a/packages/wrangler/src/__tests__/pages-deployment-tail.test.ts b/packages/wrangler/src/__tests__/pages-deployment-tail.test.ts index 0a5bba79b8d0..917abf3f6814 100644 --- a/packages/wrangler/src/__tests__/pages-deployment-tail.test.ts +++ b/packages/wrangler/src/__tests__/pages-deployment-tail.test.ts @@ -12,6 +12,7 @@ import type { RequestEvent, ScheduledEvent, AlarmEvent, + QueueEvent, } from "../tail/createTail"; import type { RequestInit } from "undici"; import type WebSocket from "ws"; @@ -355,6 +356,20 @@ describe("pages deployment tail", () => { expect(std.out).toMatch(deserializeToJson(serializedMessage)); }); + it("logs queue messages in json format", async () => { + const api = mockTailAPIs(); + await runWrangler( + "pages deployment tail mock-deployment-id --project-name mock-project --format json" + ); + + const event = generateMockQueueEvent(); + const message = generateMockEventMessage({ event }); + const serializedMessage = serialize(message); + + api.ws.send(serializedMessage); + expect(std.out).toMatch(deserializeToJson(serializedMessage)); + }); + it("logs request messages in pretty format", async () => { const api = mockTailAPIs(); await runWrangler( @@ -436,6 +451,33 @@ describe("pages deployment tail", () => { `); }); + it("logs queue messages in pretty format", async () => { + const api = mockTailAPIs(); + await runWrangler( + "pages deployment tail mock-deployment-id --project-name mock-project --format pretty" + ); + + const event = generateMockQueueEvent(); + const message = generateMockEventMessage({ event }); + const serializedMessage = serialize(message); + + api.ws.send(serializedMessage); + expect( + std.out + .replace( + new Date(mockEventTimestamp).toLocaleString(), + "[mock timestamp string]" + ) + .replace( + mockTailExpiration.toLocaleString(), + "[mock expiration date]" + ) + ).toMatchInlineSnapshot(` + "Connected to deployment mock-deployment-id, waiting for logs... + Queue my-queue123 (7 messages) - Ok @ [mock timestamp string]" + `); + }); + it("should not crash when the tail message has a void event", async () => { const api = mockTailAPIs(); await runWrangler( @@ -608,7 +650,13 @@ function serialize(message: TailEventMessage): WebSocket.RawData { * @returns true if `event` is a RequestEvent */ function isRequest( - event: ScheduledEvent | RequestEvent | AlarmEvent | undefined | null + event: + | ScheduledEvent + | RequestEvent + | AlarmEvent + | QueueEvent + | undefined + | null ): event is RequestEvent { return Boolean(event && "request" in event); } @@ -882,3 +930,10 @@ function generateMockAlarmEvent(opts?: Partial): AlarmEvent { scheduledTime: opts?.scheduledTime || mockEventScheduledTime, }; } + +function generateMockQueueEvent(opts?: Partial): QueueEvent { + return { + queue: opts?.queue || "my-queue123", + batchSize: opts?.batchSize || 7, + }; +} \ No newline at end of file diff --git a/packages/wrangler/src/__tests__/tail.test.ts b/packages/wrangler/src/__tests__/tail.test.ts index 39b479d608bc..c285931177c0 100644 --- a/packages/wrangler/src/__tests__/tail.test.ts +++ b/packages/wrangler/src/__tests__/tail.test.ts @@ -12,6 +12,7 @@ import type { RequestEvent, ScheduledEvent, AlarmEvent, + QueueEvent, } from "../tail/createTail"; import type { RequestInit } from "undici"; import type WebSocket from "ws"; @@ -398,6 +399,18 @@ describe("tail", () => { expect(std.out).toMatch(deserializeToJson(serializedMessage)); }); + it("logs queue messages in json format", async () => { + const api = mockWebsocketAPIs(); + await runWrangler("tail test-worker --format json"); + + const event = generateMockQueueEvent(); + const message = generateMockEventMessage({ event }); + const serializedMessage = serialize(message); + + api.ws.send(serializedMessage); + expect(std.out).toMatch(deserializeToJson(serializedMessage)); + }); + it("logs request messages in pretty format", async () => { const api = mockWebsocketAPIs(); await runWrangler("tail test-worker --format pretty"); @@ -467,6 +480,29 @@ describe("tail", () => { `); }); + it("logs queue messages in pretty format", async () => { + const api = mockWebsocketAPIs(); + await runWrangler("tail test-worker --format pretty"); + + const event = generateMockQueueEvent(); + const message = generateMockEventMessage({ event }); + const serializedMessage = serialize(message); + + api.ws.send(serializedMessage); + expect( + std.out + .replace( + new Date(mockEventTimestamp).toLocaleString(), + "[mock timestamp string]" + ) + .replace(mockTailExpiration.toISOString(), "[mock expiration date]") + ).toMatchInlineSnapshot(` + "Successfully created tail, expires at [mock expiration date] + Connected to test-worker, waiting for logs... + Queue my-queue123 (7 messages) - Ok @ [mock timestamp string]" + `); + }); + it("should not crash when the tail message has a void event", async () => { const api = mockWebsocketAPIs(); await runWrangler("tail test-worker --format pretty"); @@ -628,7 +664,13 @@ function serialize(message: TailEventMessage): WebSocket.RawData { * @returns true if `event` is a RequestEvent */ function isRequest( - event: ScheduledEvent | RequestEvent | AlarmEvent | undefined | null + event: + | ScheduledEvent + | RequestEvent + | AlarmEvent + | QueueEvent + | undefined + | null ): event is RequestEvent { return Boolean(event && "request" in event); } @@ -897,3 +939,10 @@ function generateMockAlarmEvent(opts?: Partial): AlarmEvent { scheduledTime: opts?.scheduledTime || mockEventScheduledTime, }; } + +function generateMockQueueEvent(opts?: Partial): QueueEvent { + return { + queue: opts?.queue || "my-queue123", + batchSize: opts?.batchSize || 7, + }; +} \ No newline at end of file diff --git a/packages/wrangler/src/tail/createTail.ts b/packages/wrangler/src/tail/createTail.ts index 535f6529bcae..5ae9bb24c7ac 100644 --- a/packages/wrangler/src/tail/createTail.ts +++ b/packages/wrangler/src/tail/createTail.ts @@ -242,11 +242,18 @@ export type TailEventMessage = { * The event that triggered the worker. In the case of an HTTP request, * this will be a RequestEvent. If it's a cron trigger, it'll be a * ScheduledEvent. If it's a durable object alarm, it's an AlarmEvent. + * If it's a Queue consumer event, it's a QueueEvent. * * Until workers-types exposes individual types for export, we'll have * to just re-define these types ourselves. */ - event: RequestEvent | ScheduledEvent | AlarmEvent | undefined | null; + event: + | RequestEvent + | ScheduledEvent + | AlarmEvent + | QueueEvent + | undefined + | null; }; /** @@ -377,3 +384,18 @@ export type AlarmEvent = { */ scheduledTime: string; }; + +/** + * A event that was triggered by receiving a batch of messages from a Queue for consumption. + */ +export type QueueEvent = { + /** + * The name of the queue that the message batch came from. + */ + queue: string; + + /** + * The number of messages in the batch. + */ + batchSize: number; +}; diff --git a/packages/wrangler/src/tail/printing.ts b/packages/wrangler/src/tail/printing.ts index d06e0d45f1e9..f33513f2085b 100644 --- a/packages/wrangler/src/tail/printing.ts +++ b/packages/wrangler/src/tail/printing.ts @@ -1,6 +1,7 @@ import { logger } from "../logger"; import type { AlarmEvent, + QueueEvent, RequestEvent, ScheduledEvent, TailEventMessage, @@ -37,6 +38,16 @@ export function prettyPrintLogs(data: WebSocket.RawData): void { ).toLocaleString(); logger.log(`Alarm @ ${datetime} - ${outcome}`); + } else if (isQueueEvent(eventMessage.event)) { + const outcome = prettifyOutcome(eventMessage.outcome); + const datetime = new Date(eventMessage.eventTimestamp).toLocaleString(); + const queueName = eventMessage.event.queue; + const batchSize = eventMessage.event.batchSize; + const batchSizeMsg = `${batchSize} message${batchSize !== 1 ? "s" : ""}`; + + logger.log( + `Queue ${queueName} (${batchSizeMsg}) - ${outcome} @ ${datetime}` + ); } else { // Unknown event type const outcome = prettifyOutcome(eventMessage.outcome); @@ -74,6 +85,10 @@ function isScheduledEvent( return Boolean(event && "cron" in event); } +function isQueueEvent(event: TailEventMessage["event"]): event is QueueEvent { + return Boolean(event && "queue" in event); +} + /** * Check to see if an event sent from a worker is an AlarmEvent. *