Skip to content

Commit

Permalink
Merge pull request #423 from techmatters/CHI-1751-batch_Import_recording
Browse files Browse the repository at this point in the history
CHI-1751: batch import recording
  • Loading branch information
stephenhand authored Aug 7, 2023
2 parents 1c636fd + d6e1dc6 commit 3ba1e8d
Show file tree
Hide file tree
Showing 9 changed files with 887 additions and 2,583 deletions.
3,002 changes: 465 additions & 2,537 deletions package-lock.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright (C) 2021-2023 Technology Matters
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see https://www.gnu.org/licenses/.
*/

module.exports = {
up: async queryInterface => {
await queryInterface.sequelize.query(`
CREATE TABLE IF NOT EXISTS resources."ImportBatches"
(
"timestamp" timestamp with time zone NOT NULL DEFAULT now(),
"batchId" text NOT NULL,
"accountSid" text NOT NULL,
"successCount" integer NOT NULL DEFAULT 0,
"failureCount" integer NOT NULL DEFAULT 0,
"remainingCount" integer NOT NULL DEFAULT 0,
"batchContext" JSONB,
CONSTRAINT "ImportBatches_pkey" PRIMARY KEY ("batchId", "accountSid")
)
`);
console.log('Table "ImportBatches" created');

await queryInterface.sequelize.query(`
ALTER TABLE IF EXISTS resources."ImportBatches"
OWNER to resources;
`);
console.log('Table "ImportBatches" now owned by resources');

await queryInterface.sequelize.query(`
CREATE TABLE IF NOT EXISTS resources."ImportErrors"
(
id serial NOT NULL,
"accountSid" text NOT NULL,
"timestamp" timestamp with time zone NOT NULL DEFAULT now(),
"batchId" text NOT NULL,
"resourceId" text,
"error" JSONB,
"rejectedBatch" JSONB,
CONSTRAINT "ImportErrors_pkey" PRIMARY KEY ("id", "accountSid")
)
`);
console.log('Table "ImportErrors" created');

await queryInterface.sequelize.query(`
ALTER TABLE IF EXISTS resources."ImportErrors"
OWNER to resources;
`);
console.log('Table "ImportErrors" now owned by resources');
},

down: async queryInterface => {
await queryInterface.sequelize.query(
`DROP TABLE IF EXISTS resources."ImportBatches"`,
);
console.log('Table "ImportBatches" dropped');
await queryInterface.sequelize.query(`DROP TABLE IF EXISTS resources."ImportErrors"`);
console.log('Table "ImportErrors" dropped');
},
};
6 changes: 3 additions & 3 deletions resources-domain/resources-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"test:service": "cross-env POSTGRES_PORT=5433 RDS_USERNAME=hrm RDS_PASSWORD=postgres RESOURCES_PASSWORD=resources-password run-s -c docker:compose:test:up db:create:schema test:service:ci:migrate test:service:ci:run docker:compose:test:down",
"test:service:ci": "RDS_USERNAME=rdsadmin RDS_PASSWORD=postgres RESOURCES_PASSWORD=resources-password run-s db:create:schema test:service:ci:migrate test:service:ci:run",
"test:service:ci:migrate": "node ./db-migrate",
"test:service:ci:run": "cross-env AWS_REGION=us-east-1 CI=true TWILIO_ACCOUNT_SID=ACxxx TWILIO_AUTH_TOKEN=xxxxxx SSM_ENDPOINT=http://mock-ssm/ jest --verbose --maxWorkers=1 --forceExit tests/service/",
"test:service:ci:run": "cross-env AWS_REGION=us-east-1 CI=true TWILIO_ACCOUNT_SID=ACxxx TWILIO_AUTH_TOKEN=xxxxxx SSM_ENDPOINT=http://mock-ssm/ jest --verbose --maxWorkers=1 --forceExit tests/service",
"test:coverage": "run-s docker:compose:test:up test:service:migrate test:coverage:run docker:compose:test:down",
"test:coverage:run": "cross-env POSTGRES_PORT=5433 AWS_REGION=us-east-1 jest --verbose --maxWorkers=1 --coverage",
"test:migrate": "run-s test:service:migrate",
Expand All @@ -59,7 +59,6 @@
},
"homepage": "https://github.com/techmatters/hrm#readme",
"dependencies": {
"@aws-sdk/client-cloudsearch-domain": "^3.287.0",
"@tech-matters/elasticsearch-client": "^1.0.0",
"@tech-matters/resources-search-config": "^1.0.0",
"@tech-matters/sns-client": "^1.0.0",
Expand All @@ -73,6 +72,7 @@
"express-async-errors": "^3.1.1",
"http-errors": "^1.7.3",
"pg-promise": "^10.15.4",
"pg-query-stream": "^4.5.0"
"pg-query-stream": "^4.5.0",
"serialize-error": "8.1.0"
}
}
71 changes: 65 additions & 6 deletions resources-domain/resources-service/src/import/importDataAccess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,25 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/

