Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding subscription handling to dwn server. #68

Closed
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ export const config = {

// log level - trace/debug/info/warn/error
logLevel: process.env.DWN_SERVER_LOG_LEVEL || 'INFO',

subscriptionsEnabled:
{ on: true, off: false }[process.env.SUBSCRIPTIONS] ?? true,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please change the environment variable to DWN_SUBSCRIPTIONS. I realize not all environment variables currently follow that convention, but I would like to move in that direction.

// where to store persistant data
};
55 changes: 41 additions & 14 deletions src/json-rpc-handlers/dwn/process-message.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
import { v4 as uuidv4 } from 'uuid';
import type { Readable as IsomorphicReadable } from 'readable-stream';
import type { RecordsReadReply } from '@tbd54566975/dwn-sdk-js';
import type {
RecordsReadReply,
SubscriptionRequestReply,

Check failure on line 5 in src/json-rpc-handlers/dwn/process-message.ts

View workflow job for this annotation

GitHub Actions / test

Module '"@tbd54566975/dwn-sdk-js"' has no exported member 'SubscriptionRequestReply'.
} from '@tbd54566975/dwn-sdk-js';
import {
DwnInterfaceName,
DwnMethodName,
SubscriptionRequest,

Check failure on line 10 in src/json-rpc-handlers/dwn/process-message.ts

View workflow job for this annotation

GitHub Actions / test

Module '"@tbd54566975/dwn-sdk-js"' has no exported member 'SubscriptionRequest'.
} from '@tbd54566975/dwn-sdk-js';
import type {
HandlerResponse,
JsonRpcHandler,
} from '../../lib/json-rpc-router.js';

import { v4 as uuidv4 } from 'uuid';
import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js';

import {
JsonRpcErrorCodes,
createJsonRpcErrorResponse,
createJsonRpcSuccessResponse,
JsonRpcErrorCodes,
} from '../../lib/json-rpc.js';

export const handleDwnProcessMessage: JsonRpcHandler = async (
Expand All @@ -21,21 +26,45 @@
const { dwn, dataStream } = context;
const { target, message } = dwnRequest.params;
const requestId = dwnRequest.id ?? uuidv4();

try {
let reply;
let reply: any;

const messageType =
message?.descriptor?.interface + message?.descriptor?.method;

// When a record is deleted via `RecordsDelete`, the initial RecordsWrite is kept as a tombstone _in addition_
// to the RecordsDelete message. the data associated to that initial RecordsWrite is deleted. If a record was written
// _and_ deleted before it ever got to dwn-server, we end up in a situation where we still need to process the tombstone
// so that we can process the RecordsDelete.
if (
messageType === DwnInterfaceName.Records + DwnMethodName.Write &&
!dataStream
) {
console.log('sending');
reply = await dwn.synchronizePrunedInitialRecordsWrite(target, message);
} else if (
messageType ===
DwnInterfaceName.Subscriptions + DwnMethodName.Request

Check failure on line 43 in src/json-rpc-handlers/dwn/process-message.ts

View workflow job for this annotation

GitHub Actions / test

Property 'Subscriptions' does not exist on type 'typeof DwnInterfaceName'.
Copy link
Contributor Author

@andorsk andorsk Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to handle subscription requests differently than a normal request, because we also need to setup and manage the socket connection, unlike most messages which terminate after the data is sent.

) {
reply = (await dwn.processMessage(
target,
message,
)) as SubscriptionRequestReply;
if (!context.subscriptionManager || !context.socket) {
throw new Error(
'setup failure. improper context provided for subscription',
);
}

// FIXME: How to handle subscription requests?
const request = await SubscriptionRequest.create({});
const req = {
socket: context.socket,
from: message.descriptor.author,
request: request,
};
reply = await context.subscriptionManager.subscribe(req);
const jsonRpcResponse = createJsonRpcSuccessResponse(requestId, {
reply,
});
const responsePayload: HandlerResponse = { jsonRpcResponse };
return responsePayload;
} else {
reply = (await dwn.processMessage(
target,
Expand All @@ -44,7 +73,6 @@
)) as RecordsReadReply;
}

// RecordsRead messages return record data as a stream to for accommodate large amounts of data
let recordDataStream;
if (reply?.record?.data !== undefined) {
recordDataStream = reply.record.data;
Expand All @@ -64,7 +92,6 @@
JsonRpcErrorCodes.InternalError,
e.message,
);

return { jsonRpcResponse } as HandlerResponse;
}
};
7 changes: 6 additions & 1 deletion src/lib/json-rpc-router.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js';

