diff --git a/x-pack/plugins/alerting/server/rules_client/rules_client.ts b/x-pack/plugins/alerting/server/rules_client/rules_client.ts index f0cf88615047d..7d0f6463a4872 100644 --- a/x-pack/plugins/alerting/server/rules_client/rules_client.ts +++ b/x-pack/plugins/alerting/server/rules_client/rules_client.ts @@ -384,7 +384,7 @@ export interface GetActionErrorLogByIdParams { sort: estypes.Sort; } -interface ScheduleRuleOptions { +interface ScheduleTaskOptions { id: string; consumer: string; ruleTypeId: string; @@ -589,7 +589,7 @@ export class RulesClient { if (data.enabled) { let scheduledTask; try { - scheduledTask = await this.scheduleRule({ + scheduledTask = await this.scheduleTask({ id: createdAlert.id, consumer: data.consumer, ruleTypeId: rawRule.alertTypeId, @@ -2138,7 +2138,24 @@ export class RulesClient { } catch (e) { throw e; } - const scheduledTask = await this.scheduleRule({ + } + + let scheduledTaskIdToCreate: string | null = null; + if (attributes.scheduledTaskId) { + // If scheduledTaskId defined in rule SO, make sure it exists + try { + await this.taskManager.get(attributes.scheduledTaskId); + } catch (err) { + scheduledTaskIdToCreate = id; + } + } else { + // If scheduledTaskId doesn't exist in rule SO, set it to rule ID + scheduledTaskIdToCreate = id; + } + + if (scheduledTaskIdToCreate) { + // Schedule the task if it doesn't exist + const scheduledTask = await this.scheduleTask({ id, consumer: attributes.consumer, ruleTypeId: attributes.alertTypeId, @@ -2148,6 +2165,9 @@ export class RulesClient { await this.unsecuredSavedObjectsClient.update('alert', id, { scheduledTaskId: scheduledTask.id, }); + } else { + // Task exists so set enabled to true + await this.taskManager.bulkEnableDisable([attributes.scheduledTaskId!], true); } } @@ -2282,14 +2302,21 @@ export class RulesClient { this.updateMeta({ ...attributes, enabled: false, - scheduledTaskId: null, + scheduledTaskId: attributes.scheduledTaskId === id ? attributes.scheduledTaskId : null, updatedBy: await this.getUserName(), updatedAt: new Date().toISOString(), }), { version } ); + + // If the scheduledTaskId does not match the rule id, we should + // remove the task, otherwise mark the task as disabled if (attributes.scheduledTaskId) { - await this.taskManager.removeIfExists(attributes.scheduledTaskId); + if (attributes.scheduledTaskId !== id) { + await this.taskManager.removeIfExists(attributes.scheduledTaskId); + } else { + await this.taskManager.bulkEnableDisable([attributes.scheduledTaskId], false); + } } } } @@ -2767,7 +2794,7 @@ export class RulesClient { return this.spaceId; } - private async scheduleRule(opts: ScheduleRuleOptions) { + private async scheduleTask(opts: ScheduleTaskOptions) { const { id, consumer, ruleTypeId, schedule, throwOnConflict } = opts; const taskInstance = { id, // use the same ID for task document as the rule @@ -2784,6 +2811,7 @@ export class RulesClient { alertInstances: {}, }, scope: ['alerting'], + enabled: true, }; try { return await this.taskManager.schedule(taskInstance); diff --git a/x-pack/plugins/alerting/server/rules_client/tests/create.test.ts b/x-pack/plugins/alerting/server/rules_client/tests/create.test.ts index b29d41f183b73..f5192bf6cbe65 100644 --- a/x-pack/plugins/alerting/server/rules_client/tests/create.test.ts +++ b/x-pack/plugins/alerting/server/rules_client/tests/create.test.ts @@ -463,6 +463,7 @@ describe('create()', () => { expect(taskManager.schedule.mock.calls[0]).toMatchInlineSnapshot(` Array [ Object { + "enabled": true, "id": "1", "params": Object { "alertId": "1", diff --git a/x-pack/plugins/alerting/server/rules_client/tests/disable.test.ts b/x-pack/plugins/alerting/server/rules_client/tests/disable.test.ts index a193733aff26f..499f1c2e8454d 100644 --- a/x-pack/plugins/alerting/server/rules_client/tests/disable.test.ts +++ b/x-pack/plugins/alerting/server/rules_client/tests/disable.test.ts @@ -60,7 +60,7 @@ const rulesClientParams: jest.Mocked = { beforeEach(() => { getBeforeSetup(rulesClientParams, taskManager, ruleTypeRegistry); taskManager.get.mockResolvedValue({ - id: 'task-123', + id: '1', taskType: 'alerting:123', scheduledAt: new Date(), attempts: 1, @@ -81,7 +81,7 @@ setGlobalDate(); describe('disable()', () => { let rulesClient: RulesClient; - const existingAlert = { + const existingRule = { id: '1', type: 'alert', attributes: { @@ -89,7 +89,7 @@ describe('disable()', () => { schedule: { interval: '10s' }, alertTypeId: 'myType', enabled: true, - scheduledTaskId: 'task-123', + scheduledTaskId: '1', actions: [ { group: 'default', @@ -105,10 +105,10 @@ describe('disable()', () => { version: '123', references: [], }; - const existingDecryptedAlert = { - ...existingAlert, + const existingDecryptedRule = { + ...existingRule, attributes: { - ...existingAlert.attributes, + ...existingRule.attributes, apiKey: Buffer.from('123:abc').toString('base64'), apiKeyOwner: 'elastic', }, @@ -118,12 +118,12 @@ describe('disable()', () => { beforeEach(() => { rulesClient = new RulesClient(rulesClientParams); - unsecuredSavedObjectsClient.get.mockResolvedValue(existingAlert); - encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValue(existingDecryptedAlert); + unsecuredSavedObjectsClient.get.mockResolvedValue(existingRule); + encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValue(existingDecryptedRule); }); describe('authorization', () => { - test('ensures user is authorised to disable this type of alert under the consumer', async () => { + test('ensures user is authorised to disable this type of rule under the consumer', async () => { await rulesClient.disable({ id: '1' }); expect(authorization.ensureAuthorized).toHaveBeenCalledWith({ @@ -134,7 +134,7 @@ describe('disable()', () => { }); }); - test('throws when user is not authorised to disable this type of alert', async () => { + test('throws when user is not authorised to disable this type of rule', async () => { authorization.ensureAuthorized.mockRejectedValue( new Error(`Unauthorized to disable a "myType" alert for "myApp"`) ); @@ -191,7 +191,7 @@ describe('disable()', () => { }); }); - test('disables an alert', async () => { + test('disables an rule', async () => { await rulesClient.disable({ id: '1' }); expect(unsecuredSavedObjectsClient.get).not.toHaveBeenCalled(); expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', { @@ -208,7 +208,7 @@ describe('disable()', () => { meta: { versionApiKeyLastmodified: 'v7.10.0', }, - scheduledTaskId: null, + scheduledTaskId: '1', apiKey: 'MTIzOmFiYw==', apiKeyOwner: 'elastic', updatedAt: '2019-02-12T21:01:22.479Z', @@ -229,11 +229,12 @@ describe('disable()', () => { version: '123', } ); - expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123'); + expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['1'], false); + expect(taskManager.removeIfExists).not.toHaveBeenCalledWith(); }); test('disables the rule with calling event log to "recover" the alert instances from the task state', async () => { - const scheduledTaskId = 'task-123'; + const scheduledTaskId = '1'; taskManager.get.mockResolvedValue({ id: scheduledTaskId, taskType: 'alerting:123', @@ -278,7 +279,7 @@ describe('disable()', () => { meta: { versionApiKeyLastmodified: 'v7.10.0', }, - scheduledTaskId: null, + scheduledTaskId: '1', apiKey: 'MTIzOmFiYw==', apiKeyOwner: 'elastic', updatedAt: '2019-02-12T21:01:22.479Z', @@ -299,7 +300,8 @@ describe('disable()', () => { version: '123', } ); - expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123'); + expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['1'], false); + expect(taskManager.removeIfExists).not.toHaveBeenCalledWith(); expect(eventLogger.logEvent).toHaveBeenCalledTimes(1); expect(eventLogger.logEvent.mock.calls[0][0]).toStrictEqual({ @@ -359,7 +361,7 @@ describe('disable()', () => { meta: { versionApiKeyLastmodified: 'v7.10.0', }, - scheduledTaskId: null, + scheduledTaskId: '1', apiKey: 'MTIzOmFiYw==', apiKeyOwner: 'elastic', updatedAt: '2019-02-12T21:01:22.479Z', @@ -380,7 +382,8 @@ describe('disable()', () => { version: '123', } ); - expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123'); + expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['1'], false); + expect(taskManager.removeIfExists).not.toHaveBeenCalledWith(); expect(eventLogger.logEvent).toHaveBeenCalledTimes(0); expect(rulesClientParams.logger.warn).toHaveBeenCalledWith( @@ -403,7 +406,7 @@ describe('disable()', () => { schedule: { interval: '10s' }, alertTypeId: 'myType', enabled: false, - scheduledTaskId: null, + scheduledTaskId: '1', updatedAt: '2019-02-12T21:01:22.479Z', updatedBy: 'elastic', actions: [ @@ -422,14 +425,15 @@ describe('disable()', () => { version: '123', } ); - expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123'); + expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['1'], false); + expect(taskManager.removeIfExists).not.toHaveBeenCalledWith(); }); - test(`doesn't disable already disabled alerts`, async () => { + test(`doesn't disable already disabled rules`, async () => { encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValueOnce({ - ...existingDecryptedAlert, + ...existingDecryptedRule, attributes: { - ...existingDecryptedAlert.attributes, + ...existingDecryptedRule.attributes, actions: [], enabled: false, }, @@ -437,7 +441,8 @@ describe('disable()', () => { await rulesClient.disable({ id: '1' }); expect(unsecuredSavedObjectsClient.update).not.toHaveBeenCalled(); - expect(taskManager.removeIfExists).not.toHaveBeenCalled(); + expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled(); + expect(taskManager.removeIfExists).not.toHaveBeenCalledWith(); }); test('swallows error when failing to load decrypted saved object', async () => { @@ -445,7 +450,8 @@ describe('disable()', () => { await rulesClient.disable({ id: '1' }); expect(unsecuredSavedObjectsClient.update).toHaveBeenCalled(); - expect(taskManager.removeIfExists).toHaveBeenCalled(); + expect(taskManager.bulkEnableDisable).toHaveBeenCalled(); + expect(taskManager.removeIfExists).not.toHaveBeenCalledWith(); expect(rulesClientParams.logger.error).toHaveBeenCalledWith( 'disable(): Failed to load API key of alert 1: Fail' ); @@ -457,13 +463,106 @@ describe('disable()', () => { await expect(rulesClient.disable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot( `"Failed to update"` ); + expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled(); + expect(taskManager.removeIfExists).not.toHaveBeenCalledWith(); }); - test('throws when failing to remove task from task manager', async () => { - taskManager.removeIfExists.mockRejectedValueOnce(new Error('Failed to remove task')); + test('throws when failing to disable task', async () => { + taskManager.bulkEnableDisable.mockRejectedValueOnce(new Error('Failed to disable task')); + await expect(rulesClient.disable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot( + `"Failed to disable task"` + ); + expect(taskManager.removeIfExists).not.toHaveBeenCalledWith(); + }); + + test('removes task document if scheduled task id does not match rule id', async () => { + encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValue({ + ...existingRule, + attributes: { + ...existingRule.attributes, + scheduledTaskId: 'task-123', + }, + }); + await rulesClient.disable({ id: '1' }); + expect(unsecuredSavedObjectsClient.get).not.toHaveBeenCalled(); + expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', { + namespace: 'default', + }); + expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith( + 'alert', + '1', + { + consumer: 'myApp', + schedule: { interval: '10s' }, + alertTypeId: 'myType', + enabled: false, + scheduledTaskId: null, + updatedAt: '2019-02-12T21:01:22.479Z', + updatedBy: 'elastic', + actions: [ + { + group: 'default', + id: '1', + actionTypeId: '1', + actionRef: '1', + params: { + foo: true, + }, + }, + ], + }, + { + version: '123', + } + ); + expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled(); + expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123'); + }); + + test('throws when failing to remove existing task', async () => { + encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValue({ + ...existingRule, + attributes: { + ...existingRule.attributes, + scheduledTaskId: 'task-123', + }, + }); + taskManager.removeIfExists.mockRejectedValueOnce(new Error('Failed to remove task')); await expect(rulesClient.disable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot( `"Failed to remove task"` ); + expect(unsecuredSavedObjectsClient.get).not.toHaveBeenCalled(); + expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', { + namespace: 'default', + }); + expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith( + 'alert', + '1', + { + consumer: 'myApp', + schedule: { interval: '10s' }, + alertTypeId: 'myType', + enabled: false, + scheduledTaskId: null, + updatedAt: '2019-02-12T21:01:22.479Z', + updatedBy: 'elastic', + actions: [ + { + group: 'default', + id: '1', + actionTypeId: '1', + actionRef: '1', + params: { + foo: true, + }, + }, + ], + }, + { + version: '123', + } + ); + expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled(); }); }); diff --git a/x-pack/plugins/alerting/server/rules_client/tests/enable.test.ts b/x-pack/plugins/alerting/server/rules_client/tests/enable.test.ts index 8923031ab6b87..b8d259bd6a682 100644 --- a/x-pack/plugins/alerting/server/rules_client/tests/enable.test.ts +++ b/x-pack/plugins/alerting/server/rules_client/tests/enable.test.ts @@ -63,6 +63,7 @@ describe('enable()', () => { consumer: 'myApp', schedule: { interval: '10s' }, alertTypeId: 'myType', + scheduledTaskId: 'task-123', enabled: false, apiKey: 'MTIzOmFiYw==', apiKeyOwner: 'elastic', @@ -91,7 +92,25 @@ describe('enable()', () => { }, }; + const mockTask = { + id: 'task-123', + taskType: 'alerting:123', + scheduledAt: new Date(), + attempts: 1, + status: TaskStatus.Idle, + runAt: new Date(), + startedAt: null, + retryAt: null, + state: {}, + params: { + alertId: '1', + }, + ownerId: null, + enabled: false, + }; + beforeEach(() => { + jest.resetAllMocks(); getBeforeSetup(rulesClientParams, taskManager, ruleTypeRegistry); (auditLogger.log as jest.Mock).mockClear(); rulesClient = new RulesClient(rulesClientParams); @@ -100,19 +119,7 @@ describe('enable()', () => { rulesClientParams.createAPIKey.mockResolvedValue({ apiKeysEnabled: false, }); - taskManager.schedule.mockResolvedValue({ - id: '1', - scheduledAt: new Date(), - attempts: 0, - status: TaskStatus.Idle, - runAt: new Date(), - state: {}, - params: {}, - taskType: '', - startedAt: null, - retryAt: null, - ownerId: null, - }); + taskManager.get.mockResolvedValue(mockTask); }); describe('authorization', () => { @@ -208,6 +215,7 @@ describe('enable()', () => { updatedBy: 'elastic', apiKey: 'MTIzOmFiYw==', apiKeyOwner: 'elastic', + scheduledTaskId: 'task-123', actions: [ { group: 'default', @@ -231,27 +239,7 @@ describe('enable()', () => { version: '123', } ); - expect(taskManager.schedule).toHaveBeenCalledWith({ - id: '1', - taskType: `alerting:myType`, - params: { - alertId: '1', - spaceId: 'default', - consumer: 'myApp', - }, - schedule: { - interval: '10s', - }, - state: { - alertInstances: {}, - alertTypeState: {}, - previousStartedAt: null, - }, - scope: ['alerting'], - }); - expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith('alert', '1', { - scheduledTaskId: '1', - }); + expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true); }); test('enables a rule that does not have an apiKey', async () => { @@ -283,6 +271,7 @@ describe('enable()', () => { updatedBy: 'elastic', apiKey: 'MTIzOmFiYw==', apiKeyOwner: 'elastic', + scheduledTaskId: 'task-123', actions: [ { group: 'default', @@ -306,9 +295,10 @@ describe('enable()', () => { version: '123', } ); + expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true); }); - test(`doesn't enable already enabled alerts`, async () => { + test(`doesn't update already enabled alerts but ensures task is enabled`, async () => { encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValueOnce({ ...existingRuleWithoutApiKey, attributes: { @@ -321,7 +311,7 @@ describe('enable()', () => { expect(rulesClientParams.getUserName).not.toHaveBeenCalled(); expect(rulesClientParams.createAPIKey).not.toHaveBeenCalled(); expect(unsecuredSavedObjectsClient.create).not.toHaveBeenCalled(); - expect(taskManager.schedule).not.toHaveBeenCalled(); + expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true); }); test('sets API key when createAPIKey returns one', async () => { @@ -345,6 +335,7 @@ describe('enable()', () => { }, apiKey: Buffer.from('123:abc').toString('base64'), apiKeyOwner: 'elastic', + scheduledTaskId: 'task-123', updatedBy: 'elastic', updatedAt: '2019-02-12T21:01:22.479Z', actions: [ @@ -370,6 +361,7 @@ describe('enable()', () => { version: '123', } ); + expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true); }); test('throws an error if API key creation throws', async () => { @@ -381,6 +373,7 @@ describe('enable()', () => { await expect( async () => await rulesClient.enable({ id: '1' }) ).rejects.toThrowErrorMatchingInlineSnapshot(`"Error creating API key for rule: no"`); + expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled(); }); test('falls back when failing to getDecryptedAsInternalUser', async () => { @@ -391,6 +384,7 @@ describe('enable()', () => { expect(rulesClientParams.logger.error).toHaveBeenCalledWith( 'enable(): Failed to load API key of alert 1: Fail' ); + expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true); }); test('throws error when failing to load the saved object using SOC', async () => { @@ -403,10 +397,10 @@ describe('enable()', () => { expect(rulesClientParams.getUserName).not.toHaveBeenCalled(); expect(rulesClientParams.createAPIKey).not.toHaveBeenCalled(); expect(unsecuredSavedObjectsClient.update).not.toHaveBeenCalled(); - expect(taskManager.schedule).not.toHaveBeenCalled(); + expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled(); }); - test('throws error when failing to update the first time', async () => { + test('throws when unsecuredSavedObjectsClient update fails', async () => { rulesClientParams.createAPIKey.mockResolvedValueOnce({ apiKeysEnabled: true, result: { id: '123', name: '123', api_key: 'abc' }, @@ -419,100 +413,102 @@ describe('enable()', () => { ); expect(rulesClientParams.getUserName).toHaveBeenCalled(); expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(1); - expect(taskManager.schedule).not.toHaveBeenCalled(); + expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled(); }); - test('throws error when failing to update the second time', async () => { - unsecuredSavedObjectsClient.update.mockReset(); - unsecuredSavedObjectsClient.update.mockResolvedValueOnce({ - ...existingRuleWithoutApiKey, - attributes: { - ...existingRuleWithoutApiKey.attributes, - enabled: true, - }, + test('enables task when scheduledTaskId is defined and task exists', async () => { + await rulesClient.enable({ id: '1' }); + expect(unsecuredSavedObjectsClient.get).not.toHaveBeenCalled(); + expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', { + namespace: 'default', }); - unsecuredSavedObjectsClient.update.mockRejectedValueOnce( - new Error('Fail to update second time') - ); - - await expect(rulesClient.enable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot( - `"Fail to update second time"` - ); - expect(rulesClientParams.getUserName).toHaveBeenCalled(); - expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(2); - expect(taskManager.schedule).toHaveBeenCalled(); + expect(unsecuredSavedObjectsClient.update).toHaveBeenCalled(); + expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true); }); - test('throws error when failing to schedule task', async () => { - taskManager.schedule.mockRejectedValueOnce(new Error('Fail to schedule')); - + test('throws error when enabling task fails', async () => { + taskManager.bulkEnableDisable.mockRejectedValueOnce(new Error('Failed to enable task')); await expect(rulesClient.enable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot( - `"Fail to schedule"` + `"Failed to enable task"` ); - expect(rulesClientParams.getUserName).toHaveBeenCalled(); + expect(unsecuredSavedObjectsClient.get).not.toHaveBeenCalled(); + expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', { + namespace: 'default', + }); expect(unsecuredSavedObjectsClient.update).toHaveBeenCalled(); }); - test('enables a rule if conflict errors received when scheduling a task', async () => { - unsecuredSavedObjectsClient.create.mockResolvedValueOnce({ - ...existingRuleWithoutApiKey, - attributes: { - ...existingRuleWithoutApiKey.attributes, - enabled: true, - apiKey: null, - apiKeyOwner: null, - updatedBy: 'elastic', - }, + test('schedules task when scheduledTaskId is defined but task with that ID does not', async () => { + taskManager.schedule.mockResolvedValueOnce({ + id: '1', + taskType: 'alerting:123', + scheduledAt: new Date(), + attempts: 1, + status: TaskStatus.Idle, + runAt: new Date(), + startedAt: null, + retryAt: null, + state: {}, + params: {}, + ownerId: null, }); - taskManager.schedule.mockRejectedValueOnce( - Object.assign(new Error('Conflict!'), { statusCode: 409 }) - ); - + taskManager.get.mockRejectedValueOnce(new Error('Failed to get task!')); await rulesClient.enable({ id: '1' }); expect(unsecuredSavedObjectsClient.get).not.toHaveBeenCalled(); expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', { namespace: 'default', }); - expect(unsecuredSavedObjectsClient.create).not.toBeCalledWith('api_key_pending_invalidation'); - expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith( - 'alert', - '1', - { - name: 'name', - schedule: { interval: '10s' }, - alertTypeId: 'myType', + expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(2); + expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled(); + expect(taskManager.schedule).toHaveBeenCalledWith({ + id: '1', + taskType: `alerting:myType`, + params: { + alertId: '1', + spaceId: 'default', consumer: 'myApp', - enabled: true, - meta: { - versionApiKeyLastmodified: kibanaVersion, - }, - updatedAt: '2019-02-12T21:01:22.479Z', - updatedBy: 'elastic', - apiKey: 'MTIzOmFiYw==', - apiKeyOwner: 'elastic', - actions: [ - { - group: 'default', - id: '1', - actionTypeId: '1', - actionRef: '1', - params: { - foo: true, - }, - }, - ], - executionStatus: { - status: 'pending', - lastDuration: 0, - lastExecutionDate: '2019-02-12T21:01:22.479Z', - error: null, - warning: null, - }, }, - { - version: '123', - } - ); + schedule: { + interval: '10s', + }, + enabled: true, + state: { + alertInstances: {}, + alertTypeState: {}, + previousStartedAt: null, + }, + scope: ['alerting'], + }); + expect(unsecuredSavedObjectsClient.update).toHaveBeenNthCalledWith(2, 'alert', '1', { + scheduledTaskId: '1', + }); + }); + + test('schedules task when scheduledTaskId is not defined', async () => { + encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValueOnce({ + ...existingRule, + attributes: { ...existingRule.attributes, scheduledTaskId: null }, + }); + taskManager.schedule.mockResolvedValueOnce({ + id: '1', + taskType: 'alerting:123', + scheduledAt: new Date(), + attempts: 1, + status: TaskStatus.Idle, + runAt: new Date(), + startedAt: null, + retryAt: null, + state: {}, + params: {}, + ownerId: null, + }); + await rulesClient.enable({ id: '1' }); + expect(unsecuredSavedObjectsClient.get).not.toHaveBeenCalled(); + expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', { + namespace: 'default', + }); + expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(2); + expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled(); expect(taskManager.schedule).toHaveBeenCalledWith({ id: '1', taskType: `alerting:myType`, @@ -524,6 +520,7 @@ describe('enable()', () => { schedule: { interval: '10s', }, + enabled: true, state: { alertInstances: {}, alertTypeState: {}, @@ -531,7 +528,81 @@ describe('enable()', () => { }, scope: ['alerting'], }); - expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith('alert', '1', { + expect(unsecuredSavedObjectsClient.update).toHaveBeenNthCalledWith(2, 'alert', '1', { + scheduledTaskId: '1', + }); + }); + + test('throws error when scheduling task fails', async () => { + encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValueOnce({ + ...existingRule, + attributes: { ...existingRule.attributes, scheduledTaskId: null }, + }); + taskManager.schedule.mockRejectedValueOnce(new Error('Fail to schedule')); + await expect(rulesClient.enable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot( + `"Fail to schedule"` + ); + expect(rulesClientParams.getUserName).toHaveBeenCalled(); + expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled(); + expect(taskManager.schedule).toHaveBeenCalled(); + expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(1); + }); + + test('succeeds if conflict errors received when scheduling a task', async () => { + encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValueOnce({ + ...existingRule, + attributes: { ...existingRule.attributes, scheduledTaskId: null }, + }); + taskManager.schedule.mockRejectedValueOnce( + Object.assign(new Error('Conflict!'), { statusCode: 409 }) + ); + await rulesClient.enable({ id: '1' }); + expect(unsecuredSavedObjectsClient.get).not.toHaveBeenCalled(); + expect(encryptedSavedObjects.getDecryptedAsInternalUser).toHaveBeenCalledWith('alert', '1', { + namespace: 'default', + }); + expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(2); + expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled(); + expect(taskManager.schedule).toHaveBeenCalled(); + }); + + test('throws error when update after scheduling task fails', async () => { + encryptedSavedObjects.getDecryptedAsInternalUser.mockResolvedValueOnce({ + ...existingRule, + attributes: { ...existingRule.attributes, scheduledTaskId: null }, + }); + taskManager.schedule.mockResolvedValueOnce({ + id: '1', + taskType: 'alerting:123', + scheduledAt: new Date(), + attempts: 1, + status: TaskStatus.Idle, + runAt: new Date(), + startedAt: null, + retryAt: null, + state: {}, + params: {}, + ownerId: null, + }); + unsecuredSavedObjectsClient.update.mockResolvedValueOnce({ + ...existingRule, + attributes: { + ...existingRule.attributes, + enabled: true, + }, + }); + unsecuredSavedObjectsClient.update.mockRejectedValueOnce( + new Error('Fail to update after scheduling task') + ); + + await expect(rulesClient.enable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot( + `"Fail to update after scheduling task"` + ); + expect(rulesClientParams.getUserName).toHaveBeenCalled(); + expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(2); + expect(taskManager.schedule).toHaveBeenCalled(); + expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled(); + expect(unsecuredSavedObjectsClient.update).toHaveBeenNthCalledWith(2, 'alert', '1', { scheduledTaskId: '1', }); }); diff --git a/x-pack/plugins/alerting/server/rules_client_conflict_retries.test.ts b/x-pack/plugins/alerting/server/rules_client_conflict_retries.test.ts index 2416e95c38f64..b71cc0276dc64 100644 --- a/x-pack/plugins/alerting/server/rules_client_conflict_retries.test.ts +++ b/x-pack/plugins/alerting/server/rules_client_conflict_retries.test.ts @@ -141,9 +141,9 @@ async function enable(success: boolean) { return expectConflict(success, err); } - // a successful enable call makes 2 calls to update, so that's 3 total, - // 1 with conflict + 2 on success - expectSuccess(success, 3); + // a successful enable call makes 1 call to update, so with + // conflict, we would expect 1 on conflict, 1 on success + expectSuccess(success, 2); } async function disable(success: boolean) { diff --git a/x-pack/plugins/task_manager/README.md b/x-pack/plugins/task_manager/README.md index 4a99889ad9cc3..59f836c10c03e 100644 --- a/x-pack/plugins/task_manager/README.md +++ b/x-pack/plugins/task_manager/README.md @@ -328,6 +328,9 @@ The _Start_ Plugin api allow you to use Task Manager to facilitate your Plugin's runSoon: (taskId: string) => { // ... }, + bulkEnableDisable: (taskIds: string[], enabled: boolean) => { + // ... + }, bulkUpdateSchedules: (taskIds: string[], schedule: IntervalSchedule) => { // ... }, @@ -418,6 +421,33 @@ export class Plugin { } ``` +#### bulkEnableDisable +Using `bulkEnableDisable` you can instruct TaskManger to update the `enabled` status of tasks. + +Example: +```js +export class Plugin { + constructor() { + } + + public setup(core: CoreSetup, plugins: { taskManager }) { + } + + public start(core: CoreStart, plugins: { taskManager }) { + try { + const bulkDisableResults = await taskManager.bulkEnableDisable( + ['97c2c4e7-d850-11ec-bf95-895ffd19f959', 'a5ee24d1-dce2-11ec-ab8d-cf74da82133d'], + false, + ); + // If no error is thrown, the bulkEnableDisable has completed successfully. + // But some updates of some tasks can be failed, due to OCC 409 conflict for example + } catch(err: Error) { + // if error is caught, means the whole method requested has failed and tasks weren't updated + } + } +} +``` + #### bulkUpdateSchedules Using `bulkUpdatesSchedules` you can instruct TaskManger to update interval of tasks that are in `idle` status (for the tasks which have `running` status, `schedule` and `runAt` will be recalculated after task run finishes). diff --git a/x-pack/plugins/task_manager/server/index.ts b/x-pack/plugins/task_manager/server/index.ts index 0b01d6a05b7e1..4f1d41cad2109 100644 --- a/x-pack/plugins/task_manager/server/index.ts +++ b/x-pack/plugins/task_manager/server/index.ts @@ -30,7 +30,7 @@ export { throwUnrecoverableError, isEphemeralTaskRejectedDueToCapacityError, } from './task_running'; -export type { RunNowResult, BulkUpdateSchedulesResult } from './task_scheduling'; +export type { RunNowResult, BulkUpdateTaskResult } from './task_scheduling'; export { getOldestIdleActionTask } from './queries/oldest_idle_action_task'; export { IdleTaskWithExpiredRunAt, diff --git a/x-pack/plugins/task_manager/server/mocks.ts b/x-pack/plugins/task_manager/server/mocks.ts index 1560c20be5baa..9ce4797e50db9 100644 --- a/x-pack/plugins/task_manager/server/mocks.ts +++ b/x-pack/plugins/task_manager/server/mocks.ts @@ -30,6 +30,7 @@ const createStartMock = () => { supportsEphemeralTasks: jest.fn(), bulkUpdateSchedules: jest.fn(), bulkSchedule: jest.fn(), + bulkEnableDisable: jest.fn(), }; return mock; }; diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index 2bb06b3a223be..08b7a2908f2e8 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -53,6 +53,7 @@ export type TaskManagerStartContract = Pick< | 'ephemeralRunNow' | 'ensureScheduled' | 'bulkUpdateSchedules' + | 'bulkEnableDisable' | 'bulkSchedule' > & Pick & { @@ -251,6 +252,7 @@ export class TaskManagerPlugin bulkSchedule: (...args) => taskScheduling.bulkSchedule(...args), ensureScheduled: (...args) => taskScheduling.ensureScheduled(...args), runSoon: (...args) => taskScheduling.runSoon(...args), + bulkEnableDisable: (...args) => taskScheduling.bulkEnableDisable(...args), bulkUpdateSchedules: (...args) => taskScheduling.bulkUpdateSchedules(...args), ephemeralRunNow: (task: EphemeralTask) => taskScheduling.ephemeralRunNow(task), supportsEphemeralTasks: () => diff --git a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts index 086a435a39fa9..4b04672615b7d 100644 --- a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts +++ b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts @@ -13,6 +13,7 @@ import { IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt, SortByRunAtAndRetryAt, + EnabledTask, } from './mark_available_tasks_as_claimed'; import { TaskTypeDictionary } from '../task_type_dictionary'; @@ -53,6 +54,8 @@ describe('mark_available_tasks_as_claimed', () => { expect({ query: mustBeAllOf( + // Task must be enabled + EnabledTask, // Either a task with idle status and runAt <= now or // status running or claiming with a retryAt <= now. shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt) @@ -72,6 +75,17 @@ describe('mark_available_tasks_as_claimed', () => { query: { bool: { must: [ + { + bool: { + must: [ + { + term: { + 'task.enabled': true, + }, + }, + ], + }, + }, // Either a task with idle status and runAt <= now or // status running or claiming with a retryAt <= now. { diff --git a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts index afdd9c5c18b33..d477c9c643a49 100644 --- a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts +++ b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts @@ -72,6 +72,18 @@ export const InactiveTasks: MustNotCondition = { }, }; +export const EnabledTask: MustCondition = { + bool: { + must: [ + { + term: { + 'task.enabled': true, + }, + }, + ], + }, +}; + export const RunningOrClaimingTaskWithExpiredRetryAt: MustCondition = { bool: { must: [ diff --git a/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts b/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts index 67175c86370d7..a23c29a5044f3 100644 --- a/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts +++ b/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts @@ -312,6 +312,17 @@ describe('TaskClaiming', () => { expect(query).toMatchObject({ bool: { must: [ + { + bool: { + must: [ + { + term: { + 'task.enabled': true, + }, + }, + ], + }, + }, { bool: { should: [ @@ -437,6 +448,17 @@ if (doc['task.runAt'].size()!=0) { organic: { bool: { must: [ + { + bool: { + must: [ + { + term: { + 'task.enabled': true, + }, + }, + ], + }, + }, { bool: { should: [ @@ -929,6 +951,17 @@ if (doc['task.runAt'].size()!=0) { expect(query).toMatchObject({ bool: { must: [ + { + bool: { + must: [ + { + term: { + 'task.enabled': true, + }, + }, + ], + }, + }, { bool: { should: [ diff --git a/x-pack/plugins/task_manager/server/queries/task_claiming.ts b/x-pack/plugins/task_manager/server/queries/task_claiming.ts index f15639661e5d0..7226a55854988 100644 --- a/x-pack/plugins/task_manager/server/queries/task_claiming.ts +++ b/x-pack/plugins/task_manager/server/queries/task_claiming.ts @@ -43,6 +43,7 @@ import { SortByRunAtAndRetryAt, tasksClaimedByOwner, tasksOfType, + EnabledTask, } from './mark_available_tasks_as_claimed'; import { TaskTypeDictionary } from '../task_type_dictionary'; import { @@ -384,6 +385,8 @@ export class TaskClaiming { : 'taskTypesToSkip' ); const queryForScheduledTasks = mustBeAllOf( + // Task must be enabled + EnabledTask, // Either a task with idle status and runAt <= now or // status running or claiming with a retryAt <= now. shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt) diff --git a/x-pack/plugins/task_manager/server/saved_objects/mappings.json b/x-pack/plugins/task_manager/server/saved_objects/mappings.json index d046a9266cce5..00129ac1bcdd4 100644 --- a/x-pack/plugins/task_manager/server/saved_objects/mappings.json +++ b/x-pack/plugins/task_manager/server/saved_objects/mappings.json @@ -16,6 +16,9 @@ "retryAt": { "type": "date" }, + "enabled": { + "type": "boolean" + }, "schedule": { "properties": { "interval": { diff --git a/x-pack/plugins/task_manager/server/saved_objects/migrations.test.ts b/x-pack/plugins/task_manager/server/saved_objects/migrations.test.ts index a59fb077dbdeb..fe8cc3f81eced 100644 --- a/x-pack/plugins/task_manager/server/saved_objects/migrations.test.ts +++ b/x-pack/plugins/task_manager/server/saved_objects/migrations.test.ts @@ -226,6 +226,47 @@ describe('successful migrations', () => { expect(migration820(taskInstance, migrationContext)).toEqual(taskInstance); }); }); + + describe('8.5.0', () => { + test('adds enabled: true to tasks that are running, claiming, or idle', () => { + const migration850 = getMigrations()['8.5.0']; + const activeTasks = [ + getMockData({ + status: 'running', + }), + getMockData({ + status: 'claiming', + }), + getMockData({ + status: 'idle', + }), + ]; + activeTasks.forEach((task) => { + expect(migration850(task, migrationContext)).toEqual({ + ...task, + attributes: { + ...task.attributes, + enabled: true, + }, + }); + }); + }); + + test('does not modify tasks that are failed or unrecognized', () => { + const migration850 = getMigrations()['8.5.0']; + const inactiveTasks = [ + getMockData({ + status: 'failed', + }), + getMockData({ + status: 'unrecognized', + }), + ]; + inactiveTasks.forEach((task) => { + expect(migration850(task, migrationContext)).toEqual(task); + }); + }); + }); }); describe('handles errors during migrations', () => { diff --git a/x-pack/plugins/task_manager/server/saved_objects/migrations.ts b/x-pack/plugins/task_manager/server/saved_objects/migrations.ts index eb4ff6c1b0ecf..a147a6bdabc6b 100644 --- a/x-pack/plugins/task_manager/server/saved_objects/migrations.ts +++ b/x-pack/plugins/task_manager/server/saved_objects/migrations.ts @@ -42,6 +42,7 @@ export function getMigrations(): SavedObjectMigrationMap { pipeMigrations(resetAttemptsAndStatusForTheTasksWithoutSchedule, resetUnrecognizedStatus), '8.2.0' ), + '8.5.0': executeMigrationWithErrorHandling(pipeMigrations(addEnabledField), '8.5.0'), }; } @@ -193,3 +194,20 @@ function resetAttemptsAndStatusForTheTasksWithoutSchedule( return doc; } + +function addEnabledField(doc: SavedObjectUnsanitizedDoc) { + if ( + doc.attributes.status === TaskStatus.Failed || + doc.attributes.status === TaskStatus.Unrecognized + ) { + return doc; + } + + return { + ...doc, + attributes: { + ...doc.attributes, + enabled: true, + }, + }; +} diff --git a/x-pack/plugins/task_manager/server/task.ts b/x-pack/plugins/task_manager/server/task.ts index 6d12a3f5984ca..ebb957e54699a 100644 --- a/x-pack/plugins/task_manager/server/task.ts +++ b/x-pack/plugins/task_manager/server/task.ts @@ -277,6 +277,11 @@ export interface TaskInstance { * The random uuid of the Kibana instance which claimed ownership of the task last */ ownerId?: string | null; + + /** + * Indicates whether the task is currently enabled. Disabled tasks will not be claimed. + */ + enabled?: boolean; } /** @@ -371,7 +376,10 @@ export interface ConcreteTaskInstance extends TaskInstance { /** * A task instance that has an id and is ready for storage. */ -export type EphemeralTask = Pick; +export type EphemeralTask = Pick< + ConcreteTaskInstance, + 'taskType' | 'params' | 'state' | 'scope' | 'enabled' +>; export type EphemeralTaskInstance = EphemeralTask & Pick; diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts index f61bd762de458..a9c58b1302f56 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts @@ -179,6 +179,7 @@ describe('TaskManagerRunner', () => { schedule: { interval: `${intervalMinutes}m`, }, + enabled: true, }, definitions: { bar: { @@ -198,6 +199,7 @@ describe('TaskManagerRunner', () => { expect(instance.retryAt!.getTime()).toEqual( instance.startedAt!.getTime() + intervalMinutes * 60 * 1000 ); + expect(instance.enabled).not.toBeDefined(); }); test('calculates retryAt by default timout when it exceeds the schedule of a recurring task', async () => { @@ -211,6 +213,7 @@ describe('TaskManagerRunner', () => { schedule: { interval: `${intervalSeconds}s`, }, + enabled: true, }, definitions: { bar: { @@ -228,6 +231,7 @@ describe('TaskManagerRunner', () => { const instance = store.update.mock.calls[0][0]; expect(instance.retryAt!.getTime()).toEqual(instance.startedAt!.getTime() + 5 * 60 * 1000); + expect(instance.enabled).not.toBeDefined(); }); test('calculates retryAt by timeout if it exceeds the schedule when running a recurring task', async () => { @@ -242,6 +246,7 @@ describe('TaskManagerRunner', () => { schedule: { interval: `${intervalSeconds}s`, }, + enabled: true, }, definitions: { bar: { @@ -262,6 +267,7 @@ describe('TaskManagerRunner', () => { expect(instance.retryAt!.getTime()).toEqual( instance.startedAt!.getTime() + timeoutMinutes * 60 * 1000 ); + expect(instance.enabled).not.toBeDefined(); }); test('sets startedAt, status, attempts and retryAt when claiming a task', async () => { @@ -271,6 +277,7 @@ describe('TaskManagerRunner', () => { const { runner, store } = await pendingStageSetup({ instance: { id, + enabled: true, attempts: initialAttempts, schedule: undefined, }, @@ -296,6 +303,7 @@ describe('TaskManagerRunner', () => { expect(instance.retryAt!.getTime()).toEqual( minutesFromNow((initialAttempts + 1) * 5).getTime() + timeoutMinutes * 60 * 1000 ); + expect(instance.enabled).not.toBeDefined(); }); test('uses getRetry (returning date) to set retryAt when defined', async () => { @@ -309,6 +317,7 @@ describe('TaskManagerRunner', () => { id, attempts: initialAttempts, schedule: undefined, + enabled: true, }, definitions: { bar: { @@ -331,6 +340,7 @@ describe('TaskManagerRunner', () => { expect(instance.retryAt!.getTime()).toEqual( new Date(nextRetry.getTime() + timeoutMinutes * 60 * 1000).getTime() ); + expect(instance.enabled).not.toBeDefined(); }); test('it returns false when markTaskAsRunning fails due to VERSION_CONFLICT_STATUS', async () => { @@ -539,6 +549,7 @@ describe('TaskManagerRunner', () => { id, attempts: initialAttempts, schedule: undefined, + enabled: true, }, definitions: { bar: { @@ -563,6 +574,7 @@ describe('TaskManagerRunner', () => { expect(instance.retryAt!.getTime()).toEqual( new Date(Date.now() + attemptDelay + timeoutDelay).getTime() ); + expect(instance.enabled).not.toBeDefined(); }); test('uses getRetry (returning false) to set retryAt when defined', async () => { @@ -575,6 +587,7 @@ describe('TaskManagerRunner', () => { id, attempts: initialAttempts, schedule: undefined, + enabled: true, }, definitions: { bar: { @@ -596,6 +609,7 @@ describe('TaskManagerRunner', () => { expect(instance.retryAt!).toBeNull(); expect(instance.status).toBe('running'); + expect(instance.enabled).not.toBeDefined(); }); test('bypasses getRetry (returning false) of a recurring task to set retryAt when defined', async () => { @@ -609,6 +623,7 @@ describe('TaskManagerRunner', () => { attempts: initialAttempts, schedule: { interval: '1m' }, startedAt: new Date(), + enabled: true, }, definitions: { bar: { @@ -630,6 +645,7 @@ describe('TaskManagerRunner', () => { const timeoutDelay = timeoutMinutes * 60 * 1000; expect(instance.retryAt!.getTime()).toEqual(new Date(Date.now() + timeoutDelay).getTime()); + expect(instance.enabled).not.toBeDefined(); }); describe('TaskEvents', () => { @@ -781,6 +797,7 @@ describe('TaskManagerRunner', () => { attempts: initialAttempts, params: { a: 'b' }, state: { hey: 'there' }, + enabled: true, }, definitions: { bar: { @@ -803,6 +820,7 @@ describe('TaskManagerRunner', () => { expect(instance.runAt.getTime()).toEqual(minutesFromNow(initialAttempts * 5).getTime()); expect(instance.params).toEqual({ a: 'b' }); expect(instance.state).toEqual({ hey: 'there' }); + expect(instance.enabled).not.toBeDefined(); }); test('reschedules tasks that have an schedule', async () => { @@ -811,6 +829,7 @@ describe('TaskManagerRunner', () => { schedule: { interval: '10m' }, status: TaskStatus.Running, startedAt: new Date(), + enabled: true, }, definitions: { bar: { @@ -831,6 +850,7 @@ describe('TaskManagerRunner', () => { expect(instance.runAt.getTime()).toBeGreaterThan(minutesFromNow(9).getTime()); expect(instance.runAt.getTime()).toBeLessThanOrEqual(minutesFromNow(10).getTime()); + expect(instance.enabled).not.toBeDefined(); }); test('expiration returns time after which timeout will have elapsed from start', async () => { @@ -951,6 +971,7 @@ describe('TaskManagerRunner', () => { schedule: { interval: '20m' }, status: TaskStatus.Running, startedAt: new Date(), + enabled: true, }, definitions: { bar: { @@ -968,6 +989,7 @@ describe('TaskManagerRunner', () => { const instance = store.update.mock.calls[0][0]; expect(instance.status).toBe('failed'); + expect(instance.enabled).not.toBeDefined(); expect(onTaskEvent).toHaveBeenCalledWith( withAnyTiming( @@ -1092,6 +1114,7 @@ describe('TaskManagerRunner', () => { instance: { id, attempts: initialAttempts, + enabled: true, }, definitions: { bar: { @@ -1113,6 +1136,7 @@ describe('TaskManagerRunner', () => { const instance = store.update.mock.calls[0][0]; expect(instance.runAt.getTime()).toEqual(nextRetry.getTime()); + expect(instance.enabled).not.toBeDefined(); }); test('uses getRetry function (returning true) on error when defined', async () => { @@ -1124,6 +1148,7 @@ describe('TaskManagerRunner', () => { instance: { id, attempts: initialAttempts, + enabled: true, }, definitions: { bar: { @@ -1146,6 +1171,7 @@ describe('TaskManagerRunner', () => { const expectedRunAt = new Date(Date.now() + initialAttempts * 5 * 60 * 1000); expect(instance.runAt.getTime()).toEqual(expectedRunAt.getTime()); + expect(instance.enabled).not.toBeDefined(); }); test('uses getRetry function (returning false) on error when defined', async () => { @@ -1157,6 +1183,7 @@ describe('TaskManagerRunner', () => { instance: { id, attempts: initialAttempts, + enabled: true, }, definitions: { bar: { @@ -1178,6 +1205,7 @@ describe('TaskManagerRunner', () => { const instance = store.update.mock.calls[0][0]; expect(instance.status).toBe('failed'); + expect(instance.enabled).not.toBeDefined(); }); test('bypasses getRetry function (returning false) on error of a recurring task', async () => { @@ -1191,6 +1219,7 @@ describe('TaskManagerRunner', () => { attempts: initialAttempts, schedule: { interval: '1m' }, startedAt: new Date(), + enabled: true, }, definitions: { bar: { @@ -1214,6 +1243,7 @@ describe('TaskManagerRunner', () => { const nextIntervalDelay = 60000; // 1m const expectedRunAt = new Date(Date.now() + nextIntervalDelay); expect(instance.runAt.getTime()).toEqual(expectedRunAt.getTime()); + expect(instance.enabled).not.toBeDefined(); }); test('Fails non-recurring task when maxAttempts reached', async () => { @@ -1224,6 +1254,7 @@ describe('TaskManagerRunner', () => { id, attempts: initialAttempts, schedule: undefined, + enabled: true, }, definitions: { bar: { @@ -1246,6 +1277,7 @@ describe('TaskManagerRunner', () => { expect(instance.status).toEqual('failed'); expect(instance.retryAt!).toBeNull(); expect(instance.runAt.getTime()).toBeLessThanOrEqual(Date.now()); + expect(instance.enabled).not.toBeDefined(); }); test(`Doesn't fail recurring tasks when maxAttempts reached`, async () => { @@ -1258,6 +1290,7 @@ describe('TaskManagerRunner', () => { attempts: initialAttempts, schedule: { interval: `${intervalSeconds}s` }, startedAt: new Date(), + enabled: true, }, definitions: { bar: { @@ -1281,6 +1314,7 @@ describe('TaskManagerRunner', () => { expect(instance.runAt.getTime()).toEqual( new Date(Date.now() + intervalSeconds * 1000).getTime() ); + expect(instance.enabled).not.toBeDefined(); }); describe('TaskEvents', () => { @@ -1450,6 +1484,7 @@ describe('TaskManagerRunner', () => { instance: { id, startedAt: new Date(), + enabled: true, }, definitions: { bar: { @@ -1468,6 +1503,7 @@ describe('TaskManagerRunner', () => { const instance = store.update.mock.calls[0][0]; expect(instance.status).toBe('failed'); + expect(instance.enabled).not.toBeDefined(); expect(onTaskEvent).toHaveBeenCalledWith( withAnyTiming( diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts index 0b735d6b0ede6..a5865abc46bbe 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -14,7 +14,7 @@ import apm from 'elastic-apm-node'; import uuid from 'uuid'; import { withSpan } from '@kbn/apm-utils'; -import { identity, defaults, flow } from 'lodash'; +import { identity, defaults, flow, omit } from 'lodash'; import { Logger, SavedObjectsErrorHelpers, ExecutionContextStart } from '@kbn/core/server'; import { UsageCounter } from '@kbn/usage-collection-plugin/server'; import { Middleware } from '../lib/middleware'; @@ -375,7 +375,7 @@ export class TaskManagerRunner implements TaskRunner { this.instance = asReadyToRun( (await this.bufferedTaskStore.update({ - ...taskInstance, + ...taskWithoutEnabled(taskInstance), status: TaskStatus.Running, startedAt: now, attempts, @@ -456,7 +456,7 @@ export class TaskManagerRunner implements TaskRunner { private async releaseClaimAndIncrementAttempts(): Promise> { return promiseResult( this.bufferedTaskStore.update({ - ...this.instance.task, + ...taskWithoutEnabled(this.instance.task), status: TaskStatus.Idle, attempts: this.instance.task.attempts + 1, startedAt: null, @@ -549,7 +549,7 @@ export class TaskManagerRunner implements TaskRunner { retryAt: null, ownerId: null, }, - this.instance.task + taskWithoutEnabled(this.instance.task) ) ) ); @@ -677,6 +677,12 @@ function howManyMsUntilOwnershipClaimExpires(ownershipClaimedUntil: Date | null) return ownershipClaimedUntil ? ownershipClaimedUntil.getTime() - Date.now() : 0; } +// Omits "enabled" field from task updates so we don't overwrite any user +// initiated changes to "enabled" while the task was running +function taskWithoutEnabled(task: ConcreteTaskInstance): ConcreteTaskInstance { + return omit(task, 'enabled'); +} + // A type that extracts the Instance type out of TaskRunningStage // This helps us to better communicate to the developer what the expected "stage" // in a specific place in the code might be diff --git a/x-pack/plugins/task_manager/server/task_scheduling.mock.ts b/x-pack/plugins/task_manager/server/task_scheduling.mock.ts index 15c8dc06c473a..08f36661dde52 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.mock.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.mock.ts @@ -9,6 +9,7 @@ import { TaskScheduling } from './task_scheduling'; const createTaskSchedulingMock = () => { return { + bulkEnableDisable: jest.fn(), ensureScheduled: jest.fn(), schedule: jest.fn(), runSoon: jest.fn(), diff --git a/x-pack/plugins/task_manager/server/task_scheduling.test.ts b/x-pack/plugins/task_manager/server/task_scheduling.test.ts index a94394527a61d..071b0147e19b2 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.test.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.test.ts @@ -70,6 +70,26 @@ describe('TaskScheduling', () => { id: undefined, schedule: undefined, traceparent: 'parent', + enabled: true, + }); + }); + + test('allows scheduling tasks that are disabled', async () => { + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + const task = { + taskType: 'foo', + enabled: false, + params: {}, + state: {}, + }; + await taskScheduling.schedule(task); + expect(mockTaskStore.schedule).toHaveBeenCalled(); + expect(mockTaskStore.schedule).toHaveBeenCalledWith({ + ...task, + id: undefined, + schedule: undefined, + traceparent: 'parent', + enabled: false, }); }); @@ -125,6 +145,133 @@ describe('TaskScheduling', () => { }); }); + describe('bulkEnableDisable', () => { + const id = '01ddff11-e88a-4d13-bc4e-256164e755e2'; + beforeEach(() => { + mockTaskStore.bulkUpdate.mockImplementation(() => + Promise.resolve([{ tag: 'ok', value: mockTask() }]) + ); + }); + + test('should search for tasks by ids enabled = true when disabling', async () => { + mockTaskStore.fetch.mockResolvedValue({ docs: [] }); + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + + await taskScheduling.bulkEnableDisable([id], false); + + expect(mockTaskStore.fetch).toHaveBeenCalledTimes(1); + expect(mockTaskStore.fetch).toHaveBeenCalledWith({ + query: { + bool: { + must: [ + { + terms: { + _id: [`task:${id}`], + }, + }, + { + term: { + 'task.enabled': true, + }, + }, + ], + }, + }, + size: 100, + }); + }); + + test('should search for tasks by ids enabled = false when enabling', async () => { + mockTaskStore.fetch.mockResolvedValue({ docs: [] }); + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + + await taskScheduling.bulkEnableDisable([id], true); + + expect(mockTaskStore.fetch).toHaveBeenCalledTimes(1); + expect(mockTaskStore.fetch).toHaveBeenCalledWith({ + query: { + bool: { + must: [ + { + terms: { + _id: [`task:${id}`], + }, + }, + { + term: { + 'task.enabled': false, + }, + }, + ], + }, + }, + size: 100, + }); + }); + + test('should split search on chunks when input ids array too large', async () => { + mockTaskStore.fetch.mockResolvedValue({ docs: [] }); + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + + await taskScheduling.bulkEnableDisable(Array.from({ length: 1250 }), false); + + expect(mockTaskStore.fetch).toHaveBeenCalledTimes(13); + }); + + test('should transform response into correct format', async () => { + const successfulTask = mockTask({ + id: 'task-1', + enabled: false, + schedule: { interval: '1h' }, + }); + const failedTask = mockTask({ id: 'task-2', enabled: true, schedule: { interval: '1h' } }); + mockTaskStore.bulkUpdate.mockImplementation(() => + Promise.resolve([ + { tag: 'ok', value: successfulTask }, + { tag: 'err', error: { entity: failedTask, error: new Error('fail') } }, + ]) + ); + mockTaskStore.fetch.mockResolvedValue({ docs: [successfulTask, failedTask] }); + + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + const result = await taskScheduling.bulkEnableDisable( + [successfulTask.id, failedTask.id], + false + ); + + expect(result).toEqual({ + tasks: [successfulTask], + errors: [{ task: failedTask, error: new Error('fail') }], + }); + }); + + test('should not disable task if it is already disabled', async () => { + const task = mockTask({ id, enabled: false, schedule: { interval: '3h' } }); + + mockTaskStore.fetch.mockResolvedValue({ docs: [task] }); + + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + await taskScheduling.bulkEnableDisable([id], false); + + const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0]; + + expect(bulkUpdatePayload).toHaveLength(0); + }); + + test('should not enable task if it is already enabled', async () => { + const task = mockTask({ id, enabled: true, schedule: { interval: '3h' } }); + + mockTaskStore.fetch.mockResolvedValue({ docs: [task] }); + + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + await taskScheduling.bulkEnableDisable([id], true); + + const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0]; + + expect(bulkUpdatePayload).toHaveLength(0); + }); + }); + describe('bulkUpdateSchedules', () => { const id = '01ddff11-e88a-4d13-bc4e-256164e755e2'; beforeEach(() => { @@ -258,6 +405,7 @@ describe('TaskScheduling', () => { expect(bulkUpdatePayload[0].runAt.getTime()).toBeLessThanOrEqual(Date.now()); }); }); + describe('runSoon', () => { test('resolves when the task update succeeds', async () => { const id = '01ddff11-e88a-4d13-bc4e-256164e755e2'; @@ -513,6 +661,40 @@ describe('TaskScheduling', () => { id: undefined, schedule: undefined, traceparent: 'parent', + enabled: true, + }, + ]); + }); + + test('allows scheduling tasks that are disabled', async () => { + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + const task1 = { + taskType: 'foo', + params: {}, + state: {}, + }; + const task2 = { + taskType: 'foo', + params: {}, + state: {}, + enabled: false, + }; + await taskScheduling.bulkSchedule([task1, task2]); + expect(mockTaskStore.bulkSchedule).toHaveBeenCalled(); + expect(mockTaskStore.bulkSchedule).toHaveBeenCalledWith([ + { + ...task1, + id: undefined, + schedule: undefined, + traceparent: 'parent', + enabled: true, + }, + { + ...task2, + id: undefined, + schedule: undefined, + traceparent: 'parent', + enabled: false, }, ]); }); @@ -546,6 +728,7 @@ function mockTask(overrides: Partial = {}): ConcreteTaskIn taskType: 'foo', schedule: undefined, attempts: 0, + enabled: true, status: TaskStatus.Claiming, params: { hello: 'world' }, state: { baby: 'Henhen' }, diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index 9434ba4fba0c3..8cd3330052cf4 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -48,7 +48,7 @@ import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle'; import { EphemeralTaskRejectedDueToCapacityError } from './task_running'; const VERSION_CONFLICT_STATUS = 409; - +const BULK_ACTION_SIZE = 100; export interface TaskSchedulingOpts { logger: Logger; taskStore: TaskStore; @@ -61,7 +61,7 @@ export interface TaskSchedulingOpts { /** * return type of TaskScheduling.bulkUpdateSchedules method */ -export interface BulkUpdateSchedulesResult { +export interface BulkUpdateTaskResult { /** * list of successfully updated tasks */ @@ -126,6 +126,7 @@ export class TaskScheduling { return await this.store.schedule({ ...modifiedTask, traceparent: traceparent || '', + enabled: modifiedTask.enabled ?? true, }); } @@ -149,13 +150,72 @@ export class TaskScheduling { ...options, taskInstance: ensureDeprecatedFieldsAreCorrected(taskInstance, this.logger), }); - return { ...modifiedTask, traceparent: traceparent || '' }; + return { + ...modifiedTask, + traceparent: traceparent || '', + enabled: modifiedTask.enabled ?? true, + }; }) ); return await this.store.bulkSchedule(modifiedTasks); } + public async bulkEnableDisable( + taskIds: string[], + enabled: boolean + ): Promise { + const tasks = await pMap( + chunk(taskIds, BULK_ACTION_SIZE), + async (taskIdsChunk) => + this.store.fetch({ + query: { + bool: { + must: [ + { + terms: { + _id: taskIdsChunk.map((taskId) => `task:${taskId}`), + }, + }, + { + term: { + 'task.enabled': !enabled, + }, + }, + ], + }, + }, + size: BULK_ACTION_SIZE, + }), + { concurrency: 10 } + ); + + const updatedTasks = tasks + .flatMap(({ docs }) => docs) + .reduce((acc, task) => { + // if task is not enabled, no need to update it + if (enabled === task.enabled) { + return acc; + } + + acc.push({ ...task, enabled }); + return acc; + }, []); + + return (await this.store.bulkUpdate(updatedTasks)).reduce( + (acc, task) => { + if (task.tag === 'ok') { + acc.tasks.push(task.value); + } else { + acc.errors.push({ error: task.error.error, task: task.error.entity }); + } + + return acc; + }, + { tasks: [], errors: [] } + ); + } + /** * Bulk updates schedules for tasks by ids. * Only tasks with `idle` status will be updated, as for the tasks which have `running` status, @@ -163,14 +223,14 @@ export class TaskScheduling { * * @param {string[]} taskIds - list of task ids * @param {IntervalSchedule} schedule - new schedule - * @returns {Promise} + * @returns {Promise} */ public async bulkUpdateSchedules( taskIds: string[], schedule: IntervalSchedule - ): Promise { + ): Promise { const tasks = await pMap( - chunk(taskIds, 100), + chunk(taskIds, BULK_ACTION_SIZE), async (taskIdsChunk) => this.store.fetch({ query: mustBeAllOf( @@ -185,7 +245,7 @@ export class TaskScheduling { }, } ), - size: 100, + size: BULK_ACTION_SIZE, }), { concurrency: 10 } ); @@ -211,7 +271,7 @@ export class TaskScheduling { return acc; }, []); - return (await this.store.bulkUpdate(updatedTasks)).reduce( + return (await this.store.bulkUpdate(updatedTasks)).reduce( (acc, task) => { if (task.tag === 'ok') { acc.tasks.push(task.value); @@ -226,7 +286,7 @@ export class TaskScheduling { } /** - * Run task. + * Run task. * * @param taskId - The task being scheduled. * @returns {Promise} diff --git a/x-pack/test/alerting_api_integration/common/lib/task_manager_utils.ts b/x-pack/test/alerting_api_integration/common/lib/task_manager_utils.ts index d8b1397edbd75..19d62bdd0682a 100644 --- a/x-pack/test/alerting_api_integration/common/lib/task_manager_utils.ts +++ b/x-pack/test/alerting_api_integration/common/lib/task_manager_utils.ts @@ -20,6 +20,48 @@ export class TaskManagerUtils { this.retry = retry; } + async waitForDisabled(id: string, taskRunAtFilter: Date) { + return await this.retry.try(async () => { + const searchResult = await this.es.search({ + index: '.kibana_task_manager', + body: { + query: { + bool: { + must: [ + { + term: { + 'task.id': `task:${id}`, + }, + }, + { + terms: { + 'task.scope': ['actions', 'alerting'], + }, + }, + { + range: { + 'task.scheduledAt': { + gte: taskRunAtFilter.getTime().toString(), + }, + }, + }, + { + term: { + 'task.enabled': true, + }, + }, + ], + }, + }, + }, + }); + // @ts-expect-error + if (searchResult.hits.total.value) { + // @ts-expect-error + throw new Error(`Expected 0 tasks but received ${searchResult.hits.total.value}`); + } + }); + } async waitForEmpty(taskRunAtFilter: Date) { return await this.retry.try(async () => { const searchResult = await this.es.search({ diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/create.ts b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/create.ts index e601c6ee15ec7..f775b3607fade 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/create.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/create.ts @@ -137,6 +137,7 @@ export default function createAlertTests({ getService }: FtrProviderContext) { spaceId: space.id, consumer: 'alertsFixture', }); + expect(taskRecord.task.enabled).to.eql(true); // Ensure AAD isn't broken await checkAAD({ supertest, diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/disable.ts b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/disable.ts index 842a00366945a..2f452a54927b9 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/disable.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/disable.ts @@ -16,6 +16,7 @@ import { ObjectRemover, getConsumerUnauthorizedErrorMessage, getProducerUnauthorizedErrorMessage, + TaskManagerDoc, } from '../../../../common/lib'; // eslint-disable-next-line import/no-default-export @@ -30,11 +31,12 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte after(() => objectRemover.removeAll()); - async function getScheduledTask(id: string) { - return await es.get({ + async function getScheduledTask(id: string): Promise { + const scheduledTask = await es.get({ id: `task:${id}`, index: '.kibana_task_manager', }); + return scheduledTask._source!; } for (const scenario of UserAtSpaceScenarios) { @@ -88,8 +90,16 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte ), statusCode: 403, }); - // Ensure task still exists - await getScheduledTask(createdAlert.scheduled_task_id); + // Ensure task still exists and is still enabled + const taskRecord1 = await getScheduledTask(createdAlert.scheduled_task_id); + expect(taskRecord1.type).to.eql('task'); + expect(taskRecord1.task.taskType).to.eql('alerting:test.noop'); + expect(JSON.parse(taskRecord1.task.params)).to.eql({ + alertId: createdAlert.id, + spaceId: space.id, + consumer: 'alertsFixture', + }); + expect(taskRecord1.task.enabled).to.eql(true); break; case 'space_1_all_alerts_none_actions at space1': case 'superuser at space1': @@ -97,12 +107,17 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte case 'space_1_all_with_restricted_fixture at space1': expect(response.statusCode).to.eql(204); expect(response.body).to.eql(''); - try { - await getScheduledTask(createdAlert.scheduled_task_id); - throw new Error('Should have removed scheduled task'); - } catch (e) { - expect(e.meta.statusCode).to.eql(404); - } + + // task should still exist but be disabled + const taskRecord2 = await getScheduledTask(createdAlert.scheduled_task_id); + expect(taskRecord2.type).to.eql('task'); + expect(taskRecord2.task.taskType).to.eql('alerting:test.noop'); + expect(JSON.parse(taskRecord2.task.params)).to.eql({ + alertId: createdAlert.id, + spaceId: space.id, + consumer: 'alertsFixture', + }); + expect(taskRecord2.task.enabled).to.eql(false); // Ensure AAD isn't broken await checkAAD({ supertest, @@ -153,12 +168,17 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte case 'space_1_all_with_restricted_fixture at space1': expect(response.statusCode).to.eql(204); expect(response.body).to.eql(''); - try { - await getScheduledTask(createdAlert.scheduled_task_id); - throw new Error('Should have removed scheduled task'); - } catch (e) { - expect(e.meta.statusCode).to.eql(404); - } + + // task should still exist but be disabled + const taskRecord = await getScheduledTask(createdAlert.scheduled_task_id); + expect(taskRecord.type).to.eql('task'); + expect(taskRecord.task.taskType).to.eql('alerting:test.restricted-noop'); + expect(JSON.parse(taskRecord.task.params)).to.eql({ + alertId: createdAlert.id, + spaceId: space.id, + consumer: 'alertsRestrictedFixture', + }); + expect(taskRecord.task.enabled).to.eql(false); break; default: throw new Error(`Scenario untested: ${JSON.stringify(scenario)}`); @@ -213,12 +233,16 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte case 'space_1_all_with_restricted_fixture at space1': expect(response.statusCode).to.eql(204); expect(response.body).to.eql(''); - try { - await getScheduledTask(createdAlert.scheduled_task_id); - throw new Error('Should have removed scheduled task'); - } catch (e) { - expect(e.meta.statusCode).to.eql(404); - } + // task should still exist but be disabled + const taskRecord = await getScheduledTask(createdAlert.scheduled_task_id); + expect(taskRecord.type).to.eql('task'); + expect(taskRecord.task.taskType).to.eql('alerting:test.unrestricted-noop'); + expect(JSON.parse(taskRecord.task.params)).to.eql({ + alertId: createdAlert.id, + spaceId: space.id, + consumer: 'alertsFixture', + }); + expect(taskRecord.task.enabled).to.eql(false); break; default: throw new Error(`Scenario untested: ${JSON.stringify(scenario)}`); @@ -269,12 +293,16 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte case 'space_1_all_with_restricted_fixture at space1': expect(response.statusCode).to.eql(204); expect(response.body).to.eql(''); - try { - await getScheduledTask(createdAlert.scheduled_task_id); - throw new Error('Should have removed scheduled task'); - } catch (e) { - expect(e.meta.statusCode).to.eql(404); - } + // task should still exist but be disabled + const taskRecord = await getScheduledTask(createdAlert.scheduled_task_id); + expect(taskRecord.type).to.eql('task'); + expect(taskRecord.task.taskType).to.eql('alerting:test.noop'); + expect(JSON.parse(taskRecord.task.params)).to.eql({ + alertId: createdAlert.id, + spaceId: space.id, + consumer: 'alerts', + }); + expect(taskRecord.task.enabled).to.eql(false); break; default: throw new Error(`Scenario untested: ${JSON.stringify(scenario)}`); @@ -319,8 +347,16 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte ), statusCode: 403, }); - // Ensure task still exists - await getScheduledTask(createdAlert.scheduled_task_id); + // Ensure task still exists and is still enabled + const taskRecord1 = await getScheduledTask(createdAlert.scheduled_task_id); + expect(taskRecord1.type).to.eql('task'); + expect(taskRecord1.task.taskType).to.eql('alerting:test.noop'); + expect(JSON.parse(taskRecord1.task.params)).to.eql({ + alertId: createdAlert.id, + spaceId: space.id, + consumer: 'alertsFixture', + }); + expect(taskRecord1.task.enabled).to.eql(true); break; case 'superuser at space1': case 'space_1_all at space1': @@ -328,12 +364,16 @@ export default function createDisableAlertTests({ getService }: FtrProviderConte case 'space_1_all_with_restricted_fixture at space1': expect(response.statusCode).to.eql(204); expect(response.body).to.eql(''); - try { - await getScheduledTask(createdAlert.scheduled_task_id); - throw new Error('Should have removed scheduled task'); - } catch (e) { - expect(e.meta.statusCode).to.eql(404); - } + // task should still exist but be disabled + const taskRecord2 = await getScheduledTask(createdAlert.scheduled_task_id); + expect(taskRecord2.type).to.eql('task'); + expect(taskRecord2.task.taskType).to.eql('alerting:test.noop'); + expect(JSON.parse(taskRecord2.task.params)).to.eql({ + alertId: createdAlert.id, + spaceId: space.id, + consumer: 'alertsFixture', + }); + expect(taskRecord2.task.enabled).to.eql(false); break; default: throw new Error(`Scenario untested: ${JSON.stringify(scenario)}`); diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/enable.ts b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/enable.ts index 0aba468174cff..73842073a542b 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/enable.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/enable.ts @@ -129,6 +129,7 @@ export default function createEnableAlertTests({ getService }: FtrProviderContex spaceId: space.id, consumer: 'alertsFixture', }); + expect(taskRecord.task.enabled).to.eql(true); // Ensure AAD isn't broken await checkAAD({ supertest, @@ -360,6 +361,7 @@ export default function createEnableAlertTests({ getService }: FtrProviderContex spaceId: space.id, consumer: 'alertsFixture', }); + expect(taskRecord.task.enabled).to.eql(true); // Ensure AAD isn't broken await checkAAD({ supertest, diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/group2/tests/alerting/alerts.ts b/x-pack/test/alerting_api_integration/security_and_spaces/group2/tests/alerting/alerts.ts index 19c10bddbd7b1..c20e41067b010 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/group2/tests/alerting/alerts.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/group2/tests/alerting/alerts.ts @@ -34,7 +34,7 @@ export default function alertTests({ getService }: FtrProviderContext) { const esTestIndexTool = new ESTestIndexTool(es, retry); const taskManagerUtils = new TaskManagerUtils(es, retry); - describe('alerts', () => { + describe('alerts test me', () => { const authorizationIndex = '.kibana-test-authorization'; const objectRemover = new ObjectRemover(supertest); @@ -122,7 +122,7 @@ export default function alertTests({ getService }: FtrProviderContext) { const alertId = response.body.id; await alertUtils.disable(alertId); - await taskManagerUtils.waitForEmpty(testStart); + await taskManagerUtils.waitForDisabled(alertId, testStart); // Ensure only 1 alert executed with proper params const alertSearchResult = await esTestIndexTool.search( @@ -274,7 +274,7 @@ instanceStateValue: true const alertId = response.body.id; await alertUtils.disable(alertId); - await taskManagerUtils.waitForEmpty(testStart); + await taskManagerUtils.waitForDisabled(alertId, testStart); // Ensure only 1 alert executed with proper params const alertSearchResult = await esTestIndexTool.search( @@ -634,7 +634,7 @@ instanceStateValue: true // Wait for test.authorization to index a document before disabling the alert and waiting for tasks to finish await esTestIndexTool.waitForDocs('alert:test.authorization', reference); await alertUtils.disable(response.body.id); - await taskManagerUtils.waitForEmpty(testStart); + await taskManagerUtils.waitForDisabled(response.body.id, testStart); // Ensure only 1 document exists with proper params searchResult = await esTestIndexTool.search('alert:test.authorization', reference); @@ -665,7 +665,7 @@ instanceStateValue: true // Wait for test.authorization to index a document before disabling the alert and waiting for tasks to finish await esTestIndexTool.waitForDocs('alert:test.authorization', reference); await alertUtils.disable(response.body.id); - await taskManagerUtils.waitForEmpty(testStart); + await taskManagerUtils.waitForDisabled(response.body.id, testStart); // Ensure only 1 document exists with proper params searchResult = await esTestIndexTool.search('alert:test.authorization', reference); @@ -751,7 +751,7 @@ instanceStateValue: true // Ensure test.authorization indexed 1 document before disabling the alert and waiting for tasks to finish await esTestIndexTool.waitForDocs('action:test.authorization', reference); await alertUtils.disable(response.body.id); - await taskManagerUtils.waitForEmpty(testStart); + await taskManagerUtils.waitForDisabled(response.body.id, testStart); // Ensure only 1 document with proper params exists searchResult = await esTestIndexTool.search('action:test.authorization', reference); @@ -790,7 +790,7 @@ instanceStateValue: true // Ensure test.authorization indexed 1 document before disabling the alert and waiting for tasks to finish await esTestIndexTool.waitForDocs('action:test.authorization', reference); await alertUtils.disable(response.body.id); - await taskManagerUtils.waitForEmpty(testStart); + await taskManagerUtils.waitForDisabled(response.body.id, testStart); // Ensure only 1 document with proper params exists searchResult = await esTestIndexTool.search('action:test.authorization', reference); @@ -853,7 +853,7 @@ instanceStateValue: true // Wait until alerts scheduled actions 3 times before disabling the alert and waiting for tasks to finish await esTestIndexTool.waitForDocs('alert:test.always-firing', reference, 3); await alertUtils.disable(response.body.id); - await taskManagerUtils.waitForEmpty(testStart); + await taskManagerUtils.waitForDisabled(response.body.id, testStart); // Ensure actions only executed once const searchResult = await esTestIndexTool.search( @@ -933,7 +933,7 @@ instanceStateValue: true // Wait for actions to execute twice before disabling the alert and waiting for tasks to finish await esTestIndexTool.waitForDocs('action:test.index-record', reference, 2); await alertUtils.disable(response.body.id); - await taskManagerUtils.waitForEmpty(testStart); + await taskManagerUtils.waitForDisabled(response.body.id, testStart); // Ensure only 2 actions with proper params exists const searchResult = await esTestIndexTool.search( @@ -1009,7 +1009,7 @@ instanceStateValue: true // Wait for actions to execute twice before disabling the alert and waiting for tasks to finish await esTestIndexTool.waitForDocs('action:test.index-record', reference, 2); await alertUtils.disable(response.body.id); - await taskManagerUtils.waitForEmpty(testStart); + await taskManagerUtils.waitForDisabled(response.body.id, testStart); // Ensure only 2 actions with proper params exists const searchResult = await esTestIndexTool.search( @@ -1074,7 +1074,7 @@ instanceStateValue: true // Actions should execute twice before widning things down await esTestIndexTool.waitForDocs('action:test.index-record', reference, 2); await alertUtils.disable(response.body.id); - await taskManagerUtils.waitForEmpty(testStart); + await taskManagerUtils.waitForDisabled(response.body.id, testStart); // Ensure only 2 actions are executed const searchResult = await esTestIndexTool.search( @@ -1133,7 +1133,7 @@ instanceStateValue: true // execution once before disabling the alert and waiting for tasks to finish await esTestIndexTool.waitForDocs('alert:test.always-firing', reference, 2); await alertUtils.disable(response.body.id); - await taskManagerUtils.waitForEmpty(testStart); + await taskManagerUtils.waitForDisabled(response.body.id, testStart); // Should not have executed any action const executedActionsResult = await esTestIndexTool.search( @@ -1192,7 +1192,7 @@ instanceStateValue: true // once before disabling the alert and waiting for tasks to finish await esTestIndexTool.waitForDocs('alert:test.always-firing', reference, 2); await alertUtils.disable(response.body.id); - await taskManagerUtils.waitForEmpty(testStart); + await taskManagerUtils.waitForDisabled(response.body.id, testStart); // Should not have executed any action const executedActionsResult = await esTestIndexTool.search( @@ -1252,7 +1252,7 @@ instanceStateValue: true // Ensure actions are executed once before disabling the alert and waiting for tasks to finish await esTestIndexTool.waitForDocs('action:test.index-record', reference, 1); await alertUtils.disable(response.body.id); - await taskManagerUtils.waitForEmpty(testStart); + await taskManagerUtils.waitForDisabled(response.body.id, testStart); // Should have one document indexed by the action const searchResult = await esTestIndexTool.search( diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/create.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/create.ts index 143d845d074c4..7860bf15dc8e5 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/create.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/create.ts @@ -108,6 +108,7 @@ export default function createAlertTests({ getService }: FtrProviderContext) { spaceId: Spaces.space1.id, consumer: 'alertsFixture', }); + expect(taskRecord.task.enabled).to.eql(true); // Ensure AAD isn't broken await checkAAD({ supertest, @@ -498,6 +499,7 @@ export default function createAlertTests({ getService }: FtrProviderContext) { spaceId: Spaces.space1.id, consumer: 'alertsFixture', }); + expect(taskRecord.task.enabled).to.eql(true); // Ensure AAD isn't broken await checkAAD({ supertest, diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/disable.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/disable.ts index 6df7f4b3f6de8..feec6431ee3cf 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/disable.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/disable.ts @@ -15,6 +15,7 @@ import { getTestRuleData, ObjectRemover, getEventLog, + TaskManagerDoc, } from '../../../common/lib'; import { validateEvent } from './event_log'; @@ -31,11 +32,12 @@ export default function createDisableRuleTests({ getService }: FtrProviderContex after(() => objectRemover.removeAll()); - async function getScheduledTask(id: string) { - return await es.get({ + async function getScheduledTask(id: string): Promise { + const scheduledTask = await es.get({ id: `task:${id}`, index: '.kibana_task_manager', }); + return scheduledTask._source!; } it('should handle disable rule request appropriately', async () => { @@ -48,12 +50,16 @@ export default function createDisableRuleTests({ getService }: FtrProviderContex await ruleUtils.disable(createdRule.id); - try { - await getScheduledTask(createdRule.scheduled_task_id); - throw new Error('Should have removed scheduled task'); - } catch (e) { - expect(e.meta.statusCode).to.eql(404); - } + // task doc should still exist but be disabled + const taskRecord = await getScheduledTask(createdRule.scheduled_task_id); + expect(taskRecord.type).to.eql('task'); + expect(taskRecord.task.taskType).to.eql('alerting:test.noop'); + expect(JSON.parse(taskRecord.task.params)).to.eql({ + alertId: createdRule.id, + spaceId: Spaces.space1.id, + consumer: 'alertsFixture', + }); + expect(taskRecord.task.enabled).to.eql(false); // Ensure AAD isn't broken await checkAAD({ @@ -188,12 +194,16 @@ export default function createDisableRuleTests({ getService }: FtrProviderContex .set('kbn-xsrf', 'foo') .expect(204); - try { - await getScheduledTask(createdRule.scheduled_task_id); - throw new Error('Should have removed scheduled task'); - } catch (e) { - expect(e.meta.statusCode).to.eql(404); - } + // task doc should still exist but be disabled + const taskRecord = await getScheduledTask(createdRule.scheduled_task_id); + expect(taskRecord.type).to.eql('task'); + expect(taskRecord.task.taskType).to.eql('alerting:test.noop'); + expect(JSON.parse(taskRecord.task.params)).to.eql({ + alertId: createdRule.id, + spaceId: Spaces.space1.id, + consumer: 'alertsFixture', + }); + expect(taskRecord.task.enabled).to.eql(false); // Ensure AAD isn't broken await checkAAD({ diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/enable.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/enable.ts index 59ae5efcba191..d8dec2a486298 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/enable.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/enable.ts @@ -59,6 +59,7 @@ export default function createEnableAlertTests({ getService }: FtrProviderContex spaceId: Spaces.space1.id, consumer: 'alertsFixture', }); + expect(taskRecord.task.enabled).to.eql(true); // Ensure AAD isn't broken await checkAAD({ @@ -111,6 +112,7 @@ export default function createEnableAlertTests({ getService }: FtrProviderContex spaceId: Spaces.space1.id, consumer: 'alertsFixture', }); + expect(taskRecord.task.enabled).to.eql(true); // Ensure AAD isn't broken await checkAAD({ diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/scheduled_task_id.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/scheduled_task_id.ts index 607166203e35f..d008421381b14 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/scheduled_task_id.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/scheduled_task_id.ts @@ -109,6 +109,7 @@ export default function createScheduledTaskIdTests({ getService }: FtrProviderCo spaceId: 'default', consumer: 'alertsFixture', }); + expect(taskRecord.task.enabled).to.eql(true); }); }); } diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/migrations.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/migrations.ts index 04f34ab50a133..232db750d5425 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/migrations.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/migrations.ts @@ -136,5 +136,59 @@ export default function createGetTests({ getService }: FtrProviderContext) { expect(response.body._source?.task.taskType).to.eql(`sampleTaskRemovedType`); expect(response.body._source?.task.status).to.eql(`unrecognized`); }); + + it('8.5.0 migrates active tasks to set enabled to true', async () => { + const response = await es.search<{ task: ConcreteTaskInstance }>( + { + index: '.kibana_task_manager', + size: 100, + body: { + query: { + match_all: {}, + }, + }, + }, + { + meta: true, + } + ); + expect(response.statusCode).to.eql(200); + const tasks = response.body.hits.hits; + tasks + .filter( + (task) => + task._source?.task.status !== 'failed' && task._source?.task.status !== 'unrecognized' + ) + .forEach((task) => { + expect(task._source?.task.enabled).to.eql(true); + }); + }); + + it('8.5.0 does not migrates failed and unrecognized', async () => { + const response = await es.search<{ task: ConcreteTaskInstance }>( + { + index: '.kibana_task_manager', + size: 100, + body: { + query: { + match_all: {}, + }, + }, + }, + { + meta: true, + } + ); + expect(response.statusCode).to.eql(200); + const tasks = response.body.hits.hits; + tasks + .filter( + (task) => + task._source?.task.status === 'failed' || task._source?.task.status === 'unrecognized' + ) + .forEach((task) => { + expect(task._source?.task.enabled).to.be(undefined); + }); + }); }); } diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts index cf720be74143a..56477f0e6edf0 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts @@ -11,7 +11,7 @@ import expect from '@kbn/expect'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import TaskManagerMapping from '@kbn/task-manager-plugin/server/saved_objects/mappings.json'; import { DEFAULT_POLL_INTERVAL } from '@kbn/task-manager-plugin/server/config'; -import { ConcreteTaskInstance, BulkUpdateSchedulesResult } from '@kbn/task-manager-plugin/server'; +import { ConcreteTaskInstance, BulkUpdateTaskResult } from '@kbn/task-manager-plugin/server'; import { FtrProviderContext } from '../../ftr_provider_context'; const { @@ -184,7 +184,7 @@ export default function ({ getService }: FtrProviderContext) { .set('kbn-xsrf', 'xxx') .send({ taskIds, schedule }) .expect(200) - .then((response: { body: BulkUpdateSchedulesResult }) => response.body); + .then((response: { body: BulkUpdateTaskResult }) => response.body); } // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139