Skip to content

Commit

Permalink
Merge pull request #582 from nicomoya123/master
Browse files Browse the repository at this point in the history
Fix: buffer messages from external resource
  • Loading branch information
davidyaha authored May 2, 2024
2 parents 1775b7c + 8be58eb commit 938fc71
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
22 changes: 17 additions & 5 deletions src/redis-pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ export class RedisPubSub implements PubSubEngine {
}

public async publish<T>(trigger: string, payload: T): Promise<void> {
await this.redisPublisher.publish(trigger, this.serializer ? this.serializer(payload) : JSON.stringify(payload));
if(this.serializer) {
await this.redisPublisher.publish(trigger, this.serializer(payload));
} else if (payload instanceof Buffer){
await this.redisPublisher.publish(trigger, payload);
} else {
await this.redisPublisher.publish(trigger, JSON.stringify(payload));
}
}

public subscribe<T = any>(
Expand Down Expand Up @@ -169,17 +175,23 @@ export class RedisPubSub implements PubSubEngine {
private readonly subsRefsMap: Map<string, Set<number>>;
private currentSubscriptionId: number;

private onMessage(pattern: string, channel: string, message: string) {
private onMessage(pattern: string, channel: string | Buffer, message: string | Buffer) {
if(typeof channel === 'object') channel = channel.toString('utf8');

const subscribers = this.subsRefsMap.get(pattern || channel);

// Don't work for nothing..
if (!subscribers?.size) return;

let parsedMessage;
try {
parsedMessage = this.deserializer
? this.deserializer(message, { pattern, channel })
: JSON.parse(message, this.reviver);
if(this.deserializer){
parsedMessage = this.deserializer(Buffer.from(message), { pattern, channel })
} else if(typeof message === 'string'){
parsedMessage = JSON.parse(message, this.reviver);
} else {
parsedMessage = message;
}
} catch (e) {
parsedMessage = message;
}
Expand Down
23 changes: 23 additions & 0 deletions src/test/integration-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,29 @@ describe('PubSubAsyncIterator', function() {
}));
});

describe('Subscribe to buffer', () => {
it('can publish buffers as well' , done => {
// when using messageBuffer, with redis instance the channel name is not a string but a buffer
const pubSub = new RedisPubSub({ messageEventName: 'messageBuffer'});
const payload = 'This is amazing';
pubSub.subscribe('Posts', message => {
try {
expect(message).to.be.instanceOf(Buffer);
expect(message.toString('utf-8')).to.be.equal(payload);
done();
} catch (e) {
done(e);
}
}).then(async subId => {
try {
await pubSub.publish('Posts', Buffer.from(payload, 'utf-8'));
pubSub.unsubscribe(subId);
} catch (e) {
done(e);
}
});
});
})

describe('PubSubCluster', () => {
const nodes = [7006, 7001, 7002, 7003, 7004, 7005].map(port => ({ host: '127.0.0.1', port }));
Expand Down

0 comments on commit 938fc71

Please sign in to comment.