Skip to content

Commit

Permalink
fix: Wrong IP usage on monolith TCP transporter configuration (Rocket…
Browse files Browse the repository at this point in the history
…Chat#29551)

Co-authored-by: Diego Sampaio <[email protected]>
  • Loading branch information
ggazzo and sampaiodiego authored Jun 20, 2023
1 parent 65dec98 commit afde60c
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 3 deletions.
5 changes: 5 additions & 0 deletions .changeset/five-clouds-obey.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@rocket.chat/meteor": patch
---

fix: Wrong IP usage on monolith TCP transporter configuration
35 changes: 35 additions & 0 deletions apps/meteor/ee/server/local-services/instance/getLogger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { pino } from 'pino';

export function getLogger({ MOLECULER_LOG_LEVEL: level, NODE_ENV: mode }: Record<string, unknown> = {}) {
if (!level || typeof level !== 'string') {
return {};
}

if (!['fatal', 'error', 'warn', 'info', 'debug', 'trace'].includes(level)) {
return {};
}

return {
logger: {
type: 'Pino',
options: {
level,
pino: {
options: {
timestamp: pino.stdTimeFunctions.isoTime,
...(mode !== 'production'
? {
transport: {
target: 'pino-pretty',
options: {
colorize: true,
},
},
}
: {}),
},
},
},
},
};
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export function getTransporter({ transporter, port }: { transporter?: string; port?: string } = {}) {
export function getTransporter({ transporter, port, extra }: { transporter?: string; port?: string; extra?: string } = {}) {
if (transporter) {
if (!transporter.match(/^(?:monolith\+)/)) {
throw new Error('invalid transporter');
Expand All @@ -11,5 +11,6 @@ export function getTransporter({ transporter, port }: { transporter?: string; po
return {
port: port ? port.trim() : 0,
udpDiscovery: false,
...(extra ? JSON.parse(extra) : {}),
};
}
15 changes: 13 additions & 2 deletions apps/meteor/ee/server/local-services/instance/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import { InstanceStatus } from '@rocket.chat/instance-status';
import { StreamerCentral } from '../../../../server/modules/streamer/streamer.module';
import type { IInstanceService } from '../../sdk/types/IInstanceService';
import { getTransporter } from './getTransporter';
import { getLogger } from './getLogger';

const hostIP = process.env.INSTANCE_IP ? String(process.env.INSTANCE_IP).trim() : 'localhost';

export class InstanceService extends ServiceClassInternal implements IInstanceService {
protected name = 'instance';
Expand All @@ -26,7 +29,7 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe
constructor() {
super();

const tx = getTransporter({ transporter: process.env.TRANSPORTER, port: process.env.TCP_PORT });
const tx = getTransporter({ transporter: process.env.TRANSPORTER, port: process.env.TCP_PORT, extra: process.env.TRANSPORTER_EXTRA });
if (typeof tx === 'string') {
this.transporter = new Transporters.NATS({ url: tx });
this.isTransporterTCP = false;
Expand All @@ -37,6 +40,8 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe
if (this.isTransporterTCP) {
this.onEvent('watch.instanceStatus', async ({ clientAction, data }): Promise<void> => {
if (clientAction === 'removed') {
(this.broker.transit?.tx as any).nodes.disconnected(data?._id, false);
(this.broker.transit?.tx as any).nodes.nodes.delete(data?._id);
return;
}

Expand Down Expand Up @@ -78,8 +83,14 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe
this.broker = new ServiceBroker({
nodeID: InstanceStatus.id(),
transporter: this.transporter,

...getLogger(process.env),
});

if ((this.broker.transit?.tx as any)?.nodes?.localNode) {
(this.broker.transit?.tx as any).nodes.localNode.ipList = [hostIP];
}

this.broker.createService({
name: 'matrix',
events: {
Expand Down Expand Up @@ -107,7 +118,7 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe
await this.broker.start();

const instance = {
host: process.env.INSTANCE_IP ? String(process.env.INSTANCE_IP).trim() : 'localhost',
host: hostIP,
port: String(process.env.PORT).trim(),
tcpPort: (this.broker.transit?.tx as any)?.nodes?.localNode?.port,
os: {
Expand Down
5 changes: 5 additions & 0 deletions apps/meteor/server/modules/watchers/watchers.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,11 @@ export function initWatchers(watcher: DatabaseWatcher, broadcast: BroadcastCallb
});

watcher.on<IInstanceStatus>(InstanceStatus.getCollectionName(), ({ clientAction, id, data, diff }) => {
if (clientAction === 'removed') {
void broadcast('watch.instanceStatus', { clientAction, id, data: { _id: id } });
return;
}

void broadcast('watch.instanceStatus', { clientAction, data, diff, id });
});

Expand Down

0 comments on commit afde60c

Please sign in to comment.