import type { Dwn } from '@tbd54566975/dwn-sdk-js';
import type { Readable } from 'node:stream';
import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js';
import type { SubscriptionController } from '../subscription-manager.js';
import type { WebSocket } from 'ws';

export type RequestContext = {
dwn: Dwn;
transport: 'http' | 'ws';
dataStream?: Readable;
socket?: WebSocket;
subscriptionManager?: SubscriptionController;
};

export type HandlerResponse = {
Expand Down
175 changes: 175 additions & 0 deletions src/subscription-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import type { Dwn, SubscriptionFilter } from '@tbd54566975/dwn-sdk-js';

Check failure on line 1 in src/subscription-manager.ts

View workflow job for this annotation

GitHub Actions / test

Module '"@tbd54566975/dwn-sdk-js"' has no exported member 'SubscriptionFilter'.
import type { EventMessage, PermissionsGrant } from '@tbd54566975/dwn-sdk-js';

Check failure on line 2 in src/subscription-manager.ts

View workflow job for this annotation

GitHub Actions / test

'"@tbd54566975/dwn-sdk-js"' has no exported member named 'EventMessage'. Did you mean 'EventsGetMessage'?

import type { JsonRpcSuccessResponse } from './lib/json-rpc.js';
import { SubscriptionRequest } from '@tbd54566975/dwn-sdk-js';

Check failure on line 5 in src/subscription-manager.ts

View workflow job for this annotation

GitHub Actions / test

Module '"@tbd54566975/dwn-sdk-js"' has no exported member 'SubscriptionRequest'.
import type { SubscriptionRequestReply } from '@tbd54566975/dwn-sdk-js';

Check failure on line 6 in src/subscription-manager.ts

View workflow job for this annotation

GitHub Actions / test

Module '"@tbd54566975/dwn-sdk-js"' has no exported member 'SubscriptionRequestReply'.
import type WebSocket from 'ws';
import { WebSocketServer } from 'ws';
import { v4 as uuidv4 } from 'uuid';

export class Subscription {
from?: string;
subscriptionId: string;
createdAt: string;
description: string;
filters?: SubscriptionFilter[];
permissionGrant: PermissionsGrant;
connection: WebSocket;
}

export interface SubscriptionController {
clear(): Promise<void>;
close(): Promise<void>;
start(): Promise<void>;
subscribe(
request: RegisterSubscriptionRequest,
): Promise<RegisterSubscriptionReply>;
}

export type RegisterSubscriptionRequest = {
from: string; // from connection
socket: WebSocket; // socket connection
filters?: SubscriptionFilter[]; // filters, if applicable
permissionGrant?: PermissionsGrant; //permission grant, if applicable
request: SubscriptionRequest; // subscription request
};

export type RegisterSubscriptionReply = {
reply?: SubscriptionRequestReply;
subscriptionId?: string;
};

export type defaultSubscriptionChannel = 'event';

export type SubscriptionManagerOptions = {
wss?: WebSocketServer;
dwn: Dwn;
tenant: string;
};

export class SubscriptionManager {
private wss: WebSocketServer;
private dwn: Dwn;
private connections: Map<string, Subscription>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep track of all the subscriptions here. IMPORTANT is to close the subscription when it's finished.

options: SubscriptionManagerOptions;
#open: boolean;

constructor(options?: SubscriptionManagerOptions) {
this.wss = options?.wss || new WebSocketServer();
this.connections = new Map();
this.dwn = options?.dwn;
this.options = options;

this.wss.on('connection', (socket: WebSocket) => {
socket.on('message', async (data) => {
await this.handleSubscribe(socket, data);
});
});
}

async clear(): Promise<void> {
this.wss.removeAllListeners();
this.connections.clear();
}

async close(): Promise<void> {
this.#open = false;
this.connections.clear();
this.wss.close();
}

async open(): Promise<void> {
this.#open = true;
}

async start(): Promise<void> {
this.open();
}

private async createSubscription(
from: string,
request: RegisterSubscriptionRequest,
): Promise<Subscription> {
return {
from,
subscriptionId: uuidv4(),
createdAt: new Date().toISOString(),
description: 'subscription',
filters: request.filters,
permissionGrant: request.permissionGrant,
connection: request.socket,
};
}

async handleSubscribe(
socket: WebSocket,
data: any,
): Promise<RegisterSubscriptionReply> {
// parse message
const req = await SubscriptionRequest.parse(data);

return await this.subscribe({
request: req,
socket: socket,
from: req.author,
});
}

createJSONRPCEvent(e: EventMessage): JsonRpcSuccessResponse {
return {
id: uuidv4(),
jsonrpc: '2.0',
result: e,
};
}

async subscribe(
req: RegisterSubscriptionRequest,
): Promise<RegisterSubscriptionReply> {
const subscriptionReply = await this.dwn.handleSubscriptionRequest(

Check failure on line 130 in src/subscription-manager.ts

View workflow job for this annotation

GitHub Actions / test

Property 'handleSubscriptionRequest' does not exist on type 'Dwn'.
req.from,
req.request.message,
);
if (subscriptionReply.status.code !== 200) {
return { reply: subscriptionReply };
}
const subscription = await this.createSubscription(req.from, req);
this.registerSubscription(subscription);
// set up forwarding.
// console.log('---------', subscriptionReply.subscription.emitter);
subscriptionReply.subscription.emitter.on(
async (e: EventMessage): Promise<void> => {
// console.log('got a record', e);
const jsonRpcResponse = this.createJSONRPCEvent(e);
const str = JSON.stringify(jsonRpcResponse);
return req.socket.send(Buffer.from(str));
},
);
return {
reply: subscriptionReply,
subscriptionId: subscription?.subscriptionId,
} as RegisterSubscriptionReply;
}

private async registerSubscription(
subscription: Subscription,
): Promise<void> {
if (!this.#open) {
throw new Error("Can't register subscription. It's not opened.");
}
if (this.connections.has(subscription.subscriptionId)) {
throw new Error(
'Failed to add connection to controller. ID already exists.',
);
}
this.connections.set(subscription.subscriptionId, subscription);
subscription.connection.on('close', () => {
this.deleteSubscription(subscription.subscriptionId);
});
}

private async deleteSubscription(id: string): Promise<void> {
this.connections.delete(id);
}
}
21 changes: 15 additions & 6 deletions src/ws-api.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { base64url } from 'multiformats/bases/base64';
import { v4 as uuidv4 } from 'uuid';
import { DataStream, type Dwn } from '@tbd54566975/dwn-sdk-js';
import type { IncomingMessage, Server } from 'http';
import { type IncomingMessage, type Server } from 'http';
import { type AddressInfo, type WebSocket, WebSocketServer } from 'ws';

import { jsonRpcApi } from './json-rpc-api.js';
Expand All @@ -12,17 +12,27 @@ import {
JsonRpcErrorCodes,
type JsonRpcResponse,
} from './lib/json-rpc.js';
import {
SubscriptionManager,
type SubscriptionController,
} from './subscription-manager.js';

const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive');
const HEARTBEAT_INTERVAL = 30_000;

export class WsApi {
#wsServer: WebSocketServer;
dwn: Dwn;
#subscriptionManager: SubscriptionController;

constructor(server: Server, dwn: Dwn) {
this.dwn = dwn;
this.#wsServer = new WebSocketServer({ server });
this.#subscriptionManager = new SubscriptionManager({
dwn: dwn,
tenant: 'asdf',
wss: this.#wsServer,
});
}

