Skip to content

Commit

Permalink
feat: Use an explicitly closeable async iterator (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
taion authored Aug 13, 2018
1 parent 09ad39c commit 4408ddf
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 116 deletions.
84 changes: 51 additions & 33 deletions src/AsyncUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,29 @@ export async function* filter<T>(
}

export type AsyncQueueOptions = {
setup?: () => Promise<void> | void,
teardown?: () => void,
setup?: () => void | Promise<void>,
teardown?: () => void | Promise<void>,
};

export class AsyncQueue {
options: AsyncQueueOptions;

values: any[];
closed: boolean = false;
promise: Promise<void>;
options: AsyncQueueOptions;
resolvePromise: () => void;
iterable: Promise<AsyncGenerator<any, void, void>>;

closed: boolean;
iterator: Promise<AsyncIterator<any>>;
setupPromise: void | Promise<void>;

constructor(options?: AsyncQueueOptions = {}) {
this.values = [];
this.options = options;
this.createPromise();

this.iterable = this.createIterable();
}
this.values = [];
this.createPromise();

close(): void | Promise<void> {
if (this.closed) return;
this.closed = true;
if (this.options.teardown) this.options.teardown();
this.closed = false;
this.iterator = this.createIterator();
}

createPromise() {
Expand All @@ -52,37 +51,56 @@ export class AsyncQueue {
});
}

async *createIterableRaw(): AsyncGenerator<any, void, void> {
try {
if (this.options.setup) await this.options.setup();
yield null;
async createIterator(): Promise<AsyncIterator<any>> {
const iterator = this.createIteratorRaw();

while (true) {
await this.promise;
// Wait for setup.
await iterator.next();
return iterator;
}

for (const value of this.values) {
yield value;
}
async *createIteratorRaw(): AsyncIterator<any> {
if (this.options.setup) {
this.setupPromise = this.options.setup();
}

this.values.length = 0;
this.createPromise();
}
} finally {
await this.close();
if (this.setupPromise) {
await this.setupPromise;
}
}

async createIterable(): Promise<AsyncGenerator<any, void, void>> {
const iterableRaw = this.createIterableRaw();
yield null;

// wait for the first synthetic yield after setup
await iterableRaw.next();
while (true) {
await this.promise;

for (const value of this.values) {
if (this.closed) {
return;
}

return iterableRaw;
yield value;
}

this.values.length = 0;
this.createPromise();
}
}

push(value: any) {
this.values.push(value);
this.resolvePromise();
}

close = async (): Promise<void> => {
if (this.setupPromise) {
await this.setupPromise;
}

if (this.options.teardown) {
await this.options.teardown();
}

this.closed = true;
this.push(null);
};
}
57 changes: 28 additions & 29 deletions src/AuthorizedSocketConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@ import {
specifiedRules,
validate,
} from 'graphql';
import type {
ExecutionResult,
GraphQLSchema,
ValidationContext,
} from 'graphql';
import type { GraphQLSchema, ValidationContext } from 'graphql';
import type IoServer from 'socket.io';

import * as AsyncUtils from './AsyncUtils';
import type { CredentialsManager } from './CredentialsManager';
import type { Logger, CreateLogger } from './Logger';
import type { Subscriber } from './Subscriber';
import SubscriptionContext from './SubscriptionContext';

export type CreateValidationRules = ({
variables: Object,
Expand All @@ -31,10 +28,6 @@ type Subscription = {
variables: Object,
};

type MaybeSubscription = Promise<
AsyncGenerator<ExecutionResult, void, void> | ExecutionResult,
>;

type AuthorizedSocketOptions<TContext, TCredentials> = {|
schema: GraphQLSchema,
subscriber: Subscriber,
Expand Down Expand Up @@ -63,7 +56,7 @@ export default class AuthorizedSocketConnection<TContext, TCredentials> {
config: AuthorizedSocketOptions<TContext, TCredentials>;

log: Logger;
subscriptions: Map<string, MaybeSubscription>;
subscriptionContexts: Map<string, SubscriptionContext>;

constructor(
socket: IoServer.socket,
Expand All @@ -73,7 +66,7 @@ export default class AuthorizedSocketConnection<TContext, TCredentials> {
this.config = config;

this.log = config.createLogger('@4c/SubscriptionServer::AuthorizedSocket');
this.subscriptions = new Map();
this.subscriptionContexts = new Map();

this.socket
.on('authenticate', this.handleAuthenticate)
Expand Down Expand Up @@ -127,7 +120,8 @@ export default class AuthorizedSocketConnection<TContext, TCredentials> {
try {
if (
this.config.maxSubscriptionsPerConnection != null &&
this.subscriptions.size >= this.config.maxSubscriptionsPerConnection
this.subscriptionContexts.size >=
this.config.maxSubscriptionsPerConnection
) {
this.log('debug', 'subscription limit reached', {
maxSubscriptionsPerConnection: this.config
Expand All @@ -139,7 +133,7 @@ export default class AuthorizedSocketConnection<TContext, TCredentials> {
return;
}

if (this.subscriptions.has(id)) {
if (this.subscriptionContexts.has(id)) {
this.log('debug', 'duplicate subscription attempted', { id });

this.emitError({
Expand Down Expand Up @@ -171,35 +165,39 @@ export default class AuthorizedSocketConnection<TContext, TCredentials> {
return;
}

const subscriptionContext = new SubscriptionContext(
this.config.subscriber,
);

const sourcePromise = createSourceEventStream(
this.config.schema,
document,
null,
{
subscribe: async (...args) =>
AsyncUtils.filter(
await this.config.subscriber.subscribe(...args),
await subscriptionContext.subscribe(...args),
this.isAuthorized,
),
},
variables,
);

this.subscriptions.set(id, sourcePromise);
this.subscriptionContexts.set(id, subscriptionContext);

try {
resultOrStream = await sourcePromise;
} catch (err) {
if (err instanceof GraphQLError) {
resultOrStream = { errors: [err] };
} else {
this.subscriptions.delete(id);
this.subscriptionContexts.delete(id);
throw err;
}
}

if (resultOrStream.errors != null) {
this.subscriptions.delete(id);
this.subscriptionContexts.delete(id);
this.emitError({
code: 'subscribe_failed.gql_error',
// $FlowFixMe
Expand Down Expand Up @@ -241,24 +239,25 @@ export default class AuthorizedSocketConnection<TContext, TCredentials> {
};

handleUnsubscribe = async (id: string) => {
const subscription = await this.subscriptions.get(id);
if (subscription && typeof subscription.return === 'function') {
subscription.return();
const subscriptionContext = this.subscriptionContexts.get(id);
if (!subscriptionContext) {
return;
}

this.log('debug', 'client unsubscribed', { id });
this.subscriptions.delete(id);

await subscriptionContext.close();
this.subscriptionContexts.delete(id);
};

handleDisconnect = async () => {
this.log('debug', 'client disconnected');
await this.config.credentialsManager.unauthenticate();

this.subscriptions.forEach(async subscriptionPromise => {
const subscription = await subscriptionPromise;

if (subscription && typeof subscription.return === 'function') {
subscription.return();
}
});
await Promise.all([
this.config.credentialsManager.unauthenticate(),
...Array.from(this.subscriptionContexts.values(), subscriptionContext =>
subscriptionContext.close(),
),
]);
};
}
4 changes: 2 additions & 2 deletions src/CredentialsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

export interface CredentialsManager<TCredentials> {
getCredentials(): ?TCredentials;
authenticate(authorization: string): Promise<mixed>;
unauthenticate(): Promise<mixed>; // allow for redis etc down the line
authenticate(authorization: string): void | Promise<void>;
unauthenticate(): void | Promise<void>; // allow for redis etc down the line
}
5 changes: 2 additions & 3 deletions src/EventSubscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,10 @@ export default class EventSubscriber implements Subscriber {
});

eventQueues.add(queue);

return queue.iterable;
return queue;
}

async close() {
close() {
this._listeners.forEach((fn, event) => {
this.emitter.removeListener(event, fn);
});
Expand Down
2 changes: 1 addition & 1 deletion src/JwtCredentialsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export default class JwtCredentialsManager<TCredentials: JwtCredentials>
await this.updateCredentials();
}

async unauthenticate() {
unauthenticate() {
if (this.renewHandle) {
clearTimeout(this.renewHandle);
}
Expand Down
39 changes: 24 additions & 15 deletions src/RedisSubscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type RedisConfigOptions = redis.ClientOpts & {

export default class RedisSubscriber implements Subscriber {
redis: redis.RedisClient;
_parseMessage: ?(data: string) => any;
_parseMessage: ?(string) => any;
_queues: Map<Channel, Set<AsyncQueue>>;
_channels: Set<string>;

Expand All @@ -26,33 +26,33 @@ export default class RedisSubscriber implements Subscriber {

this.redis.on('message', (channel, message) => {
const queues = this._queues.get(channel);
if (!queues) return;
if (!queues) {
return;
}

queues.forEach(queue => {
queue.push(message);
});
});
}

_redisSubscribe(channel: string) {
return promisify(cb => this.redis.subscribe(channel, cb))();
}

async _subscribeToChannel(channel: string) {
if (this._channels.has(channel)) return;
if (this._channels.has(channel)) {
return;
}

this._channels.add(channel);
await this._redisSubscribe(channel);
await promisify(cb => this.redis.subscribe(channel, cb))();
}

async subscribe(
subscribe(
channel: Channel,
parseMessage: ?(data: string) => any = this._parseMessage,
parseMessage: ?(string) => any = this._parseMessage,
) {
let channelQueues = this._queues.get(channel);

if (!channelQueues) {
channelQueues = new Set();
this._queues.set(channel, channelQueues);
await this._redisSubscribe(channel);
}

const queue = new AsyncQueue({
Expand All @@ -73,10 +73,19 @@ export default class RedisSubscriber implements Subscriber {

channelQueues.add(queue);

const iterable = await queue.iterable;
if (!parseMessage) return iterable;
let iteratorPromise = queue.iterator;
if (parseMessage) {
// Workaround for Flow.
const parseMessageFn: string => any = parseMessage;
iteratorPromise = iteratorPromise.then(iterator =>
map(iterator, parseMessageFn),
);
}

return map(iterable, parseMessage);
return {
iterator: iteratorPromise,
close: queue.close,
};
}

async close() {
Expand Down
4 changes: 3 additions & 1 deletion src/Subscriber.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/** @flow */

export interface Subscriber {
subscribe(...any[]): Promise<AsyncGenerator<any, void, void>>;
subscribe(
...any[]
): { iterator: Promise<AsyncIterator<any>>, close: () => Promise<void> };
close(): void | Promise<void>;
}
Loading

0 comments on commit 4408ddf

Please sign in to comment.