Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flytekit metrics note #1

Open
Yicheng-Lu-llll opened this issue May 10, 2023 · 1 comment
Open

flytekit metrics note #1

Yicheng-Lu-llll opened this issue May 10, 2023 · 1 comment

Comments

@Yicheng-Lu-llll
Copy link
Owner

Yicheng-Lu-llll commented May 10, 2023

Current frame work:
flyteorg/flyte#3272

propeller:
https://github.com/flyteorg/flytepropeller/blob/f4cadb0062320086096306c8d6e5a1e412090ef8/events/admin_eventsink.go#LL43C1-L43C1

func (s *adminEventSink) Sink(ctx context.Context, message proto.Message) error {

flyteidl:
https://github.com/flyteorg/flyteidl/pull/367/files

  // Fetches runtime metrics for a :ref:`ref_flyteidl.admin.Execution`.
  rpc GetExecutionMetrics (flyteidl.admin.WorkflowExecutionGetMetricsRequest) returns (flyteidl.admin.WorkflowExecutionGetMetricsResponse) {
    option (google.api.http) = {
      get: "/api/v1/metrics/executions/{id.project}/{id.domain}/{id.name}"
    };
    // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = {
    //   description: "Retrieve metrics from an existing workflow execution."
    // };
  };
}
// WorkflowExecutionGetMetricsRequest represents a request to retrieve metrics for the specified workflow execution.
message WorkflowExecutionGetMetricsRequest {
    // id defines the workflow execution to query for.
    core.WorkflowExecutionIdentifier id = 1;

    // depth defines the number of Flyte entity levels to traverse when breaking down execution details.
    int32 depth = 2;
}
// WorkflowExecutionGetMetricsResponse represents the response containing metrics for the specified workflow execution.
message WorkflowExecutionGetMetricsResponse {
    // Span defines the top-level breakdown of the workflows execution. More precise information is nested in a
    // hierarchical structure using Flyte entity references.
    core.Span span = 1;
}
/ Span represents a duration trace of Flyte execution. The id field denotes a Flyte execution entity or an operation
// which uniquely identifies the Span. The spans attribute allows this Span to be further broken down into more
// precise definitions.
message Span {
    // start_time defines the instance this span began.
    google.protobuf.Timestamp start_time = 1;

    // end_time defines the instance this span completed.
    google.protobuf.Timestamp end_time = 2;

    oneof id {
        // workflow_id is the id of the workflow execution this Span represents.
        flyteidl.core.WorkflowExecutionIdentifier workflow_id = 3;

        // node_id is the id of the node execution this Span represents.
        flyteidl.core.NodeExecutionIdentifier node_id = 4;

        // task_id is the id of the task execution this Span represents.
        flyteidl.core.TaskExecutionIdentifier task_id = 5;

        // operation_id is the id of a unique operation that this Span represents.
        string operation_id = 6;
    }

    // spans defines a collection of Spans that breakdown this execution.
    repeated Span spans = 7;
}

flyteadmin:
https://github.com/flyteorg/flyteadmin/pull/524/files
receive event. reportedAt

func (m *AdminService) GetExecutionMetrics(
	ctx context.Context, request *admin.WorkflowExecutionGetMetricsRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) {
	defer m.interceptPanic(ctx, request)
	if request == nil {
		return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed")
	}
	var response *admin.WorkflowExecutionGetMetricsResponse
	var err error
	m.Metrics.executionEndpointMetrics.getMetrics.Time(func() {
		response, err = m.MetricsManager.GetExecutionMetrics(ctx, *request)
	})
	if err != nil {
		return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.getMetrics)
	}
	m.Metrics.executionEndpointMetrics.getMetrics.Success()
	return response, nil
}

https://github.com/flyteorg/flyteadmin/blob/master/pkg/manager/impl/metrics_manager.go

// GetExecutionMetrics returns a Span hierarchically breaking down the workflow execution into a collection of
// Categorical and Reference Spans.
func (m *MetricsManager) GetExecutionMetrics(ctx context.Context,
	request admin.WorkflowExecutionGetMetricsRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) {

	// retrieve workflow execution
	executionRequest := admin.WorkflowExecutionGetRequest{Id: request.Id}
	execution, err := m.executionManager.GetExecution(ctx, executionRequest)
	if err != nil {
		return nil, err
	}

	span, err := m.parseExecution(ctx, execution, int(request.Depth))
	if err != nil {
		return nil, err
	}

	return &admin.WorkflowExecutionGetMetricsResponse{Span: span}, nil
}

flyteconsole:
https://github.com/flyteorg/flyteconsole/pull/747/files

export const fetchExecutionMetrics = async (
  id: WorkflowExecutionIdentifier,
  depth: number,
  apiContext: APIContextValue,
) => {
  const { getExecutionMetrics } = apiContext;
  const metrics = await getExecutionMetrics(id, {
    params: {
      depth,
    },
  });
  return metrics;
};
@Yicheng-Lu-llll
Copy link
Owner Author

Yicheng-Lu-llll commented May 10, 2023

what is WorkflowExecutionIdentifier, NodeExecutionIdentifier and TaskExecutionIdentifier?

// Encapsulation of fields that uniquely identifies a Flyte workflow execution
message WorkflowExecutionIdentifier {
    // Name of the project the resource belongs to.
    string project              = 1;

    // Name of the domain the resource belongs to.
    // A domain can be considered as a subset within a specific project.
    string domain               = 2;

    // User or system provided value for the resource.
    string name                 = 4;
}

// Encapsulation of fields that identify a Flyte node execution entity.
message NodeExecutionIdentifier {
    string node_id           = 1;

    WorkflowExecutionIdentifier execution_id        = 2;
}

// Encapsulation of fields that identify a Flyte task execution entity.
message TaskExecutionIdentifier {
    core.Identifier task_id           = 1;

    core.NodeExecutionIdentifier node_execution_id = 2;

    uint32 retry_attempt     = 3;
}
// Encapsulation of fields that uniquely identifies a Flyte resource.
message Identifier {
    // Identifies the specific type of resource that this identifier corresponds to.
    core.ResourceType resource_type  = 1;

    // Name of the project the resource belongs to.
    string project              = 2;

    // Name of the domain the resource belongs to.
    // A domain can be considered as a subset within a specific project.
    string domain               = 3;

    // User provided value for the resource.
    string name                 = 4;

    // Specific version of the resource.
    string version              = 5;
}

@Yicheng-Lu-llll Yicheng-Lu-llll changed the title flytekit metrics explosion flytekit metrics note May 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant