Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logs): timeout old operations #2220

Merged
merged 21 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/jobs/lib/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { deleteOldActivityLogs } from './crons/deleteOldActivities.js';
import { deleteSyncsData } from './crons/deleteSyncsData.js';
import { reconcileTemporalSchedules } from './crons/reconcileTemporalSchedules.js';
import { getLogger, stringifyError } from '@nangohq/utils';
import { timeoutLogsOperations } from './crons/timeoutLogsOperations.js';
import db from '@nangohq/database';
import { envs } from './env.js';

Expand All @@ -33,6 +34,7 @@ try {
deleteOldActivityLogs();
deleteSyncsData();
reconcileTemporalSchedules();
timeoutLogsOperations();

// handle SIGTERM
process.on('SIGTERM', () => {
Expand Down
26 changes: 26 additions & 0 deletions packages/jobs/lib/crons/timeoutLogsOperations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import * as cron from 'node-cron';
import { errorManager, ErrorSourceEnum } from '@nangohq/shared';
import tracer from 'dd-trace';
import { envs, model } from '@nangohq/logs';
import { getLogger } from '@nangohq/utils';

const logger = getLogger('Jobs.TimeoutLogsOperations');

export function timeoutLogsOperations(): void {
if (!envs.NANGO_LOGS_ENABLED) {
return;
}

cron.schedule(
'*/10 * * * *',
// eslint-disable-next-line @typescript-eslint/no-misused-promises
async () => {
try {
logger.info(`Timeouting old operations...`);
await model.setTimeoutForAll();
} catch (err) {
errorManager.report(err, { source: ErrorSourceEnum.PLATFORM }, tracer);
}
}
);
}
4 changes: 3 additions & 1 deletion packages/logs/lib/client.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { getOperation, listMessages, listOperations } from './models/messages.js
import type { OperationRowInsert } from '@nangohq/types';
import { afterEach } from 'node:test';
import { logContextGetter } from './models/logContextGetter.js';
import { indexMessages } from './es/schema.js';

const account = { id: 1234, name: 'test' };
const environment = { id: 5678, name: 'dev' };
Expand All @@ -20,7 +21,7 @@ const operationPayload: OperationRowInsert = {

describe('client', () => {
beforeAll(async () => {
await deleteIndex();
await deleteIndex({ prefix: indexMessages.index });
await migrateMapping();
});
afterEach(() => {
Expand Down Expand Up @@ -61,6 +62,7 @@ describe('client', () => {
environmentId: 5678,
environmentName: 'dev',
error: null,
expiresAt: expect.toBeIsoDate(),
id: ctx.id,
jobId: null,
level: 'info',
Expand Down
9 changes: 8 additions & 1 deletion packages/logs/lib/env.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import { parseEnvs, ENVS } from '@nangohq/utils';
import { parseEnvs, ENVS, MAX_ACTION_DURATION, MAX_SYNC_DURATION, MAX_WEBHOOK_DURATION } from '@nangohq/utils';

// Do not require in community and enterprise right now
const required = process.env['NANGO_LOGS_ENABLED'] === 'true';

export const envs = parseEnvs(required ? ENVS.required({ NANGO_LOGS_ES_URL: true, NANGO_LOGS_ES_USER: true, NANGO_LOGS_ES_PWD: true }) : ENVS);

envs.NANGO_LOGS_ENABLED = Boolean(envs.NANGO_LOGS_ENABLED && envs.NANGO_LOGS_ES_URL);

export const defaultOperationExpiration = {
action: () => new Date(Date.now() + MAX_ACTION_DURATION).toISOString(),
webhook: () => new Date(Date.now() + MAX_WEBHOOK_DURATION).toISOString(),
sync: () => new Date(Date.now() + MAX_SYNC_DURATION).toISOString(),
auth: () => new Date(Date.now() + 300 * 1000).toISOString()
};
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ exports[`mapping > should create one index automatically on log 1`] = `
"enabled": false,
"type": "object",
},
"expiresAt": {
"type": "date",
},
"id": {
"type": "keyword",
},
Expand Down
8 changes: 6 additions & 2 deletions packages/logs/lib/es/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export async function migrateMapping() {
}
}

export async function deleteIndex() {
export async function deleteIndex({ prefix }: { prefix: string }) {
if (!isTest) {
throw new Error('Trying to delete stuff in prod');
}
Expand All @@ -70,7 +70,11 @@ export async function deleteIndex() {
const indices = await client.cat.indices({ format: 'json' });
await Promise.all(
indices.map(async (index) => {
await client.indices.delete({ index: index.index!, ignore_unavailable: true });
if (!index.index?.startsWith(prefix)) {
return;
}

await client.indices.delete({ index: index.index, ignore_unavailable: true });
})
);
} catch (err) {
Expand Down
4 changes: 2 additions & 2 deletions packages/logs/lib/es/index.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ describe('mapping', () => {
const today = new Date().toISOString().split('T')[0];
let fullIndexName: string;
beforeAll(async () => {
indexMessages.index = `messages-${nanoid()}`.toLocaleLowerCase();
indexMessages.index = `index-messages-${nanoid()}`.toLocaleLowerCase();
fullIndexName = `${indexMessages.index}.${today}`;

// Delete before otherwise it's hard to debug
await deleteIndex();
await deleteIndex({ prefix: 'index-messages' });
});

it('should not have an index before migration', async () => {
Expand Down
1 change: 1 addition & 0 deletions packages/logs/lib/es/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ const props: Record<keyof MessageRow, estypes.MappingProperty> = {
createdAt: { type: 'date' },
updatedAt: { type: 'date' },
startedAt: { type: 'date' },
expiresAt: { type: 'date' },
endedAt: { type: 'date' }
};

Expand Down
2 changes: 1 addition & 1 deletion packages/logs/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ export * from './client.js';
export * from './models/helpers.js';
export * from './models/logContextGetter.js';
export * as model from './models/messages.js';
export { envs } from './env.js';
export { envs, defaultOperationExpiration } from './env.js';
9 changes: 6 additions & 3 deletions packages/logs/lib/models/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { nanoid } from '@nangohq/utils';
import type { MessageRow } from '@nangohq/types';
import { z } from 'zod';
import type { estypes } from '@elastic/elasticsearch';
import { defaultOperationExpiration } from '../env.js';

export const operationIdRegex = z.string().regex(/([0-9]|[a-zA-Z0-9]{20})/);

Expand All @@ -19,6 +20,7 @@ export function getFormattedMessage(
data: Partial<MessageRow>,
{ account, user, environment, integration, connection, syncConfig, meta }: FormatMessageData = {}
): MessageRow {
const now = new Date();
return {
id: data.id || nanoid(),

Expand Down Expand Up @@ -57,10 +59,11 @@ export function getFormattedMessage(
response: data.response || null,
meta: meta || data.meta || null,

createdAt: data.createdAt || new Date().toISOString(),
updatedAt: data.updatedAt || new Date().toISOString(),
createdAt: data.createdAt || now.toISOString(),
updatedAt: data.updatedAt || now.toISOString(),
startedAt: data.startedAt || null,
endedAt: data.endedAt || null
endedAt: data.endedAt || null,
expiresAt: data.operation ? data.expiresAt || defaultOperationExpiration.sync() : null
};
}

Expand Down
29 changes: 26 additions & 3 deletions packages/logs/lib/models/messages.integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import { describe, beforeAll, it, expect, vi } from 'vitest';
import { deleteIndex, migrateMapping } from '../es/helpers.js';
import type { ListMessages, ListOperations } from './messages.js';
import { listMessages, listOperations } from './messages.js';
import type { ListOperations, ListMessages } from './messages.js';
import { getOperation, listOperations, listMessages, setTimeoutForAll } from './messages.js';
import { afterEach } from 'node:test';
import { logContextGetter } from './logContextGetter.js';
import type { OperationRowInsert } from '@nangohq/types';
import { getFormattedMessage } from './helpers.js';
import { indexMessages } from '../es/schema.js';

const account = { id: 1234, name: 'test' };
const environment = { id: 5678, name: 'dev' };
const operationPayload: OperationRowInsert = { operation: { type: 'sync', action: 'run' }, message: '' };

describe('model', () => {
beforeAll(async () => {
await deleteIndex();
await deleteIndex({ prefix: indexMessages.index });
await migrateMapping();
});
afterEach(() => {
Expand Down Expand Up @@ -53,6 +55,27 @@ describe('model', () => {
expect(list3.items).toHaveLength(0);
expect(list3.cursor).toBeNull();
});

it('should timeout old operations', async () => {
const ctx1 = await logContextGetter.create(
getFormattedMessage({ ...operationPayload, expiresAt: new Date(Date.now() - 86400 * 1000).toISOString() }),
{ account, environment },
{ logToConsole: false }
);
const ctx2 = await logContextGetter.create(
getFormattedMessage({ ...operationPayload, expiresAt: new Date(Date.now() + 86400 * 1000).toISOString() }),
{ account, environment },
{ logToConsole: false }
);

await setTimeoutForAll({ wait: true });

const op1 = await getOperation({ id: ctx1.id });
expect(op1.state).toBe('timeout');

const op2 = await getOperation({ id: ctx2.id });
expect(op2.state).toBe('running');
});
});

describe('messages', () => {
Expand Down
18 changes: 18 additions & 0 deletions packages/logs/lib/models/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,21 @@ export async function listFilters(opts: {
items: agg.buckets as any
};
}

export async function setTimeoutForAll(opts: { wait?: boolean } = {}): Promise<void> {
await client.updateByQuery({
index: indexMessages.index,
wait_for_completion: opts.wait === true,
refresh: opts.wait === true,
query: {
bool: {
must: [{ range: { expiresAt: { lt: 'now' } } }],
must_not: { exists: { field: 'parentId' } },
should: [{ term: { state: 'waiting' } }, { term: { state: 'running' } }]
}
},
script: {
source: "ctx._source.state = 'timeout'"
}
});
}
16 changes: 13 additions & 3 deletions packages/server/lib/controllers/apiAuth.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
LogActionEnum
} from '@nangohq/shared';
import type { LogContext } from '@nangohq/logs';
import { logContextGetter } from '@nangohq/logs';
import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs';
import { stringifyError } from '@nangohq/utils';
import type { RequestLocals } from '../utils/express.js';
import {
Expand Down Expand Up @@ -53,7 +53,12 @@ class ApiAuthController {
let logCtx: LogContext | undefined;
try {
logCtx = await logContextGetter.create(
{ id: String(activityLogId), operation: { type: 'auth', action: 'create_connection' }, message: 'Authorization API Key' },
{
id: String(activityLogId),
operation: { type: 'auth', action: 'create_connection' },
message: 'Authorization API Key',
expiresAt: defaultOperationExpiration.auth()
},
{ account, environment }
);
void analytics.track(AnalyticsTypes.PRE_API_KEY_AUTH, account.id);
Expand Down Expand Up @@ -291,7 +296,12 @@ class ApiAuthController {

try {
logCtx = await logContextGetter.create(
{ id: String(activityLogId), operation: { type: 'auth', action: 'create_connection' }, message: 'Authorization Basic' },
{
id: String(activityLogId),
operation: { type: 'auth', action: 'create_connection' },
message: 'Authorization Basic',
expiresAt: defaultOperationExpiration.auth()
},
{ account, environment }
);
void analytics.track(AnalyticsTypes.PRE_BASIC_API_KEY_AUTH, account.id);
Expand Down
9 changes: 7 additions & 2 deletions packages/server/lib/controllers/appStoreAuth.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
LogActionEnum
} from '@nangohq/shared';
import type { LogContext } from '@nangohq/logs';
import { logContextGetter } from '@nangohq/logs';
import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs';
import { stringifyError } from '@nangohq/utils';
import type { RequestLocals } from '../utils/express.js';
import { connectionCreated as connectionCreatedHook, connectionCreationFailed as connectionCreationFailedHook } from '../hooks/hooks.js';
Expand Down Expand Up @@ -46,7 +46,12 @@ class AppStoreAuthController {

try {
logCtx = await logContextGetter.create(
{ id: String(activityLogId), operation: { type: 'auth', action: 'create_connection' }, message: 'Authorization App Store' },
{
id: String(activityLogId),
operation: { type: 'auth', action: 'create_connection' },
message: 'Authorization App Store',
expiresAt: defaultOperationExpiration.auth()
},
{ account, environment }
);
void analytics.track(AnalyticsTypes.PRE_APP_STORE_AUTH, account.id);
Expand Down
16 changes: 13 additions & 3 deletions packages/server/lib/controllers/oauth.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import publisher from '../clients/publisher.client.js';
import * as WSErrBuilder from '../utils/web-socket-error.js';
import oAuthSessionService from '../services/oauth-session.service.js';
import type { LogContext } from '@nangohq/logs';
import { logContextGetter } from '@nangohq/logs';
import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs';
import { errorToObject, stringifyError } from '@nangohq/utils';
import type { RequestLocals } from '../utils/express.js';
import { connectionCreated as connectionCreatedHook, connectionCreationFailed as connectionCreationFailedHook } from '../hooks/hooks.js';
Expand Down Expand Up @@ -86,7 +86,12 @@ class OAuthController {

try {
logCtx = await logContextGetter.create(
{ id: String(activityLogId), operation: { type: 'auth', action: 'create_connection' }, message: 'Authorization OAuth' },
{
id: String(activityLogId),
operation: { type: 'auth', action: 'create_connection' },
message: 'Authorization OAuth',
expiresAt: defaultOperationExpiration.auth()
},
{ account, environment }
);
if (!wsClientId) {
Expand Down Expand Up @@ -405,7 +410,12 @@ class OAuthController {

try {
logCtx = await logContextGetter.create(
{ id: String(activityLogId), operation: { type: 'auth', action: 'create_connection' }, message: 'Authorization OAuth2 CC' },
{
id: String(activityLogId),
operation: { type: 'auth', action: 'create_connection' },
message: 'Authorization OAuth2 CC',
expiresAt: defaultOperationExpiration.auth()
},
{ account, environment }
);
void analytics.track(AnalyticsTypes.PRE_OAUTH2_CC_AUTH, account.id);
Expand Down
9 changes: 7 additions & 2 deletions packages/server/lib/controllers/onboarding.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import {
import type { IncomingPreBuiltFlowConfig } from '@nangohq/shared';
import { getLogger } from '@nangohq/utils';
import type { LogContext } from '@nangohq/logs';
import { logContextGetter } from '@nangohq/logs';
import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs';
import { records as recordsService } from '@nangohq/records';
import type { GetOnboardingStatus } from '@nangohq/types';
import type { RequestLocals } from '../utils/express.js';
Expand Down Expand Up @@ -432,7 +432,12 @@ class OnboardingController {
}

logCtx = await logContextGetter.create(
{ id: String(activityLogId), operation: { type: 'action' }, message: 'Start action' },
{
id: String(activityLogId),
operation: { type: 'action' },
message: 'Start action',
expiresAt: defaultOperationExpiration.action()
},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 day for an action that's pretty generous

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's just for the heartbeat, unfortunately I have no way to have something more precise. But if everything works well everything should be handled by temporal/v2 :D

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to abstract out these times to one place in case we need to adjust

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah good point, actually I wasn't aware of the constant you shared on previous message so maybe I can expose them globally somehow

{
account,
environment,
Expand Down
Loading
Loading