Skip to content

Commit

Permalink
[gh-#791] handle dupe syncs by forcing user to report the error
Browse files Browse the repository at this point in the history
  • Loading branch information
khaliqgant committed Jul 18, 2023
1 parent c9d16e4 commit 723cf70
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 76 deletions.
25 changes: 20 additions & 5 deletions packages/server/lib/controllers/sync.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,38 @@ import {
IncomingSyncConfig,
getProviderConfigBySyncAndAccount,
SyncCommand,
CommandToActivityLog
CommandToActivityLog,
environmentService,
errorManager
} from '@nangohq/shared';

class SyncController {
public async deploySync(req: Request, res: Response, next: NextFunction) {
try {
const { syncs, reconcile, debug }: { syncs: IncomingSyncConfig[]; reconcile: boolean; debug: boolean } = req.body;
const environmentId = getEnvironmentId(res);
let reconcileSuccess = true;

const result = await createSyncConfig(environmentId, syncs, debug);
const syncConfigDeployResult = await createSyncConfig(environmentId, syncs, debug);

if (reconcile) {
await getAndReconcileSyncDifferences(environmentId, syncs, reconcile, debug);
const success = await getAndReconcileSyncDifferences(environmentId, syncs, reconcile, syncConfigDeployResult?.activityLogId as number, debug);
if (!success) {
reconcileSuccess = false;
}
}

res.send(result);
if (!reconcileSuccess) {
res.status(500).send({ message: 'There was an error deploying syncs, please check the activity tab and report this issue to support' });
}

res.send(syncConfigDeployResult?.result);
} catch (e) {
const environmentId = getEnvironmentId(res);
const accountId = (await environmentService.getAccountIdFromEnvironment(environmentId)) as number;
errorManager.report(new Error('error_creating_sync_config'), {
accountId
});
next(e);
}
}
Expand All @@ -48,7 +63,7 @@ class SyncController {
const { syncs, debug }: { syncs: IncomingSyncConfig[]; reconcile: boolean; debug: boolean } = req.body;
const environmentId = getEnvironmentId(res);

const result = await getAndReconcileSyncDifferences(environmentId, syncs, false, debug);
const result = await getAndReconcileSyncDifferences(environmentId, syncs, false, null, debug);

res.send(result);
} catch (e) {
Expand Down
5 changes: 5 additions & 0 deletions packages/shared/lib/models/Sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ export interface SlimSync {

export type SyncDeploymentResult = Pick<SyncConfig, 'id' | 'version' | 'sync_name'>;

export interface SyncConfigResult {
result: SyncDeploymentResult[];
activityLogId: number | null;
}

export interface SyncDifferences {
newSyncs: SlimSync[];
deletedSyncs: SlimSync[];
Expand Down
8 changes: 4 additions & 4 deletions packages/shared/lib/services/sync/config.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
} from '../activity/activity.service.js';
import { getSyncsByProviderConfigAndSyncName } from './sync.service.js';
import type { LogLevel, LogAction } from '../../models/Activity.js';
import type { SyncModelSchema, SyncConfigWithProvider, IncomingSyncConfig, SyncConfig, SlimSync, SyncDeploymentResult } from '../../models/Sync.js';
import type { SyncModelSchema, SyncConfigWithProvider, IncomingSyncConfig, SyncConfig, SlimSync, SyncConfigResult } from '../../models/Sync.js';
import type { NangoConnection } from '../../models/Connection.js';
import type { Config as ProviderConfig } from '../../models/Provider.js';
import type { NangoConfig } from '../../integrations/index.js';
Expand All @@ -23,7 +23,7 @@ import { getEnv } from '../../utils/utils.js';

const TABLE = dbNamespace + 'sync_configs';

export async function createSyncConfig(environment_id: number, syncs: IncomingSyncConfig[], debug = false): Promise<SyncDeploymentResult[] | null> {
export async function createSyncConfig(environment_id: number, syncs: IncomingSyncConfig[], debug = false): Promise<SyncConfigResult | null> {
const insertData = [];

const providers = syncs.map((sync) => sync.providerConfigKey);
Expand Down Expand Up @@ -157,7 +157,7 @@ export async function createSyncConfig(environment_id: number, syncs: IncomingSy
}
await updateSuccessActivityLog(activityLogId as number, true);

return [] as unknown as SyncDeploymentResult[];
return { result: [], activityLogId };
}

try {
Expand All @@ -176,7 +176,7 @@ export async function createSyncConfig(environment_id: number, syncs: IncomingSy
content: `Successfully deployed the syncs (${JSON.stringify(syncsWithVersions, null, 2)}).`
});

return result;
return { result, activityLogId };
} catch (e) {
await updateSuccessActivityLog(activityLogId as number, false);

Expand Down
74 changes: 47 additions & 27 deletions packages/shared/lib/services/sync/orchestrator.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,43 +25,63 @@ export class Orchestrator {
sync: IncomingSyncConfig,
debug = false,
activityLogId?: number
) {
const syncConfig = await configService.getProviderConfig(providerConfigKey, environmentId);
if (debug && activityLogId) {
await createActivityLogMessage({
level: 'debug',
activity_log_id: activityLogId as number,
timestamp: Date.now(),
content: `Beginning iteration of starting syncs for ${syncName} with ${connections.length} connections`
});
}
for (const connection of connections) {
const createdSync = await createSync(connection.id as number, syncName);
const syncClient = await SyncClient.getInstance();
await syncClient?.startContinuous(
connection,
createdSync as Sync,
syncConfig as ProviderConfig,
syncName,
{ ...sync, returns: sync.models },
debug
);
}
if (debug && activityLogId) {
): Promise<boolean> {
try {
const syncConfig = await configService.getProviderConfig(providerConfigKey, environmentId);
if (debug && activityLogId) {
await createActivityLogMessage({
level: 'debug',
activity_log_id: activityLogId as number,
timestamp: Date.now(),
content: `Beginning iteration of starting syncs for ${syncName} with ${connections.length} connections`
});
}
for (const connection of connections) {
const createdSync = await createSync(connection.id as number, syncName);
const syncClient = await SyncClient.getInstance();
await syncClient?.startContinuous(
connection,
createdSync as Sync,
syncConfig as ProviderConfig,
syncName,
{ ...sync, returns: sync.models },
debug
);
}
if (debug && activityLogId) {
await createActivityLogMessage({
level: 'debug',
activity_log_id: activityLogId as number,
timestamp: Date.now(),
content: `Finished iteration of starting syncs for ${syncName} with ${connections.length} connections`
});
}

return true;
} catch (e) {
const prettyError = JSON.stringify(e, ['message', 'name'], 2);
await createActivityLogMessage({
level: 'debug',
level: 'error',
activity_log_id: activityLogId as number,
timestamp: Date.now(),
content: `Finished iteration of starting syncs for ${syncName} with ${connections.length} connections`
content: `Error starting syncs for ${syncName} with ${connections.length} connections: ${prettyError}`
});

return false;
}
}

public async createSyncs(syncArgs: CreateSyncArgs[], debug = false, activityLogId?: number) {
public async createSyncs(syncArgs: CreateSyncArgs[], debug = false, activityLogId?: number): Promise<boolean> {
let success = true;
for (const syncToCreate of syncArgs) {
const { connections, providerConfigKey, environmentId, sync, syncName } = syncToCreate;
await this.create(connections, syncName, providerConfigKey, environmentId, sync, debug, activityLogId);
const result = await this.create(connections, syncName, providerConfigKey, environmentId, sync, debug, activityLogId);
if (!result) {
success = false;
}
}

return success;
}

/**
Expand Down
62 changes: 22 additions & 40 deletions packages/shared/lib/services/sync/sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@ import db, { schema, dbNamespace } from '../../db/database.js';
import { IncomingSyncConfig, SyncDifferences, Sync, Job as SyncJob, SyncStatus, SyncWithSchedule, SlimSync } from '../../models/Sync.js';
import type { Connection, NangoConnection } from '../../models/Connection.js';
import SyncClient from '../../clients/sync.client.js';
import type { LogLevel, LogAction } from '../../models/Activity.js';
import {
updateSuccess as updateSuccessActivityLog,
createActivityLog,
createActivityLogMessage,
createActivityLogMessageAndEnd
} from '../activity/activity.service.js';
import { updateSuccess as updateSuccessActivityLog, createActivityLogMessage, createActivityLogMessageAndEnd } from '../activity/activity.service.js';
import { markAllAsStopped } from './schedule.service.js';
import { getActiveSyncConfigsByEnvironmentId, getSyncConfigsByProviderConfigKey } from './config.service.js';
import syncOrchestrator from './orchestrator.service.js';
Expand Down Expand Up @@ -46,6 +40,12 @@ export const getById = async (id: string): Promise<Sync | null> => {
};

export const createSync = async (nangoConnectionId: number, name: string): Promise<Sync | null> => {
const existingSync = await getSyncByIdAndName(nangoConnectionId, name);

if (existingSync) {
throw new Error(`Sync with name ${name} already exists. Please reach out to support to report this issue.`);
}

const sync: Sync = {
id: uuidv4(),
nango_connection_id: nangoConnectionId,
Expand Down Expand Up @@ -271,33 +271,9 @@ export const getAndReconcileSyncDifferences = async (
environmentId: number,
syncs: IncomingSyncConfig[],
performAction: boolean,
activityLogId: number | null,
debug = false
): Promise<SyncDifferences> => {
const providers = syncs.map((sync) => sync.providerConfigKey);
const providerConfigKeys = [...new Set(providers)];

const log = {
level: 'info' as LogLevel,
success: null,
action: 'sync deploy' as LogAction,
start: Date.now(),
end: Date.now(),
timestamp: Date.now(),
connection_id: null,
provider: null,
provider_config_key: `${syncs.length} sync${syncs.length === 1 ? '' : 's'} from ${providerConfigKeys.length} integration${
providerConfigKeys.length === 1 ? '' : 's'
}`,
environment_id: environmentId,
operation_name: 'sync.deploy'
};

let activityLogId = null;

if (debug && performAction) {
activityLogId = await createActivityLog(log);
}

): Promise<SyncDifferences | null> => {
const newSyncs: SlimSync[] = [];
const syncsToCreate = [];

Expand Down Expand Up @@ -330,7 +306,7 @@ export const getAndReconcileSyncDifferences = async (
if (!exists || isNew) {
newSyncs.push({ name: syncName, providerConfigKey, connections: existingConnectionsByProviderConfig[providerConfigKey]?.length as number });
if (performAction) {
if (activityLogId) {
if (debug && activityLogId) {
await createActivityLogMessage({
level: 'debug',
activity_log_id: activityLogId as number,
Expand All @@ -344,7 +320,7 @@ export const getAndReconcileSyncDifferences = async (
}

if (syncsToCreate.length > 0) {
if (debug) {
if (debug && activityLogId) {
const syncNames = syncsToCreate.map((sync) => sync.syncName);
await createActivityLogMessage({
level: 'debug',
Expand All @@ -354,7 +330,14 @@ export const getAndReconcileSyncDifferences = async (
});
}
// this is taken out of the loop to ensure it awaits all the calls properly
await syncOrchestrator.createSyncs(syncsToCreate, debug, activityLogId as number);
const result = await syncOrchestrator.createSyncs(syncsToCreate, debug, activityLogId as number);

if (!result) {
if (activityLogId) {
await updateSuccessActivityLog(activityLogId as number, false);
}
return null;
}
}

const existingSyncs = await getActiveSyncConfigsByEnvironmentId(environmentId);
Expand All @@ -372,7 +355,7 @@ export const getAndReconcileSyncDifferences = async (
});

if (performAction) {
if (activityLogId) {
if (debug && activityLogId) {
await createActivityLogMessage({
level: 'debug',
activity_log_id: activityLogId as number,
Expand All @@ -391,14 +374,13 @@ export const getAndReconcileSyncDifferences = async (
}
}

if (activityLogId) {
if (debug && activityLogId) {
await createActivityLogMessageAndEnd({
level: 'debug',
activity_log_id: activityLogId,
activity_log_id: activityLogId as number,
timestamp: Date.now(),
content: 'Sync deploy diff in debug mode process complete successfully.'
});
await updateSuccessActivityLog(activityLogId, true);
}

return {
Expand Down

0 comments on commit 723cf70

Please sign in to comment.