Skip to content

Commit

Permalink
Merge pull request flyteorg#42 from lyft/namespaced-resource-mgr
Browse files Browse the repository at this point in the history
Adding support for cluster-namespaced resource management
  • Loading branch information
bnsblue authored Jan 2, 2020
2 parents 84df181 + 53b2d16 commit 490451f
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 4 deletions.
2 changes: 2 additions & 0 deletions flyteplugins/go/tasks/plugins/hive/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var (
Limit: 200,
LruCacheSize: 2000,
Workers: 15,
ClusterLabels: []string{"default"},
}

quboleConfigSection = pluginsConfig.MustRegisterSubSection(quboleConfigSectionKey, &defaultConfig)
Expand All @@ -48,6 +49,7 @@ type Config struct {
Limit int `json:"quboleLimit" pflag:",Global limit for concurrent Qubole queries"`
LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"`
Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"`
ClusterLabels []string `json:"clusterLabels" pflag:",List of labels of service clusters"`
}

// Retrieves the current config value or default.
Expand Down
24 changes: 22 additions & 2 deletions flyteplugins/go/tasks/plugins/hive/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,24 @@ func ConstructTaskInfo(e ExecutionState) *core.TaskInfo {
return nil
}

func composeResourceNamespaceWithClusterLabel(ctx context.Context, tCtx core.TaskExecutionContext, resourceNamespace core.ResourceNamespace) (core.ResourceNamespace, error) {
_, clusterLabel, _, _, err := GetQueryInfo(ctx, tCtx)
if err != nil {
return resourceNamespace, err
}
return resourceNamespace.CreateSubNamespace(core.ResourceNamespace(clusterLabel)), nil
}

func GetAllocationToken(ctx context.Context, resourceNamespace core.ResourceNamespace, tCtx core.TaskExecutionContext) (ExecutionState, error) {
newState := ExecutionState{}
uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, resourceNamespace, uniqueId)

resourceNamespaceWithClusterLabel, err := composeResourceNamespaceWithClusterLabel(ctx, tCtx, resourceNamespace)
if err != nil {
return newState, errors.Wrapf(errors.ResourceManagerFailure, err, "Error getting query info when requesting allocation token %s", uniqueId)
}

allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, resourceNamespaceWithClusterLabel, uniqueId)
if err != nil {
logger.Errorf(ctx, "Resource manager failed for TaskExecId [%s] token [%s]. error %s",
tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), uniqueId, err)
Expand Down Expand Up @@ -299,7 +313,13 @@ func Abort(ctx context.Context, tCtx core.TaskExecutionContext, currentState Exe
func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, resourceNamespace core.ResourceNamespace, _ ExecutionState) error {
// Release allocation token
uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
err := tCtx.ResourceManager().ReleaseResource(ctx, resourceNamespace, uniqueId)
resourceNamespaceWithClusterLabel, err := composeResourceNamespaceWithClusterLabel(ctx, tCtx, resourceNamespace)
if err != nil {
return errors.Wrapf(errors.ResourceManagerFailure, err, "Error getting query info when releasing allocation token %s", uniqueId)
}

err = tCtx.ResourceManager().ReleaseResource(ctx, resourceNamespaceWithClusterLabel, uniqueId)

if err != nil {
logger.Errorf(ctx, "Error releasing allocation token [%s] in Finalize [%s]", uniqueId, err)
return err
Expand Down
7 changes: 5 additions & 2 deletions flyteplugins/go/tasks/plugins/hive/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,11 @@ func QuboleHiveExecutorLoader(ctx context.Context, iCtx core.SetupContext) (core
return nil, err
}

if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, quboleResourceNamespace, cfg.Limit); err != nil {
return nil, err
for _, cluster := range cfg.ClusterLabels {
clusteredResourceNamespacePrefix := quboleResourceNamespace.CreateSubNamespace(core.ResourceNamespace(cluster))
if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, clusteredResourceNamespacePrefix, cfg.Limit); err != nil {
return nil, err
}
}

return q, nil
Expand Down

0 comments on commit 490451f

Please sign in to comment.