From 856d710da6f3064fc6fac416f05fa1f97fbbf28f Mon Sep 17 00:00:00 2001 From: Samuel Bodin <1637651+bodinsamuel@users.noreply.github.com> Date: Tue, 4 Jun 2024 10:16:10 +0200 Subject: [PATCH] feat(logs): timeout old operations (#2220) ## Describe your changes Fixes NAN-1036 Since we have only 15days retention and syncs are unlimited it's mostly for cosmetic and some smaller better time scoped operations (action, oauth, webhook). - Add expiration date to all operations - Cron to set `timeout` to all expired operations ## How to test? - Trigger an oauth but don't complete the process and wait 5-10minutes or - Add an operation, change the expiresAt manually, wait for the cron (The query in curl) ```sh #!/usr/bin/env bash curl -X POST "http://localhost:55132/20240528_messages.2024-05-30/_search" -H 'Content-Type: application/json' -d' { "query": { "bool": { "must": [{ "range": { "expiresAt": { "lt": "now" } } }], "must_not": { "exists": { "field": "parentId" } }, "should": [{ "term": { "state": "waiting" } }, { "term": { "state": "running" } }] } } } ' ``` --- package-lock.json | 10 +++++++ packages/jobs/lib/app.ts | 2 ++ .../jobs/lib/crons/timeoutLogsOperations.ts | 26 +++++++++++++++++ packages/logs/lib/client.integration.test.ts | 4 ++- packages/logs/lib/env.ts | 9 +++++- .../index.integration.test.ts.snap | 3 ++ packages/logs/lib/es/helpers.ts | 8 +++-- .../logs/lib/es/index.integration.test.ts | 4 +-- packages/logs/lib/es/schema.ts | 1 + packages/logs/lib/index.ts | 2 +- packages/logs/lib/models/helpers.ts | 9 ++++-- .../lib/models/messages.integration.test.ts | 29 +++++++++++++++++-- packages/logs/lib/models/messages.ts | 18 ++++++++++++ .../lib/controllers/apiAuth.controller.ts | 16 ++++++++-- .../controllers/appStoreAuth.controller.ts | 9 ++++-- .../lib/controllers/oauth.controller.ts | 16 ++++++++-- .../lib/controllers/onboarding.controller.ts | 9 ++++-- .../server/lib/controllers/sync.controller.ts | 9 ++++-- .../v1/logs/getOperation.integration.test.ts | 1 + .../logs/searchMessages.integration.test.ts | 1 + .../logs/searchOperations.integration.test.ts | 1 + packages/shared/lib/clients/orchestrator.ts | 7 ++++- .../services/notification/webhook.service.ts | 7 ++++- .../shared/lib/services/sync/job.service.ts | 6 ++-- packages/types/lib/logs/messages.ts | 1 + packages/utils/lib/errors.ts | 2 +- packages/utils/lib/index.ts | 1 + packages/utils/lib/workflows.ts | 13 +++++++++ packages/utils/package.json | 1 + 29 files changed, 193 insertions(+), 32 deletions(-) create mode 100644 packages/jobs/lib/crons/timeoutLogsOperations.ts create mode 100644 packages/utils/lib/workflows.ts diff --git a/package-lock.json b/package-lock.json index 6f3bf0e869..82a5a12f96 100644 --- a/package-lock.json +++ b/package-lock.json @@ -36148,10 +36148,20 @@ }, "devDependencies": { "@nangohq/types": "file:../types", + "ms": "3.0.0-canary.1", "rimraf": "3.0.2", "vitest": "0.33.0" } }, + "packages/utils/node_modules/ms": { + "version": "3.0.0-canary.1", + "resolved": "https://registry.npmjs.org/ms/-/ms-3.0.0-canary.1.tgz", + "integrity": "sha512-kh8ARjh8rMN7Du2igDRO9QJnqCb2xYTJxyQYK7vJJS4TvLLmsbyhiKpSW+t+y26gyOyMd0riphX0GeWKU3ky5g==", + "dev": true, + "engines": { + "node": ">=12.13" + } + }, "packages/utils/node_modules/nanoid": { "version": "5.0.7", "resolved": "https://registry.npmjs.org/nanoid/-/nanoid-5.0.7.tgz", diff --git a/packages/jobs/lib/app.ts b/packages/jobs/lib/app.ts index 2497404f8d..e1b2cfbd4f 100644 --- a/packages/jobs/lib/app.ts +++ b/packages/jobs/lib/app.ts @@ -7,6 +7,7 @@ import { deleteOldActivityLogs } from './crons/deleteOldActivities.js'; import { deleteSyncsData } from './crons/deleteSyncsData.js'; import { reconcileTemporalSchedules } from './crons/reconcileTemporalSchedules.js'; import { getLogger, stringifyError } from '@nangohq/utils'; +import { timeoutLogsOperations } from './crons/timeoutLogsOperations.js'; import db from '@nangohq/database'; import { envs } from './env.js'; @@ -33,6 +34,7 @@ try { deleteOldActivityLogs(); deleteSyncsData(); reconcileTemporalSchedules(); + timeoutLogsOperations(); // handle SIGTERM process.on('SIGTERM', () => { diff --git a/packages/jobs/lib/crons/timeoutLogsOperations.ts b/packages/jobs/lib/crons/timeoutLogsOperations.ts new file mode 100644 index 0000000000..34e9ada6bc --- /dev/null +++ b/packages/jobs/lib/crons/timeoutLogsOperations.ts @@ -0,0 +1,26 @@ +import * as cron from 'node-cron'; +import { errorManager, ErrorSourceEnum } from '@nangohq/shared'; +import tracer from 'dd-trace'; +import { envs, model } from '@nangohq/logs'; +import { getLogger } from '@nangohq/utils'; + +const logger = getLogger('Jobs.TimeoutLogsOperations'); + +export function timeoutLogsOperations(): void { + if (!envs.NANGO_LOGS_ENABLED) { + return; + } + + cron.schedule( + '*/10 * * * *', + // eslint-disable-next-line @typescript-eslint/no-misused-promises + async () => { + try { + logger.info(`Timeouting old operations...`); + await model.setTimeoutForAll(); + } catch (err) { + errorManager.report(err, { source: ErrorSourceEnum.PLATFORM }, tracer); + } + } + ); +} diff --git a/packages/logs/lib/client.integration.test.ts b/packages/logs/lib/client.integration.test.ts index 32738c74ec..1c62174d93 100644 --- a/packages/logs/lib/client.integration.test.ts +++ b/packages/logs/lib/client.integration.test.ts @@ -7,6 +7,7 @@ import { getOperation, listMessages, listOperations } from './models/messages.js import type { OperationRowInsert } from '@nangohq/types'; import { afterEach } from 'node:test'; import { logContextGetter } from './models/logContextGetter.js'; +import { indexMessages } from './es/schema.js'; const account = { id: 1234, name: 'test' }; const environment = { id: 5678, name: 'dev' }; @@ -20,7 +21,7 @@ const operationPayload: OperationRowInsert = { describe('client', () => { beforeAll(async () => { - await deleteIndex(); + await deleteIndex({ prefix: indexMessages.index }); await migrateMapping(); }); afterEach(() => { @@ -61,6 +62,7 @@ describe('client', () => { environmentId: 5678, environmentName: 'dev', error: null, + expiresAt: expect.toBeIsoDate(), id: ctx.id, jobId: null, level: 'info', diff --git a/packages/logs/lib/env.ts b/packages/logs/lib/env.ts index 430f1afb61..10b8bc3089 100644 --- a/packages/logs/lib/env.ts +++ b/packages/logs/lib/env.ts @@ -1,4 +1,4 @@ -import { parseEnvs, ENVS } from '@nangohq/utils'; +import { parseEnvs, ENVS, MAX_ACTION_DURATION, MAX_SYNC_DURATION, MAX_WEBHOOK_DURATION } from '@nangohq/utils'; // Do not require in community and enterprise right now const required = process.env['NANGO_LOGS_ENABLED'] === 'true'; @@ -6,3 +6,10 @@ const required = process.env['NANGO_LOGS_ENABLED'] === 'true'; export const envs = parseEnvs(required ? ENVS.required({ NANGO_LOGS_ES_URL: true, NANGO_LOGS_ES_USER: true, NANGO_LOGS_ES_PWD: true }) : ENVS); envs.NANGO_LOGS_ENABLED = Boolean(envs.NANGO_LOGS_ENABLED && envs.NANGO_LOGS_ES_URL); + +export const defaultOperationExpiration = { + action: () => new Date(Date.now() + MAX_ACTION_DURATION).toISOString(), + webhook: () => new Date(Date.now() + MAX_WEBHOOK_DURATION).toISOString(), + sync: () => new Date(Date.now() + MAX_SYNC_DURATION).toISOString(), + auth: () => new Date(Date.now() + 300 * 1000).toISOString() +}; diff --git a/packages/logs/lib/es/__snapshots__/index.integration.test.ts.snap b/packages/logs/lib/es/__snapshots__/index.integration.test.ts.snap index f11b7c6ce5..9edea36ead 100644 --- a/packages/logs/lib/es/__snapshots__/index.integration.test.ts.snap +++ b/packages/logs/lib/es/__snapshots__/index.integration.test.ts.snap @@ -42,6 +42,9 @@ exports[`mapping > should create one index automatically on log 1`] = ` "enabled": false, "type": "object", }, + "expiresAt": { + "type": "date", + }, "id": { "type": "keyword", }, diff --git a/packages/logs/lib/es/helpers.ts b/packages/logs/lib/es/helpers.ts index 68b6df4ec9..090380701b 100644 --- a/packages/logs/lib/es/helpers.ts +++ b/packages/logs/lib/es/helpers.ts @@ -61,7 +61,7 @@ export async function migrateMapping() { } } -export async function deleteIndex() { +export async function deleteIndex({ prefix }: { prefix: string }) { if (!isTest) { throw new Error('Trying to delete stuff in prod'); } @@ -70,7 +70,11 @@ export async function deleteIndex() { const indices = await client.cat.indices({ format: 'json' }); await Promise.all( indices.map(async (index) => { - await client.indices.delete({ index: index.index!, ignore_unavailable: true }); + if (!index.index?.startsWith(prefix)) { + return; + } + + await client.indices.delete({ index: index.index, ignore_unavailable: true }); }) ); } catch (err) { diff --git a/packages/logs/lib/es/index.integration.test.ts b/packages/logs/lib/es/index.integration.test.ts index 4e894178d8..2d9784474f 100644 --- a/packages/logs/lib/es/index.integration.test.ts +++ b/packages/logs/lib/es/index.integration.test.ts @@ -11,11 +11,11 @@ describe('mapping', () => { const today = new Date().toISOString().split('T')[0]; let fullIndexName: string; beforeAll(async () => { - indexMessages.index = `messages-${nanoid()}`.toLocaleLowerCase(); + indexMessages.index = `index-messages-${nanoid()}`.toLocaleLowerCase(); fullIndexName = `${indexMessages.index}.${today}`; // Delete before otherwise it's hard to debug - await deleteIndex(); + await deleteIndex({ prefix: 'index-messages' }); }); it('should not have an index before migration', async () => { diff --git a/packages/logs/lib/es/schema.ts b/packages/logs/lib/es/schema.ts index 4749661f1e..e8cad6b38e 100644 --- a/packages/logs/lib/es/schema.ts +++ b/packages/logs/lib/es/schema.ts @@ -91,6 +91,7 @@ const props: Record = { createdAt: { type: 'date' }, updatedAt: { type: 'date' }, startedAt: { type: 'date' }, + expiresAt: { type: 'date' }, endedAt: { type: 'date' } }; diff --git a/packages/logs/lib/index.ts b/packages/logs/lib/index.ts index 4521f1b9dd..2037b72697 100644 --- a/packages/logs/lib/index.ts +++ b/packages/logs/lib/index.ts @@ -3,4 +3,4 @@ export * from './client.js'; export * from './models/helpers.js'; export * from './models/logContextGetter.js'; export * as model from './models/messages.js'; -export { envs } from './env.js'; +export { envs, defaultOperationExpiration } from './env.js'; diff --git a/packages/logs/lib/models/helpers.ts b/packages/logs/lib/models/helpers.ts index 825f3bb65d..35799a377d 100644 --- a/packages/logs/lib/models/helpers.ts +++ b/packages/logs/lib/models/helpers.ts @@ -2,6 +2,7 @@ import { nanoid } from '@nangohq/utils'; import type { MessageRow } from '@nangohq/types'; import { z } from 'zod'; import type { estypes } from '@elastic/elasticsearch'; +import { defaultOperationExpiration } from '../env.js'; export const operationIdRegex = z.string().regex(/([0-9]|[a-zA-Z0-9]{20})/); @@ -19,6 +20,7 @@ export function getFormattedMessage( data: Partial, { account, user, environment, integration, connection, syncConfig, meta }: FormatMessageData = {} ): MessageRow { + const now = new Date(); return { id: data.id || nanoid(), @@ -57,10 +59,11 @@ export function getFormattedMessage( response: data.response || null, meta: meta || data.meta || null, - createdAt: data.createdAt || new Date().toISOString(), - updatedAt: data.updatedAt || new Date().toISOString(), + createdAt: data.createdAt || now.toISOString(), + updatedAt: data.updatedAt || now.toISOString(), startedAt: data.startedAt || null, - endedAt: data.endedAt || null + endedAt: data.endedAt || null, + expiresAt: data.operation ? data.expiresAt || defaultOperationExpiration.sync() : null }; } diff --git a/packages/logs/lib/models/messages.integration.test.ts b/packages/logs/lib/models/messages.integration.test.ts index e6d49294e6..0995db3ebb 100644 --- a/packages/logs/lib/models/messages.integration.test.ts +++ b/packages/logs/lib/models/messages.integration.test.ts @@ -1,10 +1,12 @@ import { describe, beforeAll, it, expect, vi } from 'vitest'; import { deleteIndex, migrateMapping } from '../es/helpers.js'; -import type { ListMessages, ListOperations } from './messages.js'; -import { listMessages, listOperations } from './messages.js'; +import type { ListOperations, ListMessages } from './messages.js'; +import { getOperation, listOperations, listMessages, setTimeoutForAll } from './messages.js'; import { afterEach } from 'node:test'; import { logContextGetter } from './logContextGetter.js'; import type { OperationRowInsert } from '@nangohq/types'; +import { getFormattedMessage } from './helpers.js'; +import { indexMessages } from '../es/schema.js'; const account = { id: 1234, name: 'test' }; const environment = { id: 5678, name: 'dev' }; @@ -12,7 +14,7 @@ const operationPayload: OperationRowInsert = { operation: { type: 'sync', action describe('model', () => { beforeAll(async () => { - await deleteIndex(); + await deleteIndex({ prefix: indexMessages.index }); await migrateMapping(); }); afterEach(() => { @@ -53,6 +55,27 @@ describe('model', () => { expect(list3.items).toHaveLength(0); expect(list3.cursor).toBeNull(); }); + + it('should timeout old operations', async () => { + const ctx1 = await logContextGetter.create( + getFormattedMessage({ ...operationPayload, expiresAt: new Date(Date.now() - 86400 * 1000).toISOString() }), + { account, environment }, + { logToConsole: false } + ); + const ctx2 = await logContextGetter.create( + getFormattedMessage({ ...operationPayload, expiresAt: new Date(Date.now() + 86400 * 1000).toISOString() }), + { account, environment }, + { logToConsole: false } + ); + + await setTimeoutForAll({ wait: true }); + + const op1 = await getOperation({ id: ctx1.id }); + expect(op1.state).toBe('timeout'); + + const op2 = await getOperation({ id: ctx2.id }); + expect(op2.state).toBe('running'); + }); }); describe('messages', () => { diff --git a/packages/logs/lib/models/messages.ts b/packages/logs/lib/models/messages.ts index abdecfbaa6..71a4abda3d 100644 --- a/packages/logs/lib/models/messages.ts +++ b/packages/logs/lib/models/messages.ts @@ -364,3 +364,21 @@ export async function listFilters(opts: { items: agg.buckets as any }; } + +export async function setTimeoutForAll(opts: { wait?: boolean } = {}): Promise { + await client.updateByQuery({ + index: indexMessages.index, + wait_for_completion: opts.wait === true, + refresh: opts.wait === true, + query: { + bool: { + must: [{ range: { expiresAt: { lt: 'now' } } }], + must_not: { exists: { field: 'parentId' } }, + should: [{ term: { state: 'waiting' } }, { term: { state: 'running' } }] + } + }, + script: { + source: "ctx._source.state = 'timeout'" + } + }); +} diff --git a/packages/server/lib/controllers/apiAuth.controller.ts b/packages/server/lib/controllers/apiAuth.controller.ts index ff6d8ec0ed..b77ca5a0b8 100644 --- a/packages/server/lib/controllers/apiAuth.controller.ts +++ b/packages/server/lib/controllers/apiAuth.controller.ts @@ -20,7 +20,7 @@ import { LogActionEnum } from '@nangohq/shared'; import type { LogContext } from '@nangohq/logs'; -import { logContextGetter } from '@nangohq/logs'; +import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs'; import { stringifyError } from '@nangohq/utils'; import type { RequestLocals } from '../utils/express.js'; import { @@ -53,7 +53,12 @@ class ApiAuthController { let logCtx: LogContext | undefined; try { logCtx = await logContextGetter.create( - { id: String(activityLogId), operation: { type: 'auth', action: 'create_connection' }, message: 'Authorization API Key' }, + { + id: String(activityLogId), + operation: { type: 'auth', action: 'create_connection' }, + message: 'Authorization API Key', + expiresAt: defaultOperationExpiration.auth() + }, { account, environment } ); void analytics.track(AnalyticsTypes.PRE_API_KEY_AUTH, account.id); @@ -291,7 +296,12 @@ class ApiAuthController { try { logCtx = await logContextGetter.create( - { id: String(activityLogId), operation: { type: 'auth', action: 'create_connection' }, message: 'Authorization Basic' }, + { + id: String(activityLogId), + operation: { type: 'auth', action: 'create_connection' }, + message: 'Authorization Basic', + expiresAt: defaultOperationExpiration.auth() + }, { account, environment } ); void analytics.track(AnalyticsTypes.PRE_BASIC_API_KEY_AUTH, account.id); diff --git a/packages/server/lib/controllers/appStoreAuth.controller.ts b/packages/server/lib/controllers/appStoreAuth.controller.ts index 651c68cdaa..9528220420 100644 --- a/packages/server/lib/controllers/appStoreAuth.controller.ts +++ b/packages/server/lib/controllers/appStoreAuth.controller.ts @@ -18,7 +18,7 @@ import { LogActionEnum } from '@nangohq/shared'; import type { LogContext } from '@nangohq/logs'; -import { logContextGetter } from '@nangohq/logs'; +import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs'; import { stringifyError } from '@nangohq/utils'; import type { RequestLocals } from '../utils/express.js'; import { connectionCreated as connectionCreatedHook, connectionCreationFailed as connectionCreationFailedHook } from '../hooks/hooks.js'; @@ -46,7 +46,12 @@ class AppStoreAuthController { try { logCtx = await logContextGetter.create( - { id: String(activityLogId), operation: { type: 'auth', action: 'create_connection' }, message: 'Authorization App Store' }, + { + id: String(activityLogId), + operation: { type: 'auth', action: 'create_connection' }, + message: 'Authorization App Store', + expiresAt: defaultOperationExpiration.auth() + }, { account, environment } ); void analytics.track(AnalyticsTypes.PRE_APP_STORE_AUTH, account.id); diff --git a/packages/server/lib/controllers/oauth.controller.ts b/packages/server/lib/controllers/oauth.controller.ts index d7473f4595..ca401805b1 100644 --- a/packages/server/lib/controllers/oauth.controller.ts +++ b/packages/server/lib/controllers/oauth.controller.ts @@ -54,7 +54,7 @@ import publisher from '../clients/publisher.client.js'; import * as WSErrBuilder from '../utils/web-socket-error.js'; import oAuthSessionService from '../services/oauth-session.service.js'; import type { LogContext } from '@nangohq/logs'; -import { logContextGetter } from '@nangohq/logs'; +import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs'; import { errorToObject, stringifyError } from '@nangohq/utils'; import type { RequestLocals } from '../utils/express.js'; import { connectionCreated as connectionCreatedHook, connectionCreationFailed as connectionCreationFailedHook } from '../hooks/hooks.js'; @@ -86,7 +86,12 @@ class OAuthController { try { logCtx = await logContextGetter.create( - { id: String(activityLogId), operation: { type: 'auth', action: 'create_connection' }, message: 'Authorization OAuth' }, + { + id: String(activityLogId), + operation: { type: 'auth', action: 'create_connection' }, + message: 'Authorization OAuth', + expiresAt: defaultOperationExpiration.auth() + }, { account, environment } ); if (!wsClientId) { @@ -405,7 +410,12 @@ class OAuthController { try { logCtx = await logContextGetter.create( - { id: String(activityLogId), operation: { type: 'auth', action: 'create_connection' }, message: 'Authorization OAuth2 CC' }, + { + id: String(activityLogId), + operation: { type: 'auth', action: 'create_connection' }, + message: 'Authorization OAuth2 CC', + expiresAt: defaultOperationExpiration.auth() + }, { account, environment } ); void analytics.track(AnalyticsTypes.PRE_OAUTH2_CC_AUTH, account.id); diff --git a/packages/server/lib/controllers/onboarding.controller.ts b/packages/server/lib/controllers/onboarding.controller.ts index 7b1521ce0f..a368baa781 100644 --- a/packages/server/lib/controllers/onboarding.controller.ts +++ b/packages/server/lib/controllers/onboarding.controller.ts @@ -33,7 +33,7 @@ import { import type { IncomingPreBuiltFlowConfig } from '@nangohq/shared'; import { getLogger } from '@nangohq/utils'; import type { LogContext } from '@nangohq/logs'; -import { logContextGetter } from '@nangohq/logs'; +import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs'; import { records as recordsService } from '@nangohq/records'; import type { GetOnboardingStatus } from '@nangohq/types'; import type { RequestLocals } from '../utils/express.js'; @@ -432,7 +432,12 @@ class OnboardingController { } logCtx = await logContextGetter.create( - { id: String(activityLogId), operation: { type: 'action' }, message: 'Start action' }, + { + id: String(activityLogId), + operation: { type: 'action' }, + message: 'Start action', + expiresAt: defaultOperationExpiration.action() + }, { account, environment, diff --git a/packages/server/lib/controllers/sync.controller.ts b/packages/server/lib/controllers/sync.controller.ts index 1548fb718e..295b253e8e 100644 --- a/packages/server/lib/controllers/sync.controller.ts +++ b/packages/server/lib/controllers/sync.controller.ts @@ -45,7 +45,7 @@ import { getSyncConfigRaw } from '@nangohq/shared'; import type { LogContext } from '@nangohq/logs'; -import { logContextGetter } from '@nangohq/logs'; +import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs'; import type { LastAction } from '@nangohq/records'; import { isHosted } from '@nangohq/utils'; import { records as recordsService } from '@nangohq/records'; @@ -442,7 +442,12 @@ class SyncController { } logCtx = await logContextGetter.create( - { id: String(activityLogId), operation: { type: 'action' }, message: 'Start action' }, + { + id: String(activityLogId), + operation: { type: 'action' }, + message: 'Start action', + expiresAt: defaultOperationExpiration.action() + }, { account, environment, diff --git a/packages/server/lib/controllers/v1/logs/getOperation.integration.test.ts b/packages/server/lib/controllers/v1/logs/getOperation.integration.test.ts index 44df2637e5..e08e65c775 100644 --- a/packages/server/lib/controllers/v1/logs/getOperation.integration.test.ts +++ b/packages/server/lib/controllers/v1/logs/getOperation.integration.test.ts @@ -103,6 +103,7 @@ describe('GET /logs/operations/:operationId', () => { environmentId: env.id, environmentName: 'dev', error: null, + expiresAt: expect.toBeIsoDate(), id: logCtx.id, jobId: null, level: 'info', diff --git a/packages/server/lib/controllers/v1/logs/searchMessages.integration.test.ts b/packages/server/lib/controllers/v1/logs/searchMessages.integration.test.ts index f83aa6e17c..31aa33de88 100644 --- a/packages/server/lib/controllers/v1/logs/searchMessages.integration.test.ts +++ b/packages/server/lib/controllers/v1/logs/searchMessages.integration.test.ts @@ -118,6 +118,7 @@ describe('POST /logs/messages', () => { environmentId: null, environmentName: null, error: null, + expiresAt: null, id: expect.any(String), jobId: null, level: 'info', diff --git a/packages/server/lib/controllers/v1/logs/searchOperations.integration.test.ts b/packages/server/lib/controllers/v1/logs/searchOperations.integration.test.ts index c04fa22cc5..bbc154a6cb 100644 --- a/packages/server/lib/controllers/v1/logs/searchOperations.integration.test.ts +++ b/packages/server/lib/controllers/v1/logs/searchOperations.integration.test.ts @@ -113,6 +113,7 @@ describe('POST /logs/operations', () => { environmentId: env.id, environmentName: 'dev', error: null, + expiresAt: expect.toBeIsoDate(), id: logCtx.id, jobId: null, level: 'info', diff --git a/packages/shared/lib/clients/orchestrator.ts b/packages/shared/lib/clients/orchestrator.ts index acacec88ad..4a29a5fc39 100644 --- a/packages/shared/lib/clients/orchestrator.ts +++ b/packages/shared/lib/clients/orchestrator.ts @@ -299,7 +299,12 @@ export class Orchestrator { const activityLogId = await createActivityLog(log); const logCtx = await logContextGetter.create( - { id: String(activityLogId), operation: { type: 'webhook', action: 'incoming' }, message: 'Received a webhook' }, + { + id: String(activityLogId), + operation: { type: 'webhook', action: 'incoming' }, + message: 'Received a webhook', + expiresAt: new Date(Date.now() + 15 * 60 * 1000).toISOString() + }, { account, environment, diff --git a/packages/shared/lib/services/notification/webhook.service.ts b/packages/shared/lib/services/notification/webhook.service.ts index 00ac6689ab..e706a01437 100644 --- a/packages/shared/lib/services/notification/webhook.service.ts +++ b/packages/shared/lib/services/notification/webhook.service.ts @@ -395,7 +395,12 @@ class WebhookService { const activityLogId = await createActivityLog(log); const logCtx = await logContextGetter.create( - { id: String(activityLogId), operation: { type: 'webhook', action: 'outgoing' }, message: 'Forwarding Webhook' }, + { + id: String(activityLogId), + operation: { type: 'webhook', action: 'outgoing' }, + message: 'Forwarding Webhook', + expiresAt: new Date(Date.now() + 15 * 60 * 1000).toISOString() + }, { account, environment, integration: { id: integration.id!, name: integration.unique_key, provider: integration.provider } } ); diff --git a/packages/shared/lib/services/sync/job.service.ts b/packages/shared/lib/services/sync/job.service.ts index 00c172214c..fdcbe8c91b 100644 --- a/packages/shared/lib/services/sync/job.service.ts +++ b/packages/shared/lib/services/sync/job.service.ts @@ -4,11 +4,10 @@ import { LogActionEnum } from '../../models/Activity.js'; import type { NangoConnection } from '../../models/Connection.js'; import type { Job as SyncJob, SyncResultByModel } from '../../models/Sync.js'; import { SyncStatus, SyncType } from '../../models/Sync.js'; +import { MAX_SYNC_DURATION } from '@nangohq/utils'; const SYNC_JOB_TABLE = dbNamespace + 'sync_jobs'; -const SYNC_TIMEOUT_HOURS = 25; - export const createSyncJob = async ( sync_id: string, type: SyncType, @@ -174,8 +173,7 @@ export const isInitialSyncStillRunning = async (sync_id: string): Promise