Skip to content

Commit

Permalink
deals with #34, also initialize queues in sandbox process
Browse files Browse the repository at this point in the history
  • Loading branch information
rivernews committed Mar 7, 2020
1 parent 064fa91 commit 7855c42
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 205 deletions.
2 changes: 1 addition & 1 deletion cicd/microservice.tf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ variable "app_container_image_tag" {}

module "slack_middleware_service" {
source = "rivernews/kubernetes-microservice/digitalocean"
version = "v0.0.9"
version = "v0.1.1"

aws_region = var.aws_region
aws_access_key = var.aws_access_key
Expand Down
2 changes: 1 addition & 1 deletion src/GdOrgReviewRenewal/controllers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export const singleOrgJobController = async (
'```'
);
process.env.NODE_ENV === RuntimeEnvironment.DEVELOPMENT &&
console.debug('Slack res', slackRes);
console.debug('Slack res', slackRes.status);

return res.json(supervisorJob);
} catch (error) {
Expand Down
185 changes: 84 additions & 101 deletions src/GdOrgReviewRenewal/s3OrgsJob/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,17 @@ import { supervisorJobQueueManager } from '../supervisorJob/queue';
import { s3OrgsJobQueueManager } from './queue';
import { ProgressBarManager } from '../../services/jobQueue/ProgressBar';
import { JobQueueName } from '../../services/jobQueue/jobQueueName';
import { SUPERVISOR_JOB_CONCURRENCY } from '../../services/jobQueue';
import {
SUPERVISOR_JOB_CONCURRENCY,
cleanupJobQueuesAndRedisClients
} from '../../services/jobQueue';
import { ServerError } from '../../utilities/serverExceptions';

const getOrgListFromS3 = async () => {
return s3ArchiveManager.asyncGetOverviewPageUrls();

// return [
// 'https://www.glassdoor.com/Overview/Working-at-Palo-Alto-Networks-EI_IE115142.11,29.htm'
// ];

// return [
// 'healthcrowd',
// 'https://www.glassdoor.com/Overview/Working-at-Pinterest-EI_IE503467.11,20.htm',
// ];
// return ['"Palo Alto Network"'];
// return [];
// return ['healthcrowd'];
};

module.exports = function (s3OrgsJob: Bull.Job<null>) {
console.log(`s3OrgsJob ${s3OrgsJob.id} started`, s3OrgsJob);

supervisorJobQueueManager.initialize();
const supervisorJobQueueManagerQueue = supervisorJobQueueManager.queue;

if (!supervisorJobQueueManagerQueue) {
throw new ServerError(
`supervisorJobQueueManager queue not initialized yet`
Expand All @@ -39,94 +26,90 @@ module.exports = function (s3OrgsJob: Bull.Job<null>) {
s3OrgsJob
);

return (
// check supervisor job is clean, no existing job
s3OrgsJobQueueManager
.checkConcurrency(
// since s3 org job could provision lots of supervisor jobs and scraper jobs
// it could be chaotic to mix s3 job with existing single org job
// better to limit s3 job to launch only if no supervisor job exists
1,
supervisorJobQueueManagerQueue,
s3OrgsJob
)
.then(supervisorJobsPresentCount => {
// dispatch supervisors into parallel groups
const VACANCY_BUFFER = 1;
const supervisorJobVacancy =
SUPERVISOR_JOB_CONCURRENCY -
supervisorJobsPresentCount -
VACANCY_BUFFER;
return (
getOrgListFromS3()
// increment progress after s3 org list fetched
.then(orgInfoList =>
progressBar
.increment()
// we have to use `orgInfoList` so need to nest callbacks in then() instead of chaining them
.then(() => {
const orgInfoListBucket: Array<Array<
string
>> = [];
return s3OrgsJobQueueManager
.checkConcurrency(
// check supervisor job is clean, no existing job
// since s3 org job could provision lots of supervisor jobs and scraper jobs
// it could be chaotic to mix s3 job with existing single org job
// better to limit s3 job to launch only if no supervisor job exists
1,
supervisorJobQueueManagerQueue,
s3OrgsJob
)
.then(supervisorJobsPresentCount => {
// dispatch supervisors into parallel groups
const VACANCY_BUFFER = 1;
const supervisorJobVacancy =
SUPERVISOR_JOB_CONCURRENCY -
supervisorJobsPresentCount -
VACANCY_BUFFER;
return (
s3ArchiveManager
.asyncGetOverviewPageUrls()
// increment progress after s3 org list fetched
.then(orgInfoList =>
progressBar
.increment()
// we have to use `orgInfoList` so need to nest callbacks in then() instead of chaining them
.then(() => {
const orgInfoListBucket: Array<Array<
string
>> = [];

// safe in terms of ensuring `orgInfoListBucket.length` not exceeding concurrency vacancy
const chunkSizeSafeUpperBound = Math.ceil(
orgInfoList.length /
supervisorJobVacancy
// safe in terms of ensuring `orgInfoListBucket.length` not exceeding concurrency vacancy
const chunkSizeSafeUpperBound = Math.ceil(
orgInfoList.length / supervisorJobVacancy
);
for (
let index = 0;
index < orgInfoList.length;
index += chunkSizeSafeUpperBound
) {
orgInfoListBucket.push(
orgInfoList.slice(
index,
index + chunkSizeSafeUpperBound
)
);
for (
let index = 0;
index < orgInfoList.length;
index += chunkSizeSafeUpperBound
) {
orgInfoListBucket.push(
orgInfoList.slice(
index,
index + chunkSizeSafeUpperBound
)
);
}
}

// TODO: remove
console.debug(
'orgInfoListBucket',
orgInfoListBucket
);
// TODO: remove
console.debug(
'orgInfoListBucket',
orgInfoListBucket
);

progressBar.setRelativePercentage(
0,
supervisorJobVacancy
);
progressBar.setRelativePercentage(
0,
supervisorJobVacancy
);

return Promise.all(
orgInfoListBucket.map(
bucketedOrgInfoList =>
supervisorJobQueueManagerQueue.add(
{
orgInfoList: bucketedOrgInfoList
}
)
)
);
})
)
.then(supervisorJobList =>
Promise.all(
supervisorJobList.map(supervisorJob =>
supervisorJob
.finished()
// increment progress after job finished, then propogate back job result
.then(result =>
progressBar
.increment()
.then(() => result)
)
)
return Promise.all(
orgInfoListBucket.map(bucketedOrgInfoList =>
supervisorJobQueueManagerQueue.add({
orgInfoList: bucketedOrgInfoList
})
)
);
})
)
.then(supervisorJobList =>
Promise.all(
supervisorJobList.map(supervisorJob =>
supervisorJob
.finished()
// increment progress after job finished, then propogate back job result
.then(result =>
progressBar
.increment()
.then(() => result)
)
)
)
);
})
.then(resultList => Promise.resolve(resultList))
.catch(error => Promise.reject(error))
);
)
);
})
.then(resultList => Promise.resolve(resultList))
.catch(error => Promise.reject(error))
.finally(() => cleanupJobQueuesAndRedisClients());
};
Loading

0 comments on commit 7855c42

Please sign in to comment.