diff --git a/.changeset/warm-queens-taste.md b/.changeset/warm-queens-taste.md new file mode 100644 index 000000000000..416ea83d8f3b --- /dev/null +++ b/.changeset/warm-queens-taste.md @@ -0,0 +1,8 @@ +--- +"wrangler": minor +--- + +feature: Support Queue consumer events in tail + +So that it's less confusing when tailing a worker that consumes events from a +Queue. diff --git a/packages/wrangler/src/__tests__/pages-deployment-tail.test.ts b/packages/wrangler/src/__tests__/pages-deployment-tail.test.ts index d7f190c4aa87..5b90bdf48724 100644 --- a/packages/wrangler/src/__tests__/pages-deployment-tail.test.ts +++ b/packages/wrangler/src/__tests__/pages-deployment-tail.test.ts @@ -15,6 +15,7 @@ import type { EmailEvent, TailEvent, TailInfo, + QueueEvent, } from "../tail/createTail"; import type { RequestInit } from "undici"; import type WebSocket from "ws"; @@ -379,6 +380,20 @@ describe("pages deployment tail", () => { expect(std.out).toMatch(deserializeToJson(serializedMessage)); }); + it("logs queue messages in json format", async () => { + const api = mockTailAPIs(); + await runWrangler( + "pages deployment tail mock-deployment-id --project-name mock-project --format json" + ); + + const event = generateMockQueueEvent(); + const message = generateMockEventMessage({ event }); + const serializedMessage = serialize(message); + + api.ws.send(serializedMessage); + expect(std.out).toMatch(deserializeToJson(serializedMessage)); + }); + it("logs request messages in pretty format", async () => { const api = mockTailAPIs(); await runWrangler( @@ -487,6 +502,33 @@ describe("pages deployment tail", () => { `); }); + it("logs queue messages in pretty format", async () => { + const api = mockTailAPIs(); + await runWrangler( + "pages deployment tail mock-deployment-id --project-name mock-project --format pretty" + ); + + const event = generateMockQueueEvent(); + const message = generateMockEventMessage({ event }); + const serializedMessage = serialize(message); + + api.ws.send(serializedMessage); + expect( + std.out + .replace( + new Date(mockEventTimestamp).toLocaleString(), + "[mock timestamp string]" + ) + .replace( + mockTailExpiration.toLocaleString(), + "[mock expiration date]" + ) + ).toMatchInlineSnapshot(` + "Connected to deployment mock-deployment-id, waiting for logs... + Queue my-queue123 (7 messages) - Ok @ [mock timestamp string]" + `); + }); + it("should not crash when the tail message has a void event", async () => { const api = mockTailAPIs(); await runWrangler( @@ -666,6 +708,7 @@ function isRequest( | EmailEvent | TailEvent | TailInfo + | QueueEvent | undefined | null ): event is RequestEvent { @@ -964,3 +1007,9 @@ function generateMockEmailEvent(opts?: Partial): EmailEvent { rawSize: opts?.rawSize || mockEmailEventSize, }; } +function generateMockQueueEvent(opts?: Partial): QueueEvent { + return { + queue: opts?.queue || "my-queue123", + batchSize: opts?.batchSize || 7, + }; +} diff --git a/packages/wrangler/src/__tests__/tail.test.ts b/packages/wrangler/src/__tests__/tail.test.ts index 99b66936bac8..f6d41a937041 100644 --- a/packages/wrangler/src/__tests__/tail.test.ts +++ b/packages/wrangler/src/__tests__/tail.test.ts @@ -16,6 +16,7 @@ import type { EmailEvent, TailEvent, TailInfo, + QueueEvent, } from "../tail/createTail"; import type { RequestInit } from "undici"; import type WebSocket from "ws"; @@ -439,6 +440,18 @@ describe("tail", () => { expect(std.out).toMatch(deserializeToJson(serializedMessage)); }); + it("logs queue messages in json format", async () => { + const api = mockWebsocketAPIs(); + await runWrangler("tail test-worker --format json"); + + const event = generateMockQueueEvent(); + const message = generateMockEventMessage({ event }); + const serializedMessage = serialize(message); + + api.ws.send(serializedMessage); + expect(std.out).toMatch(deserializeToJson(serializedMessage)); + }); + it("logs request messages in pretty format", async () => { const api = mockWebsocketAPIs(); await runWrangler("tail test-worker --format pretty"); @@ -583,6 +596,29 @@ describe("tail", () => { `); }); + it("logs queue messages in pretty format", async () => { + const api = mockWebsocketAPIs(); + await runWrangler("tail test-worker --format pretty"); + + const event = generateMockQueueEvent(); + const message = generateMockEventMessage({ event }); + const serializedMessage = serialize(message); + + api.ws.send(serializedMessage); + expect( + std.out + .replace( + new Date(mockEventTimestamp).toLocaleString(), + "[mock timestamp string]" + ) + .replace(mockTailExpiration.toISOString(), "[mock expiration date]") + ).toMatchInlineSnapshot(` + "Successfully created tail, expires at [mock expiration date] + Connected to test-worker, waiting for logs... + Queue my-queue123 (7 messages) - Ok @ [mock timestamp string]" + `); + }); + it("should not crash when the tail message has a void event", async () => { const api = mockWebsocketAPIs(); await runWrangler("tail test-worker --format pretty"); @@ -751,6 +787,7 @@ function isRequest( | EmailEvent | TailEvent | TailInfo + | QueueEvent | undefined | null ): event is RequestEvent { @@ -1054,3 +1091,10 @@ function generateTailInfo(overload: boolean): TailInfo { type: "overload-stop", }; } + +function generateMockQueueEvent(opts?: Partial): QueueEvent { + return { + queue: opts?.queue || "my-queue123", + batchSize: opts?.batchSize || 7, + }; +} diff --git a/packages/wrangler/src/tail/createTail.ts b/packages/wrangler/src/tail/createTail.ts index 9d5143370585..941f0e9282b4 100644 --- a/packages/wrangler/src/tail/createTail.ts +++ b/packages/wrangler/src/tail/createTail.ts @@ -242,7 +242,8 @@ export type TailEventMessage = { * The event that triggered the worker. In the case of an HTTP request, * this will be a RequestEvent. If it's a cron trigger, it'll be a * ScheduledEvent. If it's a durable object alarm, it's an AlarmEvent. - * If it's a email, it'a an EmailEvent + * If it's a email, it'a an EmailEvent. If it's a Queue consumer event, + * it's a QueueEvent. * * Until workers-types exposes individual types for export, we'll have * to just re-define these types ourselves. @@ -254,6 +255,7 @@ export type TailEventMessage = { | EmailEvent | TailEvent | TailInfo + | QueueEvent | undefined | null; }; @@ -430,3 +432,18 @@ export type TailInfo = { message: string; type: string; }; + +/* + * A event that was triggered by receiving a batch of messages from a Queue for consumption. + */ +export type QueueEvent = { + /** + * The name of the queue that the message batch came from. + */ + queue: string; + + /** + * The number of messages in the batch. + */ + batchSize: number; +}; diff --git a/packages/wrangler/src/tail/printing.ts b/packages/wrangler/src/tail/printing.ts index f2be8cae2a0c..1c837e8c8784 100644 --- a/packages/wrangler/src/tail/printing.ts +++ b/packages/wrangler/src/tail/printing.ts @@ -3,6 +3,7 @@ import { logger } from "../logger"; import type { AlarmEvent, EmailEvent, + QueueEvent, RequestEvent, ScheduledEvent, TailEvent, @@ -71,6 +72,16 @@ export function prettyPrintLogs(data: WebSocket.RawData): void { } else if (eventMessage.event.type === "overload-stop") { logger.log(`${chalk.yellow.bold(eventMessage.event.message)}`); } + } else if (isQueueEvent(eventMessage.event)) { + const outcome = prettifyOutcome(eventMessage.outcome); + const datetime = new Date(eventMessage.eventTimestamp).toLocaleString(); + const queueName = eventMessage.event.queue; + const batchSize = eventMessage.event.batchSize; + const batchSizeMsg = `${batchSize} message${batchSize !== 1 ? "s" : ""}`; + + logger.log( + `Queue ${queueName} (${batchSizeMsg}) - ${outcome} @ ${datetime}` + ); } else { // Unknown event type const outcome = prettifyOutcome(eventMessage.outcome); @@ -112,6 +123,10 @@ function isEmailEvent(event: TailEventMessage["event"]): event is EmailEvent { return Boolean(event && "mailFrom" in event); } +function isQueueEvent(event: TailEventMessage["event"]): event is QueueEvent { + return Boolean(event && "queue" in event); +} + /** * Check to see if an event sent from a worker is an AlarmEvent. *