Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Maps] Geo containment latency and concurrent containment fix #86980

Merged
merged 14 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ export interface GeoContainmentAlertParams extends AlertTypeParams {
boundaryIndexId: string;
boundaryGeoField: string;
boundaryNameField?: string;
delayOffsetWithUnits?: string;
indexQuery?: Query;
boundaryIndexQuery?: Query;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ export const ParamsSchema = schema.object({
boundaryIndexId: schema.string({ minLength: 1 }),
boundaryGeoField: schema.string({ minLength: 1 }),
boundaryNameField: schema.maybe(schema.string({ minLength: 1 })),
delayOffsetWithUnits: schema.maybe(schema.string({ minLength: 1 })),
indexQuery: schema.maybe(schema.any({})),
boundaryIndexQuery: schema.maybe(schema.any({})),
});
Expand All @@ -114,7 +113,6 @@ export interface GeoContainmentParams extends AlertTypeParams {
boundaryIndexId: string;
boundaryGeoField: string;
boundaryNameField?: string;
delayOffsetWithUnits?: string;
indexQuery?: Query;
boundaryIndexQuery?: Query;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export function transformResults(
results: SearchResponse<unknown> | undefined,
dateField: string,
geoField: string
): Map<string, LatestEntityLocation> {
): Map<string, LatestEntityLocation[]> {
if (!results) {
return new Map();
}
Expand Down Expand Up @@ -64,12 +64,15 @@ export function transformResults(
// Get unique
.reduce(
(
accu: Map<string, LatestEntityLocation>,
accu: Map<string, LatestEntityLocation[]>,
el: LatestEntityLocation & { entityName: string }
) => {
const { entityName, ...locationData } = el;
if (!accu.has(entityName)) {
accu.set(entityName, locationData);
if (entityName) {
if (!accu.has(entityName)) {
accu.set(entityName, []);
}
accu.get(entityName)!.push(locationData);
}
return accu;
},
Expand All @@ -78,26 +81,9 @@ export function transformResults(
return orderedResults;
}

function getOffsetTime(delayOffsetWithUnits: string, oldTime: Date): Date {
const timeUnit = delayOffsetWithUnits.slice(-1);
const time: number = +delayOffsetWithUnits.slice(0, -1);

const adjustedDate = new Date(oldTime.getTime());
if (timeUnit === 's') {
adjustedDate.setSeconds(adjustedDate.getSeconds() - time);
} else if (timeUnit === 'm') {
adjustedDate.setMinutes(adjustedDate.getMinutes() - time);
} else if (timeUnit === 'h') {
adjustedDate.setHours(adjustedDate.getHours() - time);
} else if (timeUnit === 'd') {
adjustedDate.setDate(adjustedDate.getDate() - time);
}
return adjustedDate;
}

export function getActiveEntriesAndGenerateAlerts(
prevLocationMap: Record<string, LatestEntityLocation>,
currLocationMap: Map<string, LatestEntityLocation>,
prevLocationMap: Map<string, LatestEntityLocation[]>,
currLocationMap: Map<string, LatestEntityLocation[]>,
alertInstanceFactory: AlertServices<
GeoContainmentInstanceState,
GeoContainmentInstanceContext,
Expand All @@ -106,32 +92,55 @@ export function getActiveEntriesAndGenerateAlerts(
shapesIdsNamesMap: Record<string, unknown>,
currIntervalEndTime: Date
) {
const allActiveEntriesMap: Map<string, LatestEntityLocation> = new Map([
...Object.entries(prevLocationMap || {}),
const allActiveEntriesMap: Map<string, LatestEntityLocation[]> = new Map([
...prevLocationMap,
...currLocationMap,
]);
allActiveEntriesMap.forEach(({ location, shapeLocationId, dateInShape, docId }, entityName) => {
const containingBoundaryName = shapesIdsNamesMap[shapeLocationId] || shapeLocationId;
const context = {
entityId: entityName,
entityDateTime: dateInShape ? new Date(dateInShape).toISOString() : null,
entityDocumentId: docId,
detectionDateTime: new Date(currIntervalEndTime).toISOString(),
entityLocation: `POINT (${location[0]} ${location[1]})`,
containingBoundaryId: shapeLocationId,
containingBoundaryName,
};
const alertInstanceId = `${entityName}-${containingBoundaryName}`;
if (shapeLocationId === OTHER_CATEGORY) {
allActiveEntriesMap.forEach((locationsArr, entityName) => {
// Generate alerts
locationsArr.forEach(({ location, shapeLocationId, dateInShape, docId }) => {
const context = {
entityId: entityName,
entityDateTime: dateInShape ? new Date(dateInShape).toISOString() : null,
entityDocumentId: docId,
detectionDateTime: new Date(currIntervalEndTime).toISOString(),
entityLocation: `POINT (${location[0]} ${location[1]})`,
containingBoundaryId: shapeLocationId,
containingBoundaryName: shapesIdsNamesMap[shapeLocationId] || shapeLocationId,
};
const alertInstanceId = `${entityName}-${context.containingBoundaryName}`;
if (shapeLocationId !== OTHER_CATEGORY) {
alertInstanceFactory(alertInstanceId).scheduleActions(ActionGroupId, context);
}
});

if (locationsArr[0].shapeLocationId === OTHER_CATEGORY) {
allActiveEntriesMap.delete(entityName);
return;
}

const otherCatIndex = locationsArr.findIndex(
({ shapeLocationId }) => shapeLocationId === OTHER_CATEGORY
);
if (otherCatIndex >= 0) {
const afterOtherLocationsArr = locationsArr.slice(0, otherCatIndex);
allActiveEntriesMap.set(entityName, afterOtherLocationsArr);
} else {
alertInstanceFactory(alertInstanceId).scheduleActions(ActionGroupId, context);
allActiveEntriesMap.set(entityName, locationsArr);
}
});
return allActiveEntriesMap;
}

export const getGeoContainmentExecutor = (log: Logger): GeoContainmentAlertType['executor'] =>
async function ({ previousStartedAt, startedAt, services, params, alertId, state }) {
async function ({
previousStartedAt: currIntervalStartTime,
startedAt: currIntervalEndTime,
services,
params,
alertId,
state,
}) {
const { shapesFilters, shapesIdsNamesMap } = state.shapesFilters
? state
: await getShapesFilters(
Expand All @@ -147,15 +156,6 @@ export const getGeoContainmentExecutor = (log: Logger): GeoContainmentAlertType[

const executeEsQuery = await executeEsQueryFactory(params, services, log, shapesFilters);

let currIntervalStartTime = previousStartedAt;
let currIntervalEndTime = startedAt;
if (params.delayOffsetWithUnits) {
if (currIntervalStartTime) {
currIntervalStartTime = getOffsetTime(params.delayOffsetWithUnits, currIntervalStartTime);
}
currIntervalEndTime = getOffsetTime(params.delayOffsetWithUnits, currIntervalEndTime);
}

// Start collecting data only on the first cycle
let currentIntervalResults: SearchResponse<unknown> | undefined;
if (!currIntervalStartTime) {
Expand All @@ -169,14 +169,17 @@ export const getGeoContainmentExecutor = (log: Logger): GeoContainmentAlertType[
currentIntervalResults = await executeEsQuery(currIntervalStartTime, currIntervalEndTime);
}

const currLocationMap: Map<string, LatestEntityLocation> = transformResults(
const currLocationMap: Map<string, LatestEntityLocation[]> = transformResults(
currentIntervalResults,
params.dateField,
params.geoField
);

const prevLocationMap: Map<string, LatestEntityLocation[]> = new Map([
...Object.entries((state.prevLocationMap as Record<string, LatestEntityLocation[]>) || {}),
]);
const allActiveEntriesMap = getActiveEntriesAndGenerateAlerts(
state.prevLocationMap as Record<string, LatestEntityLocation>,
prevLocationMap,
currLocationMap,
services.alertInstanceFactory,
shapesIdsNamesMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ describe('alertType', () => {
boundaryIndexId: 'testIndex',
boundaryGeoField: 'testField',
boundaryNameField: 'testField',
delayOffsetWithUnits: 'testOffset',
};

expect(alertType.validate?.params?.validate(params)).toBeTruthy();
Expand Down
Loading