Skip to content

Commit

Permalink
Merge pull request #626 from seb-oss/fix/pubsub-subscriber-topic-not-set
Browse files Browse the repository at this point in the history
fix: pubsub subscriber topic not set
  • Loading branch information
alexanderczigler authored Nov 7, 2024
2 parents 1263c46 + 7fc19cb commit 2ec5206
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 23 deletions.
5 changes: 5 additions & 0 deletions .changeset/quiet-papayas-greet.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@sebspark/pubsub": patch
---

Fixed bug that broke topic.initiate, leading to subscription creation failing.
67 changes: 50 additions & 17 deletions packages/pubsub/src/lib/subscriber.spec.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import { beforeEach } from 'node:test'
import { PubSub, type Subscription, type Topic } from '@google-cloud/pubsub'
import type { Schema } from 'avsc'
import { type MockedObject, afterAll, describe, expect, it, vi } from 'vitest'
import { type MockedObject, beforeAll, describe, expect, it, vi } from 'vitest'
import { createSubscriber } from './subscriber'

type ExampleMessage = {
messageType: string
message: string
}

const message = {
messageType: 'type of message',
message: 'message data',
} satisfies ExampleMessage

type ExamplePubsubChannels = {
example: ExampleMessage
}
Expand Down Expand Up @@ -77,25 +73,62 @@ describe('subscriber', () => {
const topicName = 'example'
const subscriptionName = 'example-subscription'

it('uses an existing subscription if it exists', async () => {
const topicMock = new PubSub().topic(topicName) as MockedObject<Topic>
const subscriptionMock = topicMock.subscription(
let topicMock: MockedObject<Topic>
let subscriptionMock: MockedObject<Subscription>

beforeAll(() => {
topicMock = new PubSub().topic(topicName) as MockedObject<Topic>
subscriptionMock = topicMock.subscription(
subscriptionName
) as MockedObject<Subscription>
})

beforeEach(() => {})

subscriptionMock.exists.mockImplementation(() => [true])
describe('subscribe', () => {
it('uses an existing subscription if it exists', async () => {
const subscriber = createSubscriber<ExamplePubsubChannels>({
projectId: 'test',
})

const subscriber = createSubscriber<ExamplePubsubChannels>({
projectId: 'test',
topicMock.createSubscription.mockClear()

await subscriber.topic('example').subscribe('existing-subscription', {
onMessage: () => Promise.resolve(),
})

expect(topicMock.subscription).toHaveBeenCalled()
expect(topicMock.createSubscription.mock.calls.length).toBe(0)
})
})

describe('initiate', () => {
it('does not create a subscription if it exists', async () => {
subscriptionMock.exists.mockImplementationOnce(() => [true])

topicMock.createSubscription.mockClear()
const subscriber = createSubscriber<ExamplePubsubChannels>({
projectId: 'test',
})

await subscriber.topic('example').subscribe('existing-subscription', {
onMessage: () => Promise.resolve(),
topicMock.createSubscription.mockClear()

await subscriber.topic('example').initiate('existing-subscription')

expect(topicMock.createSubscription.mock.calls.length).toBe(0)
})

expect(topicMock.subscription).toHaveBeenCalled()
expect(topicMock.createSubscription.mock.calls.length).toBe(0)
it('creates a subscription if it does not exist', async () => {
subscriptionMock.exists.mockImplementationOnce(() => [false])

const subscriber = createSubscriber<ExamplePubsubChannels>({
projectId: 'test',
})

topicMock.createSubscription.mockClear()

await subscriber.topic('example').initiate('example-subscription')

expect(topicMock.createSubscription).toHaveBeenCalled()
})
})
})
9 changes: 3 additions & 6 deletions packages/pubsub/src/lib/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
type Topic,
} from '@google-cloud/pubsub'

const makeSureSubacriptionExists = async (
const makeSureSubscriptionExists = async (
topic: Topic,
name: string,
options?: PubSubOptions
Expand Down Expand Up @@ -68,16 +68,13 @@ export const createSubscriber = <T extends Record<string, unknown>>(

const typedClient: SubscriptionClient<T> = {
topic: (name) => {
let _topic: Topic
const _topic: Topic = client.topic(name as string)

return {
initiate: async (subscriptionName, options) => {
await makeSureSubacriptionExists(_topic, subscriptionName, options)
await makeSureSubscriptionExists(_topic, subscriptionName, options)
},
subscribe: async (subscriptionName, callbacks, options) => {
if (!_topic) {
_topic = client.topic(name as string)
}
const subscription = _topic.subscription(subscriptionName)
subscription.on('message', async (msg) => {
const data = JSON.parse(msg.data.toString('utf8'))
Expand Down

0 comments on commit 2ec5206

Please sign in to comment.