Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metrics support(MAPCO-5641) #34

Merged
merged 5 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
511 changes: 511 additions & 0 deletions dashboards/dashboards.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion helm/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ spec:
checksum/configmap: {{ include (print $.Template.BasePath "/configmap.yaml") . | sha256sum }}
{{- end }}
{{- if $metrics.enabled }}
prometheus.io/port: {{ $metrics.prometheus.port | quote }}
prometheus.io/port: {{ .Values.env.targetPort | quote }}
prometheus.io/scrape: {{ $metrics.prometheus.scrape | quote }}
{{- end }}
{{- if .Values.podAnnotations }}
Expand Down
4,949 changes: 3,125 additions & 1,824 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"@map-colonies/mc-priority-queue": "^8.1.1",
"@map-colonies/mc-utils": "^3.1.0",
"@map-colonies/read-pkg": "0.0.1",
"@map-colonies/telemetry": "^6.0.0",
"@map-colonies/telemetry": "^7.0.1",
"@opentelemetry/api": "^1.7.0",
"@opentelemetry/api-metrics": "0.23.0",
"@turf/turf": "^7.0.0",
Expand Down
2 changes: 1 addition & 1 deletion src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export const SERVICES = {
LOGGER: Symbol('Logger'),
CONFIG: Symbol('Config'),
TRACER: Symbol('Tracer'),
METER: Symbol('Meter'),
METRICS: Symbol('METRICS'),
QUEUE_CLIENT: Symbol('QueueClient'),
TILE_RANGER: Symbol('TileRanger'),
} satisfies Record<string, symbol>;
Expand Down
7 changes: 5 additions & 2 deletions src/common/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
} from '@map-colonies/mc-model-types';
import { TilesMimeFormat } from '@map-colonies/types';
import { BBox, Feature, MultiPolygon, Polygon } from 'geojson';
import { Footprint, ITileRange } from '@map-colonies/mc-utils';
import { ITileRange } from '@map-colonies/mc-utils';
import { LayerCacheType, SeedMode } from './constants';