import { AccountSID, FlatResource, ImportProgress } from '@tech-matters/types';
import {
AccountSID,
FlatResource,
ImportBatch,
ImportProgress,
} from '@tech-matters/types';
import {
generateInsertImportErrorSql,
generateUpdateImportBatchRecordSql,
generateUpdateImportProgressSql,
generateUpsertSqlFromImportResource,
SELECT_IMPORT_PROGRESS_SQL,
} from './sql';
import { ITask } from 'pg-promise';
import { db } from '../connection-pool';

const getBatchId = (batch: ImportBatch): string =>
`${batch.fromSequence}-${batch.toSequence}/${batch.remaining}`;

const txIfNotInOne = async <T>(
task: ITask<{}> | undefined,
work: (y: ITask<{}>) => Promise<T>,
Expand All @@ -45,17 +55,66 @@ export const upsertImportedResource =
accountSid: AccountSID,
resource: FlatResource,
): Promise<UpsertImportedResourceResult> => {
return txIfNotInOne(task, async tx => {
await tx.none(generateUpsertSqlFromImportResource(accountSid, resource));
return { id: resource.id, success: true };
});
try {
return await txIfNotInOne(task, async tx => {
await tx.none(generateUpsertSqlFromImportResource(accountSid, resource));
return { id: resource.id, success: true };
});
} catch (error) {
return { id: resource.id, success: false, error: error as Error };
}
};

