From 7855c426148b761ea5120f93cb30c091ca8f6c23 Mon Sep 17 00:00:00 2001 From: Shaung Cheng Date: Sat, 7 Mar 2020 22:36:38 +0000 Subject: [PATCH] deals with #34, also initialize queues in sandbox process --- cicd/microservice.tf | 2 +- src/GdOrgReviewRenewal/controllers.ts | 2 +- src/GdOrgReviewRenewal/s3OrgsJob/process.ts | 185 ++++++++---------- .../supervisorJob/process.ts | 184 +++++++++-------- src/index.ts | 10 +- src/services/jobQueue/JobQueueManager.ts | 8 +- src/services/jobQueue/index.ts | 28 ++- 7 files changed, 214 insertions(+), 205 deletions(-) diff --git a/cicd/microservice.tf b/cicd/microservice.tf index 1615259..b3c6bd6 100644 --- a/cicd/microservice.tf +++ b/cicd/microservice.tf @@ -6,7 +6,7 @@ variable "app_container_image_tag" {} module "slack_middleware_service" { source = "rivernews/kubernetes-microservice/digitalocean" - version = "v0.0.9" + version = "v0.1.1" aws_region = var.aws_region aws_access_key = var.aws_access_key diff --git a/src/GdOrgReviewRenewal/controllers.ts b/src/GdOrgReviewRenewal/controllers.ts index 7bfc513..d4876c8 100644 --- a/src/GdOrgReviewRenewal/controllers.ts +++ b/src/GdOrgReviewRenewal/controllers.ts @@ -87,7 +87,7 @@ export const singleOrgJobController = async ( '```' ); process.env.NODE_ENV === RuntimeEnvironment.DEVELOPMENT && - console.debug('Slack res', slackRes); + console.debug('Slack res', slackRes.status); return res.json(supervisorJob); } catch (error) { diff --git a/src/GdOrgReviewRenewal/s3OrgsJob/process.ts b/src/GdOrgReviewRenewal/s3OrgsJob/process.ts index 225ad9d..a9e0b77 100644 --- a/src/GdOrgReviewRenewal/s3OrgsJob/process.ts +++ b/src/GdOrgReviewRenewal/s3OrgsJob/process.ts @@ -4,30 +4,17 @@ import { supervisorJobQueueManager } from '../supervisorJob/queue'; import { s3OrgsJobQueueManager } from './queue'; import { ProgressBarManager } from '../../services/jobQueue/ProgressBar'; import { JobQueueName } from '../../services/jobQueue/jobQueueName'; -import { SUPERVISOR_JOB_CONCURRENCY } from '../../services/jobQueue'; +import { + SUPERVISOR_JOB_CONCURRENCY, + cleanupJobQueuesAndRedisClients +} from '../../services/jobQueue'; import { ServerError } from '../../utilities/serverExceptions'; -const getOrgListFromS3 = async () => { - return s3ArchiveManager.asyncGetOverviewPageUrls(); - - // return [ - // 'https://www.glassdoor.com/Overview/Working-at-Palo-Alto-Networks-EI_IE115142.11,29.htm' - // ]; - - // return [ - // 'healthcrowd', - // 'https://www.glassdoor.com/Overview/Working-at-Pinterest-EI_IE503467.11,20.htm', - // ]; - // return ['"Palo Alto Network"']; - // return []; - // return ['healthcrowd']; -}; - module.exports = function (s3OrgsJob: Bull.Job) { console.log(`s3OrgsJob ${s3OrgsJob.id} started`, s3OrgsJob); + supervisorJobQueueManager.initialize(); const supervisorJobQueueManagerQueue = supervisorJobQueueManager.queue; - if (!supervisorJobQueueManagerQueue) { throw new ServerError( `supervisorJobQueueManager queue not initialized yet` @@ -39,94 +26,90 @@ module.exports = function (s3OrgsJob: Bull.Job) { s3OrgsJob ); - return ( - // check supervisor job is clean, no existing job - s3OrgsJobQueueManager - .checkConcurrency( - // since s3 org job could provision lots of supervisor jobs and scraper jobs - // it could be chaotic to mix s3 job with existing single org job - // better to limit s3 job to launch only if no supervisor job exists - 1, - supervisorJobQueueManagerQueue, - s3OrgsJob - ) - .then(supervisorJobsPresentCount => { - // dispatch supervisors into parallel groups - const VACANCY_BUFFER = 1; - const supervisorJobVacancy = - SUPERVISOR_JOB_CONCURRENCY - - supervisorJobsPresentCount - - VACANCY_BUFFER; - return ( - getOrgListFromS3() - // increment progress after s3 org list fetched - .then(orgInfoList => - progressBar - .increment() - // we have to use `orgInfoList` so need to nest callbacks in then() instead of chaining them - .then(() => { - const orgInfoListBucket: Array> = []; + return s3OrgsJobQueueManager + .checkConcurrency( + // check supervisor job is clean, no existing job + // since s3 org job could provision lots of supervisor jobs and scraper jobs + // it could be chaotic to mix s3 job with existing single org job + // better to limit s3 job to launch only if no supervisor job exists + 1, + supervisorJobQueueManagerQueue, + s3OrgsJob + ) + .then(supervisorJobsPresentCount => { + // dispatch supervisors into parallel groups + const VACANCY_BUFFER = 1; + const supervisorJobVacancy = + SUPERVISOR_JOB_CONCURRENCY - + supervisorJobsPresentCount - + VACANCY_BUFFER; + return ( + s3ArchiveManager + .asyncGetOverviewPageUrls() + // increment progress after s3 org list fetched + .then(orgInfoList => + progressBar + .increment() + // we have to use `orgInfoList` so need to nest callbacks in then() instead of chaining them + .then(() => { + const orgInfoListBucket: Array> = []; - // safe in terms of ensuring `orgInfoListBucket.length` not exceeding concurrency vacancy - const chunkSizeSafeUpperBound = Math.ceil( - orgInfoList.length / - supervisorJobVacancy + // safe in terms of ensuring `orgInfoListBucket.length` not exceeding concurrency vacancy + const chunkSizeSafeUpperBound = Math.ceil( + orgInfoList.length / supervisorJobVacancy + ); + for ( + let index = 0; + index < orgInfoList.length; + index += chunkSizeSafeUpperBound + ) { + orgInfoListBucket.push( + orgInfoList.slice( + index, + index + chunkSizeSafeUpperBound + ) ); - for ( - let index = 0; - index < orgInfoList.length; - index += chunkSizeSafeUpperBound - ) { - orgInfoListBucket.push( - orgInfoList.slice( - index, - index + chunkSizeSafeUpperBound - ) - ); - } + } - // TODO: remove - console.debug( - 'orgInfoListBucket', - orgInfoListBucket - ); + // TODO: remove + console.debug( + 'orgInfoListBucket', + orgInfoListBucket + ); - progressBar.setRelativePercentage( - 0, - supervisorJobVacancy - ); + progressBar.setRelativePercentage( + 0, + supervisorJobVacancy + ); - return Promise.all( - orgInfoListBucket.map( - bucketedOrgInfoList => - supervisorJobQueueManagerQueue.add( - { - orgInfoList: bucketedOrgInfoList - } - ) - ) - ); - }) - ) - .then(supervisorJobList => - Promise.all( - supervisorJobList.map(supervisorJob => - supervisorJob - .finished() - // increment progress after job finished, then propogate back job result - .then(result => - progressBar - .increment() - .then(() => result) - ) - ) + return Promise.all( + orgInfoListBucket.map(bucketedOrgInfoList => + supervisorJobQueueManagerQueue.add({ + orgInfoList: bucketedOrgInfoList + }) + ) + ); + }) + ) + .then(supervisorJobList => + Promise.all( + supervisorJobList.map(supervisorJob => + supervisorJob + .finished() + // increment progress after job finished, then propogate back job result + .then(result => + progressBar + .increment() + .then(() => result) + ) ) ) - ); - }) - .then(resultList => Promise.resolve(resultList)) - .catch(error => Promise.reject(error)) - ); + ) + ); + }) + .then(resultList => Promise.resolve(resultList)) + .catch(error => Promise.reject(error)) + .finally(() => cleanupJobQueuesAndRedisClients()); }; diff --git a/src/GdOrgReviewRenewal/supervisorJob/process.ts b/src/GdOrgReviewRenewal/supervisorJob/process.ts index 660c765..3a3d131 100644 --- a/src/GdOrgReviewRenewal/supervisorJob/process.ts +++ b/src/GdOrgReviewRenewal/supervisorJob/process.ts @@ -7,7 +7,10 @@ import { ScraperJobRequestData } from '../../services/jobQueue/types'; import { ServerError } from '../../utilities/serverExceptions'; -import { SUPERVISOR_JOB_CONCURRENCY } from '../../services/jobQueue'; +import { + SUPERVISOR_JOB_CONCURRENCY, + cleanupJobQueuesAndRedisClients +} from '../../services/jobQueue'; import { asyncSendSlackMessage } from '../../services/slack'; import { supervisorJobQueueManager } from './queue'; import { ProgressBarManager } from '../../services/jobQueue/ProgressBar'; @@ -19,7 +22,7 @@ const processRenewalJob = async ( ) => { if (!gdOrgReviewScraperJobQueueManager.queue) { throw new ServerError( - `gdOrgReviewScraperJobQueueManager queue not yet initialized` + `In supervisorJob processor subroutines: gdOrgReviewScraperJobQueueManager queue not yet initialized` ); } @@ -83,6 +86,15 @@ module.exports = function (supervisorJob: Bull.Job) { supervisorJob.data ); + // supervisorJob will need the following queues to be initialized: + // supervisorJobQueue + // gdOrgReviewScraperJobQueue + // + // even if you've already initialized them in master process (the one w/ express server), this sandbox process is a completely separate process + // and does not share any object or resources with master process, thus having to initialize here again (some cross-process functionalities like job.progress() + // are handled by bull, using nodejs process.send/on('message') to communicate via serialized data) + gdOrgReviewScraperJobQueueManager.initialize(); + supervisorJobQueueManager.initialize(); if (!gdOrgReviewScraperJobQueueManager.queue) { throw new ServerError( `gdOrgReviewScraperJobQueueManager queue not yet initialized` @@ -103,94 +115,110 @@ module.exports = function (supervisorJob: Bull.Job) { supervisorJob.data.crossRequestData ? 1 : orgInfoList.length ); - return supervisorJobQueueManager - .checkConcurrency(SUPERVISOR_JOB_CONCURRENCY, undefined, supervisorJob) - .then(() => { - return progressBar.setAbsolutePercentage(1); - }) - .then(async () => { - // start dispatching job - resume scraping - if (supervisorJob.data.crossRequestData) { - if ( - !ScraperCrossRequest.isScraperCrossRequestData( - supervisorJob.data.crossRequestData - ) - ) { - throw new ServerError( - `Illegal crossRequestData for supervisorJobData: ${JSON.stringify( + return ( + supervisorJobQueueManager + .checkConcurrency( + SUPERVISOR_JOB_CONCURRENCY, + undefined, + supervisorJob + ) + .then(() => { + return progressBar.setAbsolutePercentage(1); + }) + .then(async () => { + // start dispatching job - resume scraping + if (supervisorJob.data.crossRequestData) { + if ( + !ScraperCrossRequest.isScraperCrossRequestData( supervisorJob.data.crossRequestData - )}` + ) + ) { + throw new ServerError( + `Illegal crossRequestData for supervisorJobData: ${JSON.stringify( + supervisorJob.data.crossRequestData + )}` + ); + } + return await processRenewalJob( + supervisorJob.data.crossRequestData ); } - return await processRenewalJob( - supervisorJob.data.crossRequestData - ); - } - - // start dispatching job - scrape from beginning - if (!orgInfoList.length) { - console.log('org list empty, will do nothing'); - return Promise.resolve('empty orgList'); - } - console.log('supervisorJob will dispatch scraper jobs'); - for (processed = 0; processed < orgInfoList.length; processed++) { - const orgInfo = orgInfoList[processed]; - const orgFirstJob = await gdOrgReviewScraperJobQueueManagerQueue.add( - { - orgInfo - } - ); - console.log( - `supervisorJob added scraper job ${orgFirstJob.id}` - ); - - const orgFirstJobReturnData: ScraperJobReturnData = await orgFirstJob.finished(); - console.log(`supervisorJob: job ${orgFirstJob.id} finished`); - await asyncSendSlackMessage( - `supervisorJob: job ${ - orgFirstJob.id - } finished, return data:\n\`\`\`${JSON.stringify( - orgFirstJobReturnData - )}\`\`\`` - ); - if ( - typeof orgFirstJobReturnData !== 'string' && - !ScraperCrossRequest.isScraperCrossRequestData( - orgFirstJobReturnData - ) + // start dispatching job - scrape from beginning + if (!orgInfoList.length) { + console.log('org list empty, will do nothing'); + return Promise.resolve('empty orgList'); + } + console.log('supervisorJob will dispatch scraper jobs'); + for ( + processed = 0; + processed < orgInfoList.length; + processed++ ) { - throw new ServerError( + const orgInfo = orgInfoList[processed]; + const orgFirstJob = await gdOrgReviewScraperJobQueueManagerQueue.add( + { + orgInfo + } + ); + console.log( + `supervisorJob added scraper job ${orgFirstJob.id}` + ); + + const orgFirstJobReturnData: ScraperJobReturnData = await orgFirstJob.finished(); + console.log( + `supervisorJob: job ${orgFirstJob.id} finished` + ); + await asyncSendSlackMessage( `supervisorJob: job ${ orgFirstJob.id - } returned illegal result data: ${JSON.stringify( + } finished, return data:\n\`\`\`${JSON.stringify( orgFirstJobReturnData - )}` + )}\`\`\`` ); - } - // process renewal jobs if necessary - await processRenewalJob(orgFirstJobReturnData, orgFirstJob); + if ( + typeof orgFirstJobReturnData !== 'string' && + !ScraperCrossRequest.isScraperCrossRequestData( + orgFirstJobReturnData + ) + ) { + throw new ServerError( + `supervisorJob: job ${ + orgFirstJob.id + } returned illegal result data: ${JSON.stringify( + orgFirstJobReturnData + )}` + ); + } - console.log('supervisorJob: proceeding to next org'); + // process renewal jobs if necessary + await processRenewalJob(orgFirstJobReturnData, orgFirstJob); - await progressBar.increment(); - } + console.log('supervisorJob: proceeding to next org'); - console.log( - 'supervisorJob finish dispatching & waiting all jobs done' - ); + await progressBar.increment(); + } - return Promise.resolve('supervisorJob complete successfully'); - }) - .catch(async error => { - console.log( - 'supervisorJob interrupted due to error\n', - error, - 'remaining orgList not yet finished (including failed one):', - orgInfoList.slice(processed, orgInfoList.length) - ); - await gdOrgReviewScraperJobQueueManagerQueue.empty(); - return Promise.reject(error); - }); + console.log( + 'supervisorJob finish dispatching & waiting all jobs done' + ); + + return Promise.resolve('supervisorJob complete successfully'); + }) + .catch(async error => { + console.log( + 'supervisorJob interrupted due to error\n', + error, + 'remaining orgList not yet finished (including failed one):', + orgInfoList.slice(processed, orgInfoList.length) + ); + + await gdOrgReviewScraperJobQueueManagerQueue.empty(); + + return Promise.reject(error); + }) + // clean up job queue resources created in this sandbox process + .finally(() => cleanupJobQueuesAndRedisClients()) + ); }; diff --git a/src/index.ts b/src/index.ts index 58a1b57..59f404e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,7 +3,10 @@ import express from 'express'; import { ErrorResponse } from './utilities/serverExceptions'; import { createTerminus } from '@godaddy/terminus'; -import { startJobQueues, cleanupJobQueues } from './services/jobQueue'; +import { + startJobQueues, + cleanupJobQueuesAndRedisClients +} from './services/jobQueue'; import { RuntimeEnvironment, RUNTIME_CI_ENVIRONMENT @@ -103,10 +106,7 @@ export const cleanUpExpressServer = async () => { console.log('cleaning up...'); RUNTIME_CI_ENVIRONMENT != RuntimeEnvironment.TESTING && - (await cleanupJobQueues()); - - // last check for all redis connection closed - await redisManager.asyncCloseAllClients(); + (await cleanupJobQueuesAndRedisClients()); return; }; diff --git a/src/services/jobQueue/JobQueueManager.ts b/src/services/jobQueue/JobQueueManager.ts index cb8858b..59483f0 100644 --- a/src/services/jobQueue/JobQueueManager.ts +++ b/src/services/jobQueue/JobQueueManager.ts @@ -48,14 +48,17 @@ export class JobQueueManager { } public initialize () { + if (this.queue) { + console.log('already initialized queue', this.queueName); + return; + } + if (!JobQueueManager.jobQueueSharedRedisClientsSingleton) { JobQueueManager.jobQueueSharedRedisClientsSingleton = JobQueueSharedRedisClientsSingleton.singleton; JobQueueManager.jobQueueSharedRedisClientsSingleton.intialize(); } - console.log(`initializing job queue for ${this.queueName}`); - this.queue = new Bull(this.queueName, { redis: redisManager.config, defaultJobOptions: this.defaultJobOptions, @@ -89,6 +92,7 @@ export class JobQueueManager { } } }); + console.log(`initialized job queue for ${this.queueName}`); this.queue.process(JobQueueManager.CONCURRENCY, this._processFileName); diff --git a/src/services/jobQueue/index.ts b/src/services/jobQueue/index.ts index e091ae6..29a2b61 100644 --- a/src/services/jobQueue/index.ts +++ b/src/services/jobQueue/index.ts @@ -78,28 +78,18 @@ export const startJobQueues = () => { } }; -export const cleanupJobQueues = async () => { - if ( - !( - supervisorJobQueueManager.queue && - gdOrgReviewScraperJobQueueManager.queue && - s3OrgsJobQueueManager.queue - ) - ) { - throw new ServerError( - `Failed to clean up job queues, at least one of the queues is not initialized` - ); - } - +export const cleanupJobQueuesAndRedisClients = async () => { // Queue.close // https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueclose try { - await supervisorJobQueueManager.queue.close(); + supervisorJobQueueManager.queue && + (await supervisorJobQueueManager.queue.close()); } catch (error) { console.warn('supervisorJobQueueManager queue fail to close', error); } try { - await gdOrgReviewScraperJobQueueManager.queue.close(); + gdOrgReviewScraperJobQueueManager.queue && + (await gdOrgReviewScraperJobQueueManager.queue.close()); } catch (error) { console.warn( 'gdOrgReviewScraperJobQueueManager queue fail to close', @@ -107,14 +97,18 @@ export const cleanupJobQueues = async () => { ); } try { - await s3OrgsJobQueueManager.queue.close(); + s3OrgsJobQueueManager.queue && + (await s3OrgsJobQueueManager.queue.close()); } catch (error) { console.warn('s3OrgsJobQueueManager queue fail to close', error); } + // Add more queue clean up here ... + console.log('all job queues closed'); - // Add more clean up here ... + // last check for all redis connection closed + await redisManager.asyncCloseAllClients(); return; };