Skip to content

Commit

Permalink
fix(rabbitmq): fix library asserting queues with empty names
Browse files Browse the repository at this point in the history
This is an attempt to fix the issue where this library doesn't pass the right arguments to
setupQueue in setupQueuesWithBindings. In addition, this removes the duplicate setup that
setupQueuesWithBindings was doing that setupQueue alredy does.

re golevelup#667
  • Loading branch information
ttshivers committed Jan 16, 2024
1 parent 1891b32 commit 7081feb
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 32 deletions.
63 changes: 63 additions & 0 deletions integration/rabbitmq/e2e/configuration.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ describe('Module Configuration', () => {

afterEach(async () => {
jest.clearAllMocks();
jest.restoreAllMocks();
await app?.close();
});

Expand Down Expand Up @@ -227,6 +228,68 @@ describe('Module Configuration', () => {
).toBeDefined();
await app.close();
});

it('should not assert queue with empty name', async () => {
const originalConnect = amqplib.connect;
let assertQueueSpy;

const connectSpy = jest
.spyOn(amqplib, 'connect')
.mockImplementation((...args) => {
const result = originalConnect(...args);
result.then((conn) => {
const originalCreateConfirmChannel = conn.createConfirmChannel;
jest
.spyOn(conn, 'createConfirmChannel')
.mockImplementation(function (...args) {
const result = originalCreateConfirmChannel.apply(this, args);
result.then((channel) => {
assertQueueSpy = jest.spyOn(channel, 'assertQueue');
});
return result;
});
});
return result;
});

app = await Test.createTestingModule({
imports: [
RabbitMQModule.forRootAsync(RabbitMQModule, {
useFactory: async () => {
return {
queues: [
{
name: nonExistingQueue,
createQueueIfNotExists: true,
options: {
durable: true,
},
},
],
uri,
connectionInitOptions: {
wait: true,
reject: true,
timeout: 3000,
},
};
},
}),
],
}).compile();

expect(app).toBeDefined();

expect(connectSpy).toHaveBeenCalledTimes(1);
expect(connectSpy).toHaveBeenCalledWith(amqplibUri, undefined);
expect(assertQueueSpy).not.toHaveBeenCalledWith('', undefined);
expect(assertQueueSpy).toHaveBeenCalledWith(nonExistingQueue, {
durable: true,
});

await app.init();
await app.close();
});
});
});

Expand Down
48 changes: 16 additions & 32 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,36 +296,20 @@ export class AmqpConnection {
queues: RabbitMQQueueConfig[]
) {
for (const configuredQueue of queues) {
const { createQueueIfNotExists = true } = configuredQueue;

if (createQueueIfNotExists) {
this.setupQueue(configuredQueue, channel);
}
await channel.assertQueue(configuredQueue.name, configuredQueue.options);

let routingKeys: string[] = [];
if (Array.isArray(configuredQueue.routingKey)) {
routingKeys = configuredQueue.routingKey;
} else {
if (configuredQueue.routingKey != null) {
routingKeys = [configuredQueue.routingKey];
}
}

if (routingKeys.length > 0) {
await Promise.all(
routingKeys.map((routingKey) => {
if (configuredQueue.exchange != undefined) {
channel.bindQueue(
configuredQueue.name,
configuredQueue.exchange,
routingKey,
configuredQueue.bindQueueArguments
);
}
})
);
}
const { name, options, bindQueueArguments, ...rest } = configuredQueue;
const queueOptions = {
...options,
...(bindQueueArguments !== undefined && { bindQueueArguments }),
};

await this.setupQueue(
{
...rest,
...(name !== undefined && { queue: name }),
queueOptions,
},
channel
);
}
}

Expand Down Expand Up @@ -693,8 +677,8 @@ export class AmqpConnection {
}

let bindQueueArguments: any;
if (subscriptionOptions.queueOptions) {
bindQueueArguments = subscriptionOptions.queueOptions.bindQueueArguments;
if (queueOptions) {
bindQueueArguments = queueOptions.bindQueueArguments;
}

const routingKeys = Array.isArray(routingKey) ? routingKey : [routingKey];
Expand Down

0 comments on commit 7081feb

Please sign in to comment.