diff --git a/lib/gateway.js b/lib/gateway.js index 9120b8c0a..d8bdf56fa 100644 --- a/lib/gateway.js +++ b/lib/gateway.js @@ -146,12 +146,21 @@ function defineResolvers (schema, typeToServiceMap, serviceMap, typeFieldsToServ } else if (typeFieldsToService[`${type}-${fieldName}`]) { const service = serviceMap[typeFieldsToService[`${type}-${fieldName}`]] if (serviceForType === null) { - field.resolve = makeResolver({ - service, - createOperation: createQueryOperation, - transformData: response => response.json.data[fieldName], - isQuery: true - }) + if (type.name === 'Subscription') { + field.subscribe = makeResolver({ + service, + createOperation: createQueryOperation, + isQuery: true, + isSubscription: true + }) + } else { + field.resolve = makeResolver({ + service, + createOperation: createQueryOperation, + transformData: response => response.json.data[fieldName], + isQuery: true + }) + } } else { field.resolve = makeResolver({ service, diff --git a/test/gateway/subscription.js b/test/gateway/subscription.js index cc2b02062..0036a999d 100644 --- a/test/gateway/subscription.js +++ b/test/gateway/subscription.js @@ -605,3 +605,165 @@ test('connection_init payload is overwritten at gateway and forwarded to the fed }) }) }) + +test('subscriptions work with scalars', async t => { + let testService + let gateway + + const schema = ` + extend type Query { + ignored: Boolean! + } + + extend type Mutation { + addTestEvent(value: Int!): Int! + } + + type TestEvent { + value: Int! + } + + extend type Subscription { + testEvent: Int! + }` + + const resolvers = { + Query: { + ignored: () => true + }, + Mutation: { + addTestEvent: async (_, { value }, { pubsub }) => { + await pubsub.publish({ + topic: 'testEvent', + payload: { testEvent: value } + }) + + return value + } + }, + Subscription: { + testEvent: { + subscribe: async (_, __, { pubsub }) => { + return await pubsub.subscribe('testEvent') + } + } + } + } + + function createTestService () { + testService = Fastify() + testService.register(GQL, { + schema, + resolvers, + federationMetadata: true, + subscription: true + }) + + return testService.listen(0) + } + + function createGateway () { + const testServicePort = testService.server.address().port + + gateway = Fastify() + gateway.register(GQL, { + subscription: true, + gateway: { + services: [{ + name: 'testService', + url: `http://localhost:${testServicePort}/graphql`, + wsUrl: `ws://localhost:${testServicePort}/graphql` + }] + } + }) + + return gateway.listen(0) + } + + function runSubscription () { + const ws = new WebSocket(`ws://localhost:${(gateway.server.address()).port}/graphql`, 'graphql-ws') + const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8', objectMode: true }) + t.teardown(async () => { + client.destroy() + await gateway.close() + await testService.close() + }) + client.setEncoding('utf8') + + client.write(JSON.stringify({ + type: 'connection_init' + })) + + client.write(JSON.stringify({ + id: 1, + type: 'start', + payload: { + query: ` + subscription { + testEvent + } + ` + } + })) + + client.write(JSON.stringify({ + id: 2, + type: 'start', + payload: { + query: ` + subscription { + testEvent + } + ` + } + })) + + client.write(JSON.stringify({ + id: 2, + type: 'stop' + })) + + let end + + const endPromise = new Promise(resolve => { + end = resolve + }) + + client.on('data', (chunk) => { + const data = JSON.parse(chunk) + + if (data.id === 1 && data.type === 'data') { + t.equal(chunk, JSON.stringify({ + type: 'data', + id: 1, + payload: { + data: { + testEvent: 1 + } + } + })) + + client.end() + end() + } else if (data.id === 2 && data.type === 'complete') { + gateway.inject({ + method: 'POST', + url: '/graphql', + body: { + query: ` + mutation { + addTestEvent(value: 1) + } + ` + } + }) + } + }) + + return endPromise + } + + await createTestService() + await createGateway() + await runSubscription() +})