Skip to content

Commit

Permalink
Using features in telemetry (#1047)
Browse files Browse the repository at this point in the history
- Moving root flow metrics to features
- Switching actions to use newTrace, which starts a trace if it is not currently inside something.
- Improving instrumentation to always assign a feature name
  • Loading branch information
maxl0rd authored Oct 15, 2024
1 parent 8c5d2dd commit 5eacd6d
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 230 deletions.
8 changes: 3 additions & 5 deletions js/core/src/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
import { parseSchema } from './schema.js';
import {
SPAN_TYPE_ATTR,
runInNewSpan,
newTrace,
setCustomMetadataAttributes,
} from './tracing.js';

Expand Down Expand Up @@ -129,11 +129,9 @@ export function action<
schema: config.inputSchema,
jsonSchema: config.inputJsonSchema,
});
let output = await runInNewSpan(
let output = await newTrace(
{
metadata: {
name: actionName,
},
name: actionName,
labels: {
[SPAN_TYPE_ATTR]: 'action',
},
Expand Down
6 changes: 2 additions & 4 deletions js/core/src/tracing/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,11 @@ export async function newTrace<T>(
fn: (metadata: SpanMetadata, rootSpan: ApiSpan) => Promise<T>
) {
ensureBasicTelemetryInstrumentation();
const traceMetadata = traceMetadataAls.getStore() || {
const traceMetadata: TraceMetadata = traceMetadataAls.getStore() || {
paths: new Set<PathMetadata>(),
timestamp: performance.now(),
featureName: opts.name,
};
if (opts.labels && opts.labels[SPAN_TYPE_ATTR] === 'flow') {
traceMetadata.flowName = opts.name;
}
return await traceMetadataAls.run(traceMetadata, () =>
runInNewSpan(
{
Expand Down
2 changes: 1 addition & 1 deletion js/core/src/tracing/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export const PathMetadataSchema = z.object({
export type PathMetadata = z.infer<typeof PathMetadataSchema>;

export const TraceMetadataSchema = z.object({
flowName: z.string().optional(),
featureName: z.string().optional(),
paths: z.set(PathMetadataSchema).optional(),
timestamp: z.number(),
});
Expand Down
7 changes: 7 additions & 0 deletions js/core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,10 @@ export function isDevEnv(): boolean {
export function flowMetadataPrefix(name: string) {
return `flow:${name}`;
}

/**
* Adds flow-specific prefix for OpenTelemetry span attributes.
*/
export function featureMetadataPrefix(name: string) {
return `feature:${name}`;
}
37 changes: 24 additions & 13 deletions js/plugins/google-cloud/src/gcpOpenTelemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ import { PathMetadata } from 'genkit/tracing';
import { actionTelemetry } from './telemetry/action.js';
import { engagementTelemetry } from './telemetry/engagement.js';
import { featuresTelemetry } from './telemetry/feature.js';
import { flowsTelemetry } from './telemetry/flow.js';
import { generateTelemetry } from './telemetry/generate.js';
import { pathsTelemetry } from './telemetry/path.js';
import { GcpTelemetryConfig } from './types';
import { extractErrorName } from './utils';

Expand Down Expand Up @@ -241,7 +241,7 @@ class AdjustingTraceExporter implements SpanExporter {
return spans.map((span) => {
this.tickTelemetry(span, allLeafPaths);

span = this.redactPii(span);
span = this.redactInputOutput(span);
span = this.markErrorSpanAsError(span);
span = this.markFailedAction(span);
span = this.markGenkitFeature(span);
Expand All @@ -260,22 +260,33 @@ class AdjustingTraceExporter implements SpanExporter {
const type = attributes['genkit:type'] as string;
const subtype = attributes['genkit:metadata:subtype'] as string;
const isRoot = !!span.attributes['genkit:isRoot'];
const unused: Set<PathMetadata> = new Set();

if (type === 'flow') {
flowsTelemetry.tick(span, paths, this.logIO, this.projectId);
} else if (type === 'action' && subtype === 'model') {
generateTelemetry.tick(span, paths, this.logIO, this.projectId);
} else if (type === 'action' || type == 'flowStep') {
actionTelemetry.tick(span, paths, this.logIO, this.projectId);
} else if (type === 'userEngagement') {
engagementTelemetry.tick(span, paths, this.logIO, this.projectId);
}
if (isRoot) {
featuresTelemetry.tick(span, paths, this.logIO, this.projectId);
// Report top level feature request and latency only for root spans
// Log input to and output from to the feature
featuresTelemetry.tick(span, unused, this.logIO, this.projectId);
// Report executions and latency for all flow paths only on the root span
pathsTelemetry.tick(span, paths, this.logIO, this.projectId);
}
if (type === 'action' && subtype === 'model') {
// Report generate metrics () for all model actions
generateTelemetry.tick(span, unused, this.logIO, this.projectId);
}
if (type === 'action' && subtype === 'tool') {
// TODO: Report input and output for tool actions
}
if (type === 'action' || type === 'flow' || type == 'flowStep') {
// Report request and latency metrics for all actions
actionTelemetry.tick(span, unused, this.logIO, this.projectId);
}
if (type === 'userEngagement') {
// Report user acceptance and feedback metrics
engagementTelemetry.tick(span, unused, this.logIO, this.projectId);
}
}

private redactPii(span: ReadableSpan): ReadableSpan {
private redactInputOutput(span: ReadableSpan): ReadableSpan {
const hasInput = 'genkit:input' in span.attributes;
const hasOutput = 'genkit:output' in span.attributes;

Expand Down
22 changes: 12 additions & 10 deletions js/plugins/google-cloud/src/telemetry/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
Telemetry,
internalMetricNamespaceWrap,
} from '../metrics.js';
import { extractErrorName, extractOuterFlowNameFromPath } from '../utils';
import { extractErrorName, extractOuterFeatureNameFromPath } from '../utils';

class ActionTelemetry implements Telemetry {
/**
Expand Down Expand Up @@ -55,35 +55,37 @@ class ActionTelemetry implements Telemetry {

const actionName = (attributes['genkit:name'] as string) || '<unknown>';
const path = (attributes['genkit:path'] as string) || '<unknown>';
const flowName =
(attributes['genkit:metadata:flow:name'] as string) ||
extractOuterFlowNameFromPath(path);
let featureName = (attributes['genkit:metadata:flow:name'] ||
extractOuterFeatureNameFromPath(path)) as string;
if (!featureName || featureName === '<unknown>') {
featureName = actionName;
}
const state = attributes['genkit:state'] || 'success';
const latencyMs = hrTimeToMilliseconds(
hrTimeDuration(span.startTime, span.endTime)
);
const errorName = extractErrorName(span.events);

if (state === 'success') {
this.writeSuccess(actionName, flowName, path, latencyMs);
this.writeSuccess(actionName, featureName, path, latencyMs);
return;
}
if (state === 'error') {
this.writeFailure(actionName, flowName, path, latencyMs, errorName);
this.writeFailure(actionName, featureName, path, latencyMs, errorName);
}

logger.warn(`Unknown action state; ${state}`);
}

private writeSuccess(
actionName: string,
flowName: string,
featureName: string,
path: string,
latencyMs: number
) {
const dimensions = {
name: actionName,
flowName,
featureName,
path,
status: 'success',
source: 'ts',
Expand All @@ -95,14 +97,14 @@ class ActionTelemetry implements Telemetry {

private writeFailure(
actionName: string,
flowName: string,
featureName: string,
path: string,
latencyMs: number,
errorName?: string
) {
const dimensions = {
name: actionName,
flowName,
featureName,
path,
source: 'ts',
sourceVersion: GENKIT_VERSION,
Expand Down
45 changes: 37 additions & 8 deletions js/plugins/google-cloud/src/telemetry/feature.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import { hrTimeDuration, hrTimeToMilliseconds } from '@opentelemetry/core';
import { ReadableSpan } from '@opentelemetry/sdk-trace-base';
import { GENKIT_VERSION, GenkitError } from 'genkit';
import { logger } from 'genkit/logging';
import { PathMetadata } from 'genkit/tracing';
import { PathMetadata, toDisplayPath } from 'genkit/tracing';
import {
MetricCounter,
MetricHistogram,
Telemetry,
internalMetricNamespaceWrap,
} from '../metrics';
import { extractErrorName } from '../utils';
import { createCommonLogAttributes, extractErrorName } from '../utils';

class FeaturesTelemetry implements Telemetry {
/**
Expand All @@ -53,6 +53,7 @@ class FeaturesTelemetry implements Telemetry {
): void {
const attributes = span.attributes;
const name = attributes['genkit:name'] as string;
const path = attributes['genkit:path'] as string;
const latencyMs = hrTimeToMilliseconds(
hrTimeDuration(span.startTime, span.endTime)
);
Expand All @@ -67,17 +68,24 @@ class FeaturesTelemetry implements Telemetry {

if (state === 'success') {
this.writeFeatureSuccess(name, latencyMs);
return;
}

if (state === 'error') {
} else if (state === 'error') {
const errorName = extractErrorName(span.events) || '<unknown>';

this.writeFeatureFailure(name, latencyMs, errorName);
} else {
logger.warn(`Unknown state; ${state}`);
return;
}

logger.warn(`Unknown state; ${state}`);
if (logIO) {
const input = attributes['genkit:input'] as string;
const output = attributes['genkit:output'] as string;
if (input) {
this.recordIO(span, 'Input', name, path, input, projectId);
}
if (output) {
this.recordIO(span, 'Output', name, path, output, projectId);
}
}
}

private writeFeatureSuccess(featureName: string, latencyMs: number) {
Expand Down Expand Up @@ -106,6 +114,27 @@ class FeaturesTelemetry implements Telemetry {
this.featureCounter.add(1, dimensions);
this.featureLatencies.record(latencyMs, dimensions);
}

private recordIO(
span: ReadableSpan,
tag: string,
featureName: string,
qualifiedPath: string,
input: string,
projectId?: string
) {
const path = toDisplayPath(qualifiedPath);
const sharedMetadata = {
...createCommonLogAttributes(span, projectId),
path,
qualifiedPath,
featureName,
};
logger.logStructured(`${tag}[${path}, ${featureName}]`, {
...sharedMetadata,
content: input,
});
}
}

const featuresTelemetry = new FeaturesTelemetry();
Expand Down
Loading

0 comments on commit 5eacd6d

Please sign in to comment.