Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: fix zalgo flakiness in seek tests, and convert them to async/promises to make the logic more rigorous #1112

Merged
merged 1 commit into from
Sep 28, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 76 additions & 50 deletions system-test/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -873,52 +873,71 @@ describe('pubsub', () => {
let subscription: Subscription;
let snapshot: Snapshot;
let messageId: string;
let errorPromise: Promise<{}>;

beforeEach(async () => {
subscription = topic.subscription(generateSubName());
snapshot = subscription.snapshot(generateSnapshotName());

return subscription
.create()
.then(() => {
return snapshot.create();
})
.then(() => {
return topic.publish(Buffer.from('Hello, world!'));
})
.then((_messageId: string) => {
messageId = _messageId;
});
await subscription.create();
await snapshot.create();

errorPromise = new Promise((_, reject) =>
subscription.on('error', reject)
);
});

it('should seek to a snapshot', done => {
// This creates a Promise that hooks the 'message' callback of the
// subscription above, and resolves when that callback calls `resolve`.
type WorkCallback<T> = (arg: T, resolve: Function) => void;
function makeMessagePromise<T>(
workCallback: WorkCallback<T>
): Promise<void> {
return new Promise(resolve => {
subscription.on('message', (arg: T) => {
workCallback(arg, resolve);
});
});
}

async function publishTestMessage() {
messageId = await topic.publish(Buffer.from('Hello, world!'));
}

it('should seek to a snapshot', async () => {
let messageCount = 0;

subscription.on('error', done);
subscription.on('message', (message: {id: string; ack: () => void}) => {
if (message.id !== messageId) {
return;
}
message.ack();
type EventParameter = {id: string; ack: () => void};
const messagePromise = makeMessagePromise(
async (message: EventParameter, resolve) => {
if (message.id !== messageId) {
return;
}
message.ack();

if (++messageCount === 1) {
snapshot!.seek(assert.ifError);
return;
if (++messageCount === 1) {
await snapshot!.seek();
return;
}

assert.strictEqual(messageCount, 2);
await subscription.close();

resolve();
}
);

assert.strictEqual(messageCount, 2);
subscription.close(done);
});
await publishTestMessage();
await Promise.race([errorPromise, messagePromise]);
});

it('should seek to a date', done => {
it('should seek to a date', async () => {
let messageCount = 0;

subscription.on('error', done);
subscription.on(
'message',
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(message: {id: string; ack: () => void; publishTime: any}) => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type EventParameter = {id: string; ack: () => void; publishTime: any};
const messagePromise = makeMessagePromise(
async (message: EventParameter, resolve) => {
if (message.id !== messageId) {
return;
}
Expand All @@ -936,36 +955,43 @@ describe('pubsub', () => {
}

assert.strictEqual(messageCount, 2);
subscription.close(done);
await subscription.close();

resolve();
}
);

await publishTestMessage();
await Promise.race([errorPromise, messagePromise]);
});

it('should seek to a future date (purge)', done => {
it('should seek to a future date (purge)', async () => {
const testText = 'Oh no!';

await publishTestMessage();

// Forward-seek to remove any messages from the queue (those were
// placed there in before()).
//
// We... probably won't be using this in 3000?
subscription
.seek(new Date('3000-01-01'))
.then(() => {
// Drop a second message and make sure it's the right ID.
return topic.publish(Buffer.from(testText));
})
.then(() => {
subscription.on('error', done);
subscription.on(
'message',
(message: {data: {toString: () => string}; ack: () => void}) => {
// If we get the default message from before() then this fails.
assert.equal(message.data.toString(), testText);
message.ack();
subscription.close(done);
}
);
});
await subscription.seek(new Date('3000-01-01'));

// Drop a second message and make sure it's the right ID.
await topic.publish(Buffer.from(testText));

type EventParameter = {data: {toString: () => string}; ack: () => void};
const messagePromise = makeMessagePromise(
async (message: EventParameter, resolve) => {
// If we get the default message from before() then this fails.
assert.equal(message.data.toString(), testText);
message.ack();
await subscription.close();

resolve();
}
);

await Promise.race([errorPromise, messagePromise]);
});
});
});
Expand Down