Skip to content

Commit

Permalink
[eem] add default lookback (#189395)
Browse files Browse the repository at this point in the history
Closes #187348

This changes adds an optional `history.settings.lookbackPeriod` property
that will default to `1h` if none is provided. The main point is to
prevent accidental processing of the entire dataset when creating a
definition.

I took the opportunity to do some refactoring:
- `durationSchema` was transforming a literal duration (eg `1h`) into a
`moment.Duration` with overriden `toJSON` property. since we don't use
any of the `moment` functionalities in consuming code the schema now
returns the raw string after regex validation
- split the `generateHistoryTransform` in `generateHistoryTransform` and
`generateBackfillHistoryTransform`
  • Loading branch information
klacabane authored Jul 31, 2024
1 parent 22de72d commit 1e23b6d
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 69 deletions.
58 changes: 45 additions & 13 deletions x-pack/packages/kbn-entities-schema/src/schema/common.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
* 2.0.
*/

import { SafeParseSuccess } from 'zod';
import { durationSchema, metadataSchema, semVerSchema } from './common';
import moment from 'moment';
import { durationSchema, metadataSchema, semVerSchema, historySettingsSchema } from './common';

describe('schemas', () => {
describe('metadataSchema', () => {
Expand Down Expand Up @@ -60,38 +58,46 @@ describe('schemas', () => {
expect(result).toMatchSnapshot();
});
});

describe('durationSchema', () => {
it('should work with 1m', () => {
const result = durationSchema.safeParse('1m');
expect(result.success).toBeTruthy();
expect((result as SafeParseSuccess<moment.Duration>).data.toJSON()).toBe('1m');
expect((result as SafeParseSuccess<moment.Duration>).data.asSeconds()).toEqual(60);
expect(result.data).toBe('1m');
});
it('should work with 10s', () => {
const result = durationSchema.safeParse('10s');
expect(result.success).toBeTruthy();
expect((result as SafeParseSuccess<moment.Duration>).data.toJSON()).toBe('10s');
expect((result as SafeParseSuccess<moment.Duration>).data.asSeconds()).toEqual(10);
expect(result.data).toBe('10s');
});
it('should work with 999h', () => {
const result = durationSchema.safeParse('999h');
expect(result.success).toBeTruthy();
expect((result as SafeParseSuccess<moment.Duration>).data.toJSON()).toBe('999h');
expect((result as SafeParseSuccess<moment.Duration>).data.asSeconds()).toEqual(999 * 60 * 60);
expect(result.data).toBe('999h');
});
it('should work with 90d', () => {
const result = durationSchema.safeParse('90d');
expect(result.success).toBeTruthy();
expect((result as SafeParseSuccess<moment.Duration>).data.toJSON()).toBe('90d');
expect((result as SafeParseSuccess<moment.Duration>).data.asSeconds()).toEqual(
90 * 24 * 60 * 60
);
expect(result.data).toBe('90d');
});
it('should not work with 1ms', () => {
const result = durationSchema.safeParse('1ms');
expect(result.success).toBeFalsy();
});
it('should not work with invalid values', () => {
let result = durationSchema.safeParse('PT1H');
expect(result.success).toBeFalsy();
result = durationSchema.safeParse('1H');
expect(result.success).toBeFalsy();
result = durationSchema.safeParse('1f');
expect(result.success).toBeFalsy();
result = durationSchema.safeParse('foo');
expect(result.success).toBeFalsy();
result = durationSchema.safeParse(' 1h ');
expect(result.success).toBeFalsy();
});
});

describe('semVerSchema', () => {
it('should validate with 999.999.999', () => {
const result = semVerSchema.safeParse('999.999.999');
Expand All @@ -103,4 +109,30 @@ describe('schemas', () => {
expect(result).toMatchSnapshot();
});
});

describe('historySettingsSchema', () => {
it('should return default values when not defined', () => {
let result = historySettingsSchema.safeParse(undefined);
expect(result.success).toBeTruthy();
expect(result.data).toEqual({ lookbackPeriod: '1h' });

result = historySettingsSchema.safeParse({ syncDelay: '1m' });
expect(result.success).toBeTruthy();
expect(result.data).toEqual({ syncDelay: '1m', lookbackPeriod: '1h' });
});

it('should return user defined values when defined', () => {
const result = historySettingsSchema.safeParse({
lookbackPeriod: '30m',
syncField: 'event.ingested',
syncDelay: '5m',
});
expect(result.success).toBeTruthy();
expect(result.data).toEqual({
lookbackPeriod: '30m',
syncField: 'event.ingested',
syncDelay: '5m',
});
});
});
});
48 changes: 34 additions & 14 deletions x-pack/packages/kbn-entities-schema/src/schema/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,21 @@ export const docCountMetricSchema = z.object({
filter: filterSchema,
});

export const durationSchema = z
.string()
.regex(/^\d+[m|d|s|h]$/)
.transform((val: string) => {
const parts = val.match(/(\d+)([m|s|h|d])/);
if (parts === null) {
throw new Error('Unable to parse duration');
}
const value = parseInt(parts[1], 10);
const unit = parts[2] as 'm' | 's' | 'h' | 'd';
const duration = moment.duration(value, unit);
duration.toJSON = () => val;
return duration;
});
export const durationSchema = z.string().regex(/^\d+[m|d|s|h]$/);

export const durationSchemaWithMinimum = (minimumMinutes: number) =>
durationSchema.refine(
(val: string) => {
const parts = val.match(/(\d+)([m|s|h|d])/);
if (parts === null) {
throw new Error('Unable to parse duration');
}
const value = parseInt(parts[1], 10);
const unit = parts[2] as 'm' | 's' | 'h' | 'd';
return moment.duration(value, unit).asMinutes() >= minimumMinutes;
},
{ message: `can not be less than ${minimumMinutes}m` }
);

export const percentileMetricSchema = z.object({
name: metricNameSchema,
Expand Down Expand Up @@ -131,3 +132,22 @@ export const semVerSchema = z.string().refine((maybeSemVer) => semVerRegex.test(
message:
'The string does use the Semantic Versioning (Semver) format of {major}.{minor}.{patch} (e.g., 1.0.0), ensure each part contains only digits.',
});

export const historySettingsSchema = z
.optional(
z.object({
syncField: z.optional(z.string()),
syncDelay: z.optional(durationSchema),
lookbackPeriod: z.optional(durationSchema).default('1h'),
frequency: z.optional(durationSchema),
backfillSyncDelay: z.optional(durationSchema),
backfillLookbackPeriod: z.optional(durationSchema),
backfillFrequency: z.optional(durationSchema),
})
)
.transform((settings) => {
return {
...settings,
lookbackPeriod: settings?.lookbackPeriod || durationSchema.parse('1h'),
};
});
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import {
durationSchema,
identityFieldsSchema,
semVerSchema,
historySettingsSchema,
durationSchemaWithMinimum,
} from './common';

export const entityDefinitionSchema = z.object({
Expand All @@ -32,27 +34,16 @@ export const entityDefinitionSchema = z.object({
managed: z.optional(z.boolean()).default(false),
history: z.object({
timestampField: z.string(),
interval: durationSchema.refine((val) => val.asMinutes() >= 1, {
message: 'The history.interval can not be less than 1m',
}),
settings: z.optional(
z.object({
syncField: z.optional(z.string()),
syncDelay: z.optional(z.string()),
frequency: z.optional(z.string()),
backfillSyncDelay: z.optional(z.string()),
backfillLookbackPeriod: z.optional(durationSchema),
backfillFrequency: z.optional(z.string()),
})
),
interval: durationSchemaWithMinimum(1),
settings: historySettingsSchema,
}),
latest: z.optional(
z.object({
settings: z.optional(
z.object({
syncField: z.optional(z.string()),
syncDelay: z.optional(z.string()),
frequency: z.optional(z.string()),
syncDelay: z.optional(durationSchema),
frequency: z.optional(durationSchema),
})
),
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ export const builtInServicesFromLogsEntityDefinition: EntityDefinition =
'This definition extracts service entities from common data streams by looking for the ECS field service.name',
type: 'service',
managed: true,
filter: '@timestamp >= now-10m',
indexPatterns: ['logs-*', 'filebeat*', 'metrics-apm.service_transaction.1m*'],
history: {
timestampField: '@timestamp',
interval: '1m',
settings: {
lookbackPeriod: '10m',
frequency: '2m',
syncDelay: '2m',
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import { ElasticsearchClient, Logger } from '@kbn/core/server';
import { EntityDefinition } from '@kbn/entities-schema';
import { retryTransientEsErrors } from './helpers/retry';
import { generateLatestTransform } from './transform/generate_latest_transform';
import { generateHistoryTransform } from './transform/generate_history_transform';
import {
generateBackfillHistoryTransform,
generateHistoryTransform,
} from './transform/generate_history_transform';

export async function createAndInstallHistoryTransform(
esClient: ElasticsearchClient,
Expand All @@ -33,7 +36,7 @@ export async function createAndInstallHistoryBackfillTransform(
logger: Logger
) {
try {
const historyTransform = generateHistoryTransform(definition, true);
const historyTransform = generateBackfillHistoryTransform(definition);
await retryTransientEsErrors(() => esClient.transform.putTransform(historyTransform), {
logger,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
import { EntityDefinition } from '@kbn/entities-schema';

export function isBackfillEnabled(definition: EntityDefinition) {
return definition.history.settings?.backfillSyncDelay != null;
return definition.history.settings.backfillSyncDelay != null;
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@

import { entityDefinition } from '../helpers/fixtures/entity_definition';
import { entityDefinitionWithBackfill } from '../helpers/fixtures/entity_definition_with_backfill';
import { generateHistoryTransform } from './generate_history_transform';
import {
generateBackfillHistoryTransform,
generateHistoryTransform,
} from './generate_history_transform';

describe('generateHistoryTransform(definition)', () => {
it('should generate a valid history transform', () => {
const transform = generateHistoryTransform(entityDefinition);
expect(transform).toMatchSnapshot();
});
it('should generate a valid history backfill transform', () => {
const transform = generateHistoryTransform(entityDefinitionWithBackfill, true);
const transform = generateBackfillHistoryTransform(entityDefinitionWithBackfill);
expect(transform).toMatchSnapshot();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,37 @@ import {
import { isBackfillEnabled } from '../helpers/is_backfill_enabled';

export function generateHistoryTransform(
definition: EntityDefinition,
backfill = false
definition: EntityDefinition
): TransformPutTransformRequest {
if (backfill && !isBackfillEnabled(definition)) {
const filter: QueryDslQueryContainer[] = [];

if (definition.filter) {
filter.push(getElasticsearchQueryOrThrow(definition.filter));
}

filter.push({
range: {
[definition.history.timestampField]: {
gte: `now-${definition.history.settings.lookbackPeriod}`,
},
},
});

return generateTransformPutRequest({
definition,
filter,
transformId: generateHistoryTransformId(definition),
frequency: definition.history.settings.frequency,
syncDelay: definition.history.settings.syncDelay,
});
}

export function generateBackfillHistoryTransform(
definition: EntityDefinition
): TransformPutTransformRequest {
if (!isBackfillEnabled(definition)) {
throw new Error(
'This function was called with backfill=true without history.settings.backfillSyncDelay'
'generateBackfillHistoryTransform called without history.settings.backfillSyncDelay set'
);
}

Expand All @@ -41,28 +66,38 @@ export function generateHistoryTransform(
filter.push(getElasticsearchQueryOrThrow(definition.filter));
}

if (backfill && definition.history.settings?.backfillLookbackPeriod) {
if (definition.history.settings.backfillLookbackPeriod) {
filter.push({
range: {
[definition.history.timestampField]: {
gte: `now-${definition.history.settings?.backfillLookbackPeriod.toJSON()}`,
gte: `now-${definition.history.settings.backfillLookbackPeriod}`,
},
},
});
}

const syncDelay = backfill
? definition.history.settings?.backfillSyncDelay
: definition.history.settings?.syncDelay;

const transformId = backfill
? generateHistoryBackfillTransformId(definition)
: generateHistoryTransformId(definition);

const frequency = backfill
? definition.history.settings?.backfillFrequency
: definition.history.settings?.frequency;
return generateTransformPutRequest({
definition,
filter,
transformId: generateHistoryBackfillTransformId(definition),
frequency: definition.history.settings.backfillFrequency,
syncDelay: definition.history.settings.backfillSyncDelay,
});
}

const generateTransformPutRequest = ({
definition,
filter,
transformId,
frequency,
syncDelay,
}: {
definition: EntityDefinition;
transformId: string;
filter: QueryDslQueryContainer[];
frequency?: string;
syncDelay?: string;
}) => {
return {
transform_id: transformId,
_meta: {
Expand All @@ -87,7 +122,7 @@ export function generateHistoryTransform(
frequency: frequency || ENTITY_DEFAULT_HISTORY_FREQUENCY,
sync: {
time: {
field: definition.history.settings?.syncField ?? definition.history.timestampField,
field: definition.history.settings.syncField || definition.history.timestampField,
delay: syncDelay || ENTITY_DEFAULT_HISTORY_SYNC_DELAY,
},
},
Expand All @@ -109,7 +144,7 @@ export function generateHistoryTransform(
['@timestamp']: {
date_histogram: {
field: definition.history.timestampField,
fixed_interval: definition.history.interval.toJSON(),
fixed_interval: definition.history.interval,
},
},
},
Expand All @@ -124,4 +159,4 @@ export function generateHistoryTransform(
},
},
};
}
};
Loading

0 comments on commit 1e23b6d

Please sign in to comment.