Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(webhooks): [nan 1064] webhook on sync error #2281

Merged
merged 5 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions packages/jobs/lib/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import { getLogger, stringifyError, errorToObject } from '@nangohq/utils';
import integrationService from './integration.service.js';
import type { LogContext } from '@nangohq/logs';
import { logContextGetter } from '@nangohq/logs';
import { sendSync } from '@nangohq/webhooks';
import { bigQueryClient, slackService } from './clients.js';

const logger = getLogger('Jobs');
Expand Down Expand Up @@ -90,6 +91,7 @@ export async function runAction(args: ActionArgs): Promise<ServiceResponse> {
slackService,
writeToDb: true,
logCtx: await logContextGetter.get({ id: String(activityLogId) }),
sendSyncWebhook: sendSync,
nangoConnection,
syncName: actionName,
isAction: true,
Expand Down Expand Up @@ -321,6 +323,7 @@ export async function syncProvider({
integrationService,
recordsService,
slackService,
sendSyncWebhook: sendSync,
writeToDb: true,
syncId,
syncJobId,
Expand Down Expand Up @@ -418,6 +421,7 @@ export async function runWebhook(args: WebhookArgs): Promise<boolean> {
slackService,
writeToDb: true,
nangoConnection,
sendSyncWebhook: sendSync,
syncJobId: syncJobId?.id as number,
syncName: parentSyncName,
isAction: false,
Expand Down Expand Up @@ -455,6 +459,7 @@ export async function runPostConnectionScript(args: PostConnectionScriptArgs): P
writeToDb: true,
nangoConnection,
syncName: name,
sendSyncWebhook: sendSync,
isAction: false,
isPostConnectionScript: true,
syncType: SyncType.POST_CONNECTION_SCRIPT,
Expand Down
3 changes: 3 additions & 0 deletions packages/jobs/lib/processor/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { JsonValue } from 'type-fest';
import { Err, Ok } from '@nangohq/utils';
import type { Result } from '@nangohq/utils';
import { configService, createSyncJob, getSyncByIdAndName, syncRunService, SyncStatus, SyncType } from '@nangohq/shared';
import { sendSync } from '@nangohq/webhooks';
import { logContextGetter } from '@nangohq/logs';
import { records as recordsService } from '@nangohq/records';
import integrationService from '../integration.service.js';
Expand Down Expand Up @@ -49,6 +50,7 @@ async function action(task: TaskAction): Promise<Result<JsonValue>> {
recordsService,
slackService,
writeToDb: true,
sendSyncWebhook: sendSync,
logCtx: await logContextGetter.get({ id: String(task.activityLogId) }),
nangoConnection: task.connection,
syncName: task.actionName,
Expand Down Expand Up @@ -90,6 +92,7 @@ async function webhook(task: TaskWebhook): Promise<Result<JsonValue>> {
recordsService,
slackService,
writeToDb: true,
sendSyncWebhook: sendSync,
nangoConnection: task.connection,
syncJobId: syncJobId?.id as number,
syncName: task.parentSyncName,
Expand Down
1 change: 1 addition & 0 deletions packages/jobs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"@nangohq/shared": "file:../shared",
"@nangohq/records": "file:../records",
"@nangohq/utils": "file:../utils",
"@nangohq/webhooks": "file:../webhooks",
"@temporalio/activity": "^1.9.1",
"@temporalio/client": "^1.9.1",
"@temporalio/worker": "^1.9.1",
Expand Down
125 changes: 2 additions & 123 deletions packages/shared/lib/services/notification/webhook.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import dayjs from 'dayjs';
import utc from 'dayjs/plugin/utc.js';
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what still live in this file? Is everything moved over to the webhooks package?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

webhook forwarding logic which I'll bring over in another PR.

import { backOff } from 'exponential-backoff';
import crypto from 'crypto';
import { SyncType } from '../../models/Sync.js';
import type { NangoConnection } from '../../models/Connection.js';
import type { Account, Config, Environment, SyncResult } from '../../models/index.js';
import type { Account, Config, Environment } from '../../models/index.js';
import type { LogLevel } from '../../models/Activity.js';
import { LogActionEnum } from '../../models/Activity.js';
import type { NangoSyncWebhookBody, NangoForwardWebhookBody } from '../../models/Webhook.js';
import type { NangoForwardWebhookBody } from '../../models/Webhook.js';
import { WebhookType } from '../../models/Webhook.js';
import { createActivityLog, createActivityLogMessage, createActivityLogMessageAndEnd } from '../activity/activity.service.js';
import type { LogContext, LogContextGetter } from '@nangohq/logs';
Expand Down Expand Up @@ -108,125 +106,6 @@ class WebhookService {
return true;
}

async sendSyncUpdate(
nangoConnection: NangoConnection,
syncName: string,
model: string,
responseResults: SyncResult,
syncType: SyncType,
now: Date | undefined,
activityLogId: number,
logCtx: LogContext,
environment: Environment
) {
if (!this.shouldSendWebhook(environment)) {
return;
}

const { webhook_url: webhookUrl, webhook_url_secondary: webhookUrlSecondary, always_send_webhook: alwaysSendWebhook } = environment;

const noChanges =
responseResults.added === 0 && responseResults.updated === 0 && (responseResults.deleted === 0 || responseResults.deleted === undefined);

if (!alwaysSendWebhook && noChanges) {
await createActivityLogMessage({
level: 'info',
environment_id: environment.id,
activity_log_id: activityLogId,
content: `There were no added, updated, or deleted results. No webhook sent, as per your environment settings.`,
timestamp: Date.now()
});
await logCtx.info('There were no added, updated, or deleted results. No webhook sent, as per your environment settings');

return;
}

const body: NangoSyncWebhookBody = {
from: 'nango',
type: WebhookType.SYNC,
connectionId: nangoConnection.connection_id,
providerConfigKey: nangoConnection.provider_config_key,
syncName,
model,
responseResults: {
added: responseResults.added,
updated: responseResults.updated,
deleted: 0
},
syncType,
modifiedAfter: dayjs(now).toDate().toISOString(),
queryTimeStamp: now as unknown as string
};

if (syncType === SyncType.INITIAL) {
body.queryTimeStamp = null;
}

if (responseResults.deleted && responseResults.deleted > 0) {
body.responseResults.deleted = responseResults.deleted;
}

const endingMessage = noChanges
? 'with no data changes as per your environment settings.'
: `with the following data: ${JSON.stringify(body, null, 2)}`;

const webhookUrls: { url: string; type: string }[] = [
{ url: webhookUrl, type: 'webhookUrl' },
{ url: webhookUrlSecondary, type: 'webhookUrlSecondary' }
].filter((webhook) => webhook.url) as { url: string; type: string }[];

for (const webhookUrl of webhookUrls) {
const { url, type } = webhookUrl;
try {
const headers = this.getSignatureHeader(environment.secret_key, body);

const response = await backOff(
() => {
return axios.post(url, body, { headers });
},
{ numOfAttempts: RETRY_ATTEMPTS, retry: this.retry.bind(this, activityLogId, environment.id, logCtx) }
);

if (response.status >= 200 && response.status < 300) {
await createActivityLogMessage({
level: 'info',
environment_id: environment.id,
activity_log_id: activityLogId,
content: `Sync webhook sent successfully ${type === 'webhookUrlSecondary' ? 'to the secondary webhook URL' : ''} and received with a ${response.status} response code to ${url} ${endingMessage}`,
timestamp: Date.now()
});
await logCtx.info(
`Sync webhook sent successfully ${type === 'webhookUrlSecondary' ? 'to the secondary webhook URL' : ''} and received with a ${response.status} response code to ${url} ${endingMessage}`
);
} else {
await createActivityLogMessage({
level: 'error',
environment_id: environment.id,
activity_log_id: activityLogId,
content: `Sync webhook sent successfully ${type === 'webhookUrlSecondary' ? 'to the secondary webhook URL' : ''} to ${url} ${endingMessage} but received a ${response.status} response code. Please send a 2xx on successful receipt.`,
timestamp: Date.now()
});
await logCtx.error(
`Sync webhook sent successfully ${type === 'webhookUrlSecondary' ? 'to the secondary webhook URL' : ''} to ${url} ${endingMessage} but received a ${response.status} response code. Please send a 2xx on successful receipt.`
);
}
} catch (e) {
const errorMessage = stringifyError(e, { pretty: true });

await createActivityLogMessage({
level: 'error',
environment_id: environment.id,
activity_log_id: activityLogId,
content: `Sync webhook failed to send ${type === 'webhookUrlSecondary' ? 'to the secondary webhook URL' : ''} to ${url}. The error was: ${errorMessage}`,
timestamp: Date.now()
});
await logCtx.error(`Sync webhook failed to send ${type === 'webhookUrlSecondary' ? 'to the secondary webhook URL' : ''} to ${url}`, {
error: e
});
}
}
}

async forward({
integration,
account,
Expand Down
Loading
Loading