Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ymao1 committed Nov 11, 2024
1 parent ea01eef commit 9bb0899
Showing 1 changed file with 70 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import { Logger } from '@kbn/logging';
import { CoreStart } from '@kbn/core-lifecycle-server';
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { SearchHit } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { TaskScheduling } from '../task_scheduling';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { ConcreteTaskInstance, TaskManagerStartContract } from '..';
Expand Down Expand Up @@ -60,60 +62,10 @@ export function taskRunner(
const [{ elasticsearch }] = await coreStartServices();
const esClient = elasticsearch.client.asInternalUser;

const result = await esClient.search<ConcreteTaskInstance>({
index: TASK_MANAGER_INDEX,
body: {
size: 100,
query: {
bool: {
must: [
{
terms: {
'task.taskType': REMOVED_TYPES,
},
},
],
},
},
},
});

logger.info(`markRemovedTasks results ${JSON.stringify(result)}`);
const removedTasks = await queryForRemovedTasks(esClient);

if (result.hits.hits.length > 0) {
const bulkBody = [];
for (const hit of result.hits.hits) {
bulkBody.push({ update: { _id: hit._id } });
bulkBody.push({ doc: { task: { status: TaskStatus.Unrecognized } } });
}

let removedCount = 0;
try {
const removeResults = await esClient.bulk({
index: TASK_MANAGER_INDEX,
refresh: false,
body: bulkBody,
});
for (const removeResult of removeResults.items) {
if (!removeResult.update || !removeResult.update._id) {
logger.warn(
`Error updating task with unknown to mark as unrecognized - malformed response`
);
} else if (removeResult.update?.error) {
logger.warn(
`Error updating task ${
removeResult.update._id
} to mark as unrecognized - ${JSON.stringify(removeResult.update.error)}`
);
} else {
removedCount++;
}
}
logger.debug(`Marked ${removedCount} removed tasks as unrecognized`);
} catch (err) {
// don't worry too much about errors, we'll try again next time
logger.warn(`Error updating tasks to mark as unrecognized: ${err}`);
}
if (removedTasks.length > 0) {
await updateTasksToBeUnrecognized(esClient, logger, removedTasks);
}

return {
Expand All @@ -131,3 +83,68 @@ export function taskRunner(
};
};
}

async function queryForRemovedTasks(
esClient: ElasticsearchClient
): Promise<Array<SearchHit<ConcreteTaskInstance>>> {
const result = await esClient.search<ConcreteTaskInstance>({
index: TASK_MANAGER_INDEX,
body: {
size: 100,
_source: false,
query: {
bool: {
must: [
{
terms: {
'task.taskType': REMOVED_TYPES,
},
},
],
},
},
},
});

return result.hits.hits;
}

async function updateTasksToBeUnrecognized(
esClient: ElasticsearchClient,
logger: Logger,
removedTasks: Array<SearchHit<ConcreteTaskInstance>>
) {
const bulkBody = [];
for (const task of removedTasks) {
bulkBody.push({ update: { _id: task._id } });
bulkBody.push({ doc: { task: { status: TaskStatus.Unrecognized } } });
}

let removedCount = 0;
try {
const removeResults = await esClient.bulk({
index: TASK_MANAGER_INDEX,
refresh: false,
body: bulkBody,
});
for (const removeResult of removeResults.items) {
if (!removeResult.update || !removeResult.update._id) {
logger.warn(
`Error updating task with unknown to mark as unrecognized - malformed response`
);
} else if (removeResult.update?.error) {
logger.warn(
`Error updating task ${
removeResult.update._id
} to mark as unrecognized - ${JSON.stringify(removeResult.update.error)}`
);
} else {
removedCount++;
}
}
logger.debug(`Marked ${removedCount} removed tasks as unrecognized`);
} catch (err) {
// don't worry too much about errors, we'll try again next time
logger.warn(`Error updating tasks to mark as unrecognized: ${err}`);
}
}

0 comments on commit 9bb0899

Please sign in to comment.