Skip to content

Commit

Permalink
Regression: Unify isLastDocDelayed logic (#27120)
Browse files Browse the repository at this point in the history
  • Loading branch information
sampaiodiego authored Oct 21, 2022
1 parent 0c6871e commit 205a64c
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 15 deletions.
11 changes: 11 additions & 0 deletions apps/meteor/server/database/DatabaseWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import { convertChangeStreamPayload } from './convertChangeStreamPayload';
import { convertOplogPayload } from './convertOplogPayload';
import { watchCollections } from './watchCollections';

const instancePing = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10000;

const maxDocMs = instancePing * 4; // 4 times the ping interval

export type RealTimeData<T> = {
id: string;
action: 'insert' | 'update' | 'remove';
Expand Down Expand Up @@ -195,4 +199,11 @@ export class DatabaseWatcher extends EventEmitter {
getLastDocDelta(): number {
return this.lastDocTS ? Date.now() - this.lastDocTS.getTime() : Infinity;
}

/**
* @returns Indicates if the last document received is older than it should be. If that happens, it means that the oplog is not working properly
*/
isLastDocDelayed(): boolean {
return this.getLastDocDelta() > maxDocMs;
}
}
10 changes: 2 additions & 8 deletions apps/meteor/server/startup/watchDb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,18 @@ import { SystemLogger } from '../lib/logger/system';

const { mongo } = MongoInternals.defaultRemoteCollectionDriver();

const instancePing = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10000;

const maxDocMs = instancePing * 4; // 4 times the ping interval

const watcher = new DatabaseWatcher({ db, _oplogHandle: (mongo as any)._oplogHandle, metrics });

initWatchers(watcher, api.broadcastLocal.bind(api));

watcher.watch();

setInterval(function _checkDatabaseWatcher() {
if (isLastDocDelayed()) {
if (watcher.isLastDocDelayed()) {
SystemLogger.error('No real time data received recently');
}
}, 20000);

export function isLastDocDelayed(): boolean {
const lastDocMs = watcher.getLastDocDelta();

return lastDocMs > maxDocMs;
return watcher.isLastDocDelayed();
}
7 changes: 1 addition & 6 deletions ee/apps/stream-hub-service/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ import { registerServiceModels } from '../../../../apps/meteor/ee/server/lib/reg

const PORT = process.env.PORT || 3035;

const instancePing = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10000;

const maxDocMs = instancePing * 4; // 4 times the ping interval

(async () => {
const db = await getConnection();

Expand All @@ -36,8 +32,7 @@ const maxDocMs = instancePing * 4; // 4 times the ping interval
try {
await api.nodeList();

const lastDocMs = watcher.getLastDocDelta();
if (lastDocMs > maxDocMs) {
if (watcher.isLastDocDelayed()) {
throw new Error('not healthy');
}
} catch (err) {
Expand Down
2 changes: 1 addition & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -23520,7 +23520,7 @@ __metadata:
optional: true
bin:
lessc: ./bin/lessc
checksum: c9b8c0e865427112c48a9cac36f14964e130577743c29d56a6d93b5812b70846b04ccaa364acf1e8d75cee3855215ec0a2d8d9de569c80e774f10b6245f39b7d
checksum: 61568b56b5289fdcfe3d51baf3c13e7db7140022c0a37ef0ae343169f0de927a4b4f4272bc10c20101796e8ee79e934e024051321bba93b3ae071f734309bd98
languageName: node
linkType: hard

Expand Down

0 comments on commit 205a64c

Please sign in to comment.