Skip to content

Commit

Permalink
[Response Ops][Alerting] Ensuring retries on all esClient calls whe…
Browse files Browse the repository at this point in the history
…n installing FAAD resources (#153650)

## Summary

Audited all the calls to ES during FAAD resource installation and
ensured that each call is wrapped within a `retryTransientEsErrors`
call. This does not handle retrying resource installation during rule
execution if initial installation fails. That will be covered in another
issue.

---------

Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
ymao1 and kibanamachine authored Mar 28, 2023
1 parent 2cc12ce commit f266a0c
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,37 @@ describe('createConcreteWriteIndex', () => {
expect(logger.error).toHaveBeenCalledWith(`Error creating concrete write index - fail`);
});

it(`should retry getting index on transient ES error`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => ({}));
const error = new Error(`fail`) as EsError;
error.meta = {
body: {
error: {
type: 'resource_already_exists_exception',
},
},
};
clusterClient.indices.create.mockRejectedValueOnce(error);
clusterClient.indices.get
.mockRejectedValueOnce(new EsErrors.ConnectionError('foo'))
.mockRejectedValueOnce(new EsErrors.TimeoutError('timeout'))
.mockImplementationOnce(async () => ({
'.internal.alerts-test.alerts-default-000001': {
aliases: { '.alerts-test.alerts-default': { is_write_index: true } },
},
}));

await createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
});

expect(clusterClient.indices.get).toHaveBeenCalledTimes(3);
expect(logger.error).toHaveBeenCalledWith(`Error creating concrete write index - fail`);
});

it(`should log and throw error if ES throws resource_already_exists_exception error and existing index is not the write index`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => ({}));
const error = new Error(`fail`) as EsError;
Expand Down Expand Up @@ -265,6 +296,42 @@ describe('createConcreteWriteIndex', () => {
expect(clusterClient.indices.putMapping).toHaveBeenCalledTimes(2);
});

it(`should retry simulateIndexTemplate on transient ES errors`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => GetAliasResponse);
clusterClient.indices.simulateIndexTemplate
.mockRejectedValueOnce(new EsErrors.ConnectionError('foo'))
.mockRejectedValueOnce(new EsErrors.TimeoutError('timeout'))
.mockImplementation(async () => SimulateTemplateResponse);

await createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
});

expect(clusterClient.indices.simulateIndexTemplate).toHaveBeenCalledTimes(4);
});

it(`should retry getting alias on transient ES errors`, async () => {
clusterClient.indices.getAlias
.mockRejectedValueOnce(new EsErrors.ConnectionError('foo'))
.mockRejectedValueOnce(new EsErrors.TimeoutError('timeout'))
.mockImplementation(async () => GetAliasResponse);
clusterClient.indices.simulateIndexTemplate.mockImplementation(
async () => SimulateTemplateResponse
);

await createConcreteWriteIndex({
logger,
esClient: clusterClient,
indexPatterns: IndexPatterns,
totalFieldsLimit: 2500,
});

expect(clusterClient.indices.getAlias).toHaveBeenCalledTimes(3);
});

