Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/gh 77 scale a container on gke #190

Merged
merged 7 commits into from
Oct 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
### ENHANCEMENTS

* Concurrent workflows and custom commands executions are now allowed except when a deployment/undeployment/scaling operation is in progress ([GH-182](https://github.com/ystia/yorc/issues/182))
* Enable scaling of Kubernetes deployments ([GH-77](https://github.com/ystia/yorc/issues/77))

## 3.1.0-M4 (October 08, 2018)

Expand Down
15 changes: 15 additions & 0 deletions data/tosca/yorc-kubernetes-types.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,22 @@ artifact_types:
node_types:
yorc.nodes.kubernetes.api.types.DeploymentResource:
derived_from: org.alien4cloud.kubernetes.api.types.DeploymentResource
attributes:
replicas:
type: integer
description: >
Current number of replicas for this deployment
interfaces:
org.alien4cloud.management.ClusterControl:
scale:
inputs:
EXPECTED_INSTANCES:
type: integer
INSTANCES_DELTA:
type: integer
implementation:
file: "embedded"
type: yorc.artifacts.Deployment.Kubernetes
Standard:
create:
implementation:
Expand Down
35 changes: 34 additions & 1 deletion prov/kubernetes/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net/url"
"path"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -54,6 +55,7 @@ type k8sResourceOperation int
const (
k8sCreateOperation k8sResourceOperation = iota
k8sDeleteOperation
k8sScaleOperation
)

type execution interface {
Expand Down Expand Up @@ -151,6 +153,8 @@ func (e *executionCommon) execute(ctx context.Context, clientset *kubernetes.Cli
return e.uninstallNode(ctx, clientset)
case "standard.delete":
return e.manageKubernetesResource(ctx, clientset, generator, k8sDeleteOperation)
case "org.alien4cloud.management.clustercontrol.scale":
return e.manageKubernetesResource(ctx, clientset, generator, k8sScaleOperation)
default:
return errors.Errorf("Unsupported operation %q", e.Operation.Name)
}
Expand Down Expand Up @@ -248,7 +252,10 @@ func (e *executionCommon) manageDeploymentResource(ctx context.Context, clientse
if err != nil {
return err
}

err = deployments.SetAttributeForAllInstances(e.kv, e.deploymentID, e.NodeName, "replicas", fmt.Sprint(*deployment.Spec.Replicas))
if err != nil {
return err
}
events.WithContextOptionalFields(ctx).NewLogEntry(events.LogLevelDEBUG, e.deploymentID).Registerf("k8s Deployment %s created in namespace %s", deployment.Name, namespace)
case k8sDeleteOperation:
// Delete Deployment k8s resource
Expand Down Expand Up @@ -286,7 +293,33 @@ func (e *executionCommon) manageDeploymentResource(ctx context.Context, clientse
return err
}
events.WithContextOptionalFields(ctx).NewLogEntry(events.LogLevelINFO, e.deploymentID).Registerf("k8s Namespace %s deleted", namespace)
case k8sScaleOperation:
expectedInstances, err := tasks.GetTaskInput(e.kv, e.taskID, "EXPECTED_INSTANCES")
if err != nil {
return err
}
r, err := strconv.ParseInt(expectedInstances, 10, 32)
if err != nil {
return errors.Wrapf(err, "failed to parse EXPECTED_INSTANCES: %q parameter as integer", expectedInstances)
}
replicas := int32(r)
deploymentRepr.Spec.Replicas = &replicas

deployment, err := clientset.ExtensionsV1beta1().Deployments(namespace).Update(&deploymentRepr)
if err != nil {
return errors.Wrap(err, "failed to update kubernetes deployment for scaling")
}
streamDeploymentLogs(ctx, e.deploymentID, clientset, deployment)

err = waitForDeploymentCompletion(ctx, e.deploymentID, clientset, deployment)
if err != nil {
return err
}
err = deployments.SetAttributeForAllInstances(e.kv, e.deploymentID, e.NodeName, "replicas", expectedInstances)
if err != nil {
return err
}
events.WithContextOptionalFields(ctx).NewLogEntry(events.LogLevelDEBUG, e.deploymentID).Registerf("k8s Deployment %s scaled to %s instances in namespace %s", deployment.Name, expectedInstances, namespace)
default:
return errors.Errorf("Unsupported operation on k8s resource")
}
Expand Down
52 changes: 20 additions & 32 deletions rest/dep_custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
package rest

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"path"
"strconv"

"github.com/ystia/yorc/prov/operations"

"strings"

"github.com/julienschmidt/httprouter"
"github.com/ystia/yorc/deployments"
"github.com/ystia/yorc/helper/consulutil"
"github.com/ystia/yorc/log"
"github.com/ystia/yorc/tasks"
)
Expand All @@ -51,28 +52,30 @@ func (s *Server) newCustomCommandHandler(w http.ResponseWriter, r *http.Request)
log.Panic(err)
}

var inputMap CustomCommandRequest
if err = json.Unmarshal(body, &inputMap); err != nil {
var ccRequest CustomCommandRequest
if err = json.Unmarshal(body, &ccRequest); err != nil {
log.Panic(err)
}
ccRequest.InterfaceName = strings.ToLower(ccRequest.InterfaceName)

inputsName, err := s.getInputNameFromCustom(id, inputMap.NodeName, inputMap.CustomCommandName)
inputsName, err := s.getInputNameFromCustom(id, ccRequest.NodeName, ccRequest.InterfaceName, ccRequest.CustomCommandName)
if err != nil {
log.Panic(err)
}

data := make(map[string]string)

// For now custom commands are for all instances
instances, err := deployments.GetNodeInstancesIds(s.consulClient.KV(), id, inputMap.NodeName)
data[path.Join("nodes", inputMap.NodeName)] = strings.Join(instances, ",")
data["commandName"] = inputMap.CustomCommandName
instances, err := deployments.GetNodeInstancesIds(s.consulClient.KV(), id, ccRequest.NodeName)
data[path.Join("nodes", ccRequest.NodeName)] = strings.Join(instances, ",")
data["commandName"] = ccRequest.CustomCommandName
data["interfaceName"] = ccRequest.InterfaceName

for _, name := range inputsName {
if err != nil {
log.Panic(err)
}
data[path.Join("inputs", name)] = inputMap.Inputs[name].String()
data[path.Join("inputs", name)] = ccRequest.Inputs[name].String()
}

taskID, err := s.tasksCollector.RegisterTaskWithData(id, tasks.TaskTypeCustomCommand, data)
Expand All @@ -88,41 +91,26 @@ func (s *Server) newCustomCommandHandler(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusAccepted)
}

func (s *Server) getInputNameFromCustom(deploymentID, nodeName, customCName string) ([]string, error) {
nodeType, err := deployments.GetNodeType(s.consulClient.KV(), deploymentID, nodeName)

func (s *Server) getInputNameFromCustom(deploymentID, nodeName, interfaceName, customCName string) ([]string, error) {
kv := s.consulClient.KV()
op, err := operations.GetOperation(context.Background(), kv, deploymentID, nodeName, interfaceName+"."+customCName, "", "")
if err != nil {
return nil, err
}

kv := s.consulClient.KV()
kvp, _, err := kv.Keys(path.Join(consulutil.DeploymentKVPrefix, deploymentID, "topology/types", nodeType, "interfaces/custom", customCName, "inputs")+"/", "/", nil)
inputs, err := deployments.GetOperationInputs(kv, deploymentID, op.ImplementedInNodeTemplate, op.ImplementedInType, op.Name)
if err != nil {
log.Panic(err)
}

var result []string

for _, key := range kvp {
res, _, err := kv.Get(path.Join(key, "is_property_definition"), nil)

result := inputs[:0]
for _, inputName := range inputs {
isPropDef, err := deployments.IsOperationInputAPropertyDefinition(kv, deploymentID, op.ImplementedInNodeTemplate, op.ImplementedInType, op.Name, inputName)
if err != nil {
return nil, err
}

if res == nil {
continue
}

isPropDef, err := strconv.ParseBool(string(res.Value))
if err != nil {
return nil, err
}

if isPropDef {
result = append(result, path.Base(key))
result = append(result, inputName)
}
}

return result, nil
}
3 changes: 3 additions & 0 deletions rest/http_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -581,13 +581,16 @@ Request body:
{
"node": "NodeName",
"name": "Custom_Command_Name",
"interface": "fully.qualified.interface.name",
"inputs": {
"index":"",
"nb_replicas":"2"
}
}
```

If omitted interface defaults to `custom`.

**Response**:

```HTTP
Expand Down
1 change: 1 addition & 0 deletions rest/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ type Attribute struct {
type CustomCommandRequest struct {
NodeName string `json:"node"`
CustomCommandName string `json:"name"`
InterfaceName string `json:"interface,omitempty"`
Inputs map[string]*tosca.ValueAssignment `json:"inputs"`
}

Expand Down
1 change: 1 addition & 0 deletions tasks/workflow/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (t *taskExecution) setTaskStatus(ctx context.Context, status tasks.TaskStat
log.Debugf("[WARNING] Failed to set task status to:%q for taskID:%q as last index has been changed before. Retry it", status.String(), t.taskID)
return t.checkAndSetTaskStatus(ctx, status)
}
tasks.EmitTaskEventWithContextualLogs(ctx, t.kv, t.targetID, t.taskID, t.taskType, status.String())
if status == tasks.TaskStatusFAILED {
return t.addTaskErrorFlag(ctx)
}
Expand Down
12 changes: 11 additions & 1 deletion tasks/workflow/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,12 @@ func (w *worker) runCustomCommand(ctx context.Context, t *taskExecution) {
t.checkAndSetTaskStatus(ctx, tasks.TaskStatusFAILED)
return
}
interfaceNameKv, _, err := kv.Get(path.Join(consulutil.TasksPrefix, t.taskID, "interfaceName"), nil)
if err != nil {
log.Printf("Deployment id: %q, Task id: %q, Failed to get Custom command name: %+v", t.targetID, t.taskID, err)
t.checkAndSetTaskStatus(ctx, tasks.TaskStatusFAILED)
return
}
nodes, err := tasks.GetTaskRelatedNodes(kv, t.taskID)
if err != nil {
log.Printf("Deployment id: %q, Task id: %q, Failed to get Custom command node: %+v", t.targetID, t.taskID, err)
Expand All @@ -354,14 +360,18 @@ func (w *worker) runCustomCommand(ctx context.Context, t *taskExecution) {
return
}
nodeName := nodes[0]
interfaceName := "custom"
if interfaceNameKv != nil && len(interfaceNameKv.Value) != 0 {
interfaceName = string(interfaceNameKv.Value)
}
commandName := string(commandNameKv.Value)
nodeType, err := deployments.GetNodeType(w.consulClient.KV(), t.targetID, nodeName)
if err != nil {
log.Printf("Deployment id: %q, Task id: %q, Failed to get Custom command node type: %+v", t.targetID, t.taskID, err)
t.checkAndSetTaskStatus(ctx, tasks.TaskStatusFAILED)
return
}
op, err := operations.GetOperation(ctx, kv, t.targetID, nodeName, "custom."+commandName, "", "")
op, err := operations.GetOperation(ctx, kv, t.targetID, nodeName, interfaceName+"."+commandName, "", "")
if err != nil {
log.Printf("Deployment id: %q, Task id: %q, Command TaskExecution failed for node %q: %+v", t.targetID, t.taskID, nodeName, err)
err = setNodeStatus(ctx, t.kv, t.taskID, t.targetID, nodeName, tosca.NodeStateError.String())
Expand Down