Skip to content

Commit

Permalink
Implement callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
ardatan committed Aug 25, 2022
1 parent 99a2b50 commit ed9a3ac
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 155 deletions.
8 changes: 2 additions & 6 deletions packages/loaders/json-schema/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,8 @@ export type JSONSchemaBaseOperationConfig = {
description?: string;

argTypeMap?: Record<string, string | JSONSchemaObject>;
} & (
| {
responseByStatusCode?: Record<string, JSONSchemaOperationResponseConfig>;
}
| JSONSchemaOperationResponseConfig
);
responseByStatusCode?: Record<string, JSONSchemaOperationResponseConfig>;
} & JSONSchemaOperationResponseConfig;

export type JSONSchemaBaseOperationConfigWithJSONRequest = JSONSchemaBaseOperationConfig & {
requestSchema?: string | JSONSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,39 @@ export async function getJSONSchemaOptionsFromOpenAPIOptions(
oasOrSwagger = (await dereferenceObject(oasOrSwagger)) as any;
const operations: JSONSchemaOperationConfig[] = [];

function handleCallback(callbackKey: string, callbackObj: OpenAPIV3.CallbackObject) {
for (const callbackUrlRefKey in callbackObj) {
const pubsubTopic = callbackUrlRefKey.split('$request.query').join('args').split('$request.body#/').join('args.');
const callbackOperationConfig: JSONSchemaPubSubOperationConfig = {
type: OperationTypeNode.SUBSCRIPTION,
field: '',
pubsubTopic,
};
const callbackUrlObj = callbackObj[callbackUrlRefKey];
for (const method in callbackUrlObj) {
const callbackOperation: OpenAPIV3.OperationObject = callbackUrlObj[method];
callbackOperationConfig.field = callbackOperation.operationId;
const requestBodyContents = (callbackOperation.requestBody as OpenAPIV3.RequestBodyObject)?.content;
if (requestBodyContents) {
callbackOperationConfig.responseSchema = requestBodyContents[Object.keys(requestBodyContents)[0]]
.schema as any;
}
const responses = callbackOperation.responses;
if (responses) {
const response = responses[Object.keys(responses)[0]];
if (response) {
const responseContents = (response as OpenAPIV3.ResponseObject).content;
if (responseContents) {
callbackOperationConfig.requestSchema = responseContents[Object.keys(responseContents)[0]].schema as any;
}
}
}
}
callbackOperationConfig.field = callbackOperationConfig.field || sanitizeNameForGraphQL(callbackKey);
operations.push(callbackOperationConfig);
}
}

if (!baseUrl) {
if ('servers' in oasOrSwagger) {
baseUrl = oasOrSwagger.servers[0].url;
Expand Down Expand Up @@ -435,16 +468,7 @@ export async function getJSONSchemaOptionsFromOpenAPIOptions(

if ('callbacks' in methodObj) {
for (const callbackKey in methodObj.callbacks) {
const callbackObj = methodObj.callbacks[callbackKey];
for (const _callbackUrlRefKey in callbackObj) {
const fieldName = sanitizeNameForGraphQL(operationConfig.field + '_' + callbackKey);
const callbackOperationConfig: JSONSchemaPubSubOperationConfig = {
type: OperationTypeNode.SUBSCRIPTION,
field: fieldName,
pubsubTopic: '{args.callbackUrl}',
};
operations.push(callbackOperationConfig);
}
handleCallback(callbackKey, methodObj.callbacks[callbackKey] as OpenAPIV3.CallbackObject);
}
}

Expand Down
73 changes: 27 additions & 46 deletions packages/loaders/openapi/tests/example_api7.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,19 @@ import { graphql, execute, subscribe, GraphQLSchema } from 'graphql';

import { createServer, Server } from 'http';
import { SubscriptionServer, SubscriptionClient, ServerOptions } from 'subscriptions-transport-ws';
import { MQTTPubSub } from 'graphql-mqtt-subscriptions';
import { connect, MqttClient } from 'mqtt';
import ws from 'ws';
import { fetch } from '@whatwg-node/fetch';

import { loadGraphQLSchemaFromOpenAPI } from '../src/loadGraphQLSchemaFromOpenAPI';
import { startServers, stopServers } from './example_api7_server';
import { startServers, stopServers, pubsub } from './example_api7_server';

let createdSchema: GraphQLSchema;
const TEST_PORT = 3009;
const HTTP_PORT = 3008;
const MQTT_PORT = 1885;
// Update PORT for this test case:
const baseUrl = `http://localhost:${HTTP_PORT}/api`;

// const oas = require('./fixtures/example_oas7.json')
// oas.servers[0].variables.port.default = String(HTTP_PORT)
// oas.servers[1].variables.port.default = String(MQTT_PORT)

let wsServer: Server;
let mqttClient: MqttClient;
let subscriptionServer: SubscriptionServer;

describe('OpenAPI Loader: example_api7', () => {
Expand All @@ -38,20 +31,6 @@ describe('OpenAPI Loader: example_api7', () => {

createdSchema = schema;
try {
mqttClient = connect(`mqtt://localhost:${MQTT_PORT}`, {
keepalive: 60,
reschedulePings: true,
protocolId: 'MQTT',
protocolVersion: 4,
reconnectPeriod: 2000,
connectTimeout: 5 * 1000,
clean: true,
});

const pubsub = new MQTTPubSub({
client: mqttClient,
});

wsServer = createServer((req, res) => {
res.writeHead(404);
res.end();
Expand All @@ -64,8 +43,7 @@ describe('OpenAPI Loader: example_api7', () => {
execute,
subscribe,
schema,
onConnect: (params, socket, context) => {
// Add pubsub to subscribe context
onConnect: () => {
return { pubsub };
},
} as ServerOptions,
Expand All @@ -77,7 +55,7 @@ describe('OpenAPI Loader: example_api7', () => {
} catch (e) {
console.log('error', e);
}
await startServers(HTTP_PORT, MQTT_PORT);
await startServers(HTTP_PORT);
});

function sleep(ms: number) {
Expand All @@ -93,27 +71,28 @@ describe('OpenAPI Loader: example_api7', () => {
* The timeout allows these to close properly but is there a better way?
*/
await sleep(500);
await Promise.all([subscriptionServer.close(), wsServer.close(), mqttClient.end(), stopServers()]);
await Promise.all([subscriptionServer.close(), wsServer.close(), stopServers()]);
await sleep(500);
});

test('Receive data from the subscription after creating a new instance', () => {
const userName = 'Carlos';
const deviceName = 'Bot';

const query = `subscription watchDevice($topicInput: TopicInput!) {
devicesEventListener(topicInput: $topicInput) {
const query = `subscription watchDevice($method: String!, $userName: String!) {
devicesEventListener(method: $method, userName: $userName) {
name
userName
status
}
}`;

const query2 = `mutation($deviceInput: DeviceInput!) {
createDevice(deviceInput: $deviceInput) {
name
userName
status
const query2 = `mutation($deviceInput: Device_Input!) {
createDevice(input: $deviceInput) {
... on Device {
name
userName
status
}
}
}`;

Expand All @@ -127,10 +106,8 @@ describe('OpenAPI Loader: example_api7', () => {
query,
operationName: 'watchDevice',
variables: {
topicInput: {
method: 'POST',
userName: `${userName}`,
},
method: 'POST',
userName,
},
})
.subscribe({
Expand All @@ -142,8 +119,7 @@ describe('OpenAPI Loader: example_api7', () => {
if (result.data) {
expect(result.data).toEqual({
devicesEventListener: {
name: `${deviceName}`,
userName: `${userName}`,
name: deviceName,
status: false,
},
});
Expand All @@ -154,15 +130,20 @@ describe('OpenAPI Loader: example_api7', () => {
});

setTimeout(() => {
graphql(createdSchema, query2, null, null, {
deviceInput: {
name: `${deviceName}`,
userName: `${userName}`,
status: false,
graphql({
schema: createdSchema,
source: query2,
variableValues: {
deviceInput: {
name: `${deviceName}`,
userName: `${userName}`,
status: false,
},
},
})
.then(res => {
if (!res.data) {
console.log(res.errors?.[0]);
reject(new Error('Failed mutation'));
}
})
Expand Down
70 changes: 8 additions & 62 deletions packages/loaders/openapi/tests/example_api7_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
// This file is licensed under the MIT License.
// License text available at https://opensource.org/licenses/MIT

import { Aedes, aedes, AedesOptions, PublishPacket } from 'aedes';
import { PubSub } from '@graphql-mesh/utils';
import * as bodyParser from 'body-parser';
import express from 'express';
import { Server } from 'http';
import { createServer, Server as NetSetver } from 'net';

const app = express();

let server: Server; // holds server object for shutdown
let mqttBroker: Aedes;
let tcpServer: NetSetver;

export const pubsub = new PubSub();

const Devices = {
'Audio-player': {
Expand All @@ -29,19 +28,7 @@ const Devices = {
/**
* Starts the server at the given port
*/
export function startServers(HTTP_PORT: number, MQTT_PORT: number) {
mqttBroker = aedes({
published: (packet, client, cb) => {
if (packet.topic.startsWith('$SYS')) {
return cb();
}
console.log(`MQTT packet published on ${packet.topic} by ${client ? client.id : 'broker'}`);
cb();
},
} as AedesOptions);

tcpServer = createServer(mqttBroker.handle);

export function startServers(HTTP_PORT: number) {
app.use(bodyParser.json());

app.get('/api/user', (req, res) => {
Expand All @@ -58,15 +45,7 @@ export function startServers(HTTP_PORT: number, MQTT_PORT: number) {
if (req.body.userName && req.body.name) {
const device = req.body;
Devices[device.name] = device;
const packet: PublishPacket = {
topic: `/api/${device.userName}/devices/${req.method.toUpperCase()}/${device.name}`,
payload: Buffer.from(JSON.stringify(device)),
cmd: 'publish',
qos: 0,
dup: false,
retain: false,
};
mqttBroker.publish(packet, () => {});
pubsub.publish(`/api/${device.userName}/devices/${req.method.toUpperCase()}`, device);
res.status(200).send(device);
} else {
res.status(404).send({
Expand All @@ -91,15 +70,7 @@ export function startServers(HTTP_PORT: number, MQTT_PORT: number) {
const device = req.body;
delete Devices[req.params.deviceName];
Devices[device.deviceName] = device;
const packet: PublishPacket = {
topic: `/api/${device.userName}/devices/${req.method.toUpperCase()}/${device.name}`,
payload: Buffer.from(JSON.stringify(device)),
cmd: 'publish',
qos: 0,
dup: false,
retain: false,
};
mqttBroker.publish(packet, () => {});
pubsub.publish(`/api/${device.userName}/devices/${req.method.toUpperCase()}`, device);
res.status(200).send(device);
} else {
res.status(404).send({
Expand All @@ -113,50 +84,25 @@ export function startServers(HTTP_PORT: number, MQTT_PORT: number) {
}
});

// mqttBroker.on('client', client => {
// console.log(`MQTT client connected`, client ? client.id : client)
// })

// mqttBroker.on('subscribe', (subscriptions, client) => {
// console.log(
// `MQTT client ${
// client ? client.id : client
// } subscribed to topic(s) ${subscriptions.map(s => s.topic).join('\n')}`
// )
// })

// mqttBroker.on('unsubscribe', (subscriptions, client) => {
// console.log(
// `MQTT client ${
// client ? client.id : client
// } unsubscribed from topic(s) ${subscriptions.join('\n')}`
// )
// })

return Promise.all([
new Promise(resolve => {
server = app.listen(HTTP_PORT, resolve as () => void);
}),
new Promise(resolve => {
tcpServer = app.listen(MQTT_PORT, resolve as () => void);
}),
]).then(() => {
console.log(`Example HTTP API accessible on port ${HTTP_PORT}`);
console.log(`Example MQTT API accessible on port ${MQTT_PORT}`);
});
}

/**
* Stops server.
*/
export function stopServers() {
return Promise.all([server.close(), tcpServer.close(), mqttBroker.close()]).then(() => {
return Promise.all([server.close()]).then(() => {
console.log(`Stopped HTTP API server`);
console.log(`Stopped MQTT API server`);
});
}

// If run from command line, start server:
if (require.main === module) {
startServers(3008, 1885).catch(console.error);
startServers(3008).catch(console.error);
}
Loading

0 comments on commit ed9a3ac

Please sign in to comment.