Skip to content

Commit

Permalink
feat(logs): timeout old operations (#2220)
Browse files Browse the repository at this point in the history
## Describe your changes

Fixes NAN-1036

Since we have only 15days retention and syncs are unlimited it's mostly
for cosmetic and some smaller better time scoped operations (action,
oauth, webhook).

- Add expiration date to all operations
- Cron to set `timeout` to all expired operations

## How to test?

- Trigger an oauth but don't complete the process and wait 5-10minutes
or
- Add an operation, change the expiresAt manually, wait for the cron

(The query in curl)
```sh
#!/usr/bin/env bash

curl -X POST "http://localhost:55132/20240528_messages.2024-05-30/_search" -H 'Content-Type: application/json' -d'
{
  "query": {
            "bool": {
                "must": [{ "range": { "expiresAt": { "lt": "now" } } }],
                "must_not": { "exists": { "field": "parentId" } },
                "should": [{ "term": { "state": "waiting" } }, { "term": { "state": "running" } }]
            }
        }
}
'
```
  • Loading branch information
bodinsamuel authored Jun 4, 2024
1 parent 5ffdae6 commit 856d710
Show file tree
Hide file tree
Showing 29 changed files with 193 additions and 32 deletions.
10 changes: 10 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/jobs/lib/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { deleteOldActivityLogs } from './crons/deleteOldActivities.js';
import { deleteSyncsData } from './crons/deleteSyncsData.js';
import { reconcileTemporalSchedules } from './crons/reconcileTemporalSchedules.js';
import { getLogger, stringifyError } from '@nangohq/utils';
import { timeoutLogsOperations } from './crons/timeoutLogsOperations.js';
import db from '@nangohq/database';
import { envs } from './env.js';

Expand All @@ -33,6 +34,7 @@ try {
deleteOldActivityLogs();
deleteSyncsData();
reconcileTemporalSchedules();
timeoutLogsOperations();

// handle SIGTERM
process.on('SIGTERM', () => {
Expand Down
26 changes: 26 additions & 0 deletions packages/jobs/lib/crons/timeoutLogsOperations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import * as cron from 'node-cron';
import { errorManager, ErrorSourceEnum } from '@nangohq/shared';
import tracer from 'dd-trace';
import { envs, model } from '@nangohq/logs';
import { getLogger } from '@nangohq/utils';

const logger = getLogger('Jobs.TimeoutLogsOperations');

export function timeoutLogsOperations(): void {
if (!envs.NANGO_LOGS_ENABLED) {
return;
}

cron.schedule(
'*/10 * * * *',
// eslint-disable-next-line @typescript-eslint/no-misused-promises
async () => {
try {
logger.info(`Timeouting old operations...`);
await model.setTimeoutForAll();
} catch (err) {
errorManager.report(err, { source: ErrorSourceEnum.PLATFORM }, tracer);
}
}
);
}
4 changes: 3 additions & 1 deletion packages/logs/lib/client.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { getOperation, listMessages, listOperations } from './models/messages.js
import type { OperationRowInsert } from '@nangohq/types';
import { afterEach } from 'node:test';
import { logContextGetter } from './models/logContextGetter.js';
import { indexMessages } from './es/schema.js';

const account = { id: 1234, name: 'test' };
const environment = { id: 5678, name: 'dev' };
Expand All @@ -20,7 +21,7 @@ const operationPayload: OperationRowInsert = {

describe('client', () => {
beforeAll(async () => {
await deleteIndex();
await deleteIndex({ prefix: indexMessages.index });
await migrateMapping();
});
afterEach(() => {
Expand Down Expand Up @@ -61,6 +62,7 @@ describe('client', () => {
environmentId: 5678,
environmentName: 'dev',
error: null,
expiresAt: expect.toBeIsoDate(),
id: ctx.id,
jobId: null,
level: 'info',
Expand Down
9 changes: 8 additions & 1 deletion packages/logs/lib/env.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import { parseEnvs, ENVS } from '@nangohq/utils';
import { parseEnvs, ENVS, MAX_ACTION_DURATION, MAX_SYNC_DURATION, MAX_WEBHOOK_DURATION } from '@nangohq/utils';

// Do not require in community and enterprise right now
const required = process.env['NANGO_LOGS_ENABLED'] === 'true';

export const envs = parseEnvs(required ? ENVS.required({ NANGO_LOGS_ES_URL: true, NANGO_LOGS_ES_USER: true, NANGO_LOGS_ES_PWD: true }) : ENVS);

envs.NANGO_LOGS_ENABLED = Boolean(envs.NANGO_LOGS_ENABLED && envs.NANGO_LOGS_ES_URL);

export const defaultOperationExpiration = {
action: () => new Date(Date.now() + MAX_ACTION_DURATION).toISOString(),
webhook: () => new Date(Date.now() + MAX_WEBHOOK_DURATION).toISOString(),
sync: () => new Date(Date.now() + MAX_SYNC_DURATION).toISOString(),
auth: () => new Date(Date.now() + 300 * 1000).toISOString()
};
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ exports[`mapping > should create one index automatically on log 1`] = `
"enabled": false,
"type": "object",
},
"expiresAt": {
"type": "date",
},
"id": {
"type": "keyword",
},
Expand Down
8 changes: 6 additions & 2 deletions packages/logs/lib/es/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export async function migrateMapping() {
}
}

export async function deleteIndex() {
export async function deleteIndex({ prefix }: { prefix: string }) {
if (!isTest) {
throw new Error('Trying to delete stuff in prod');
}
Expand All @@ -70,7 +70,11 @@ export async function deleteIndex() {
const indices = await client.cat.indices({ format: 'json' });
await Promise.all(
indices.map(async (index) => {
await client.indices.delete({ index: index.index!, ignore_unavailable: true });
if (!index.index?.startsWith(prefix)) {
return;
}

await client.indices.delete({ index: index.index, ignore_unavailable: true });
})
);
} catch (err) {
Expand Down
4 changes: 2 additions & 2 deletions packages/logs/lib/es/index.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ describe('mapping', () => {
const today = new Date().toISOString().split('T')[0];
let fullIndexName: string;
beforeAll(async () => {
indexMessages.index = `messages-${nanoid()}`.toLocaleLowerCase();
indexMessages.index = `index-messages-${nanoid()}`.toLocaleLowerCase();
fullIndexName = `${indexMessages.index}.${today}`;

// Delete before otherwise it's hard to debug
await deleteIndex();
await deleteIndex({ prefix: 'index-messages' });
});

it('should not have an index before migration', async () => {
Expand Down
1 change: 1 addition & 0 deletions packages/logs/lib/es/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ const props: Record<keyof MessageRow, estypes.MappingProperty> = {
createdAt: { type: 'date' },
updatedAt: { type: 'date' },
startedAt: { type: 'date' },
expiresAt: { type: 'date' },
endedAt: { type: 'date' }
};

Expand Down
2 changes: 1 addition & 1 deletion packages/logs/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ export * from './client.js';
export * from './models/helpers.js';
export * from './models/logContextGetter.js';
export * as model from './models/messages.js';
export { envs } from './env.js';
export { envs, defaultOperationExpiration } from './env.js';
9 changes: 6 additions & 3 deletions packages/logs/lib/models/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { nanoid } from '@nangohq/utils';
import type { MessageRow } from '@nangohq/types';
import { z } from 'zod';
import type { estypes } from '@elastic/elasticsearch';
import { defaultOperationExpiration } from '../env.js';

export const operationIdRegex = z.string().regex(/([0-9]|[a-zA-Z0-9]{20})/);

Expand All @@ -19,6 +20,7 @@ export function getFormattedMessage(
data: Partial<MessageRow>,
{ account, user, environment, integration, connection, syncConfig, meta }: FormatMessageData = {}
): MessageRow {
const now = new Date();
return {
id: data.id || nanoid(),

Expand Down Expand Up @@ -57,10 +59,11 @@ export function getFormattedMessage(
response: data.response || null,
meta: meta || data.meta || null,

createdAt: data.createdAt || new Date().toISOString(),
updatedAt: data.updatedAt || new Date().toISOString(),
createdAt: data.createdAt || now.toISOString(),
updatedAt: data.updatedAt || now.toISOString(),
startedAt: data.startedAt || null,
endedAt: data.endedAt || null
endedAt: data.endedAt || null,
expiresAt: data.operation ? data.expiresAt || defaultOperationExpiration.sync() : null
};
}

Expand Down
29 changes: 26 additions & 3 deletions packages/logs/lib/models/messages.integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import { describe, beforeAll, it, expect, vi } from 'vitest';
import { deleteIndex, migrateMapping } from '../es/helpers.js';
import type { ListMessages, ListOperations } from './messages.js';
import { listMessages, listOperations } from './messages.js';
import type { ListOperations, ListMessages } from './messages.js';
import { getOperation, listOperations, listMessages, setTimeoutForAll } from './messages.js';
import { afterEach } from 'node:test';
import { logContextGetter } from './logContextGetter.js';
import type { OperationRowInsert } from '@nangohq/types';
import { getFormattedMessage } from './helpers.js';
import { indexMessages } from '../es/schema.js';

const account = { id: 1234, name: 'test' };
const environment = { id: 5678, name: 'dev' };
const operationPayload: OperationRowInsert = { operation: { type: 'sync', action: 'run' }, message: '' };

describe('model', () => {
beforeAll(async () => {
await deleteIndex();
await deleteIndex({ prefix: indexMessages.index });
await migrateMapping();
});
afterEach(() => {
Expand Down Expand Up @@ -53,6 +55,27 @@ describe('model', () => {
expect(list3.items).toHaveLength(0);
expect(list3.cursor).toBeNull();
});

it('should timeout old operations', async () => {
const ctx1 = await logContextGetter.create(
getFormattedMessage({ ...operationPayload, expiresAt: new Date(Date.now() - 86400 * 1000).toISOString() }),
{ account, environment },
{ logToConsole: false }
);
const ctx2 = await logContextGetter.create(
getFormattedMessage({ ...operationPayload, expiresAt: new Date(Date.now() + 86400 * 1000).toISOString() }),
{ account, environment },
{ logToConsole: false }
);

await setTimeoutForAll({ wait: true });

const op1 = await getOperation({ id: ctx1.id });
expect(op1.state).toBe('timeout');

const op2 = await getOperation({ id: ctx2.id });
expect(op2.state).toBe('running');
});
});

describe('messages', () => {
Expand Down
18 changes: 18 additions & 0 deletions packages/logs/lib/models/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,21 @@ export async function listFilters(opts: {
items: agg.buckets as any
};
}

export async function setTimeoutForAll(opts: { wait?: boolean } = {}): Promise<void> {
await client.updateByQuery({
index: indexMessages.index,
wait_for_completion: opts.wait === true,
refresh: opts.wait === true,
query: {
bool: {
must: [{ range: { expiresAt: { lt: 'now' } } }],
must_not: { exists: { field: 'parentId' } },
should: [{ term: { state: 'waiting' } }, { term: { state: 'running' } }]
}
},
script: {
source: "ctx._source.state = 'timeout'"
}
});
}
16 changes: 13 additions & 3 deletions packages/server/lib/controllers/apiAuth.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
LogActionEnum
} from '@nangohq/shared';
import type { LogContext } from '@nangohq/logs';
import { logContextGetter } from '@nangohq/logs';
import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs';
import { stringifyError } from '@nangohq/utils';
import type { RequestLocals } from '../utils/express.js';
import {
Expand Down Expand Up @@ -53,7 +53,12 @@ class ApiAuthController {
let logCtx: LogContext | undefined;
try {
logCtx = await logContextGetter.create(
{ id: String(activityLogId), operation: { type: 'auth', action: 'create_connection' }, message: 'Authorization API Key' },
{
id: String(activityLogId),
operation: { type: 'auth', action: 'create_connection' },
message: 'Authorization API Key',
expiresAt: defaultOperationExpiration.auth()
},
{ account, environment }
);
void analytics.track(AnalyticsTypes.PRE_API_KEY_AUTH, account.id);
Expand Down Expand Up @@ -291,7 +296,12 @@ class ApiAuthController {

try {
logCtx = await logContextGetter.create(
{ id: String(activityLogId), operation: { type: 'auth', action: 'create_connection' }, message: 'Authorization Basic' },
{
id: String(activityLogId),
operation: { type: 'auth', action: 'create_connection' },
message: 'Authorization Basic',
expiresAt: defaultOperationExpiration.auth()
},
{ account, environment }
);
void analytics.track(AnalyticsTypes.PRE_BASIC_API_KEY_AUTH, account.id);
Expand Down
9 changes: 7 additions & 2 deletions packages/server/lib/controllers/appStoreAuth.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
LogActionEnum
} from '@nangohq/shared';
import type { LogContext } from '@nangohq/logs';
import { logContextGetter } from '@nangohq/logs';
import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs';
import { stringifyError } from '@nangohq/utils';
import type { RequestLocals } from '../utils/express.js';
import { connectionCreated as connectionCreatedHook, connectionCreationFailed as connectionCreationFailedHook } from '../hooks/hooks.js';
Expand Down Expand Up @@ -46,7 +46,12 @@ class AppStoreAuthController {

try {
logCtx = await logContextGetter.create(
{ id: String(activityLogId), operation: { type: 'auth', action: 'create_connection' }, message: 'Authorization App Store' },
{
id: String(activityLogId),
operation: { type: 'auth', action: 'create_connection' },
message: 'Authorization App Store',
expiresAt: defaultOperationExpiration.auth()
},
{ account, environment }
);
void analytics.track(AnalyticsTypes.PRE_APP_STORE_AUTH, account.id);
Expand Down
16 changes: 13 additions & 3 deletions packages/server/lib/controllers/oauth.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import publisher from '../clients/publisher.client.js';
import * as WSErrBuilder from '../utils/web-socket-error.js';
import oAuthSessionService from '../services/oauth-session.service.js';
import type { LogContext } from '@nangohq/logs';
import { logContextGetter } from '@nangohq/logs';
import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs';
import { errorToObject, stringifyError } from '@nangohq/utils';
import type { RequestLocals } from '../utils/express.js';
import { connectionCreated as connectionCreatedHook, connectionCreationFailed as connectionCreationFailedHook } from '../hooks/hooks.js';
Expand Down Expand Up @@ -86,7 +86,12 @@ class OAuthController {

try {
logCtx = await logContextGetter.create(
{ id: String(activityLogId), operation: { type: 'auth', action: 'create_connection' }, message: 'Authorization OAuth' },
{
id: String(activityLogId),
operation: { type: 'auth', action: 'create_connection' },
message: 'Authorization OAuth',
expiresAt: defaultOperationExpiration.auth()
},
{ account, environment }
);
if (!wsClientId) {
Expand Down Expand Up @@ -405,7 +410,12 @@ class OAuthController {

try {
logCtx = await logContextGetter.create(
{ id: String(activityLogId), operation: { type: 'auth', action: 'create_connection' }, message: 'Authorization OAuth2 CC' },
{
id: String(activityLogId),
operation: { type: 'auth', action: 'create_connection' },
message: 'Authorization OAuth2 CC',
expiresAt: defaultOperationExpiration.auth()
},
{ account, environment }
);
void analytics.track(AnalyticsTypes.PRE_OAUTH2_CC_AUTH, account.id);
Expand Down
9 changes: 7 additions & 2 deletions packages/server/lib/controllers/onboarding.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import {
import type { IncomingPreBuiltFlowConfig } from '@nangohq/shared';
import { getLogger } from '@nangohq/utils';
import type { LogContext } from '@nangohq/logs';
import { logContextGetter } from '@nangohq/logs';
import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs';
import { records as recordsService } from '@nangohq/records';
import type { GetOnboardingStatus } from '@nangohq/types';
import type { RequestLocals } from '../utils/express.js';
Expand Down Expand Up @@ -432,7 +432,12 @@ class OnboardingController {
}

logCtx = await logContextGetter.create(
{ id: String(activityLogId), operation: { type: 'action' }, message: 'Start action' },
{
id: String(activityLogId),
operation: { type: 'action' },
message: 'Start action',
expiresAt: defaultOperationExpiration.action()
},
{
account,
environment,
Expand Down
Loading

0 comments on commit 856d710

Please sign in to comment.