diff --git a/.changeset/fluffy-fishes-remember.md b/.changeset/fluffy-fishes-remember.md new file mode 100644 index 00000000..02658f58 --- /dev/null +++ b/.changeset/fluffy-fishes-remember.md @@ -0,0 +1,5 @@ +--- +'@sebspark/pubsub': minor +--- + +Support initiating PubSub with projectId diff --git a/packages/pubsub/src/lib/client.ts b/packages/pubsub/src/lib/client.ts index afc9766f..a3d484f3 100644 --- a/packages/pubsub/src/lib/client.ts +++ b/packages/pubsub/src/lib/client.ts @@ -1,19 +1,43 @@ import type { Topic } from '@google-cloud/pubsub' import { PubSub } from '@google-cloud/pubsub' -export const pubsub = new PubSub() +export type ClientConfig = { + projectId: string +} + +const localProjectId = 'local' +const clients: Record = {} + +const init = ({ projectId }: ClientConfig = { projectId: localProjectId }) => { + if (!clients[projectId]) { + if (projectId === localProjectId) { + // Create a default client when there is no config. + clients[projectId] = new PubSub() + } else { + clients[projectId] = new PubSub({ + projectId, + }) + } + } +} export const getOrCreateTopic = async ( topicName: string, + config?: ClientConfig, tries = 0, ): Promise => { + // Ensure there is always a client for desired project, as specified in config. + init(config) + try { - const [t] = await pubsub.topic(topicName).get({ autoCreate: true }) + const [t] = await clients[config?.projectId || localProjectId] + .topic(topicName) + .get({ autoCreate: true }) return t // eslint-disable-next-line @typescript-eslint/no-explicit-any } catch (err: any) { if (err.code && err?.code === 6 && tries < 3) { - return getOrCreateTopic(topicName, tries + 1) + return getOrCreateTopic(topicName, config, tries + 1) } else { throw err } diff --git a/packages/pubsub/src/lib/publisher.ts b/packages/pubsub/src/lib/publisher.ts index af472204..1f8d331d 100644 --- a/packages/pubsub/src/lib/publisher.ts +++ b/packages/pubsub/src/lib/publisher.ts @@ -1,4 +1,4 @@ -import { getOrCreateTopic } from './client' +import { ClientConfig, getOrCreateTopic } from './client' interface Publisher { (message: T, headers?: Headers, raw?: raw): Promise @@ -16,9 +16,10 @@ export const publisher = Headers extends Record, >( topicName: TopicName, + config?: ClientConfig, ): Publisher => async (message, headers?, raw?) => { - const topic = await getOrCreateTopic(topicName.toString()) + const topic = await getOrCreateTopic(topicName.toString(), config) const msg: PubsubMessage = { message, headers, diff --git a/packages/pubsub/src/lib/pubsub.spec.ts b/packages/pubsub/src/lib/pubsub.spec.ts index 219636c4..02a96669 100644 --- a/packages/pubsub/src/lib/pubsub.spec.ts +++ b/packages/pubsub/src/lib/pubsub.spec.ts @@ -65,6 +65,10 @@ const setup = () => { off: vi.fn(), } + const mockConfig = { + projectId: 'test-project-id', + } + mockTopic.get.mockImplementation(async () => [mockTopic]) mockTopic.publishMessage.mockImplementation(async () => 'ok') mockTopic.createSubscription.mockImplementation(async () => [subscription]) @@ -94,6 +98,7 @@ const setup = () => { mockTopic, subscription, pubsub, + mockConfig, ...testMessage, } } @@ -110,8 +115,52 @@ afterAll(() => { delete process.env.PUBSUB_PUSH_HOST }) -it('creates an instance of PubSub', () => { - expect(PubSub).toHaveBeenCalled() +describe('creates an instance of PubSub', () => { + describe('without configuration', () => { + it('when publishing', async () => { + const { createdPubsub, topicData, topicName } = setup() + + await createdPubsub.topic(topicName).publish(topicData) + + expect(PubSub).toHaveBeenCalled() + }) + + it('when subscribing', async () => { + const { createdPubsub, topicName } = setup() + + await createdPubsub.topic(topicName).subscribe({ + subscriberName: 'gateway', + onSuccess: () => undefined, + }) + + expect(PubSub).toHaveBeenCalled() + }) + }) + + describe('with projectId when passed config', () => { + it('when publishing', async () => { + const { createdPubsub, mockConfig, topicData, topicName } = setup() + + await createdPubsub.topic(topicName, mockConfig).publish(topicData) + + expect(PubSub).toHaveBeenCalledWith({ + projectId: mockConfig.projectId, + }) + }) + + it('when subscribing', async () => { + const { createdPubsub, mockConfig, topicName } = setup() + + await createdPubsub.topic(topicName, mockConfig).subscribe({ + subscriberName: 'gateway', + onSuccess: () => undefined, + }) + + expect(PubSub).toHaveBeenCalledWith({ + projectId: mockConfig.projectId, + }) + }) + }) }) describe('#topic', () => { diff --git a/packages/pubsub/src/lib/pubsub.ts b/packages/pubsub/src/lib/pubsub.ts index 6e52a092..c59e5ea2 100644 --- a/packages/pubsub/src/lib/pubsub.ts +++ b/packages/pubsub/src/lib/pubsub.ts @@ -1,3 +1,4 @@ +import { ClientConfig } from './client' import { publisher } from './publisher' import type { Subscriber, SubscriberHandler, Unsubscriber } from './subscriber' import { subscriber } from './subscriber' @@ -22,17 +23,24 @@ export const createPubsub = < >() => { type TopicName = keyof Topics - const topic = (name: TopicName): PubSubTopic => { + const topic = ( + name: TopicName, + config?: ClientConfig, + ): PubSubTopic => { return { publish: publisher>( name, + config, ), - subscribe: subscriber(name), + subscribe: subscriber(name, config), name: name, } } - const subscribeToMultipleAs = (name: SubscriberName) => { + const subscribeToMultipleAs = ( + name: SubscriberName, + config: ClientConfig, + ) => { const promises: Promise[] = [] const obj = { wait: async () => await Promise.all(promises), @@ -41,7 +49,7 @@ export const createPubsub = < { onSuccess, onError }: SubscriberHandler, ) => { promises.push( - topic(topicName.toString()).subscribe({ + topic(topicName.toString(), config).subscribe({ subscriberName: name, onSuccess, onError, diff --git a/packages/pubsub/src/lib/subscriber.ts b/packages/pubsub/src/lib/subscriber.ts index 89fa434c..b4bd67fd 100644 --- a/packages/pubsub/src/lib/subscriber.ts +++ b/packages/pubsub/src/lib/subscriber.ts @@ -4,7 +4,7 @@ import type { Subscription, Topic, } from '@google-cloud/pubsub' -import { getOrCreateTopic } from './client' +import { ClientConfig, getOrCreateTopic } from './client' const ALREADY_EXISTS_ERROR = '6 ALREADY_EXISTS' @@ -145,9 +145,10 @@ const createOrGetSubscription = async ( export const subscriber = ( topicName: TopicName, + config?: ClientConfig, ): Subscriber => async ({ subscriberName, onSuccess, onError }) => { - const topic = await getOrCreateTopic(topicName.toString()) + const topic = await getOrCreateTopic(topicName.toString(), config) const subscriptionName = `${topicName.toString()}.${subscriberName}` const subscription = await createOrGetSubscription(subscriptionName, topic) const messageHandler = async (message: Message) => {