Skip to content

Commit

Permalink
chore: scalable logic with single listener
Browse files Browse the repository at this point in the history
  • Loading branch information
nunogois committed Dec 3, 2024
1 parent e231f27 commit 641b9ba
Showing 1 changed file with 23 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { NONE } from '../../types/permissions';
import type ConfigurationRevisionService from '../feature-toggle/configuration-revision-service';
import { UPDATE_REVISION } from '../feature-toggle/configuration-revision-service';

type ResponseWithFlush = Response & { flush: Function };
type SSEClientResponse = Response & { flush: Function };

export class FeatureStreamingController extends Controller {
private readonly logger: Logger;
Expand All @@ -20,6 +20,8 @@ export class FeatureStreamingController extends Controller {

private flagResolver: IFlagResolver;

private activeConnections: Set<SSEClientResponse>;

constructor(
{
configurationRevisionService,
Expand All @@ -31,6 +33,13 @@ export class FeatureStreamingController extends Controller {
this.flagResolver = config.flagResolver;
this.logger = config.getLogger('client-api/streaming.js');

this.activeConnections = new Set();
this.onUpdateRevisionEvent = this.onUpdateRevisionEvent.bind(this);
this.configurationRevisionService.on(
UPDATE_REVISION,
this.onUpdateRevisionEvent,
);

this.route({
method: 'get',
path: '',
Expand All @@ -42,7 +51,7 @@ export class FeatureStreamingController extends Controller {

async getFeatureStream(
req: IAuthRequest,
res: ResponseWithFlush,
res: SSEClientResponse,
): Promise<void> {
if (!this.flagResolver.isEnabled('streaming')) {
res.status(403).end();
Expand All @@ -59,20 +68,22 @@ export class FeatureStreamingController extends Controller {
res.write(`data: CONNECTED\n\n`);
res.flush();

this.configurationRevisionService.on(UPDATE_REVISION, (e) =>
this.onUpdateRevisionEvent(res, e),
);
this.activeConnections.add(res);

res.on('close', () => {
this.configurationRevisionService.removeListener(
UPDATE_REVISION,
(e) => this.onUpdateRevisionEvent(res, e),
);
this.activeConnections.delete(res);
});
}

private onUpdateRevisionEvent(res: ResponseWithFlush, event: any): void {
res.write(`data: UPDATE\n\n`);
res.flush();
private onUpdateRevisionEvent() {
for (const res of this.activeConnections) {
try {
res.write(`data: UPDATE\n\n`);
res.flush();
} catch (err) {
this.logger.info('Failed to send event. Dropping connection.');
this.activeConnections.delete(res);
}
}
}
}

0 comments on commit 641b9ba

Please sign in to comment.