// TODO: github.com/TBD54566975/dwn-server/issues/49 Add code coverage tracker, similar to either dwn-sdk-js or to web5-js
Expand All @@ -40,6 +50,7 @@ export class WsApi {
*/
#handleConnection(socket: WebSocket, _request: IncomingMessage): void {
const dwn = this.dwn;
const subscriptionManager = this.#subscriptionManager;

socket[SOCKET_ISALIVE_SYMBOL] = true;

Expand All @@ -63,7 +74,6 @@ export class WsApi {

socket.on('message', async function (dataBuffer) {
let dwnRequest;

try {
// deserialize bytes into JSON object
dwnRequest = dataBuffer.toString();
Expand All @@ -77,15 +87,13 @@ export class WsApi {
const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse);
return socket.send(responseBuffer);
}

dwnRequest = JSON.parse(dwnRequest);
} catch (e) {
const jsonRpcResponse = createJsonRpcErrorResponse(
uuidv4(),
JsonRpcErrorCodes.BadRequest,
e.message,
);

const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse);
return socket.send(responseBuffer);
}
Expand All @@ -100,7 +108,10 @@ export class WsApi {
dwn,
transport: 'ws',
dataStream: requestDataStream,
subscriptionManager: subscriptionManager,
socket: socket,
};

const { jsonRpcResponse } = await jsonRpcApi.handle(
dwnRequest,
requestContext,
Expand Down Expand Up @@ -140,9 +151,7 @@ export class WsApi {

#setupWebSocket(): void {
this.#wsServer.on('connection', this.#handleConnection.bind(this));

const heartbeatInterval = this.#setupHeartbeat();

this.#wsServer.on('close', function close() {
clearInterval(heartbeatInterval);
});
Expand Down
Loading
Loading