Skip to content

Commit

Permalink
Merge pull request #108 from seb-neo/feat/multiple-project-sub
Browse files Browse the repository at this point in the history
feat: support initiating clients with projectId
  • Loading branch information
Alexander Czigler authored Aug 11, 2023
2 parents cd0cb83 + d6d7b90 commit ab0e2b5
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 13 deletions.
5 changes: 5 additions & 0 deletions .changeset/fluffy-fishes-remember.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@sebspark/pubsub': minor
---

Support initiating PubSub with projectId
30 changes: 27 additions & 3 deletions packages/pubsub/src/lib/client.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,43 @@
import type { Topic } from '@google-cloud/pubsub'
import { PubSub } from '@google-cloud/pubsub'

export const pubsub = new PubSub()
export type ClientConfig = {
projectId: string
}

const localProjectId = 'local'
const clients: Record<string, PubSub> = {}

const init = ({ projectId }: ClientConfig = { projectId: localProjectId }) => {
if (!clients[projectId]) {
if (projectId === localProjectId) {
// Create a default client when there is no config.
clients[projectId] = new PubSub()
} else {
clients[projectId] = new PubSub({
projectId,
})
}
}
}

export const getOrCreateTopic = async (
topicName: string,
config?: ClientConfig,
tries = 0,
): Promise<Topic> => {
// Ensure there is always a client for desired project, as specified in config.
init(config)

try {
const [t] = await pubsub.topic(topicName).get({ autoCreate: true })
const [t] = await clients[config?.projectId || localProjectId]
.topic(topicName)
.get({ autoCreate: true })
return t
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (err: any) {
if (err.code && err?.code === 6 && tries < 3) {
return getOrCreateTopic(topicName, tries + 1)
return getOrCreateTopic(topicName, config, tries + 1)
} else {
throw err
}
Expand Down
5 changes: 3 additions & 2 deletions packages/pubsub/src/lib/publisher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getOrCreateTopic } from './client'
import { ClientConfig, getOrCreateTopic } from './client'

interface Publisher<T, Headers, raw> {
(message: T, headers?: Headers, raw?: raw): Promise<string>
Expand All @@ -16,9 +16,10 @@ export const publisher =
Headers extends Record<string, unknown>,
>(
topicName: TopicName,
config?: ClientConfig,
): Publisher<Msg, Headers, boolean> =>
async (message, headers?, raw?) => {
const topic = await getOrCreateTopic(topicName.toString())
const topic = await getOrCreateTopic(topicName.toString(), config)
const msg: PubsubMessage<Msg, Headers> = {
message,
headers,
Expand Down
53 changes: 51 additions & 2 deletions packages/pubsub/src/lib/pubsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ const setup = () => {
off: vi.fn(),
}

const mockConfig = {
projectId: 'test-project-id',
}

mockTopic.get.mockImplementation(async () => [mockTopic])
mockTopic.publishMessage.mockImplementation(async () => 'ok')
mockTopic.createSubscription.mockImplementation(async () => [subscription])
Expand Down Expand Up @@ -94,6 +98,7 @@ const setup = () => {
mockTopic,
subscription,
pubsub,
mockConfig,
...testMessage,
}
}
Expand All @@ -110,8 +115,52 @@ afterAll(() => {
delete process.env.PUBSUB_PUSH_HOST
})

it('creates an instance of PubSub', () => {
expect(PubSub).toHaveBeenCalled()
describe('creates an instance of PubSub', () => {
describe('without configuration', () => {
it('when publishing', async () => {
const { createdPubsub, topicData, topicName } = setup()

await createdPubsub.topic(topicName).publish(topicData)

expect(PubSub).toHaveBeenCalled()
})

it('when subscribing', async () => {
const { createdPubsub, topicName } = setup()

await createdPubsub.topic(topicName).subscribe({
subscriberName: 'gateway',
onSuccess: () => undefined,
})

expect(PubSub).toHaveBeenCalled()
})
})

describe('with projectId when passed config', () => {
it('when publishing', async () => {
const { createdPubsub, mockConfig, topicData, topicName } = setup()

await createdPubsub.topic(topicName, mockConfig).publish(topicData)

expect(PubSub).toHaveBeenCalledWith({
projectId: mockConfig.projectId,
})
})

it('when subscribing', async () => {
const { createdPubsub, mockConfig, topicName } = setup()

await createdPubsub.topic(topicName, mockConfig).subscribe({
subscriberName: 'gateway',
onSuccess: () => undefined,
})

expect(PubSub).toHaveBeenCalledWith({
projectId: mockConfig.projectId,
})
})
})
})

describe('#topic', () => {
Expand Down
16 changes: 12 additions & 4 deletions packages/pubsub/src/lib/pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { ClientConfig } from './client'
import { publisher } from './publisher'
import type { Subscriber, SubscriberHandler, Unsubscriber } from './subscriber'
import { subscriber } from './subscriber'
Expand All @@ -22,17 +23,24 @@ export const createPubsub = <
>() => {
type TopicName = keyof Topics

const topic = (name: TopicName): PubSubTopic<Topics[TopicName], Topics> => {
const topic = (
name: TopicName,
config?: ClientConfig,
): PubSubTopic<Topics[TopicName], Topics> => {
return {
publish: publisher<Topics[TopicName], TopicName, Record<string, unknown>>(
name,
config,
),
subscribe: subscriber<Topics[TopicName], TopicName>(name),
subscribe: subscriber<Topics[TopicName], TopicName>(name, config),
name: name,
}
}

const subscribeToMultipleAs = (name: SubscriberName) => {
const subscribeToMultipleAs = (
name: SubscriberName,
config: ClientConfig,
) => {
const promises: Promise<Unsubscriber>[] = []
const obj = {
wait: async () => await Promise.all(promises),
Expand All @@ -41,7 +49,7 @@ export const createPubsub = <
{ onSuccess, onError }: SubscriberHandler<Topics[TopicName]>,
) => {
promises.push(
topic(topicName.toString()).subscribe({
topic(topicName.toString(), config).subscribe({
subscriberName: name,
onSuccess,
onError,
Expand Down
5 changes: 3 additions & 2 deletions packages/pubsub/src/lib/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type {
Subscription,
Topic,
} from '@google-cloud/pubsub'
import { getOrCreateTopic } from './client'
import { ClientConfig, getOrCreateTopic } from './client'

const ALREADY_EXISTS_ERROR = '6 ALREADY_EXISTS'

Expand Down Expand Up @@ -145,9 +145,10 @@ const createOrGetSubscription = async (
export const subscriber =
<Msg, TopicName extends string | number | symbol>(
topicName: TopicName,
config?: ClientConfig,
): Subscriber<Msg> =>
async ({ subscriberName, onSuccess, onError }) => {
const topic = await getOrCreateTopic(topicName.toString())
const topic = await getOrCreateTopic(topicName.toString(), config)
const subscriptionName = `${topicName.toString()}.${subscriberName}`
const subscription = await createOrGetSubscription(subscriptionName, topic)
const messageHandler = async (message: Message) => {
Expand Down

0 comments on commit ab0e2b5

Please sign in to comment.