Skip to content

Commit

Permalink
responding with event stream
Browse files Browse the repository at this point in the history
  • Loading branch information
andorsk committed Sep 28, 2023
1 parent e6ee17a commit 498531d
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions packages/api/src/dwn-api.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { DwnInterfaceName, DwnMethodName, EventStreamI } from '@tbd54566975/dwn-sdk-js';
import { DwnInterfaceName, DwnMethodName, EventStream, EventStreamI } from '@tbd54566975/dwn-sdk-js';
import type { DwnResponse, Web5Agent } from '@web5/agent';
import type {
EventMessage,
Expand Down Expand Up @@ -129,9 +129,9 @@ export class DwnApi {
/**
* Creates a subscription. Note: the appropriate Permissions over SubscriptionRequestPermission
* MUST be set beforehand for authorization to work.
* @param {string} target - The target for the subscription.
* @param {string} target - The DID for the subscription.
* @param {SubscriptionRequestMessage} request - The subscription request message.
* @param {(e: EventMessage) => Promise<void>} callback - The callback function to handle events.
* @param {(e: EventMessage) => Promise<void>} callback - The callback function to handle events. Optional and possibe to chain later.
* @returns {Promise<SubscriptionRequestReply>} A promise containing the subscription request reply.
*
* Example:
Expand All @@ -149,7 +149,8 @@ export class DwnApi {
* Callback will run over the returned event type.
* Alternatively, you may request the actual pipe
*/
create: async (target, request, callback) : Promise<Subscription> => {
create: async (target: string, request: SubscriptionRequestMessage, callback?: (e: EventMessage) => Promise<void>) : Promise<SubscriptionRequestReply> => {
let reply: SubscriptionRequestReply
if (this.connectedDid === target) {
// Form a request object
const agentResponse = await this.agent.processDwnRequest({
Expand All @@ -166,31 +167,36 @@ export class DwnApi {
const metadata = { author: this.connectedDid, messageCid };
// response.subscription = new Subscription(this.agent, message as SubscriptionRequestMessage, metadata);
}
response.subscription?.emitter.on((event) => {
callback(event);
});
if (callback) {
reply.subscription.emitter.on(callback);
}
return response;
} else {
// Create an event stream.

const eventStream = new EventStream();
reply.subscription.emitter = eventStream;
// Step 1: Get address via DID document (To be fixed: resolve DID document)
const addr = "127.0.0.1:9002";

// Step 2: Create WebSocket
const socket = new WebSocket(addr);

// Setup socket
socket.onmessage = (data) => {
// Parse message
const event = JSON.parse(data) as EventMessage;
// Run callback
callback(event);
};

socket.onopen = () => {
// Step 3: Send RPC request to endpoint
const request = JSON.stringify(dwnRequest)
socket.send(request);
};

socket.onmessage = (event: any) => {
// parse EventMessage and push to event stream
eventStream.add(event)
if (callback) {
callback(event);
}
}

return reply;
}
}
};
Expand Down

0 comments on commit 498531d

Please sign in to comment.