Skip to content

Commit

Permalink
Add nil checks and avoid nil returns (flyteorg#297)
Browse files Browse the repository at this point in the history
  • Loading branch information
EngHabu authored Nov 24, 2021
1 parent 6f1ea7d commit e79dbf7
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 15 deletions.
17 changes: 14 additions & 3 deletions pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,13 +432,15 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
if err := validation.ValidateNodeExecutionIdentifier(request.Id); err != nil {
logger.Debugf(ctx, "can't get node execution data with invalid identifier [%+v]: %v", request.Id, err)
}

ctx = getNodeExecutionContext(ctx, request.Id)
nodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, request.Id)
if err != nil {
logger.Debugf(ctx, "Failed to get node execution with id [%+v] with err %v",
request.Id, err)
return nil, err
}

nodeExecution, err := transformers.FromNodeExecutionModel(*nodeExecutionModel)
if err != nil {
logger.Debugf(ctx, "failed to transform node execution model [%+v] when fetching data: %v", request.Id, err)
Expand All @@ -450,11 +452,13 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
if err != nil {
return nil, err
}

outputs, outputURLBlob, err := util.GetOutputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, nodeExecution.Closure)
if err != nil {
return nil, err
}

response := &admin.NodeExecutionGetDataResponse{
Inputs: inputURLBlob,
Outputs: outputURLBlob,
Expand All @@ -469,9 +473,16 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
return nil, errors.NewFlyteAdminErrorf(codes.Internal,
"Unable to read WorkflowClosure from location %s : %v", nodeExecutionModel.DynamicWorkflowRemoteClosureReference, err)
}
response.DynamicWorkflow = &admin.DynamicWorkflowNodeMetadata{
Id: closure.Primary.Template.Id,
CompiledWorkflow: closure,

if wf := closure.Primary; wf == nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Empty primary workflow definition in loaded dynamic workflow model.")
} else if template := wf.Template; template == nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Empty primary workflow template in loaded dynamic workflow model.")
} else {
response.DynamicWorkflow = &admin.DynamicWorkflowNodeMetadata{
Id: closure.Primary.Template.Id,
CompiledWorkflow: closure,
}
}
}

Expand Down
26 changes: 15 additions & 11 deletions pkg/manager/impl/util/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ func shouldFetchOutputData(config *runtimeInterfaces.RemoteDataConfig, urlBlob a
return len(outputURI) > 0 && shouldFetchData(config, urlBlob)
}

// Returns an inputs URL blob and if config settings permit, inline inputs data for an execution.
// GetInputs returns an inputs URL blob and if config settings permit, inline inputs data for an execution.
func GetInputs(ctx context.Context, urlData dataInterfaces.RemoteURLInterface,
remoteDataConfig *runtimeInterfaces.RemoteDataConfig, storageClient *storage.DataStore, inputURI string) (
*core.LiteralMap, *admin.UrlBlob, error) {
var inputsURLBlob admin.UrlBlob
var fullInputs core.LiteralMap

if len(inputURI) == 0 {
return nil, nil, nil
return &fullInputs, &inputsURLBlob, nil
}
var inputsURLBlob admin.UrlBlob

var err error
if remoteDataConfig.SignedURL.Enabled {
inputsURLBlob, err = urlData.Get(ctx, inputURI)
Expand All @@ -38,7 +41,6 @@ func GetInputs(ctx context.Context, urlData dataInterfaces.RemoteURLInterface,
}
}

var fullInputs core.LiteralMap
if shouldFetchData(remoteDataConfig, inputsURLBlob) {
err = storageClient.ReadProtobuf(ctx, storage.DataReference(inputURI), &fullInputs)
if err != nil {
Expand All @@ -50,7 +52,7 @@ func GetInputs(ctx context.Context, urlData dataInterfaces.RemoteURLInterface,
return &fullInputs, &inputsURLBlob, nil
}

// Defines common methods in NodeExecutionClosure and TaskExecutionClosure used to return output data.
// ExecutionClosure defines common methods in NodeExecutionClosure and TaskExecutionClosure used to return output data.
type ExecutionClosure interface {
GetOutputUri() string //nolint
GetOutputData() *core.LiteralMap
Expand Down Expand Up @@ -78,22 +80,24 @@ func (c workflowExecutionClosure) GetOutputData() *core.LiteralMap {
return c.ExecutionClosure.GetOutputData()
}

// Converts a workflow execution closure to an implementation of the ExecutionClosure interface
// for use in producing execution output data.
// ToExecutionClosureInterface converts a workflow execution closure to an implementation of the ExecutionClosure
// interface for use in producing execution output data.
func ToExecutionClosureInterface(closure *admin.ExecutionClosure) ExecutionClosure {
return &workflowExecutionClosure{
ExecutionClosure: closure,
}
}

// Returns an outputs URL blob and if config settings permit, inline outputs data for an execution.
// GetOutputs returns an outputs URL blob and if config settings permit, inline outputs data for an execution.
func GetOutputs(ctx context.Context, urlData dataInterfaces.RemoteURLInterface,
remoteDataConfig *runtimeInterfaces.RemoteDataConfig, storageClient *storage.DataStore, closure ExecutionClosure) (
*core.LiteralMap, *admin.UrlBlob, error) {
var outputsURLBlob admin.UrlBlob
var fullOutputs = &core.LiteralMap{}
if closure == nil {
return nil, nil, nil
return fullOutputs, &outputsURLBlob, nil
}
var outputsURLBlob admin.UrlBlob

if len(closure.GetOutputUri()) > 0 && remoteDataConfig.SignedURL.Enabled {
var err error
outputsURLBlob, err = urlData.Get(ctx, closure.GetOutputUri())
Expand All @@ -102,7 +106,6 @@ func GetOutputs(ctx context.Context, urlData dataInterfaces.RemoteURLInterface,
}
}

var fullOutputs = &core.LiteralMap{}
if closure.GetOutputData() != nil {
if int64(proto.Size(closure.GetOutputData())) < remoteDataConfig.MaxSizeInBytes {
fullOutputs = closure.GetOutputData()
Expand All @@ -117,5 +120,6 @@ func GetOutputs(ctx context.Context, urlData dataInterfaces.RemoteURLInterface,
logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v", closure.GetOutputUri(), err)
}
}

return fullOutputs, &outputsURLBlob, nil
}
2 changes: 1 addition & 1 deletion pkg/rpc/adminservice/util/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type responseCodeMetrics struct {
responseCodeCounters map[codes.Code]prometheus.Counter
}

// Per-endpoint request metrics.
// RequestMetrics per-endpoint request metrics.
type RequestMetrics struct {
scope promutils.Scope

Expand Down

0 comments on commit e79dbf7

Please sign in to comment.