//#region config interfaces
Expand Down Expand Up @@ -75,7 +75,7 @@ export interface TilesSeedingTaskConfig {
//#region job/task interfaces
export interface IJobHandler {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
handleJobInit: (job: IJobResponse<any, any>, taskId: string) => Promise<void>;
handleJobInit: (job: IJobResponse<any, any>, task: ITaskResponse<any>) => Promise<void>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
handleJobFinalize: (job: IJobResponse<any, any>, task: ITaskResponse<any>) => Promise<void>;
}
Expand Down Expand Up @@ -114,6 +114,9 @@ export interface IBBox {
maxX: number;
maxY: number;
}

export type Footprint = Polygon | MultiPolygon | Feature<Polygon | MultiPolygon>;

export interface PartSourceContext {
fileName: string;
tilesPath: string;
Expand Down
24 changes: 13 additions & 11 deletions src/common/tracing.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import { Tracing } from '@map-colonies/telemetry';
import { IGNORED_INCOMING_TRACE_ROUTES, IGNORED_OUTGOING_TRACE_ROUTES } from './constants';

const tracing = new Tracing(undefined, {
// eslint-disable-next-line @typescript-eslint/naming-convention
'@opentelemetry/instrumentation-http': {
ignoreIncomingRequestHook: (request): boolean =>
IGNORED_INCOMING_TRACE_ROUTES.some((route) => request.url !== undefined && route.test(request.url)),
ignoreOutgoingRequestHook: (request): boolean =>
IGNORED_OUTGOING_TRACE_ROUTES.some((route) => typeof request.path === 'string' && route.test(request.path)),
},
// eslint-disable-next-line @typescript-eslint/naming-convention
'@opentelemetry/instrumentation-fs': {
requireParentSpan: true,
const tracing = new Tracing({
autoInstrumentationsConfigMap: {
// eslint-disable-next-line @typescript-eslint/naming-convention
'@opentelemetry/instrumentation-http': {
ignoreIncomingRequestHook: (request): boolean =>
IGNORED_INCOMING_TRACE_ROUTES.some((route) => request.url !== undefined && route.test(request.url)),
ignoreOutgoingRequestHook: (request): boolean =>
IGNORED_OUTGOING_TRACE_ROUTES.some((route) => typeof request.path === 'string' && route.test(request.path)),
},
// eslint-disable-next-line @typescript-eslint/naming-convention
'@opentelemetry/instrumentation-fs': {
requireParentSpan: true,
},
},
});

Expand Down
28 changes: 20 additions & 8 deletions src/containerConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import config from 'config';
import { getOtelMixin } from '@map-colonies/telemetry';
import { TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { IHttpRetryConfig, TileRanger } from '@map-colonies/mc-utils';
import { trace, metrics as OtelMetrics } from '@opentelemetry/api';
import { instancePerContainerCachingFactory } from 'tsyringe';
import { trace } from '@opentelemetry/api';
import { Registry } from 'prom-client';
import { instanceCachingFactory, instancePerContainerCachingFactory } from 'tsyringe';
import { DependencyContainer } from 'tsyringe/dist/typings/types';
import jsLogger, { Logger, LoggerOptions } from '@map-colonies/js-logger';
import { Metrics } from '@map-colonies/telemetry';
import { INJECTION_VALUES, SERVICES, SERVICE_NAME } from './common/constants';
import { tracing } from './common/tracing';
import { InjectionObject, registerDependencies } from './common/dependencyRegistration';
Expand Down Expand Up @@ -47,9 +47,7 @@ export const registerExternalValues = (options?: RegisterOptions): DependencyCon
const loggerConfig = config.get<LoggerOptions>('telemetry.logger');
const logger = jsLogger({ ...loggerConfig, prettyPrint: loggerConfig.prettyPrint, mixin: getOtelMixin(), pinoCaller: loggerConfig.pinoCaller });

const metrics = new Metrics();
metrics.start();

const metricsRegistry = new Registry();
const tracer = trace.getTracer(SERVICE_NAME);

const ingestionConfig = config.get<IngestionPollingJobs>('jobManagement.ingestion.pollingJobs');
Expand All @@ -61,19 +59,33 @@ export const registerExternalValues = (options?: RegisterOptions): DependencyCon
{ token: SERVICES.LOGGER, provider: { useValue: logger } },
{ token: SERVICES.TRACER, provider: { useValue: tracer } },
{ token: SERVICES.QUEUE_CLIENT, provider: { useFactory: instancePerContainerCachingFactory(queueClientFactory) } },
{ token: SERVICES.METER, provider: { useValue: OtelMetrics.getMeterProvider().getMeter(SERVICE_NAME) } },
{ token: JOB_HANDLER_FACTORY_SYMBOL, provider: { useFactory: instancePerContainerCachingFactory(jobHandlerFactory) } },
{ token: handlersTokens.Ingestion_New, provider: { useClass: NewJobHandler } },
{ token: handlersTokens.Ingestion_Update, provider: { useClass: UpdateJobHandler } },
{ token: handlersTokens.Ingestion_Swap_Update, provider: { useClass: SwapJobHandler } },
{ token: SERVICES.TILE_RANGER, provider: { useClass: TileRanger } },
{ token: INJECTION_VALUES.ingestionJobTypes, provider: { useValue: handlersTokens } },
{
token: SERVICES.METRICS,
provider: {
useFactory: instanceCachingFactory((container) => {
const config = container.resolve<IConfig>(SERVICES.CONFIG);

if (config.get<boolean>('telemetry.metrics.enabled')) {
metricsRegistry.setDefaultLabels({
app: SERVICE_NAME,
});
return metricsRegistry;
}
}),
},
},
{
token: 'onSignal',
provider: {
useValue: {
useValue: async (): Promise<void> => {
await Promise.all([tracing.stop(), metrics.stop()]);
await Promise.all([tracing.stop()]);
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion src/job/models/jobProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export class JobProcessor {

switch (task.type) {
case taskTypes.init:
await jobHandler.handleJobInit(job, task.id);
await jobHandler.handleJobInit(job, task);
break;
case taskTypes.finalize:
await jobHandler.handleJobFinalize(job, task);
Expand Down
23 changes: 16 additions & 7 deletions src/job/models/newJobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { IngestionNewFinalizeTaskParams, IngestionNewJobParams, NewRasterLayerMe
import { TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { newAdditionalParamsSchema } from '../../utils/zod/schemas/jobParametersSchema';
import { Grid, IJobHandler, MergeTilesTaskParams, ExtendedRasterLayerMetadata, ExtendedNewRasterLayer } from '../../common/interfaces';
import { TaskMetrics } from '../../utils/metrics/taskMetrics';
import { SERVICES } from '../../common/constants';
import { getTileOutputFormat } from '../../utils/imageFormatUtil';
import { TileMergeTaskManager } from '../../task/models/tileMergeTaskManager';
Expand All @@ -23,15 +24,18 @@ export class NewJobHandler extends JobHandler implements IJobHandler {
@inject(SERVICES.QUEUE_CLIENT) queueClient: QueueClient,
@inject(CatalogClient) private readonly catalogClient: CatalogClient,
@inject(MapproxyApiClient) private readonly mapproxyClient: MapproxyApiClient,
@inject(GeoserverClient) private readonly geoserverClient: GeoserverClient
@inject(GeoserverClient) private readonly geoserverClient: GeoserverClient,
private readonly taskMetrics: TaskMetrics
) {
super(logger, queueClient);
}

public async handleJobInit(job: IJobResponse<IngestionNewJobParams, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, jobType: job.type, taskId });
public async handleJobInit(job: IJobResponse<IngestionNewJobParams, unknown>, task: ITaskResponse<unknown>): Promise<void> {
const logger = this.logger.child({ jobId: job.id, jobType: job.type, taskId: task.id });
const taskProcessTracking = this.taskMetrics.trackTaskProcessing(job.type, task.type);

try {
logger.info({ msg: `handling ${job.type} job with "init" task` });
logger.info({ msg: `handling ${job.type} job with ${task.type} task` });

const { inputFiles, metadata, partsData, additionalParams } = job.parameters;
const validAdditionalParams = this.validateAdditionalParams(additionalParams, newAdditionalParamsSchema);
Expand All @@ -52,7 +56,7 @@ export class NewJobHandler extends JobHandler implements IJobHandler {
const mergeTasks = this.taskBuilder.buildTasks(taskBuildParams);

logger.info({ msg: 'pushing tasks' });
await this.taskBuilder.pushTasks(job.id, mergeTasks);
await this.taskBuilder.pushTasks(job.id, job.type, mergeTasks);

logger.info({ msg: 'Updating job with new metadata', ...metadata, extendedLayerMetadata });
await this.queueClient.jobManagerClient.updateJob(job.id, {
Expand All @@ -61,13 +65,15 @@ export class NewJobHandler extends JobHandler implements IJobHandler {
});

logger.info({ msg: 'Acking task' });
await this.queueClient.ack(job.id, taskId);
await this.queueClient.ack(job.id, task.id);
taskProcessTracking?.success();

logger.info({ msg: 'Job init completed successfully' });
} catch (err) {
if (err instanceof Error) {
logger.error({ msg: 'Failed to handle job init', error: err });
await this.queueClient.reject(job.id, taskId, true, err.message);
await this.queueClient.reject(job.id, task.id, true, err.message);
taskProcessTracking?.failure(err.name);
}
}
}
Expand All @@ -77,6 +83,7 @@ export class NewJobHandler extends JobHandler implements IJobHandler {
task: ITaskResponse<IngestionNewFinalizeTaskParams>
): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId: task.id });
const taskProcessTracking = this.taskMetrics.trackTaskProcessing(job.type, task.type);

try {
logger.info({ msg: `handling ${job.type} job with "finalize"` });
Expand Down Expand Up @@ -109,12 +116,14 @@ export class NewJobHandler extends JobHandler implements IJobHandler {
if (this.isAllStepsCompleted(finalizeTaskParams)) {
logger.info({ msg: 'All finalize steps completed successfully', ...finalizeTaskParams });
await this.completeTaskAndJob(job, task);
taskProcessTracking?.success();
}
} catch (err) {
if (err instanceof Error) {
const errorMsg = `Failed to handle job finalize: ${err.message}`;
logger.error({ msg: errorMsg, error: err });
await this.queueClient.reject(job.id, task.id, true, err.message);
taskProcessTracking?.failure(err.name);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/job/models/seedingJobCreator.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { Logger } from '@map-colonies/js-logger';
import { IngestionUpdateJobParams, PolygonPart } from '@map-colonies/mc-model-types';
import { ICreateJobBody, IJobResponse, OperationStatus, TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { Footprint, getUTCDate } from '@map-colonies/mc-utils';
import { getUTCDate } from '@map-colonies/mc-utils';
import { feature, featureCollection, union } from '@turf/turf';
import { Feature, MultiPolygon, Polygon } from 'geojson';
import { inject, injectable } from 'tsyringe';
import { LayerCacheType, SeedMode, SERVICES } from '../../common/constants';
import { IConfig, SeedJobParams, SeedTaskOptions, SeedTaskParams, TilesSeedingTaskConfig } from '../../common/interfaces';
import { Footprint, IConfig, SeedJobParams, SeedTaskOptions, SeedTaskParams, TilesSeedingTaskConfig } from '../../common/interfaces';
import { MapproxyApiClient } from '../../httpClients/mapproxyClient';
import { internalIdSchema, swapUpdateAdditionalParamsSchema } from '../../utils/zod/schemas/jobParametersSchema';

Expand Down
24 changes: 16 additions & 8 deletions src/job/models/swapJobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Grid, IJobHandler, MergeTilesTaskParams } from '../../common/interfaces
import { MapproxyApiClient } from '../../httpClients/mapproxyClient';
import { TileMergeTaskManager } from '../../task/models/tileMergeTaskManager';
import { CatalogClient } from '../../httpClients/catalogClient';
import { TaskMetrics } from '../../utils/metrics/taskMetrics';
import { swapUpdateAdditionalParamsSchema, updateAdditionalParamsSchema } from '../../utils/zod/schemas/jobParametersSchema';
import { SeedMode, SERVICES } from '../../common/constants';
import { JobHandler } from './jobHandler';
Expand All @@ -21,16 +22,18 @@ export class SwapJobHandler extends JobHandler implements IJobHandler {
@inject(TileMergeTaskManager) private readonly taskBuilder: TileMergeTaskManager,
@inject(MapproxyApiClient) private readonly mapproxyClient: MapproxyApiClient,
@inject(CatalogClient) private readonly catalogClient: CatalogClient,
@inject(SeedingJobCreator) private readonly seedingJobCreator: SeedingJobCreator
@inject(SeedingJobCreator) private readonly seedingJobCreator: SeedingJobCreator,
private readonly taskMetrics: TaskMetrics
) {
super(logger, queueClient);
}

public async handleJobInit(job: IJobResponse<IngestionUpdateJobParams, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId, jobType: job.type });
public async handleJobInit(job: IJobResponse<IngestionUpdateJobParams, unknown>, task: ITaskResponse<unknown>): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId: task.id, jobType: job.type });
const taskProcessTracking = this.taskMetrics.trackTaskProcessing(job.type, task.type);

try {
logger.info({ msg: `handling ${job.type} job with "init" task` });
logger.info({ msg: `handling ${job.type} job with ${task.type} task` });
const { inputFiles, partsData, additionalParams } = job.parameters;

const validAdditionalParams = this.validateAdditionalParams(additionalParams, swapUpdateAdditionalParamsSchema);
Expand All @@ -51,22 +54,24 @@ export class SwapJobHandler extends JobHandler implements IJobHandler {
const mergeTasks = this.taskBuilder.buildTasks(taskBuildParams);

logger.info({ msg: 'pushing tasks' });
await this.taskBuilder.pushTasks(job.id, mergeTasks);
await this.taskBuilder.pushTasks(job.id, job.type, mergeTasks);

logger.info({ msg: 'Acking task' });
await this.queueClient.ack(job.id, taskId);
await this.queueClient.ack(job.id, task.id);
taskProcessTracking?.success();

await this.updateJobAdditionalParams(job, validAdditionalParams, displayPath);
} catch (err) {
taskProcessTracking?.failure((err as Error).name);
if (err instanceof ZodError) {
const errorMsg = `Failed to validate additionalParams: ${err.message}`;
logger.error({ msg: errorMsg, err });
await this.queueClient.reject(job.id, taskId, false, err.message);
await this.queueClient.reject(job.id, task.id, false, err.message);
return await this.queueClient.jobManagerClient.updateJob(job.id, { status: OperationStatus.FAILED, reason: errorMsg });
}
if (err instanceof Error) {
logger.error({ msg: 'Failed to handle job init', error: err });
await this.queueClient.reject(job.id, taskId, true, err.message);
await this.queueClient.reject(job.id, task.id, true, err.message);
}
}
}
Expand All @@ -76,6 +81,7 @@ export class SwapJobHandler extends JobHandler implements IJobHandler {
task: ITaskResponse<IngestionSwapUpdateFinalizeTaskParams>
): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId: task.id, jobType: job.type, taskType: task.type });
const taskProcessTracking = this.taskMetrics.trackTaskProcessing(job.type, task.type);

try {
logger.info({ msg: `handling ${job.type} job with ${task.type} task` });
Expand All @@ -101,9 +107,11 @@ export class SwapJobHandler extends JobHandler implements IJobHandler {
if (this.isAllStepsCompleted(finalizeTaskParams)) {
logger.info({ msg: 'All finalize steps completed successfully', ...finalizeTaskParams });
await this.completeTaskAndJob(job, task);
taskProcessTracking?.success();
await this.seedingJobCreator.create({ mode: SeedMode.CLEAN, layerName, ingestionJob: job });
}
} catch (err) {
taskProcessTracking?.failure((err as Error).name);
if (err instanceof ZodError) {
const errorMsg = `Failed to validate additionalParams: ${err.message}`;
logger.error({ msg: errorMsg, err });
Expand Down
Loading
Loading