diff --git a/x-pack/plugins/ingest_manager/common/constants/agent.ts b/x-pack/plugins/ingest_manager/common/constants/agent.ts index f3990ba78c539..e9226fa684925 100644 --- a/x-pack/plugins/ingest_manager/common/constants/agent.ts +++ b/x-pack/plugins/ingest_manager/common/constants/agent.ts @@ -14,3 +14,5 @@ export const AGENT_TYPE_TEMPORARY = 'TEMPORARY'; export const AGENT_POLLING_THRESHOLD_MS = 30000; export const AGENT_POLLING_INTERVAL = 1000; +export const AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS = 30000; +export const AGENT_UPDATE_ACTIONS_INTERVAL_MS = 5000; diff --git a/x-pack/plugins/ingest_manager/common/types/index.ts b/x-pack/plugins/ingest_manager/common/types/index.ts index 800e77a6a2742..7f81b04f5e84a 100644 --- a/x-pack/plugins/ingest_manager/common/types/index.ts +++ b/x-pack/plugins/ingest_manager/common/types/index.ts @@ -15,6 +15,7 @@ export interface IngestManagerConfigType { fleet: { enabled: boolean; tlsCheckDisabled: boolean; + pollingRequestTimeout: number; kibana: { host?: string; ca_sha256?: string; diff --git a/x-pack/plugins/ingest_manager/package.json b/x-pack/plugins/ingest_manager/package.json index 052d3d0b42c7e..8826ed57ab106 100644 --- a/x-pack/plugins/ingest_manager/package.json +++ b/x-pack/plugins/ingest_manager/package.json @@ -3,5 +3,8 @@ "name": "ingest-manager", "version": "8.0.0", "private": true, - "license": "Elastic-License" + "license": "Elastic-License", + "dependencies": { + "abort-controller": "^3.0.0" + } } diff --git a/x-pack/plugins/ingest_manager/server/constants/index.ts b/x-pack/plugins/ingest_manager/server/constants/index.ts index 6e633c04ed816..4d60b9031414e 100644 --- a/x-pack/plugins/ingest_manager/server/constants/index.ts +++ b/x-pack/plugins/ingest_manager/server/constants/index.ts @@ -9,6 +9,8 @@ export { AGENT_TYPE_TEMPORARY, AGENT_POLLING_THRESHOLD_MS, AGENT_POLLING_INTERVAL, + AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS, + AGENT_UPDATE_ACTIONS_INTERVAL_MS, INDEX_PATTERN_PLACEHOLDER_SUFFIX, // Routes PLUGIN_ID, diff --git a/x-pack/plugins/ingest_manager/server/index.ts b/x-pack/plugins/ingest_manager/server/index.ts index 84408b60d3edb..f6b2d7ccc6d48 100644 --- a/x-pack/plugins/ingest_manager/server/index.ts +++ b/x-pack/plugins/ingest_manager/server/index.ts @@ -27,6 +27,7 @@ export const config = { fleet: schema.object({ enabled: schema.boolean({ defaultValue: true }), tlsCheckDisabled: schema.boolean({ defaultValue: false }), + pollingRequestTimeout: schema.number({ defaultValue: 60000 }), kibana: schema.object({ host: schema.maybe(schema.string()), ca_sha256: schema.maybe(schema.string()), diff --git a/x-pack/plugins/ingest_manager/server/plugin.ts b/x-pack/plugins/ingest_manager/server/plugin.ts index 0d53092a0a8ff..13301df471c53 100644 --- a/x-pack/plugins/ingest_manager/server/plugin.ts +++ b/x-pack/plugins/ingest_manager/server/plugin.ts @@ -55,6 +55,7 @@ import { } from './services'; import { getAgentStatusById } from './services/agents'; import { CloudSetup } from '../../cloud/server'; +import { agentCheckinState } from './services/agents/checkin/state'; export interface IngestManagerSetupDeps { licensing: LicensingPluginSetup; @@ -229,6 +230,8 @@ export class IngestManagerPlugin logger: this.logger, }); licenseService.start(this.licensing$); + agentCheckinState.start(); + return { esIndexPatternService: new ESIndexPatternSavedObjectService(), agentService: { @@ -240,5 +243,6 @@ export class IngestManagerPlugin public async stop() { appContextService.stop(); licenseService.stop(); + agentCheckinState.stop(); } } diff --git a/x-pack/plugins/ingest_manager/server/routes/agent/handlers.ts b/x-pack/plugins/ingest_manager/server/routes/agent/handlers.ts index ae833b55137cc..0d1c77b8d697f 100644 --- a/x-pack/plugins/ingest_manager/server/routes/agent/handlers.ts +++ b/x-pack/plugins/ingest_manager/server/routes/agent/handlers.ts @@ -4,8 +4,9 @@ * you may not use this file except in compliance with the Elastic License. */ -import { RequestHandler, KibanaRequest } from 'src/core/server'; +import { RequestHandler } from 'src/core/server'; import { TypeOf } from '@kbn/config-schema'; +import { AbortController } from 'abort-controller'; import { GetAgentsResponse, GetOneAgentResponse, @@ -32,13 +33,6 @@ import * as AgentService from '../../services/agents'; import * as APIKeyService from '../../services/api_keys'; import { appContextService } from '../../services/app_context'; -export function getInternalUserSOClient(request: KibanaRequest) { - // soClient as kibana internal users, be carefull on how you use it, security is not enabled - return appContextService.getSavedObjects().getScopedClient(request, { - excludedWrappers: ['security'], - }); -} - export const getAgentHandler: RequestHandler> = async (context, request, response) => { @@ -176,14 +170,20 @@ export const postAgentCheckinHandler: RequestHandler< TypeOf > = async (context, request, response) => { try { - const soClient = getInternalUserSOClient(request); + const soClient = appContextService.getInternalUserSOClient(request); const res = APIKeyService.parseApiKeyFromHeaders(request.headers); const agent = await AgentService.getAgentByAccessAPIKeyId(soClient, res.apiKeyId); + const abortController = new AbortController(); + request.events.aborted$.subscribe(() => { + abortController.abort(); + }); + const signal = abortController.signal; const { actions } = await AgentService.agentCheckin( soClient, agent, request.body.events || [], - request.body.local_metadata + request.body.local_metadata, + { signal } ); const body: PostAgentCheckinResponse = { action: 'checkin', @@ -198,16 +198,24 @@ export const postAgentCheckinHandler: RequestHandler< }; return response.ok({ body }); - } catch (e) { - if (e.isBoom && e.output.statusCode === 404) { - return response.notFound({ - body: { message: `Agent ${request.params.agentId} not found` }, + } catch (err) { + const logger = appContextService.getLogger(); + if (err.isBoom) { + if (err.output.statusCode >= 500) { + logger.error(err); + } + + return response.customError({ + statusCode: err.output.statusCode, + body: { message: err.output.payload.message }, }); } + logger.error(err); + return response.customError({ statusCode: 500, - body: { message: e.message }, + body: { message: err.message }, }); } }; @@ -218,7 +226,7 @@ export const postAgentEnrollHandler: RequestHandler< TypeOf > = async (context, request, response) => { try { - const soClient = getInternalUserSOClient(request); + const soClient = appContextService.getInternalUserSOClient(request); const { apiKeyId } = APIKeyService.parseApiKeyFromHeaders(request.headers); const enrollmentAPIKey = await APIKeyService.getEnrollmentAPIKeyById(soClient, apiKeyId); diff --git a/x-pack/plugins/ingest_manager/server/routes/agent/index.ts b/x-pack/plugins/ingest_manager/server/routes/agent/index.ts index 78bb178dce402..87eee4622c80b 100644 --- a/x-pack/plugins/ingest_manager/server/routes/agent/index.ts +++ b/x-pack/plugins/ingest_manager/server/routes/agent/index.ts @@ -35,12 +35,12 @@ import { postAgentEnrollHandler, postAgentsUnenrollHandler, getAgentStatusForConfigHandler, - getInternalUserSOClient, putAgentsReassignHandler, } from './handlers'; import { postAgentAcksHandlerBuilder } from './acks_handlers'; import * as AgentService from '../../services/agents'; import { postNewAgentActionHandlerBuilder } from './actions_handlers'; +import { appContextService } from '../../services'; export const registerRoutes = (router: IRouter) => { // Get one @@ -110,7 +110,9 @@ export const registerRoutes = (router: IRouter) => { postAgentAcksHandlerBuilder({ acknowledgeAgentActions: AgentService.acknowledgeAgentActions, getAgentByAccessAPIKeyId: AgentService.getAgentByAccessAPIKeyId, - getSavedObjectsClientContract: getInternalUserSOClient, + getSavedObjectsClientContract: appContextService.getInternalUserSOClient.bind( + appContextService + ), saveAgentEvents: AgentService.saveAgentEvents, }) ); diff --git a/x-pack/plugins/ingest_manager/server/services/agent_config.ts b/x-pack/plugins/ingest_manager/server/services/agent_config.ts index 9e0386de74763..9c27e9b7a3a7a 100644 --- a/x-pack/plugins/ingest_manager/server/services/agent_config.ts +++ b/x-pack/plugins/ingest_manager/server/services/agent_config.ts @@ -65,8 +65,6 @@ class AgentConfigService { updated_by: user ? user.username : 'system', }); - await this.triggerAgentConfigUpdatedEvent(soClient, 'updated', id); - return (await this.get(soClient, id)) as AgentConfig; } diff --git a/x-pack/plugins/ingest_manager/server/services/agents/actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/actions.ts index 236ad7df466b4..8d1b320c89ae6 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/actions.ts @@ -77,6 +77,15 @@ export async function getAgentActionByIds( ); } +export async function getNewActionsSince(soClient: SavedObjectsClientContract, timestamp: string) { + const res = await soClient.find({ + type: AGENT_ACTION_SAVED_OBJECT_TYPE, + filter: `not ${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.sent_at: * AND ${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.created_at >= "${timestamp}"`, + }); + + return res.saved_objects.map(savedObjectToAgentAction); +} + export interface ActionsService { getAgent: (soClient: SavedObjectsClientContract, agentId: string) => Promise; diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin.test.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin.test.ts deleted file mode 100644 index 72a86d7c8158e..0000000000000 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin.test.ts +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { shouldCreateConfigAction } from './checkin'; -import { Agent } from '../../types'; - -function getAgent(data: Partial) { - return { actions: [], ...data } as Agent; -} - -describe('Agent checkin service', () => { - describe('shouldCreateConfigAction', () => { - it('should return false if the agent do not have an assigned config', () => { - const res = shouldCreateConfigAction(getAgent({}), []); - - expect(res).toBeFalsy(); - }); - - it('should return true if this is agent first checkin', () => { - const res = shouldCreateConfigAction(getAgent({ config_id: 'config1' }), []); - - expect(res).toBeTruthy(); - }); - - it('should return false agent is already running latest revision', () => { - const res = shouldCreateConfigAction( - getAgent({ - config_id: 'config1', - last_checkin: '2018-01-02T00:00:00', - config_revision: 1, - config_newest_revision: 1, - }), - [] - ); - - expect(res).toBeFalsy(); - }); - - it('should return false agent has already latest revision config change action', () => { - const res = shouldCreateConfigAction( - getAgent({ - config_id: 'config1', - last_checkin: '2018-01-02T00:00:00', - config_revision: 1, - config_newest_revision: 2, - }), - [ - { - id: 'action1', - agent_id: 'agent1', - type: 'CONFIG_CHANGE', - created_at: new Date().toISOString(), - data: { - config: { - id: 'config1', - revision: 2, - }, - }, - }, - ] - ); - - expect(res).toBeFalsy(); - }); - - it('should return true agent has unrelated config change actions', () => { - const res = shouldCreateConfigAction( - getAgent({ - config_id: 'config1', - last_checkin: '2018-01-02T00:00:00', - config_revision: 1, - config_newest_revision: 2, - }), - [ - { - id: 'action1', - agent_id: 'agent1', - type: 'CONFIG_CHANGE', - created_at: new Date().toISOString(), - data: { - config: { - id: 'config2', - revision: 2, - }, - }, - }, - { - id: 'action1', - agent_id: 'agent1', - type: 'CONFIG_CHANGE', - created_at: new Date().toISOString(), - data: { - config: { - id: 'config1', - revision: 1, - }, - }, - }, - ] - ); - - expect(res).toBeTruthy(); - }); - - it('should return true if this agent has a new revision', () => { - const res = shouldCreateConfigAction( - getAgent({ - config_id: 'config1', - last_checkin: '2018-01-02T00:00:00', - config_revision: 1, - config_newest_revision: 2, - }), - [] - ); - - expect(res).toBeTruthy(); - }); - - it('should return true if this agent has no revision currently set', () => { - const res = shouldCreateConfigAction( - getAgent({ - config_id: 'config1', - last_checkin: '2018-01-02T00:00:00', - config_revision: null, - config_newest_revision: 2, - }), - [] - ); - - expect(res).toBeTruthy(); - }); - }); -}); diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin.ts deleted file mode 100644 index 5bbc376051122..0000000000000 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin.ts +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { SavedObjectsClientContract, SavedObjectsBulkCreateObject } from 'src/core/server'; -import { - Agent, - NewAgentEvent, - AgentEvent, - AgentAction, - AgentSOAttributes, - AgentEventSOAttributes, - AgentMetadata, -} from '../../types'; - -import { agentConfigService } from '../agent_config'; -import * as APIKeysService from '../api_keys'; -import { AGENT_SAVED_OBJECT_TYPE, AGENT_EVENT_SAVED_OBJECT_TYPE } from '../../constants'; -import { getAgentActionsForCheckin, createAgentAction } from './actions'; -import { appContextService } from '../app_context'; - -export async function agentCheckin( - soClient: SavedObjectsClientContract, - agent: Agent, - events: NewAgentEvent[], - localMetadata?: any -) { - const updateData: { - last_checkin: string; - default_api_key?: string; - default_api_key_id?: string; - local_metadata?: AgentMetadata; - current_error_events?: string; - } = { - last_checkin: new Date().toISOString(), - }; - - const actions = await getAgentActionsForCheckin(soClient, agent.id); - - // Generate new agent config if config is updated - if (agent.config_id && shouldCreateConfigAction(agent, actions)) { - const { - attributes: { default_api_key: defaultApiKey }, - } = await appContextService - .getEncryptedSavedObjects() - .getDecryptedAsInternalUser(AGENT_SAVED_OBJECT_TYPE, agent.id); - - const config = await agentConfigService.getFullConfig(soClient, agent.config_id); - if (config) { - // Assign output API keys - // We currently only support default ouput - if (!defaultApiKey) { - const outputAPIKey = await APIKeysService.generateOutputApiKey( - soClient, - 'default', - agent.id - ); - updateData.default_api_key = outputAPIKey.key; - updateData.default_api_key_id = outputAPIKey.id; - } - // Mutate the config to set the api token for this agent - config.outputs.default.api_key = defaultApiKey || updateData.default_api_key; - - const configChangeAction = await createAgentAction(soClient, { - agent_id: agent.id, - type: 'CONFIG_CHANGE', - data: { config } as any, - created_at: new Date().toISOString(), - sent_at: undefined, - }); - actions.push(configChangeAction); - } - } - - const { updatedErrorEvents } = await processEventsForCheckin(soClient, agent, events); - - // Persist changes - if (updatedErrorEvents) { - updateData.current_error_events = JSON.stringify(updatedErrorEvents); - } - - await soClient.update(AGENT_SAVED_OBJECT_TYPE, agent.id, updateData); - - return { actions }; -} - -async function processEventsForCheckin( - soClient: SavedObjectsClientContract, - agent: Agent, - events: NewAgentEvent[] -) { - const acknowledgedActionIds: string[] = []; - const updatedErrorEvents: Array = [...agent.current_error_events]; - for (const event of events) { - // @ts-ignore - event.config_id = agent.config_id; - - if (isActionEvent(event)) { - acknowledgedActionIds.push(event.action_id as string); - } - - if (isErrorOrState(event)) { - // Remove any global or specific to a stream event - const existingEventIndex = updatedErrorEvents.findIndex( - (e) => e.stream_id === event.stream_id - ); - if (existingEventIndex >= 0) { - updatedErrorEvents.splice(existingEventIndex, 1); - } - if (event.type === 'ERROR') { - updatedErrorEvents.push(event); - } - } - } - - if (events.length > 0) { - await createEventsForAgent(soClient, agent.id, events); - } - - return { - acknowledgedActionIds, - updatedErrorEvents, - }; -} - -async function createEventsForAgent( - soClient: SavedObjectsClientContract, - agentId: string, - events: NewAgentEvent[] -) { - const objects: Array> = events.map( - (eventData) => { - return { - attributes: { - ...eventData, - payload: eventData.payload ? JSON.stringify(eventData.payload) : undefined, - }, - type: AGENT_EVENT_SAVED_OBJECT_TYPE, - }; - } - ); - - return soClient.bulkCreate(objects); -} - -function isErrorOrState(event: AgentEvent | NewAgentEvent) { - return event.type === 'STATE' || event.type === 'ERROR'; -} - -function isActionEvent(event: AgentEvent | NewAgentEvent) { - return ( - event.type === 'ACTION' && (event.subtype === 'ACKNOWLEDGED' || event.subtype === 'UNKNOWN') - ); -} - -export function shouldCreateConfigAction(agent: Agent, actions: AgentAction[]): boolean { - if (!agent.config_id) { - return false; - } - - const isFirstCheckin = !agent.last_checkin; - if (isFirstCheckin) { - return true; - } - - const isAgentConfigOutdated = - // Config reassignment - (!agent.config_revision && agent.config_newest_revision) || - // new revision of a config - (agent.config_revision && - agent.config_newest_revision && - agent.config_revision < agent.config_newest_revision); - - if (!isAgentConfigOutdated) { - return false; - } - - const isActionAlreadyGenerated = !!actions.find((action) => { - if (!action.data || action.type !== 'CONFIG_CHANGE') { - return false; - } - - const { data } = action; - - return ( - data.config.id === agent.config_id && data.config.revision === agent.config_newest_revision - ); - }); - - return !isActionAlreadyGenerated; -} diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/index.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/index.ts new file mode 100644 index 0000000000000..7c6641bbb5faa --- /dev/null +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/index.ts @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { SavedObjectsClientContract, SavedObjectsBulkCreateObject } from 'src/core/server'; +import { + Agent, + NewAgentEvent, + AgentEvent, + AgentSOAttributes, + AgentEventSOAttributes, + AgentMetadata, +} from '../../../types'; + +import { AGENT_SAVED_OBJECT_TYPE, AGENT_EVENT_SAVED_OBJECT_TYPE } from '../../../constants'; +import { agentCheckinState } from './state'; +import { getAgentActionsForCheckin } from '../actions'; + +export async function agentCheckin( + soClient: SavedObjectsClientContract, + agent: Agent, + events: NewAgentEvent[], + localMetadata?: any, + options?: { signal: AbortSignal } +) { + const updateData: { + local_metadata?: AgentMetadata; + current_error_events?: string; + } = {}; + const { updatedErrorEvents } = await processEventsForCheckin(soClient, agent, events); + if (updatedErrorEvents) { + updateData.current_error_events = JSON.stringify(updatedErrorEvents); + } + if (localMetadata) { + updateData.local_metadata = localMetadata; + } + if (Object.keys(updateData).length > 0) { + await soClient.update(AGENT_SAVED_OBJECT_TYPE, agent.id, updateData); + } + + // Check if some actions are not acknowledged + let actions = await getAgentActionsForCheckin(soClient, agent.id); + if (actions.length > 0) { + return { actions }; + } + + // Wait for new actions + actions = await agentCheckinState.subscribeToNewActions(soClient, agent, options); + + return { actions }; +} + +async function processEventsForCheckin( + soClient: SavedObjectsClientContract, + agent: Agent, + events: NewAgentEvent[] +) { + const updatedErrorEvents: Array = [...agent.current_error_events]; + for (const event of events) { + // @ts-ignore + event.config_id = agent.config_id; + + if (isErrorOrState(event)) { + // Remove any global or specific to a stream event + const existingEventIndex = updatedErrorEvents.findIndex( + (e) => e.stream_id === event.stream_id + ); + if (existingEventIndex >= 0) { + updatedErrorEvents.splice(existingEventIndex, 1); + } + if (event.type === 'ERROR') { + updatedErrorEvents.push(event); + } + } + } + + if (events.length > 0) { + await createEventsForAgent(soClient, agent.id, events); + } + + return { + updatedErrorEvents, + }; +} + +async function createEventsForAgent( + soClient: SavedObjectsClientContract, + agentId: string, + events: NewAgentEvent[] +) { + const objects: Array> = events.map( + (eventData) => { + return { + attributes: { + ...eventData, + payload: eventData.payload ? JSON.stringify(eventData.payload) : undefined, + }, + type: AGENT_EVENT_SAVED_OBJECT_TYPE, + }; + } + ); + + return soClient.bulkCreate(objects); +} + +function isErrorOrState(event: AgentEvent | NewAgentEvent) { + return event.type === 'STATE' || event.type === 'ERROR'; +} diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts new file mode 100644 index 0000000000000..1f9bba8b12be4 --- /dev/null +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { Observable } from 'rxjs'; + +export class AbortError extends Error {} + +export const toPromiseAbortable = ( + observable: Observable, + signal?: AbortSignal +): Promise => + new Promise((resolve, reject) => { + if (signal && signal.aborted) { + reject(new AbortError('Aborted')); + return; + } + + const listener = () => { + subscription.unsubscribe(); + reject(new AbortError('Aborted')); + }; + const cleanup = () => { + if (signal) { + signal.removeEventListener('abort', listener); + } + }; + const subscription = observable.subscribe( + (data) => { + cleanup(); + resolve(data); + }, + (err) => { + cleanup(); + reject(err); + } + ); + + if (signal) { + signal.addEventListener('abort', listener, { once: true }); + } + }); diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state.ts new file mode 100644 index 0000000000000..69d61171b21fc --- /dev/null +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state.ts @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { SavedObjectsClientContract } from 'src/core/server'; +import { Agent } from '../../../types'; +import { appContextService } from '../../app_context'; +import { agentCheckinStateConnectedAgentsFactory } from './state_connected_agents'; +import { agentCheckinStateNewActionsFactory } from './state_new_actions'; +import { AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS } from '../../../constants'; + +function agentCheckinStateFactory() { + const agentConnected = agentCheckinStateConnectedAgentsFactory(); + const newActions = agentCheckinStateNewActionsFactory(); + let interval: NodeJS.Timeout; + function start() { + interval = setInterval(async () => { + try { + await agentConnected.updateLastCheckinAt(); + } catch (err) { + appContextService.getLogger().error(err); + } + }, AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS); + } + + function stop() { + if (interval) { + clearInterval(interval); + } + } + return { + subscribeToNewActions: ( + soClient: SavedObjectsClientContract, + agent: Agent, + options?: { signal: AbortSignal } + ) => + agentConnected.wrapPromise( + agent.id, + newActions.subscribeToNewActions(soClient, agent, options) + ), + start, + stop, + }; +} + +export const agentCheckinState = agentCheckinStateFactory(); diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_connected_agents.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_connected_agents.ts new file mode 100644 index 0000000000000..96e006b78f00f --- /dev/null +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_connected_agents.ts @@ -0,0 +1,79 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { KibanaRequest, SavedObjectsBulkUpdateObject } from 'src/core/server'; +import { appContextService } from '../../app_context'; +import { AgentSOAttributes } from '../../../types'; +import { AGENT_SAVED_OBJECT_TYPE } from '../../../constants'; + +function getInternalUserSOClient() { + const fakeRequest = ({ + headers: {}, + getBasePath: () => '', + path: '/', + route: { settings: {} }, + url: { + href: '/', + }, + raw: { + req: { + url: '/', + }, + }, + } as unknown) as KibanaRequest; + + return appContextService.getInternalUserSOClient(fakeRequest); +} +export function agentCheckinStateConnectedAgentsFactory() { + const connectedAgentsIds = new Set(); + let agentToUpdate = new Set(); + + function addAgent(agentId: string) { + connectedAgentsIds.add(agentId); + agentToUpdate.add(agentId); + } + + function removeAgent(agentId: string) { + connectedAgentsIds.delete(agentId); + } + + async function wrapPromise(agentId: string, p: Promise): Promise { + try { + addAgent(agentId); + const res = await p; + removeAgent(agentId); + return res; + } catch (err) { + removeAgent(agentId); + throw err; + } + } + + async function updateLastCheckinAt() { + if (agentToUpdate.size === 0) { + return; + } + const internalSOClient = getInternalUserSOClient(); + const now = new Date().toISOString(); + const updates: Array> = [ + ...connectedAgentsIds.values(), + ].map((agentId) => ({ + type: AGENT_SAVED_OBJECT_TYPE, + id: agentId, + attributes: { + last_checkin: now, + }, + })); + + agentToUpdate = new Set([...connectedAgentsIds.values()]); + await internalSOClient.bulkUpdate(updates, { refresh: false }); + } + + return { + wrapPromise, + updateLastCheckinAt, + }; +} diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts new file mode 100644 index 0000000000000..0f30ab409f381 --- /dev/null +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts @@ -0,0 +1,183 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { timer, from, Observable, TimeoutError } from 'rxjs'; +import { + shareReplay, + distinctUntilKeyChanged, + switchMap, + mergeMap, + merge, + filter, + timeout, + take, +} from 'rxjs/operators'; +import { SavedObjectsClientContract, KibanaRequest } from 'src/core/server'; +import { + Agent, + AgentAction, + AgentSOAttributes, + AgentConfig, + FullAgentConfig, +} from '../../../types'; +import { agentConfigService } from '../../agent_config'; +import * as APIKeysService from '../../api_keys'; +import { AGENT_SAVED_OBJECT_TYPE, AGENT_UPDATE_ACTIONS_INTERVAL_MS } from '../../../constants'; +import { createAgentAction, getNewActionsSince } from '../actions'; +import { appContextService } from '../../app_context'; +import { toPromiseAbortable, AbortError } from './rxjs_utils'; + +function getInternalUserSOClient() { + const fakeRequest = ({ + headers: {}, + getBasePath: () => '', + path: '/', + route: { settings: {} }, + url: { + href: '/', + }, + raw: { + req: { + url: '/', + }, + }, + } as unknown) as KibanaRequest; + + return appContextService.getInternalUserSOClient(fakeRequest); +} + +function createAgentConfigSharedObservable(configId: string) { + const internalSOClient = getInternalUserSOClient(); + return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe( + switchMap(() => + from(agentConfigService.get(internalSOClient, configId) as Promise) + ), + distinctUntilKeyChanged('revision'), + switchMap((data) => from(agentConfigService.getFullConfig(internalSOClient, configId))), + shareReplay({ refCount: true, bufferSize: 1 }) + ); +} + +function createNewActionsSharedObservable(): Observable { + return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe( + switchMap(() => { + const internalSOClient = getInternalUserSOClient(); + + return from(getNewActionsSince(internalSOClient, new Date().toISOString())); + }), + shareReplay({ refCount: true, bufferSize: 1 }) + ); +} + +async function getOrCreateAgentDefaultOutputAPIKey( + soClient: SavedObjectsClientContract, + agent: Agent +): Promise { + const { + attributes: { default_api_key: defaultApiKey }, + } = await appContextService + .getEncryptedSavedObjects() + .getDecryptedAsInternalUser(AGENT_SAVED_OBJECT_TYPE, agent.id); + + if (defaultApiKey) { + return defaultApiKey; + } + + const outputAPIKey = await APIKeysService.generateOutputApiKey(soClient, 'default', agent.id); + await soClient.update(AGENT_SAVED_OBJECT_TYPE, agent.id, { + default_api_key: outputAPIKey.key, + default_api_key_id: outputAPIKey.id, + }); + + return outputAPIKey.key; +} + +async function createAgentActionFromConfigIfOutdated( + soClient: SavedObjectsClientContract, + agent: Agent, + config: FullAgentConfig | null +) { + if (!config || !config.revision) { + return; + } + const isAgentConfigOutdated = !agent.config_revision || agent.config_revision < config.revision; + if (!isAgentConfigOutdated) { + return; + } + + // Deep clone !not supporting Date, and undefined value. + const newConfig = JSON.parse(JSON.stringify(config)); + + // Mutate the config to set the api token for this agent + newConfig.outputs.default.api_key = await getOrCreateAgentDefaultOutputAPIKey(soClient, agent); + + const configChangeAction = await createAgentAction(soClient, { + agent_id: agent.id, + type: 'CONFIG_CHANGE', + data: { config: newConfig } as any, + created_at: new Date().toISOString(), + sent_at: undefined, + }); + + return [configChangeAction]; +} + +export function agentCheckinStateNewActionsFactory() { + // Shared Observables + const agentConfigs$ = new Map>(); + const newActions$ = createNewActionsSharedObservable(); + + async function subscribeToNewActions( + soClient: SavedObjectsClientContract, + agent: Agent, + options?: { signal: AbortSignal } + ): Promise { + if (!agent.config_id) { + throw new Error('Agent do not have a config'); + } + const configId = agent.config_id; + if (!agentConfigs$.has(configId)) { + agentConfigs$.set(configId, createAgentConfigSharedObservable(configId)); + } + const agentConfig$ = agentConfigs$.get(configId); + if (!agentConfig$) { + throw new Error(`Invalid state no observable for config ${configId}`); + } + const stream$ = agentConfig$.pipe( + timeout(appContextService.getConfig()?.fleet.pollingRequestTimeout || 0), + mergeMap((config) => createAgentActionFromConfigIfOutdated(soClient, agent, config)), + merge(newActions$), + mergeMap(async (data) => { + if (!data) { + return; + } + const newActions = data.filter((action) => action.agent_id); + if (newActions.length === 0) { + return; + } + + return newActions; + }), + filter((data) => data !== undefined), + take(1) + ); + try { + const data = await toPromiseAbortable(stream$, options?.signal); + + return data || []; + } catch (err) { + if (err instanceof TimeoutError || err instanceof AbortError) { + return []; + } + + throw err; + } + } + + return { + subscribeToNewActions, + }; +} diff --git a/x-pack/plugins/ingest_manager/server/services/app_context.ts b/x-pack/plugins/ingest_manager/server/services/app_context.ts index 81a16caa8ce9e..5ed6f7c5e54d1 100644 --- a/x-pack/plugins/ingest_manager/server/services/app_context.ts +++ b/x-pack/plugins/ingest_manager/server/services/app_context.ts @@ -5,7 +5,7 @@ */ import { BehaviorSubject, Observable } from 'rxjs'; import { first } from 'rxjs/operators'; -import { SavedObjectsServiceStart, HttpServiceSetup, Logger } from 'src/core/server'; +import { SavedObjectsServiceStart, HttpServiceSetup, Logger, KibanaRequest } from 'src/core/server'; import { EncryptedSavedObjectsClient, EncryptedSavedObjectsPluginSetup, @@ -89,6 +89,13 @@ class AppContextService { return this.savedObjects; } + public getInternalUserSOClient(request: KibanaRequest) { + // soClient as kibana internal users, be carefull on how you use it, security is not enabled + return appContextService.getSavedObjects().getScopedClient(request, { + excludedWrappers: ['security'], + }); + } + public getIsProductionMode() { return this.isProductionMode; } diff --git a/x-pack/plugins/ingest_manager/yarn.lock b/x-pack/plugins/ingest_manager/yarn.lock new file mode 120000 index 0000000000000..6e09764ec763b --- /dev/null +++ b/x-pack/plugins/ingest_manager/yarn.lock @@ -0,0 +1 @@ +../../../yarn.lock \ No newline at end of file diff --git a/x-pack/test/api_integration/apis/fleet/agents/checkin.ts b/x-pack/test/api_integration/apis/fleet/agents/checkin.ts index 47756b02a8d22..d24f7f495a06c 100644 --- a/x-pack/test/api_integration/apis/fleet/agents/checkin.ts +++ b/x-pack/test/api_integration/apis/fleet/agents/checkin.ts @@ -8,7 +8,7 @@ import expect from '@kbn/expect'; import uuid from 'uuid'; import { FtrProviderContext } from '../../../ftr_provider_context'; -import { getSupertestWithoutAuth } from './services'; +import { getSupertestWithoutAuth, setupIngest } from './services'; export default function (providerContext: FtrProviderContext) { const { getService } = providerContext; @@ -44,6 +44,7 @@ export default function (providerContext: FtrProviderContext) { }, }); }); + setupIngest(providerContext); after(async () => { await esArchiver.unload('fleet/agents'); }); diff --git a/x-pack/test/api_integration/config.ts b/x-pack/test/api_integration/config.ts index 71da903d33b29..4a333869a9549 100644 --- a/x-pack/test/api_integration/config.ts +++ b/x-pack/test/api_integration/config.ts @@ -29,6 +29,7 @@ export async function getApiIntegrationConfig({ readConfigFile }: FtrConfigProvi '--optimize.enabled=false', '--telemetry.optIn=true', '--xpack.ingestManager.enabled=true', + '--xpack.ingestManager.fleet.pollingRequestTimeout=5000', // 5 seconds '--xpack.securitySolution.alertResultListDefaultDateRange.from=2018-01-10T00:00:00.000Z', ], }, diff --git a/x-pack/test/functional/es_archives/fleet/agents/data.json b/x-pack/test/functional/es_archives/fleet/agents/data.json index 1739f583b2e87..047f26a3f443c 100644 --- a/x-pack/test/functional/es_archives/fleet/agents/data.json +++ b/x-pack/test/functional/es_archives/fleet/agents/data.json @@ -9,7 +9,7 @@ "access_api_key_id": "api-key-2", "active": true, "shared_id": "agent1_filebeat", - "config_id": "1", + "config_id": "config1", "type": "PERMANENT", "local_metadata": {}, "user_provided_metadata": {} diff --git a/yarn.lock b/yarn.lock index 256c8642a02ae..9a9bf98e06c31 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6410,6 +6410,13 @@ abort-controller@^2.0.3: dependencies: event-target-shim "^5.0.0" +abort-controller@^3.0.0: + version "3.0.0" + resolved "https://registry.yarnpkg.com/abort-controller/-/abort-controller-3.0.0.tgz#eaf54d53b62bae4138e809ca225c8439a6efb392" + integrity sha512-h8lQ8tacZYnR3vNQTgibj+tODHI5/+l06Au2Pcriv/Gmet0eaj4TwWH41sO9wnHDiQsEj19q0drzdWdeAHtweg== + dependencies: + event-target-shim "^5.0.0" + abortcontroller-polyfill@^1.4.0: version "1.4.0" resolved "https://registry.yarnpkg.com/abortcontroller-polyfill/-/abortcontroller-polyfill-1.4.0.tgz#0d5eb58e522a461774af8086414f68e1dda7a6c4"