Skip to content

Commit

Permalink
Correct Lint Errors and Add Documentation on Pre-Commit (#19)
Browse files Browse the repository at this point in the history
* README update

* Fix lint errors
  • Loading branch information
matthewphsmith authored Oct 22, 2019
1 parent 91a82e1 commit c633f76
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 38 deletions.
18 changes: 18 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,21 @@ administering workflow executions. Flyteadmin implements the
`AdminService <https://github.com/lyft/flyteidl/blob/master/protos/flyteidl/service/admin.proto>`_ which
defines a stateless REST/gRPC service for interacting with registered Flyte entities and executions.
Flyteadmin uses a relational style Metadata Store abstracted by `GORM <http://gorm.io/>`_ ORM library.

Before Check-In
~~~~~~~~~~~~~~~

Flyte Admin has a few useful make targets for linting and testing. Please use these before checking in to help suss out
minor bugs and linting errors.

.. code-block:: console
make goimports
.. code-block:: console
make test_unit
.. code-block:: console
make lint
40 changes: 20 additions & 20 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,14 @@ func (m *ExecutionManager) offloadInputs(ctx context.Context, literalMap *core.L
if literalMap == nil {
literalMap = &core.LiteralMap{}
}
inputsUri, err := m.storageClient.ConstructReference(ctx, m.storageClient.GetBaseContainerFQN(ctx), shared.Metadata, identifier.Project, identifier.Domain, identifier.Name, key)
inputsURI, err := m.storageClient.ConstructReference(ctx, m.storageClient.GetBaseContainerFQN(ctx), shared.Metadata, identifier.Project, identifier.Domain, identifier.Name, key)
if err != nil {
return "", err
}
if err := m.storageClient.WriteProtobuf(ctx, inputsUri, storage.Options{}, literalMap); err != nil {
if err := m.storageClient.WriteProtobuf(ctx, inputsURI, storage.Options{}, literalMap); err != nil {
return "", err
}
return inputsUri, nil
return inputsURI, nil
}

func (m *ExecutionManager) launchExecutionAndPrepareModel(
Expand Down Expand Up @@ -231,11 +231,11 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
// Dynamically assign execution queues.
m.populateExecutionQueue(ctx, *workflow.Id, workflow.Closure.CompiledWorkflow)

inputsUri, err := m.offloadInputs(ctx, executionInputs, &workflowExecutionID, shared.Inputs)
inputsURI, err := m.offloadInputs(ctx, executionInputs, &workflowExecutionID, shared.Inputs)
if err != nil {
return nil, err
}
userInputsUri, err := m.offloadInputs(ctx, request.Inputs, &workflowExecutionID, shared.UserInputs)
userInputsURI, err := m.offloadInputs(ctx, request.Inputs, &workflowExecutionID, shared.UserInputs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -289,8 +289,8 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
WorkflowIdentifier: workflow.Id,
ParentNodeExecutionID: parentNodeExecutionID,
Cluster: execInfo.Cluster,
InputsUri: inputsUri,
UserInputsUri: userInputsUri,
InputsURI: inputsURI,
UserInputsURI: userInputsURI,
})
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
Expand Down Expand Up @@ -359,9 +359,9 @@ func (m *ExecutionManager) RelaunchExecution(
executionSpec.Metadata = &admin.ExecutionMetadata{}
}
var inputs *core.LiteralMap
if len(existingExecutionModel.UserInputsUri) > 0 {
if len(existingExecutionModel.UserInputsURI) > 0 {
inputs = &core.LiteralMap{}
if err := m.storageClient.ReadProtobuf(ctx, existingExecutionModel.UserInputsUri, inputs); err != nil {
if err := m.storageClient.ReadProtobuf(ctx, existingExecutionModel.UserInputsURI, inputs); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -428,9 +428,9 @@ func (m *ExecutionManager) emitScheduledWorkflowMetrics(
}

var inputs core.LiteralMap
err = m.storageClient.ReadProtobuf(ctx, storage.DataReference(executionModel.InputsUri), &inputs)
err = m.storageClient.ReadProtobuf(ctx, executionModel.InputsURI, &inputs)
if err != nil {
logger.Errorf(ctx, "Failed to find inputs for emitting schedule delay event from uri: [%v]", executionModel.InputsUri)
logger.Errorf(ctx, "Failed to find inputs for emitting schedule delay event from uri: [%v]", executionModel.InputsURI)
return
}
scheduledKickoffTimeProto := inputs.Literals[launchPlan.Spec.EntityMetadata.Schedule.KickoffTimeInputArg]
Expand Down Expand Up @@ -633,16 +633,16 @@ func (m *ExecutionManager) GetExecution(
// TO BE DELETED
// TODO: Remove the publishing to deprecated fields (Inputs) after a smooth migration has been completed of our existing users
// For now, publish to deprecated fields thus ensuring old clients don't break when calling GetExecution
if len(executionModel.InputsUri) > 0 {
if len(executionModel.InputsURI) > 0 {
var inputs core.LiteralMap
if err := m.storageClient.ReadProtobuf(ctx, executionModel.InputsUri, &inputs); err != nil {
if err := m.storageClient.ReadProtobuf(ctx, executionModel.InputsURI, &inputs); err != nil {
return nil, err
}
execution.Closure.ComputedInputs = &inputs
}
if len(executionModel.UserInputsUri) > 0 {
if len(executionModel.UserInputsURI) > 0 {
var userInputs core.LiteralMap
if err := m.storageClient.ReadProtobuf(ctx, executionModel.UserInputsUri, &userInputs); err != nil {
if err := m.storageClient.ReadProtobuf(ctx, executionModel.UserInputsURI, &userInputs); err != nil {
return nil, err
}
execution.Spec.Inputs = &userInputs
Expand Down Expand Up @@ -672,29 +672,29 @@ func (m *ExecutionManager) GetExecutionData(
}
}
// Prior to flyteidl v0.15.0, Inputs were held in ExecutionClosure and were not offloaded. Ensure we can return the inputs as expected.
if len(executionModel.InputsUri) == 0 {
if len(executionModel.InputsURI) == 0 {
closure := &admin.ExecutionClosure{}
// We must not use the FromExecutionModel method because it empties deprecated fields.
if err := proto.Unmarshal(executionModel.Closure, closure); err != nil {
return nil, err
}
newInputsUri, err := m.offloadInputs(ctx, closure.ComputedInputs, request.Id, shared.Inputs)
newInputsURI, err := m.offloadInputs(ctx, closure.ComputedInputs, request.Id, shared.Inputs)
if err != nil {
return nil, err
}
// Update model so as not to offload again.
executionModel.InputsUri = newInputsUri
executionModel.InputsURI = newInputsURI
if err := m.db.ExecutionRepo().UpdateExecution(ctx, *executionModel); err != nil {
return nil, err
}
}
inputsUrlBlob, err := m.urlData.Get(ctx, executionModel.InputsUri.String())
inputsURLBlob, err := m.urlData.Get(ctx, executionModel.InputsURI.String())
if err != nil {
return nil, err
}
return &admin.WorkflowExecutionGetDataResponse{
Outputs: &signedOutputsURLBlob,
Inputs: &inputsUrlBlob,
Inputs: &inputsURLBlob,
}, nil
}

Expand Down
25 changes: 14 additions & 11 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var mockPublisher notificationMocks.MockPublisher
var mockExecutionRemoteURL = dataMocks.NewMockRemoteURL()
var requestedAt = time.Now()
var testCluster = "C1"
var outputURI = "output uri"

func getLegacySpec() *admin.ExecutionSpec {
executionRequest := testutils.GetExecutionRequest()
Expand Down Expand Up @@ -150,7 +151,7 @@ func getMockStorageForExecTest(ctx context.Context) *storage.DataStore {
_ = proto.Unmarshal(val, msg)
return nil
}
return errors.New(fmt.Sprintf("Could not find value in storage [%v]\n", reference.String()))
return fmt.Errorf("could not find value in storage [%v]", reference.String())
}
mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).WriteProtobufCb = func(
ctx context.Context, reference storage.DataReference, opts storage.Options, msg proto.Message) error {
Expand All @@ -162,7 +163,9 @@ func getMockStorageForExecTest(ctx context.Context) *storage.DataStore {
return nil
}
workflowClosure := testutils.GetWorkflowClosure()
mockStorage.WriteProtobuf(ctx, storage.DataReference(remoteClosureIdentifier), defaultStorageOptions, workflowClosure)
if err := mockStorage.WriteProtobuf(ctx, storage.DataReference(remoteClosureIdentifier), defaultStorageOptions, workflowClosure); err != nil {
return nil
}
return mockStorage
}

Expand Down Expand Up @@ -475,10 +478,10 @@ func TestCreateExecutionVerifyDbModel(t *testing.T) {
assert.Nil(t, closureValue.ComputedInputs)

var userInputs, inputs core.LiteralMap
if err := storageClient.ReadProtobuf(ctx, storage.DataReference(input.UserInputsUri), &userInputs); err != nil {
if err := storageClient.ReadProtobuf(ctx, input.UserInputsURI, &userInputs); err != nil {
return err
}
if err := storageClient.ReadProtobuf(ctx, storage.DataReference(input.InputsUri), &inputs); err != nil {
if err := storageClient.ReadProtobuf(ctx, input.InputsURI, &inputs); err != nil {
return err
}
fooValue := utils.MustMakeLiteral("foo-value-1")
Expand Down Expand Up @@ -1884,7 +1887,7 @@ func TestGetExecutionData(t *testing.T) {
OutputResult: &admin.ExecutionClosure_Outputs{
Outputs: &admin.LiteralMapBlob{
Data: &admin.LiteralMapBlob_Uri{
Uri: "output uri",
Uri: outputURI,
},
},
},
Expand All @@ -1904,13 +1907,13 @@ func TestGetExecutionData(t *testing.T) {
LaunchPlanID: uint(1),
WorkflowID: uint(2),
StartedAt: &startedAt,
InputsUri: shared.Inputs,
InputsURI: shared.Inputs,
}, nil
}
mockExecutionRemoteURL := dataMocks.NewMockRemoteURL()
mockExecutionRemoteURL.(*dataMocks.MockRemoteURL).GetCallback = func(
ctx context.Context, uri string) (admin.UrlBlob, error) {
if uri == "output uri" {
if uri == outputURI {
return admin.UrlBlob{
Url: "outputs",
Bytes: 200,
Expand Down Expand Up @@ -2042,8 +2045,8 @@ func TestGetExecution_LegacyClient_OffloadedData(t *testing.T) {
LaunchPlanID: uint(1),
WorkflowID: uint(2),
StartedAt: &startedAt,
UserInputsUri: shared.UserInputs,
InputsUri: shared.Inputs,
UserInputsURI: shared.UserInputs,
InputsURI: shared.Inputs,
}, nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)
Expand All @@ -2069,7 +2072,7 @@ func TestGetExecutionData_LegacyModel(t *testing.T) {
closure.OutputResult = &admin.ExecutionClosure_Outputs{
Outputs: &admin.LiteralMapBlob{
Data: &admin.LiteralMapBlob_Uri{
Uri: "output uri",
Uri: outputURI,
},
},
}
Expand All @@ -2093,7 +2096,7 @@ func TestGetExecutionData_LegacyModel(t *testing.T) {
mockExecutionRemoteURL := dataMocks.NewMockRemoteURL()
mockExecutionRemoteURL.(*dataMocks.MockRemoteURL).GetCallback = func(
ctx context.Context, uri string) (admin.UrlBlob, error) {
if uri == "output uri" {
if uri == outputURI {
return admin.UrlBlob{
Url: "outputs",
Bytes: 200,
Expand Down
2 changes: 1 addition & 1 deletion pkg/repositories/config/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ var Migrations = []*gormigrate.Migration{
return tx.AutoMigrate(&models.Execution{}).Error
},
Rollback: func(tx *gorm.DB) error {
return tx.Exec("ALTER TABLE executions DROP COLUMN IF EXISTS InputsUri, DROP COLUMN IF EXISTS UserInputsUri").Error
return tx.Exec("ALTER TABLE executions DROP COLUMN IF EXISTS InputsURI, DROP COLUMN IF EXISTS UserInputsURI").Error
},
},
}
4 changes: 2 additions & 2 deletions pkg/repositories/models/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Execution struct {
// Cluster where execution was triggered
Cluster string
// Offloaded location of inputs LiteralMap. These are the inputs evaluated and contain applied defaults.
InputsUri storage.DataReference
InputsURI storage.DataReference
// User specified inputs. This map might be incomplete and not include defaults applied
UserInputsUri storage.DataReference
UserInputsURI storage.DataReference
}
8 changes: 4 additions & 4 deletions pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type CreateExecutionModelInput struct {
WorkflowIdentifier *core.Identifier
ParentNodeExecutionID uint
Cluster string
InputsUri storage.DataReference
UserInputsUri storage.DataReference
InputsURI storage.DataReference
UserInputsURI storage.DataReference
}

// Transforms a ExecutionCreateRequest to a Execution model
Expand Down Expand Up @@ -74,8 +74,8 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
ExecutionUpdatedAt: &input.CreatedAt,
ParentNodeExecutionID: input.ParentNodeExecutionID,
Cluster: input.Cluster,
InputsUri: input.InputsUri,
UserInputsUri: input.UserInputsUri,
InputsURI: input.InputsURI,
UserInputsURI: input.UserInputsURI,
}
if input.RequestSpec.Metadata != nil {
executionModel.Mode = int32(input.RequestSpec.Metadata.Mode)
Expand Down

0 comments on commit c633f76

Please sign in to comment.