From 2bdd1ae9ebd6cb741a5c238a9b714c7a071065e9 Mon Sep 17 00:00:00 2001 From: Thomas Bonnin <233326+TBonnin@users.noreply.github.com> Date: Fri, 23 Aug 2024 11:07:31 +0100 Subject: [PATCH 1/2] fix: action and webhook logs improvements - Added request/response to action operation payload - Removed duplicates logs. We were indeed logging more or less the same thing at different levels (handler in jobs, trigger function and/or controller). - Log messages for actions and webhooks are now shorter and mostly without data (ex: 'Started action A', 'The action failed') Details like connection, provider, truncated response, error payload are now in the log payload I want to add request/response details for proxied requests made within a script but it requires modifying the persist API which is ingesting the logs coming from the runner. I will do that in next PR --- packages/jobs/lib/execution/action.ts | 6 - packages/jobs/lib/execution/postConnection.ts | 1 - packages/jobs/lib/execution/webhook.ts | 8 -- .../server/lib/controllers/sync.controller.ts | 17 ++- packages/shared/lib/clients/orchestrator.ts | 125 +++++++++++++----- packages/shared/lib/constants.ts | 2 - packages/shared/lib/utils/error.ts | 2 +- packages/types/lib/logs/messages.ts | 2 +- packages/utils/lib/express/headers.ts | 13 ++ packages/utils/lib/index.ts | 1 + 10 files changed, 120 insertions(+), 57 deletions(-) create mode 100644 packages/utils/lib/express/headers.ts diff --git a/packages/jobs/lib/execution/action.ts b/packages/jobs/lib/execution/action.ts index 5644ef6e32..392e07e258 100644 --- a/packages/jobs/lib/execution/action.ts +++ b/packages/jobs/lib/execution/action.ts @@ -105,11 +105,6 @@ export async function startAction(task: TaskAction): Promise> { } } export async function handleActionSuccess({ nangoProps }: { nangoProps: NangoProps }): Promise { - const logCtx = await logContextGetter.get({ id: String(nangoProps.activityLogId) }); - const content = `${nangoProps.syncConfig.sync_name} action was run successfully and results are being sent synchronously.`; - - await logCtx.info(content); - const connection: NangoConnection = { id: nangoProps.nangoConnectionId!, connection_id: nangoProps.connectionId, @@ -219,5 +214,4 @@ async function onFailure({ } }); } - await logCtx.error(error.message, { error }); } diff --git a/packages/jobs/lib/execution/postConnection.ts b/packages/jobs/lib/execution/postConnection.ts index e928463c8b..846f5fd10a 100644 --- a/packages/jobs/lib/execution/postConnection.ts +++ b/packages/jobs/lib/execution/postConnection.ts @@ -121,7 +121,6 @@ export async function handlePostConnectionSuccess({ nangoProps }: { nangoProps: createdAt: Date.now() }); const logCtx = await logContextGetter.get({ id: String(nangoProps.activityLogId) }); - await logCtx.info(content); await logCtx.success(); } diff --git a/packages/jobs/lib/execution/webhook.ts b/packages/jobs/lib/execution/webhook.ts index 85352a3fdc..34fe7d19fd 100644 --- a/packages/jobs/lib/execution/webhook.ts +++ b/packages/jobs/lib/execution/webhook.ts @@ -122,7 +122,6 @@ export async function startWebhook(task: TaskWebhook): Promise> { syncName: task.parentSyncName, syncJobId: syncJob?.id, providerConfigKey: task.connection.provider_config_key, - activityLogId: task.activityLogId, runTime: 0, error, environment: { id: task.connection.environment_id, name: environment?.name || 'unknown' }, @@ -151,8 +150,6 @@ export async function handleWebhookSuccess({ nangoProps }: { nangoProps: NangoPr runTimeInSeconds: (new Date().getTime() - nangoProps.startedAt.getTime()) / 1000, createdAt: Date.now() }); - const logCtx = await logContextGetter.get({ id: String(nangoProps.activityLogId) }); - await logCtx.info(content); await updateSyncJobStatus(nangoProps.syncJobId!, SyncStatus.SUCCESS); } @@ -169,7 +166,6 @@ export async function handleWebhookError({ nangoProps, error }: { nangoProps: Na syncName: nangoProps.syncConfig.sync_name, syncJobId: nangoProps.syncJobId!, providerConfigKey: nangoProps.providerConfigKey, - activityLogId: nangoProps.activityLogId!, runTime: (new Date().getTime() - nangoProps.startedAt.getTime()) / 1000, error, environment: { id: nangoProps.environmentId, name: nangoProps.environmentName || 'unknown' }, @@ -185,7 +181,6 @@ async function onFailure({ syncName, syncJobId, providerConfigKey, - activityLogId, runTime, error }: { @@ -196,7 +191,6 @@ async function onFailure({ syncJobId?: number | undefined; syncName: string; providerConfigKey: string; - activityLogId: string; runTime: number; error: NangoError; }): Promise { @@ -219,8 +213,6 @@ async function onFailure({ createdAt: Date.now() }); } - const logCtx = await logContextGetter.get({ id: activityLogId }); - await logCtx.error(error.message, { error }); if (syncJobId) { await updateSyncJobStatus(syncJobId, SyncStatus.STOPPED); diff --git a/packages/server/lib/controllers/sync.controller.ts b/packages/server/lib/controllers/sync.controller.ts index b2d9970a8a..08604a580f 100644 --- a/packages/server/lib/controllers/sync.controller.ts +++ b/packages/server/lib/controllers/sync.controller.ts @@ -33,7 +33,7 @@ import { import type { LogContext } from '@nangohq/logs'; import { defaultOperationExpiration, logContextGetter } from '@nangohq/logs'; import type { LastAction } from '@nangohq/records'; -import { isHosted } from '@nangohq/utils'; +import { getHeaders, isHosted } from '@nangohq/utils'; import { records as recordsService } from '@nangohq/records'; import type { RequestLocals } from '../utils/express.js'; import { getOrchestrator } from '../utils/utils.js'; @@ -324,7 +324,6 @@ class SyncController { return; } else { span.setTag('nango.error', actionResponse.error); - await logCtx.error('Failed to run action', { error: actionResponse.error }); await logCtx.failed(); errorManager.errResFromNangoErr(res, actionResponse.error); @@ -340,6 +339,20 @@ class SyncController { } next(err); + } finally { + const reqHeaders = getHeaders(req.headers); + reqHeaders['authorization'] = 'REDACTED'; + await logCtx?.enrichOperation({ + request: { + url: `${req.protocol}://${req.get('host')}${req.originalUrl}`, + method: req.method, + headers: reqHeaders + }, + response: { + code: res.statusCode, + headers: getHeaders(res.getHeaders()) + } + }); } } diff --git a/packages/shared/lib/clients/orchestrator.ts b/packages/shared/lib/clients/orchestrator.ts index ceaf2adb5b..acfad147a3 100644 --- a/packages/shared/lib/clients/orchestrator.ts +++ b/packages/shared/lib/clients/orchestrator.ts @@ -6,7 +6,6 @@ import type { Result } from '@nangohq/utils'; import { NangoError, deserializeNangoError } from '../utils/error.js'; import telemetry, { LogTypes } from '../utils/telemetry.js'; import type { NangoConnection } from '../models/Connection.js'; -import { SYNC_TASK_QUEUE, WEBHOOK_TASK_QUEUE } from '../constants.js'; import { v4 as uuid } from 'uuid'; import errorManager, { ErrorSourceEnum } from '../utils/error.manager.js'; import type { Config as ProviderConfig } from '../models/Provider.js'; @@ -110,9 +109,13 @@ export class Orchestrator { ...(activeSpan ? { childOf: activeSpan } : {}) }); const startTime = Date.now(); - const workflowId = `${SYNC_TASK_QUEUE}.ACTION:${actionName}.${connection.connection_id}.${uuid()}`; try { - await logCtx.info(`Starting action workflow ${workflowId} in the task queue: ${SYNC_TASK_QUEUE}`, { input }); + await logCtx.info(`Starting action '${actionName}'`, { + input, + action: actionName, + connection: connection.connection_id, + integration: connection.provider_config_key + }); let parsedInput = null; try { @@ -144,7 +147,7 @@ export class Orchestrator { const res = actionResult.mapError((err) => { return ( deserializeNangoError(err.payload) || - new NangoError('action_failure', { error: err.message, ...(err.payload ? { payload: err.payload } : {}) }) + new NangoError('action_script_failure', { error: err.message, ...(err.payload ? { payload: err.payload } : {}) }) ); }); @@ -152,16 +155,20 @@ export class Orchestrator { throw res.error; } - const content = `The action workflow ${workflowId} was successfully run. A truncated response is: ${JSON.stringify(res.value, null, 2)?.slice(0, 100)}`; + const content = `The action was successfully run`; - await logCtx.info(content); + await logCtx.info(content, { + action: actionName, + connection: connection.connection_id, + integration: connection.provider_config_key, + truncated_response: JSON.stringify(res.value, null, 2)?.slice(0, 100) + }); await telemetry.log( LogTypes.ACTION_SUCCESS, content, LogActionEnum.ACTION, { - workflowId, input: JSON.stringify(input, null, 2), environmentId: String(connection.environment_id), connectionId: connection.connection_id, @@ -181,9 +188,13 @@ export class Orchestrator { formattedError = new NangoError('action_failure', { error: errorToObject(err) }); } - const content = `The action workflow ${workflowId} failed with error: ${stringifyError(err)}`; - - await logCtx.error(`Failed with error "${formattedError.type}"`, { error: formattedError }); + const content = `The action failed`; + await logCtx.error(content, { + error: formattedError, + action: actionName, + connection: connection.connection_id, + integration: connection.provider_config_key + }); errorManager.report(err, { source: ErrorSourceEnum.PLATFORM, @@ -201,7 +212,7 @@ export class Orchestrator { content, LogActionEnum.ACTION, { - workflowId, + error: stringifyError(err), input: JSON.stringify(input, null, 2), environmentId: String(connection.environment_id), connectionId: connection.connection_id, @@ -270,10 +281,13 @@ export class Orchestrator { } ); - const workflowId = `${WEBHOOK_TASK_QUEUE}.WEBHOOK:${syncConfig.sync_name}:${webhookName}.${connection.connection_id}.${Date.now()}`; - try { - await logCtx.info('Starting webhook workflow', { workflowId, input }); + await logCtx.info('Starting webhook workflow', { + input, + webhook: webhookName, + connection: connection.connection_id, + integration: connection.provider_config_key + }); let parsedInput = null; try { @@ -302,25 +316,44 @@ export class Orchestrator { groupKey, args }); - const res = webhookResult.mapError((e) => new NangoError('webhook_failure', e.payload ?? { error: e.message })); + const res = webhookResult.mapError((err) => { + return ( + deserializeNangoError(err.payload) || + new NangoError('webhook_script_failure', { error: err.message, ...(err.payload ? { payload: err.payload } : {}) }) + ); + }); if (res.isErr()) { throw res.error; } - await logCtx.info('The webhook workflow was successfully run'); + await logCtx.info('The webhook was successfully run', { + action: webhookName, + connection: connection.connection_id, + integration: connection.provider_config_key + }); + await logCtx.success(); metrics.increment(metrics.Types.WEBHOOK_SUCCESS); return res as Result; - } catch (e) { - const errorMessage = stringifyError(e, { pretty: true }); - const error = new NangoError('webhook_script_failure', { errorMessage }); + } catch (err) { + let formattedError: NangoError; + if (err instanceof NangoError) { + formattedError = err; + } else { + formattedError = new NangoError('webhook_failure', { error: errorToObject(err) }); + } - await logCtx.error('The webhook workflow failed', { error: e }); + await logCtx.error('The webhook failed', { + error: err, + webhook: webhookName, + connection: connection.connection_id, + integration: connection.provider_config_key + }); await logCtx.failed(); - errorManager.report(e, { + errorManager.report(err, { source: ErrorSourceEnum.PLATFORM, operation: LogActionEnum.SYNC_CLIENT, environmentId: connection.environment_id, @@ -333,8 +366,8 @@ export class Orchestrator { }); metrics.increment(metrics.Types.WEBHOOK_FAILURE); - span.setTag('error', error); - return Err(error); + span.setTag('error', formattedError); + return Err(formattedError); } finally { span.finish(); } @@ -366,9 +399,12 @@ export class Orchestrator { ...(activeSpan ? { childOf: activeSpan } : {}) }); const startTime = Date.now(); - const workflowId = `${SYNC_TASK_QUEUE}.POST_CONNECTION_SCRIPT:${name}.${connection.connection_id}.${uuid()}`; try { - await logCtx.info(`Starting post connection script workflow ${workflowId} in the task queue: ${SYNC_TASK_QUEUE}`); + await logCtx.info(`Starting post connection script`, { + postConnection: name, + connection: connection.connection_id, + integration: connection.provider_config_key + }); const groupKey: string = 'post-connection-script'; const executionId = `${groupKey}:environment:${connection.environment_id}:connection:${connection.id}:post-connection-script:${name}:at:${new Date().toISOString()}:${uuid()}`; @@ -389,22 +425,31 @@ export class Orchestrator { groupKey, args }); - const res = result.mapError((e) => new NangoError('post_connection_failure', e.payload ?? { error: e.message })); + + const res = result.mapError((err) => { + return ( + deserializeNangoError(err.payload) || + new NangoError('post_connection_script_failure', { error: err.message, ...(err.payload ? { payload: err.payload } : {}) }) + ); + }); if (res.isErr()) { throw res.error; } - const content = `The post connection script workflow ${workflowId} was successfully run. A truncated response is: ${JSON.stringify(res.value, null, 2)?.slice(0, 100)}`; + const content = `The post connection script was successfully run.`; - await logCtx.info(content); + await logCtx.info(content, { + postConnection: name, + connection: connection.connection_id, + integration: connection.provider_config_key + }); await telemetry.log( LogTypes.POST_CONNECTION_SCRIPT_SUCCESS, content, LogActionEnum.POST_CONNECTION_SCRIPT, { - workflowId, environmentId: String(connection.environment_id), connectionId: connection.connection_id, providerConfigKey: connection.provider_config_key, @@ -416,12 +461,21 @@ export class Orchestrator { metrics.increment(metrics.Types.POST_CONNECTION_SCRIPT_SUCCESS); return res as Result; } catch (err) { - const errorMessage = stringifyError(err, { pretty: true }); - const error = new NangoError('post_connection_script_failure', { errorMessage }); + let formattedError: NangoError; + if (err instanceof NangoError) { + formattedError = err; + } else { + formattedError = new NangoError('post_connection_failure', { error: errorToObject(err) }); + } - const content = `The post-connection-script workflow ${workflowId} failed with error: ${err}`; + const content = `The post connection script failed`; - await logCtx.error(content); + await logCtx.error(content, { + error: formattedError, + postConnection: name, + connection: connection.connection_id, + integration: connection.provider_config_key + }); errorManager.report(err, { source: ErrorSourceEnum.PLATFORM, @@ -438,7 +492,6 @@ export class Orchestrator { content, LogActionEnum.POST_CONNECTION_SCRIPT, { - workflowId, environmentId: String(connection.environment_id), connectionId: connection.connection_id, providerConfigKey: connection.provider_config_key, @@ -449,8 +502,8 @@ export class Orchestrator { ); metrics.increment(metrics.Types.POST_CONNECTION_SCRIPT_FAILURE); - span.setTag('error', error); - return Err(error); + span.setTag('error', formattedError); + return Err(formattedError); } finally { const endTime = Date.now(); const totalRunTime = (endTime - startTime) / 1000; diff --git a/packages/shared/lib/constants.ts b/packages/shared/lib/constants.ts index da3202481d..77fc6db393 100644 --- a/packages/shared/lib/constants.ts +++ b/packages/shared/lib/constants.ts @@ -1,5 +1,3 @@ import { isEnterprise } from '@nangohq/utils'; -export const SYNC_TASK_QUEUE = 'nango-syncs'; -export const WEBHOOK_TASK_QUEUE = 'nango-webhooks'; export const CONNECTIONS_WITH_SCRIPTS_CAP_LIMIT = isEnterprise ? Infinity : 3; diff --git a/packages/shared/lib/utils/error.ts b/packages/shared/lib/utils/error.ts index 173828f598..3c9276113d 100644 --- a/packages/shared/lib/utils/error.ts +++ b/packages/shared/lib/utils/error.ts @@ -533,7 +533,7 @@ export class NangoError extends Error { case 'action_script_runtime_error': this.status = 500; - this.message = ''; + this.message = 'The action script failed with a runtime error'; break; case 'script_cancelled': diff --git a/packages/types/lib/logs/messages.ts b/packages/types/lib/logs/messages.ts index 119f398bad..b7b88940e2 100644 --- a/packages/types/lib/logs/messages.ts +++ b/packages/types/lib/logs/messages.ts @@ -140,6 +140,6 @@ export type OperationRow = Merge, { accountId: numb export type MessageRowInsert = Pick & Partial> & { id?: never }; // Buffer logs inside proxy calls -export type LogsBuffer = Pick & Partial>; +export type LogsBuffer = Pick & Partial>; export type MessageOrOperationRow = MessageRow | OperationRow; diff --git a/packages/utils/lib/express/headers.ts b/packages/utils/lib/express/headers.ts new file mode 100644 index 0000000000..9ebd95e775 --- /dev/null +++ b/packages/utils/lib/express/headers.ts @@ -0,0 +1,13 @@ +import type { IncomingHttpHeaders, OutgoingHttpHeaders } from 'http'; + +export function getHeaders(hs: IncomingHttpHeaders | OutgoingHttpHeaders): Record { + const headers: Record = {}; + for (const [key, value] of Object.entries(hs)) { + if (typeof value === 'string') { + headers[key] = value; + } else if (Array.isArray(value)) { + headers[key] = value.join(', '); + } + } + return headers; +} diff --git a/packages/utils/lib/index.ts b/packages/utils/lib/index.ts index c49127cc93..9324513b8d 100644 --- a/packages/utils/lib/index.ts +++ b/packages/utils/lib/index.ts @@ -12,6 +12,7 @@ export * from './string.js'; export * from './express/requestLoggerMiddleware.js'; export * from './express/route.js'; export * from './express/validate.js'; +export * from './express/headers.js'; export * from './workflows.js'; export * from './axios.js'; export * from './auth.js'; From 61438a286d3daa219fd934b997e9f8b9d6096c15 Mon Sep 17 00:00:00 2001 From: Thomas Bonnin <233326+TBonnin@users.noreply.github.com> Date: Mon, 26 Aug 2024 11:36:17 -0400 Subject: [PATCH 2/2] fix: use getHeaders function from utils --- .../server/lib/controllers/proxy.controller.ts | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/packages/server/lib/controllers/proxy.controller.ts b/packages/server/lib/controllers/proxy.controller.ts index a63045722f..226f14c53d 100644 --- a/packages/server/lib/controllers/proxy.controller.ts +++ b/packages/server/lib/controllers/proxy.controller.ts @@ -1,5 +1,5 @@ import type { Request, Response, NextFunction } from 'express'; -import type { OutgoingHttpHeaders, IncomingHttpHeaders } from 'http'; +import type { OutgoingHttpHeaders } from 'http'; import type { TransformCallback } from 'stream'; import type stream from 'stream'; import { Readable, Transform, PassThrough } from 'stream'; @@ -10,7 +10,7 @@ import type { AxiosError, AxiosRequestConfig, AxiosResponse } from 'axios'; import { backOff } from 'exponential-backoff'; import type { HTTP_VERB, UserProvidedProxyConfiguration, InternalProxyConfiguration, ApplicationConstructedProxyConfiguration } from '@nangohq/shared'; import { NangoError, LogActionEnum, errorManager, ErrorSourceEnum, proxyService, connectionService, configService, featureFlags } from '@nangohq/shared'; -import { metrics, getLogger, axiosInstance as axios } from '@nangohq/utils'; +import { metrics, getLogger, axiosInstance as axios, getHeaders } from '@nangohq/utils'; import { logContextGetter } from '@nangohq/logs'; import { connectionRefreshFailed as connectionRefreshFailedHook, connectionRefreshSuccess as connectionRefreshSuccessHook } from '../hooks/hooks.js'; import type { LogContext } from '@nangohq/logs'; @@ -162,17 +162,6 @@ class ProxyController { metrics.increment(metrics.Types.PROXY_FAILURE); next(err); } finally { - const getHeaders = (hs: IncomingHttpHeaders | OutgoingHttpHeaders): Record => { - const headers: Record = {}; - for (const [key, value] of Object.entries(hs)) { - if (typeof value === 'string') { - headers[key] = value; - } else if (Array.isArray(value)) { - headers[key] = value.join(', '); - } - } - return headers; - }; const reqHeaders = getHeaders(req.headers); reqHeaders['authorization'] = 'REDACTED'; await logCtx?.enrichOperation({