diff --git a/system-test/pubsub.ts b/system-test/pubsub.ts index 83ec943ee..31039e7de 100644 --- a/system-test/pubsub.ts +++ b/system-test/pubsub.ts @@ -873,52 +873,71 @@ describe('pubsub', () => { let subscription: Subscription; let snapshot: Snapshot; let messageId: string; + let errorPromise: Promise<{}>; beforeEach(async () => { subscription = topic.subscription(generateSubName()); snapshot = subscription.snapshot(generateSnapshotName()); - return subscription - .create() - .then(() => { - return snapshot.create(); - }) - .then(() => { - return topic.publish(Buffer.from('Hello, world!')); - }) - .then((_messageId: string) => { - messageId = _messageId; - }); + await subscription.create(); + await snapshot.create(); + + errorPromise = new Promise((_, reject) => + subscription.on('error', reject) + ); }); - it('should seek to a snapshot', done => { + // This creates a Promise that hooks the 'message' callback of the + // subscription above, and resolves when that callback calls `resolve`. + type WorkCallback = (arg: T, resolve: Function) => void; + function makeMessagePromise( + workCallback: WorkCallback + ): Promise { + return new Promise(resolve => { + subscription.on('message', (arg: T) => { + workCallback(arg, resolve); + }); + }); + } + + async function publishTestMessage() { + messageId = await topic.publish(Buffer.from('Hello, world!')); + } + + it('should seek to a snapshot', async () => { let messageCount = 0; - subscription.on('error', done); - subscription.on('message', (message: {id: string; ack: () => void}) => { - if (message.id !== messageId) { - return; - } - message.ack(); + type EventParameter = {id: string; ack: () => void}; + const messagePromise = makeMessagePromise( + async (message: EventParameter, resolve) => { + if (message.id !== messageId) { + return; + } + message.ack(); - if (++messageCount === 1) { - snapshot!.seek(assert.ifError); - return; + if (++messageCount === 1) { + await snapshot!.seek(); + return; + } + + assert.strictEqual(messageCount, 2); + await subscription.close(); + + resolve(); } + ); - assert.strictEqual(messageCount, 2); - subscription.close(done); - }); + await publishTestMessage(); + await Promise.race([errorPromise, messagePromise]); }); - it('should seek to a date', done => { + it('should seek to a date', async () => { let messageCount = 0; - subscription.on('error', done); - subscription.on( - 'message', - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (message: {id: string; ack: () => void; publishTime: any}) => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + type EventParameter = {id: string; ack: () => void; publishTime: any}; + const messagePromise = makeMessagePromise( + async (message: EventParameter, resolve) => { if (message.id !== messageId) { return; } @@ -936,36 +955,43 @@ describe('pubsub', () => { } assert.strictEqual(messageCount, 2); - subscription.close(done); + await subscription.close(); + + resolve(); } ); + + await publishTestMessage(); + await Promise.race([errorPromise, messagePromise]); }); - it('should seek to a future date (purge)', done => { + it('should seek to a future date (purge)', async () => { const testText = 'Oh no!'; + await publishTestMessage(); + // Forward-seek to remove any messages from the queue (those were // placed there in before()). // // We... probably won't be using this in 3000? - subscription - .seek(new Date('3000-01-01')) - .then(() => { - // Drop a second message and make sure it's the right ID. - return topic.publish(Buffer.from(testText)); - }) - .then(() => { - subscription.on('error', done); - subscription.on( - 'message', - (message: {data: {toString: () => string}; ack: () => void}) => { - // If we get the default message from before() then this fails. - assert.equal(message.data.toString(), testText); - message.ack(); - subscription.close(done); - } - ); - }); + await subscription.seek(new Date('3000-01-01')); + + // Drop a second message and make sure it's the right ID. + await topic.publish(Buffer.from(testText)); + + type EventParameter = {data: {toString: () => string}; ack: () => void}; + const messagePromise = makeMessagePromise( + async (message: EventParameter, resolve) => { + // If we get the default message from before() then this fails. + assert.equal(message.data.toString(), testText); + message.ack(); + await subscription.close(); + + resolve(); + } + ); + + await Promise.race([errorPromise, messagePromise]); }); }); });