it(`should retry settings update on transient ES errors`, async () => {
clusterClient.indices.getAlias.mockImplementation(async () => GetAliasResponse);
clusterClient.indices.simulateIndexTemplate.mockImplementation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ const updateUnderlyingMapping = async ({
const { index, alias } = concreteIndexInfo;
let simulatedIndexMapping: IndicesSimulateIndexTemplateResponse;
try {
simulatedIndexMapping = await esClient.indices.simulateIndexTemplate({
name: index,
});
simulatedIndexMapping = await retryTransientEsErrors(
() => esClient.indices.simulateIndexTemplate({ name: index }),
{ logger }
);
} catch (err) {
logger.error(
`Ignored PUT mappings for alias ${alias}; error generating simulated mappings: ${err.message}`
Expand Down Expand Up @@ -149,10 +150,14 @@ export const createConcreteWriteIndex = async ({
try {
// Specify both the index pattern for the backing indices and their aliases
// The alias prevents the request from finding other namespaces that could match the -* pattern
const response = await esClient.indices.getAlias({
index: indexPatterns.pattern,
name: indexPatterns.basePattern,
});
const response = await retryTransientEsErrors(
() =>
esClient.indices.getAlias({
index: indexPatterns.pattern,
name: indexPatterns.basePattern,
}),
{ logger }
);

concreteIndices = Object.entries(response).flatMap(([index, { aliases }]) =>
Object.entries(aliases).map(([aliasName, aliasProperties]) => ({
Expand Down Expand Up @@ -213,19 +218,18 @@ export const createConcreteWriteIndex = async ({
},
},
}),
{
logger,
}
{ logger }
);
} catch (error) {
logger.error(`Error creating concrete write index - ${error.message}`);
// If the index already exists and it's the write index for the alias,
// something else created it so suppress the error. If it's not the write
// index, that's bad, throw an error.
if (error?.meta?.body?.error?.type === 'resource_already_exists_exception') {
const existingIndices = await esClient.indices.get({
index: indexPatterns.name,
});
const existingIndices = await retryTransientEsErrors(
() => esClient.indices.get({ index: indexPatterns.name }),
{ logger }
);
if (!existingIndices[indexPatterns.name]?.aliases?.[indexPatterns.alias]?.is_write_index) {
throw Error(
`Attempted to create index: ${indexPatterns.name} as the write index for alias: ${indexPatterns.alias}, but the index already exists and is not the write index for the alias`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,82 @@ describe('createOrUpdateComponentTemplate', () => {
},
});
});

it(`should retry getIndexTemplate and putIndexTemplate on transient ES errors`, async () => {
clusterClient.cluster.putComponentTemplate.mockRejectedValueOnce(
new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 400,
body: {
error: {
root_cause: [
{
type: 'illegal_argument_exception',
reason:
'updating component template [.alerts-ecs-mappings] results in invalid composable template [.alerts-security.alerts-default-index-template] after templates are merged',
},
],
type: 'illegal_argument_exception',
reason:
'updating component template [.alerts-ecs-mappings] results in invalid composable template [.alerts-security.alerts-default-index-template] after templates are merged',
caused_by: {
type: 'illegal_argument_exception',
reason:
'composable template [.alerts-security.alerts-default-index-template] template after composition with component templates [.alerts-ecs-mappings, .alerts-security.alerts-mappings, .alerts-technical-mappings] is invalid',
caused_by: {
type: 'illegal_argument_exception',
reason:
'invalid composite mappings for [.alerts-security.alerts-default-index-template]',
caused_by: {
type: 'illegal_argument_exception',
reason: 'Limit of total fields [1900] has been exceeded',
},
},
},
},
},
})
)
);
const existingIndexTemplate = {
name: 'test-template',
index_template: {
index_patterns: ['test*'],
composed_of: ['test-mappings'],
template: {
settings: {
auto_expand_replicas: '0-1',
hidden: true,
'index.lifecycle': {
name: '.alerts-ilm-policy',
rollover_alias: `.alerts-empty-default`,
},
'index.mapping.total_fields.limit': 1800,
},
mappings: {
dynamic: false,
},
},
},
};
clusterClient.indices.getIndexTemplate
.mockRejectedValueOnce(new EsErrors.ConnectionError('foo'))
.mockRejectedValueOnce(new EsErrors.TimeoutError('timeout'))
.mockResolvedValueOnce({
index_templates: [existingIndexTemplate],
});
clusterClient.indices.putIndexTemplate
.mockRejectedValueOnce(new EsErrors.ConnectionError('foo'))
.mockRejectedValueOnce(new EsErrors.TimeoutError('timeout'))
.mockResolvedValue({ acknowledged: true });
await createOrUpdateComponentTemplate({
logger,
esClient: clusterClient,
template: ComponentTemplate,
totalFieldsLimit: 2500,
});

expect(clusterClient.indices.getIndexTemplate).toHaveBeenCalledTimes(3);
expect(clusterClient.indices.putIndexTemplate).toHaveBeenCalledTimes(3);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,50 @@ interface CreateOrUpdateComponentTemplateOpts {
const getIndexTemplatesUsingComponentTemplate = async (
esClient: ElasticsearchClient,
componentTemplateName: string,
totalFieldsLimit: number
totalFieldsLimit: number,
logger: Logger
) => {
// Get all index templates and filter down to just the ones referencing this component template
const { index_templates: indexTemplates } = await esClient.indices.getIndexTemplate();
const { index_templates: indexTemplates } = await retryTransientEsErrors(
() => esClient.indices.getIndexTemplate(),
{ logger }
);
const indexTemplatesUsingComponentTemplate = (indexTemplates ?? []).filter(
(indexTemplate: IndicesGetIndexTemplateIndexTemplateItem) =>
indexTemplate.index_template.composed_of.includes(componentTemplateName)
);
await asyncForEach(
indexTemplatesUsingComponentTemplate,
async (template: IndicesGetIndexTemplateIndexTemplateItem) => {
await esClient.indices.putIndexTemplate({
name: template.name,
body: {
...template.index_template,
template: {
...template.index_template.template,
settings: {
...template.index_template.template?.settings,
'index.mapping.total_fields.limit': totalFieldsLimit,
await retryTransientEsErrors(
() =>
esClient.indices.putIndexTemplate({
name: template.name,
body: {
...template.index_template,
template: {
...template.index_template.template,
settings: {
...template.index_template.template?.settings,
'index.mapping.total_fields.limit': totalFieldsLimit,
},
},
},
},
},
});
}),
{ logger }
);
}
);
};

const createOrUpdateComponentTemplateHelper = async (
esClient: ElasticsearchClient,
template: ClusterPutComponentTemplateRequest,
totalFieldsLimit: number
totalFieldsLimit: number,
logger: Logger
) => {
try {
await esClient.cluster.putComponentTemplate(template);
await retryTransientEsErrors(() => esClient.cluster.putComponentTemplate(template), { logger });
} catch (error) {
const reason = error?.meta?.body?.error?.caused_by?.caused_by?.caused_by?.reason;
if (reason && reason.match(/Limit of total fields \[\d+\] has been exceeded/) != null) {
Expand All @@ -68,10 +77,17 @@ const createOrUpdateComponentTemplateHelper = async (
// number of new ECS fields pushes the composed mapping above the limit, this error will
// occur. We have to update the field limit inside the index template now otherwise we
// can never update the component template
await getIndexTemplatesUsingComponentTemplate(esClient, template.name, totalFieldsLimit);
await getIndexTemplatesUsingComponentTemplate(
esClient,
template.name,
totalFieldsLimit,
logger
);

// Try to update the component template again
await esClient.cluster.putComponentTemplate(template);
await retryTransientEsErrors(() => esClient.cluster.putComponentTemplate(template), {
logger,
});
} else {
throw error;
}
Expand All @@ -87,10 +103,7 @@ export const createOrUpdateComponentTemplate = async ({
logger.info(`Installing component template ${template.name}`);

try {
await retryTransientEsErrors(
() => createOrUpdateComponentTemplateHelper(esClient, template, totalFieldsLimit),
{ logger }
);
await createOrUpdateComponentTemplateHelper(esClient, template, totalFieldsLimit, logger);
} catch (err) {
logger.error(`Error installing component template ${template.name} - ${err.message}`);
throw err;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,21 @@ describe('createOrUpdateIndexTemplate', () => {
expect(clusterClient.indices.putIndexTemplate).toHaveBeenCalledTimes(3);
});

it(`should retry simulateTemplate on transient ES errors`, async () => {
clusterClient.indices.simulateTemplate
.mockRejectedValueOnce(new EsErrors.ConnectionError('foo'))
.mockRejectedValueOnce(new EsErrors.TimeoutError('timeout'))
.mockImplementation(async () => SimulateTemplateResponse);
clusterClient.indices.putIndexTemplate.mockResolvedValue({ acknowledged: true });
await createOrUpdateIndexTemplate({
logger,
esClient: clusterClient,
template: IndexTemplate,
});

expect(clusterClient.indices.simulateTemplate).toHaveBeenCalledTimes(3);
});

it(`should log and throw error if max retries exceeded`, async () => {
clusterClient.indices.simulateTemplate.mockImplementation(async () => SimulateTemplateResponse);
clusterClient.indices.putIndexTemplate.mockRejectedValue(new EsErrors.ConnectionError('foo'));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ export const createOrUpdateIndexTemplate = async ({
let mappings: MappingTypeMapping = {};
try {
// Simulate the index template to proactively identify any issues with the mapping
const simulateResponse = await esClient.indices.simulateTemplate(template);
const simulateResponse = await retryTransientEsErrors(
() => esClient.indices.simulateTemplate(template),
{ logger }
);
mappings = simulateResponse.template.mappings;
} catch (err) {
logger.error(
Expand Down

0 comments on commit f266a0c

Please sign in to comment.