Skip to content

Commit

Permalink
refactor(core): Streamline flows in multi-main mode (no-changelog) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Feb 5, 2024
1 parent da1fe44 commit dc5ec8f
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 282 deletions.
97 changes: 60 additions & 37 deletions packages/cli/src/ActiveWorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ export class ActiveWorkflowRunner {
}

/**
* Clear workflow-defined webhooks from the `webhook_entity` table.
* Remove all webhooks of a workflow from the database, and
* deregister those webhooks from external services.
*/
async clearWebhooks(workflowId: string) {
const workflowData = await this.workflowRepository.findOne({
Expand Down Expand Up @@ -418,9 +419,10 @@ export class ActiveWorkflowRunner {
}

/**
* Register as active in memory all workflows stored as `active`.
* Register as active in memory all workflows stored as `active`,
* only on instance init or (in multi-main setup) on leadership change.
*/
async addActiveWorkflows(activationMode: WorkflowActivateMode) {
async addActiveWorkflows(activationMode: 'init' | 'leadershipChange') {
const dbWorkflows = await this.workflowRepository.getAllActive();

if (dbWorkflows.length === 0) return;
Expand All @@ -433,7 +435,9 @@ export class ActiveWorkflowRunner {

for (const dbWorkflow of dbWorkflows) {
try {
const wasActivated = await this.add(dbWorkflow.id, activationMode, dbWorkflow);
const wasActivated = await this.add(dbWorkflow.id, activationMode, dbWorkflow, {
shouldPublish: false,
});

if (wasActivated) {
this.logger.verbose(`Successfully started workflow ${dbWorkflow.display()}`, {
Expand Down Expand Up @@ -471,15 +475,21 @@ export class ActiveWorkflowRunner {
}

async clearAllActivationErrors() {
this.logger.debug('Clearing all activation errors');

await this.activationErrorsService.clearAll();
}

async addAllTriggerAndPollerBasedWorkflows() {
this.logger.debug('Adding all trigger- and poller-based workflows');

await this.addActiveWorkflows('leadershipChange');
}

@OnShutdown()
async removeAllTriggerAndPollerBasedWorkflows() {
this.logger.debug('Removing all trigger- and poller-based workflows');

await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
}

Expand All @@ -506,35 +516,24 @@ export class ActiveWorkflowRunner {
workflowId: string,
activationMode: WorkflowActivateMode,
existingWorkflow?: WorkflowEntity,
{ shouldPublish } = { shouldPublish: true },
) {
let workflow: Workflow;
if (this.orchestrationService.isMultiMainSetupEnabled && shouldPublish) {
await this.orchestrationService.publish('add-webhooks-triggers-and-pollers', {
workflowId,
});

let shouldAddWebhooks = true;
let shouldAddTriggersAndPollers = true;

/**
* In a multi-main scenario, webhooks are stored in the database, while triggers
* and pollers are run only by the leader main instance.
*
* - During a regular workflow activation (i.e. not leadership change), only the
* leader should add webhooks to prevent duplicate insertions, and only the leader
* should handle triggers and pollers to prevent duplicate work.
*
* - During a leadership change, webhooks remain in storage and so need not be added
* again, and the new leader should take over the triggers and pollers that stopped
* running when the former leader became unresponsive.
*/
if (this.orchestrationService.isMultiMainSetupEnabled) {
if (activationMode !== 'leadershipChange') {
shouldAddWebhooks = this.orchestrationService.isLeader;
shouldAddTriggersAndPollers = this.orchestrationService.isLeader;
} else {
shouldAddWebhooks = false;
shouldAddTriggersAndPollers = this.orchestrationService.isLeader;
}
return;
}

const shouldActivate = shouldAddWebhooks || shouldAddTriggersAndPollers;
let workflow: Workflow;

const shouldAddWebhooks = this.orchestrationService.shouldAddWebhooks(activationMode);
const shouldAddTriggersAndPollers = this.orchestrationService.shouldAddTriggersAndPollers();

const shouldDisplayActivationMessage =
(shouldAddWebhooks || shouldAddTriggersAndPollers) &&
['init', 'leadershipChange'].includes(activationMode);

try {
const dbWorkflow = existingWorkflow ?? (await this.workflowRepository.findById(workflowId));
Expand All @@ -543,7 +542,7 @@ export class ActiveWorkflowRunner {
throw new WorkflowActivationError(`Failed to find workflow with ID "${workflowId}"`);
}

if (shouldActivate) {
if (shouldDisplayActivationMessage) {
this.logger.info(` - ${dbWorkflow.display()}`);
this.logger.debug(`Initializing active workflow ${dbWorkflow.display()} (startup)`, {
workflowName: dbWorkflow.name,
Expand Down Expand Up @@ -608,7 +607,7 @@ export class ActiveWorkflowRunner {
// id of them in the static data. So make sure that data gets persisted.
await this.workflowStaticDataService.saveStaticData(workflow);

return shouldActivate;
return shouldDisplayActivationMessage;
}

/**
Expand Down Expand Up @@ -709,7 +708,21 @@ export class ActiveWorkflowRunner {
*/
// TODO: this should happen in a transaction
async remove(workflowId: string) {
// Remove all the webhooks of the workflow
if (this.orchestrationService.isMultiMainSetupEnabled) {
try {
await this.clearWebhooks(workflowId);
} catch (error) {
ErrorReporter.error(error);
this.logger.error(
`Could not remove webhooks of workflow "${workflowId}" because of error: "${error.message}"`,
);
}

await this.orchestrationService.publish('remove-triggers-and-pollers', { workflowId });

return;
}

try {
await this.clearWebhooks(workflowId);
} catch (error) {
Expand All @@ -727,11 +740,21 @@ export class ActiveWorkflowRunner {

// if it's active in memory then it's a trigger
// so remove from list of actives workflows
if (this.activeWorkflows.isActive(workflowId)) {
const removalSuccess = await this.activeWorkflows.remove(workflowId);
if (removalSuccess) {
this.logger.verbose(`Successfully deactivated workflow "${workflowId}"`, { workflowId });
}
await this.removeWorkflowTriggersAndPollers(workflowId);
}

/**
* Stop running active triggers and pollers for a workflow.
*/
async removeWorkflowTriggersAndPollers(workflowId: string) {
if (!this.activeWorkflows.isActive(workflowId)) return;

const wasRemoved = await this.activeWorkflows.remove(workflowId);

if (wasRemoved) {
this.logger.warn(`Removed triggers and pollers for workflow "${workflowId}"`, {
workflowId,
});
}
}

Expand Down
40 changes: 6 additions & 34 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,31 +226,11 @@ export class Start extends BaseCommand {
if (!orchestrationService.isMultiMainSetupEnabled) return;

orchestrationService.multiMainSetup
.addListener('leadershipChange', async () => {
if (orchestrationService.isLeader) {
this.logger.debug('[Leadership change] Clearing all activation errors...');

await this.activeWorkflowRunner.clearAllActivationErrors();

this.logger.debug(
'[Leadership change] Adding all trigger- and poller-based workflows...',
);

await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows();
} else {
this.logger.debug(
'[Leadership change] Removing all trigger- and poller-based workflows...',
);

await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
}
})
.addListener('leadershipVacant', async () => {
this.logger.debug(
'[Leadership vacant] Removing all trigger- and poller-based workflows...',
);

.on('leader-stepdown', async () => {
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
})
.on('leader-takeover', async () => {
await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows();
});
}

Expand Down Expand Up @@ -370,16 +350,8 @@ export class Start extends BaseCommand {
if (!orchestrationService.isMultiMainSetupEnabled) return;

orchestrationService.multiMainSetup
.addListener('leadershipChange', async () => {
if (orchestrationService.isLeader) {
this.pruningService.startPruning();
} else {
this.pruningService.stopPruning();
}
})
.addListener('leadershipVacant', () => {
this.pruningService.stopPruning();
});
.on('leader-stepdown', () => this.pruningService.stopPruning())
.on('leader-takeover', () => this.pruningService.startPruning());
}

async catch(error: Error) {
Expand Down
26 changes: 26 additions & 0 deletions packages/cli/src/services/orchestration.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { RedisServiceBaseCommand, RedisServiceCommand } from './redis/Redis

import { RedisService } from './redis.service';
import { MultiMainSetup } from './orchestration/main/MultiMainSetup.ee';
import type { WorkflowActivateMode } from 'n8n-workflow';

@Service()
export class OrchestrationService {
Expand Down Expand Up @@ -118,4 +119,29 @@ export class OrchestrationService {

await this.redisPublisher.publishToCommandChannel({ command });
}

// ----------------------------------
// activations
// ----------------------------------

/**
* Whether this instance may add webhooks to the `webhook_entity` table.
*/
shouldAddWebhooks(activationMode: WorkflowActivateMode) {
if (activationMode === 'init') return false;

if (activationMode === 'leadershipChange') return false;

return this.isLeader; // 'update' or 'activate'
}

/**
* Whether this instance may add triggers and pollers to memory.
*
* In both single- and multi-main setup, only the leader is allowed to manage
* triggers and pollers in memory, to ensure they are not duplicated.
*/
shouldAddTriggersAndPollers() {
return this.isLeader;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,9 @@ export class MultiMainSetup extends EventEmitter {
if (config.getEnv('multiMainSetup.instanceType') === 'leader') {
config.set('multiMainSetup.instanceType', 'follower');

this.emit('leadershipChange'); // stop triggers, pollers, pruning
this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning

EventReporter.report('[Multi-main setup] Leader failed to renew leader key', {
level: 'info',
});
EventReporter.info('[Multi-main setup] Leader failed to renew leader key');
}

return;
Expand All @@ -79,7 +77,7 @@ export class MultiMainSetup extends EventEmitter {

config.set('multiMainSetup.instanceType', 'follower');

this.emit('leadershipVacant'); // stop triggers, pollers, pruning
this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning

await this.tryBecomeLeader();
}
Expand All @@ -99,7 +97,7 @@ export class MultiMainSetup extends EventEmitter {

await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);

this.emit('leadershipChange'); // start triggers, pollers, pruning
this.emit('leader-takeover'); // gained leadership - start triggers, pollers, pruning
} else {
config.set('multiMainSetup.instanceType', 'follower');
}
Expand Down
Loading

0 comments on commit dc5ec8f

Please sign in to comment.