Skip to content

Commit

Permalink
refactor(migration): simplify preference query params
Browse files Browse the repository at this point in the history
  • Loading branch information
rifont committed Nov 21, 2024
1 parent b9adce3 commit 36820b3
Showing 1 changed file with 16 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ process.on('SIGINT', () => {
* -> preferences with workflow-resource type
* -> preferences with user-workflow type
*/
export async function preferenceCentralization(startCreatedAtWorkflow?: string, startCreatedAtSubscriber?: string) {
export async function preferenceCentralization(startWorkflowId?: string, startSubscriberId?: string) {
const app = await NestFactory.create(AppModule, {
logger: false,
});
Expand All @@ -92,9 +92,9 @@ export async function preferenceCentralization(startCreatedAtWorkflow?: string,
}
}, 1000); // 10 seconds

await migrateWorkflowPreferences(workflowPreferenceRepository, upsertPreferences, startCreatedAtWorkflow);
await migrateWorkflowPreferences(workflowPreferenceRepository, upsertPreferences, startWorkflowId);
console.log({ counter });
await migrateSubscriberPreferences(subscriberPreferenceRepository, upsertPreferences, startCreatedAtSubscriber);
await migrateSubscriberPreferences(subscriberPreferenceRepository, upsertPreferences, startSubscriberId);

// Clear the logging interval once migration is complete
clearInterval(logInterval);
Expand Down Expand Up @@ -155,26 +155,18 @@ async function processWorkflowBatch(
async function migrateWorkflowPreferences(
workflowPreferenceRepository: NotificationTemplateRepository,
upsertPreferences: UpsertPreferences,
startCreatedAtWorkflow?: string
startWorkflowId?: string
) {
console.log('start workflow preference migration');
let query = {};
if (startCreatedAtWorkflow) {
console.log(`Starting from workflow preference ID: ${startCreatedAtWorkflow}`);
query = { createdAt: { $gt: startCreatedAtWorkflow } };
if (startWorkflowId) {
console.log(`Starting from workflow preference ID: ${startWorkflowId}`);
query = { _id: { $gt: startWorkflowId } };
}
const workflowPreferenceCursor = await workflowPreferenceRepository._model
.find()
.select({
_id: 1,
_environmentId: 1,
_organizationId: 1,
_creatorId: 1,
critical: 1,
preferenceSettings: 1,
createdAt: 1,
})
.sort({ createdAt: 1 })
.find(query)
.select({ _id: 1, _environmentId: 1, _organizationId: 1, _creatorId: 1, critical: 1, preferenceSettings: 1 })
.sort({ _id: 1 })
.read('secondaryPreferred')
.cursor({ batchSize: BATCH_SIZE });

Expand Down Expand Up @@ -258,18 +250,18 @@ async function processSubscriberBatch(batch: SubscriberPreferenceEntity[], upser
async function migrateSubscriberPreferences(
subscriberPreferenceRepository: SubscriberPreferenceRepository,
upsertPreferences: UpsertPreferences,
startCreatedAtSubscriber?: string
startSubscriberId?: string
) {
console.log('start subscriber preference migration');
let query = {};
if (startCreatedAtSubscriber) {
console.log(`Starting from subscriber preference ID: ${startCreatedAtSubscriber}`);
query = { createdAt: { $gt: startCreatedAtSubscriber } };
if (startSubscriberId) {
console.log(`Starting from subscriber preference ID: ${startSubscriberId}`);
query = { _id: { $gt: startSubscriberId } };
}
const subscriberPreferenceCursor = await subscriberPreferenceRepository._model
.find(query)
.select({ _id: 1, _environmentId: 1, _organizationId: 1, _subscriberId: 1, level: 1, channels: 1, createdAt: 1 })
.sort({ createdAt: 1 })
.select({ _id: 1, _environmentId: 1, _organizationId: 1, _subscriberId: 1, _templateId: 1, level: 1, channels: 1 })
.sort({ _id: 1 })
.read('secondaryPreferred')
.cursor({ batchSize: BATCH_SIZE });

Expand Down

0 comments on commit 36820b3

Please sign in to comment.