export const updateImportProgress =
(task?: ITask<{}>) =>
async (accountSid: AccountSID, progress: ImportProgress): Promise<void> => {
async (
accountSid: AccountSID,
progress: ImportProgress,
processed: number,
): Promise<void> => {
await txIfNotInOne(task, async tx => {
await tx.none(generateUpdateImportProgressSql(accountSid, progress));
await tx.none(
generateUpdateImportBatchRecordSql(
accountSid,
getBatchId(progress),
progress,
processed,
0,
),
);
});
};

export const insertImportError =
(task?: ITask<{}>) =>
async (
accountSid: AccountSID,
resourceId: string,
batch: ImportBatch,
error: any,
rejectedBatch: FlatResource[],
) => {
await txIfNotInOne(task, async tx => {
const batchId = getBatchId(batch);
await tx.none(
generateInsertImportErrorSql(
accountSid,
resourceId,
batchId,
error,
rejectedBatch,
),
);
await tx.none(
generateUpdateImportBatchRecordSql(
accountSid,
batchId,
batch,
0,
rejectedBatch.length,
),
);
});
};

Expand Down
43 changes: 33 additions & 10 deletions resources-domain/resources-service/src/import/importService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import {
import { db } from '../connection-pool';
import {
getImportState,
insertImportError,
updateImportProgress,
upsertImportedResource,
UpsertImportedResourceResult,
} from './importDataAccess';
import { publishSearchIndexJob } from '../resource-jobs/client-sqs';
const { serializeError } = require('serialize-error');

export type ValidationFailure = {
reason: 'missing field';
Expand Down Expand Up @@ -70,20 +72,24 @@ const importService = () => {
if (missingFields.length) {
// Unfortunately I can't see a way to roll back a transaction other than throwing / rejecting
// Hence the messy throw & catch
const err = new Error();
(err as any).validationFailure = {
const err = new Error() as any;
err.validationFailure = {
reason: 'missing field',
fields: missingFields,
resource,
};
err.resource = resource;
throw err;
}
console.debug(`Upserting ${accountSid}/${resource.id}`);
const result = await upsert(accountSid, resource);
if (!result.success) {
throw result.error;
const dbErr = new Error('Error inserting resource into database.') as any;
dbErr.resource = resource;
dbErr.cause = result.error;
throw dbErr;
}
results.push(result);

try {
await publishSearchIndexJob(resource.accountSid, resource);
} catch (e) {
Expand All @@ -98,18 +104,35 @@ const importService = () => {
b.importSequenceId || `${parseISO(b.lastUpdated).valueOf()}-0`,
),
)[resources.length - 1];
await updateImportProgress(t)(accountSid, {
...batch,
lastProcessedDate: lastUpdated,
lastProcessedId: id,
});
await updateImportProgress(t)(
accountSid,
{
...batch,
lastProcessedDate: lastUpdated,
lastProcessedId: id,
},
resources.length,
);

return results;
});
} catch (e) {
const error = e as any;
console.error(
`Failed to upsert ${accountSid}/${
error.resource?.id ?? 'unknown'
} - rolling back upserts in this message.`,
error,
);
await insertImportError()(
accountSid,
error.resource?.id,
batch,
serializeError(error),
resources,
);
if (error.validationFailure) {
return error.validationFailure;
return { ...error.validationFailure, resource: error.resource };
}
throw error;
}
Expand Down
49 changes: 48 additions & 1 deletion resources-domain/resources-service/src/import/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
*/

import { pgp } from '../connection-pool';
import { AccountSID, ImportProgress, FlatResource } from '@tech-matters/types';
import {
AccountSID,
ImportProgress,
FlatResource,
ImportBatch,
} from '@tech-matters/types';

const DELETE_RESOURCE_ATTRIBUTES_SQL = `DELETE FROM resources."ResourceStringAttributes" WHERE "resourceId" = $<resourceId> AND "accountSid" = $<accountSid>;
DELETE FROM resources."ResourceNumberAttributes" WHERE "resourceId" = $<resourceId> AND "accountSid" = $<accountSid>;
Expand Down Expand Up @@ -129,6 +134,48 @@ export const generateUpdateImportProgressSql = (
DO UPDATE SET "importState" = EXCLUDED."importState"
`;

export const generateUpdateImportBatchRecordSql = (
accountSid: AccountSID,
batchId: string,
batchContext: ImportBatch,
successCount: number,
failureCount: number,
) =>
`
${pgp.helpers.insert(
{
accountSid,
batchId,
successCount,
failureCount,
batchContext,
},
['accountSid', 'batchId', 'successCount', 'failureCount', 'batchContext'],
{ schema: 'resources', table: 'ImportBatches' },
)}
ON CONFLICT ON CONSTRAINT "ImportBatches_pkey"
DO UPDATE SET "failureCount" = EXCLUDED."failureCount" + "ImportBatches"."failureCount", "successCount" = EXCLUDED."successCount" + "ImportBatches"."successCount"
`;

export const generateInsertImportErrorSql = (
accountSid: AccountSID,
resourceId: string,
batchId: string,
error: any,
rejectedBatch: FlatResource[],
) =>
pgp.helpers.insert(
{
accountSid,
batchId,
resourceId,
error,
rejectedBatch,
},
['accountSid', 'batchId', 'resourceId', 'error', 'rejectedBatch:json'],
{ schema: 'resources', table: 'ImportErrors' },
);

export const SELECT_IMPORT_PROGRESS_SQL = `
SELECT "importState" FROM resources."Accounts" WHERE "accountSid" = $<accountSid>
`;
Loading

0 comments on commit 3ba1e8d

Please sign in to comment.