diff --git a/cmd/cli/flavor.go b/cmd/cli/flavor.go index 42d407cd2..d86528995 100644 --- a/cmd/cli/flavor.go +++ b/cmd/cli/flavor.go @@ -34,7 +34,7 @@ func flavorPluginCommand(plugins func() discovery.Plugins) *cobra.Command { return err } - flavorPlugin = flavor_plugin.NewClient(endpoint.Address) + flavorPlugin = flavor_plugin.NewClient(plugin.Name(*name), endpoint.Address) return nil } diff --git a/cmd/group/main.go b/cmd/group/main.go index 96ce2d142..1bf3320e7 100644 --- a/cmd/group/main.go +++ b/cmd/group/main.go @@ -41,7 +41,7 @@ func main() { if err != nil { return nil, err } - return instance_client.NewClient(plugin.Name(n), endpoint.Address), nil + return instance_client.NewClient(n, endpoint.Address), nil } flavorPluginLookup := func(n plugin.Name) (flavor.Plugin, error) { @@ -49,7 +49,7 @@ func main() { if err != nil { return nil, err } - return flavor_client.NewClient(endpoint.Address), nil + return flavor_client.NewClient(n, endpoint.Address), nil } cli.RunPlugin(*name, group_server.PluginServer( diff --git a/pkg/example/flavor/combo/main.go b/pkg/example/flavor/combo/main.go index 40c4dd780..aeecaa5c4 100644 --- a/pkg/example/flavor/combo/main.go +++ b/pkg/example/flavor/combo/main.go @@ -33,7 +33,7 @@ func main() { if err != nil { return nil, err } - return flavor_rpc.NewClient(endpoint.Address), nil + return flavor_rpc.NewClient(n, endpoint.Address), nil } cli.SetLogLevel(*logLevel) diff --git a/pkg/example/flavor/swarm/flavor.go b/pkg/example/flavor/swarm/flavor.go index 23f8179a6..416df324d 100644 --- a/pkg/example/flavor/swarm/flavor.go +++ b/pkg/example/flavor/swarm/flavor.go @@ -11,33 +11,22 @@ import ( "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" "github.com/docker/infrakit/pkg/plugin/group/types" - "github.com/docker/infrakit/pkg/plugin/group/util" "github.com/docker/infrakit/pkg/spi/flavor" "github.com/docker/infrakit/pkg/spi/instance" "github.com/docker/infrakit/pkg/template" "golang.org/x/net/context" ) -type nodeType string - const ( - worker nodeType = "worker" - manager nodeType = "manager" - ebsAttachment string = "ebs" + ebsAttachment string = "ebs" ) -// NewSwarmFlavor creates a flavor.Plugin that creates manager and worker nodes connected in a swarm. -func NewSwarmFlavor(dockerClient client.APIClient, templ *template.Template) flavor.Plugin { - return &swarmFlavor{client: dockerClient, initScript: templ} -} - type swarmFlavor struct { client client.APIClient initScript *template.Template } type schema struct { - Type nodeType Attachments map[instance.LogicalID][]instance.Attachment DockerRestartCommand string } @@ -48,7 +37,9 @@ func parseProperties(flavorProperties json.RawMessage) (schema, error) { return s, err } -func validateIDsAndAttachments(logicalIDs []instance.LogicalID, attachments map[instance.LogicalID][]instance.Attachment) error { +func validateIDsAndAttachments(logicalIDs []instance.LogicalID, + attachments map[instance.LogicalID][]instance.Attachment) error { + // Each attachment association must be represented by a logical ID. idsMap := map[instance.LogicalID]bool{} for _, id := range logicalIDs { @@ -97,49 +88,14 @@ func validateIDsAndAttachments(logicalIDs []instance.LogicalID, attachments map[ return nil } -func (s swarmFlavor) Validate(flavorProperties json.RawMessage, allocation types.AllocationMethod) error { - properties, err := parseProperties(flavorProperties) - if err != nil { - return err - } - - if properties.DockerRestartCommand == "" { - return errors.New("DockerRestartCommand must be specified") - } - - switch properties.Type { - case manager: - numIDs := len(allocation.LogicalIDs) - if numIDs != 1 && numIDs != 3 && numIDs != 5 { - return errors.New("Must have 1, 3, or 5 manager logical IDs") - } - case worker: - - default: - return errors.New("Unrecognized node Type") - } - - if properties.Type == manager { - for _, id := range allocation.LogicalIDs { - if att, exists := properties.Attachments[id]; !exists || len(att) == 0 { - log.Warnf("LogicalID %s has no attachments, which is needed for durability", id) - } - } - } - - if err := validateIDsAndAttachments(allocation.LogicalIDs, properties.Attachments); err != nil { - return err - } - - return nil -} - const ( // associationTag is a machine tag added to associate machines with Swarm nodes. associationTag = "swarm-association-id" ) -func generateInitScript(templ *template.Template, joinIP, joinToken, associationID, restartCommand string) (string, error) { +func generateInitScript(templ *template.Template, + joinIP, joinToken, associationID, restartCommand string) (string, error) { + var buffer bytes.Buffer err := templ.Execute(&buffer, map[string]string{ "MY_IP": joinIP, @@ -153,9 +109,25 @@ func generateInitScript(templ *template.Template, joinIP, joinToken, association return buffer.String(), nil } +func (s swarmFlavor) Validate(flavorProperties json.RawMessage, allocation types.AllocationMethod) error { + properties, err := parseProperties(flavorProperties) + if err != nil { + return err + } + if properties.DockerRestartCommand == "" { + return errors.New("DockerRestartCommand must be specified") + } + if err := validateIDsAndAttachments(allocation.LogicalIDs, properties.Attachments); err != nil { + return err + } + return nil +} + // Healthy determines whether an instance is healthy. This is determined by whether it has successfully joined the // Swarm. -func (s swarmFlavor) Healthy(flavorProperties json.RawMessage, inst instance.Description) (flavor.Health, error) { +func healthy(client client.APIClient, + flavorProperties json.RawMessage, inst instance.Description) (flavor.Health, error) { + associationID, exists := inst.Tags[associationTag] if !exists { log.Info("Reporting unhealthy for instance without an association tag", inst.ID) @@ -165,7 +137,7 @@ func (s swarmFlavor) Healthy(flavorProperties json.RawMessage, inst instance.Des filter := filters.NewArgs() filter.Add("label", fmt.Sprintf("%s=%s", associationTag, associationID)) - nodes, err := s.client.NodeList(context.Background(), docker_types.NodeListOptions{Filters: filter}) + nodes, err := client.NodeList(context.Background(), docker_types.NodeListOptions{Filters: filter}) if err != nil { return flavor.Unknown, err } @@ -183,128 +155,3 @@ func (s swarmFlavor) Healthy(flavorProperties json.RawMessage, inst instance.Des return flavor.Healthy, nil } } - -func (s swarmFlavor) Drain(flavorProperties json.RawMessage, inst instance.Description) error { - properties, err := parseProperties(flavorProperties) - if err != nil { - return err - } - - // Only explicitly remove worker nodes, not manager nodes. Manager nodes are assumed to have an - // attached volume for state, and fixed IP addresses. This allows them to rejoin as the same node. - if properties.Type != worker { - return nil - } - - associationID, exists := inst.Tags[associationTag] - if !exists { - return fmt.Errorf("Unable to drain %s without an association tag", inst.ID) - } - - filter := filters.NewArgs() - filter.Add("label", fmt.Sprintf("%s=%s", associationTag, associationID)) - - nodes, err := s.client.NodeList(context.Background(), docker_types.NodeListOptions{Filters: filter}) - if err != nil { - return err - } - - switch { - case len(nodes) == 0: - return fmt.Errorf("Unable to drain %s, not found in swarm", inst.ID) - - case len(nodes) == 1: - err := s.client.NodeRemove( - context.Background(), - nodes[0].ID, - docker_types.NodeRemoveOptions{Force: true}) - if err != nil { - return err - } - - return nil - - default: - return fmt.Errorf("Expected at most one node with label %s, but found %s", associationID, nodes) - } -} - -func (s *swarmFlavor) Prepare( - flavorProperties json.RawMessage, - spec instance.Spec, - allocation types.AllocationMethod) (instance.Spec, error) { - - properties, err := parseProperties(flavorProperties) - if err != nil { - return spec, err - } - - swarmStatus, err := s.client.SwarmInspect(context.Background()) - if err != nil { - return spec, fmt.Errorf("Failed to fetch Swarm join tokens: %s", err) - } - - nodeInfo, err := s.client.Info(context.Background()) - if err != nil { - return spec, fmt.Errorf("Failed to fetch node self info: %s", err) - } - - self, _, err := s.client.NodeInspectWithRaw(context.Background(), nodeInfo.Swarm.NodeID) - if err != nil { - return spec, fmt.Errorf("Failed to fetch Swarm node status: %s", err) - } - - if self.ManagerStatus == nil { - return spec, errors.New( - "Swarm node status did not include manager status. Need to run 'docker swarm init`?") - } - - associationID := util.RandomAlphaNumericString(8) - spec.Tags[associationTag] = associationID - - switch properties.Type { - case worker: - initScript, err := - generateInitScript( - s.initScript, - self.ManagerStatus.Addr, - swarmStatus.JoinTokens.Worker, - associationID, - properties.DockerRestartCommand) - if err != nil { - return spec, err - } - spec.Init = initScript - - case manager: - if spec.LogicalID == nil { - return spec, errors.New("Manager nodes require a LogicalID, " + - "which will be used as an assigned private IP address") - } - - initScript, err := generateInitScript( - s.initScript, - self.ManagerStatus.Addr, - swarmStatus.JoinTokens.Manager, - associationID, - properties.DockerRestartCommand) - if err != nil { - return spec, err - } - spec.Init = initScript - default: - return spec, errors.New("Unsupported node type") - } - - if spec.LogicalID != nil { - if attachments, exists := properties.Attachments[*spec.LogicalID]; exists { - spec.Attachments = append(spec.Attachments, attachments...) - } - } - - // TODO(wfarner): Use the cluster UUID to scope instances for this swarm separately from instances in another - // swarm. This will require plumbing back to Scaled (membership tags). - spec.Tags["swarm-id"] = swarmStatus.ID - - return spec, nil -} diff --git a/pkg/example/flavor/swarm/flavor_test.go b/pkg/example/flavor/swarm/flavor_test.go index 0fa9b08f6..2fc3ae620 100644 --- a/pkg/example/flavor/swarm/flavor_test.go +++ b/pkg/example/flavor/swarm/flavor_test.go @@ -29,36 +29,33 @@ func TestValidate(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - swarmFlavor := NewSwarmFlavor(mock_client.NewMockAPIClient(ctrl), templ()) + managerFlavor := NewManagerFlavor(mock_client.NewMockAPIClient(ctrl), templ()) + workerFlavor := NewWorkerFlavor(mock_client.NewMockAPIClient(ctrl), templ()) - require.NoError(t, swarmFlavor.Validate( - json.RawMessage(`{"Type": "worker", "DockerRestartCommand": "systemctl restart docker"}`), + require.NoError(t, workerFlavor.Validate( + json.RawMessage(`{"DockerRestartCommand": "systemctl restart docker"}`), types.AllocationMethod{Size: 5})) - require.NoError(t, swarmFlavor.Validate( - json.RawMessage(`{"Type": "manager", "DockerRestartCommand": "systemctl restart docker"}`), + require.NoError(t, managerFlavor.Validate( + json.RawMessage(`{"DockerRestartCommand": "systemctl restart docker"}`), types.AllocationMethod{LogicalIDs: []instance.LogicalID{"127.0.0.1"}})) // Logical ID with multiple attachments is allowed. - require.NoError(t, swarmFlavor.Validate( + require.NoError(t, managerFlavor.Validate( json.RawMessage(`{ - "Type": "manager", "DockerRestartCommand": "systemctl restart docker", "Attachments": {"127.0.0.1": [{"ID": "a", "Type": "ebs"}, {"ID": "b", "Type": "ebs"}]}}`), types.AllocationMethod{LogicalIDs: []instance.LogicalID{"127.0.0.1"}})) - require.Error(t, swarmFlavor.Validate(json.RawMessage(`{"type": "other"}`), types.AllocationMethod{Size: 5})) - // Logical ID used more than once. - err := swarmFlavor.Validate( - json.RawMessage(`{"Type": "manager", "DockerRestartCommand": "systemctl restart docker"}`), + err := managerFlavor.Validate( + json.RawMessage(`{"DockerRestartCommand": "systemctl restart docker"}`), types.AllocationMethod{LogicalIDs: []instance.LogicalID{"127.0.0.1", "127.0.0.1", "127.0.0.2"}}) require.Error(t, err) require.Equal(t, "LogicalID 127.0.0.1 specified more than once", err.Error()) // Attachment cannot be associated with multiple Logical IDs. - err = swarmFlavor.Validate( + err = managerFlavor.Validate( json.RawMessage(`{ - "Type": "manager", "DockerRestartCommand": "systemctl restart docker", "Attachments": {"127.0.0.1": [{"ID": "a", "Type": "ebs"}], "127.0.0.2": [{"ID": "a", "Type": "ebs"}]}}`), types.AllocationMethod{LogicalIDs: []instance.LogicalID{"127.0.0.1", "127.0.0.2", "127.0.0.3"}}) @@ -66,9 +63,8 @@ func TestValidate(t *testing.T) { require.Equal(t, "Attachment a specified more than once", err.Error()) // Unsupported Attachment Type. - err = swarmFlavor.Validate( + err = managerFlavor.Validate( json.RawMessage(`{ - "Type": "manager", "DockerRestartCommand": "systemctl restart docker", "Attachments": {"127.0.0.1": [{"ID": "a", "Type": "keyboard"}]}}`), types.AllocationMethod{LogicalIDs: []instance.LogicalID{"127.0.0.1"}}) @@ -82,7 +78,7 @@ func TestWorker(t *testing.T) { client := mock_client.NewMockAPIClient(ctrl) - flavorImpl := NewSwarmFlavor(client, templ()) + flavorImpl := NewWorkerFlavor(client, templ()) swarmInfo := swarm.Swarm{ ClusterInfo: swarm.ClusterInfo{ID: "ClusterUUID"}, @@ -99,7 +95,7 @@ func TestWorker(t *testing.T) { client.EXPECT().NodeInspectWithRaw(gomock.Any(), nodeID).Return(nodeInfo, nil, nil) details, err := flavorImpl.Prepare( - json.RawMessage(`{"Type": "worker"}`), + json.RawMessage(`{}`), instance.Spec{Tags: map[string]string{"a": "b"}}, types.AllocationMethod{Size: 5}) require.NoError(t, err) @@ -144,7 +140,7 @@ func TestManager(t *testing.T) { client := mock_client.NewMockAPIClient(ctrl) - flavorImpl := NewSwarmFlavor(client, templ()) + flavorImpl := NewManagerFlavor(client, templ()) swarmInfo := swarm.Swarm{ ClusterInfo: swarm.ClusterInfo{ID: "ClusterUUID"}, @@ -162,7 +158,7 @@ func TestManager(t *testing.T) { id := instance.LogicalID("127.0.0.1") details, err := flavorImpl.Prepare( - json.RawMessage(`{"Type": "manager", "Attachments": {"127.0.0.1": [{"ID": "a", "Type": "gpu"}]}}`), + json.RawMessage(`{"Attachments": {"127.0.0.1": [{"ID": "a", "Type": "gpu"}]}}`), instance.Spec{Tags: map[string]string{"a": "b"}, LogicalID: &id}, types.AllocationMethod{LogicalIDs: []instance.LogicalID{"127.0.0.1"}}) require.NoError(t, err) diff --git a/pkg/example/flavor/swarm/main.go b/pkg/example/flavor/swarm/main.go index daa610f0a..92479c6c7 100644 --- a/pkg/example/flavor/swarm/main.go +++ b/pkg/example/flavor/swarm/main.go @@ -8,6 +8,7 @@ import ( "github.com/docker/infrakit/pkg/cli" "github.com/docker/infrakit/pkg/discovery" flavor_plugin "github.com/docker/infrakit/pkg/rpc/flavor" + "github.com/docker/infrakit/pkg/spi/flavor" "github.com/docker/infrakit/pkg/template" "github.com/docker/infrakit/pkg/util/docker" "github.com/spf13/cobra" @@ -70,7 +71,11 @@ func main() { templ = t } - cli.RunPlugin(*name, flavor_plugin.PluginServer(NewSwarmFlavor(dockerClient, templ))) + cli.RunPlugin(*name, flavor_plugin.PluginServerWithTypes( + map[string]flavor.Plugin{ + "manager": NewManagerFlavor(dockerClient, templ), + "worker": NewWorkerFlavor(dockerClient, templ), + })) return nil } diff --git a/pkg/example/flavor/swarm/manager.go b/pkg/example/flavor/swarm/manager.go new file mode 100644 index 000000000..3a588b4ac --- /dev/null +++ b/pkg/example/flavor/swarm/manager.go @@ -0,0 +1,125 @@ +package main + +import ( + "encoding/json" + "errors" + "fmt" + + log "github.com/Sirupsen/logrus" + "github.com/docker/docker/client" + "github.com/docker/infrakit/pkg/plugin/group/types" + "github.com/docker/infrakit/pkg/plugin/group/util" + "github.com/docker/infrakit/pkg/spi/flavor" + "github.com/docker/infrakit/pkg/spi/instance" + "github.com/docker/infrakit/pkg/template" + "golang.org/x/net/context" +) + +// NewManagerFlavor creates a flavor.Plugin that creates manager and worker nodes connected in a swarm. +func NewManagerFlavor(dockerClient client.APIClient, templ *template.Template) flavor.Plugin { + return &managerFlavor{client: dockerClient, initScript: templ} +} + +type managerFlavor struct { + client client.APIClient + initScript *template.Template +} + +func (s *managerFlavor) Validate(flavorProperties json.RawMessage, allocation types.AllocationMethod) error { + properties, err := parseProperties(flavorProperties) + if err != nil { + return err + } + + if properties.DockerRestartCommand == "" { + return errors.New("DockerRestartCommand must be specified") + } + + numIDs := len(allocation.LogicalIDs) + if numIDs != 1 && numIDs != 3 && numIDs != 5 { + return errors.New("Must have 1, 3, or 5 manager logical IDs") + } + + for _, id := range allocation.LogicalIDs { + if att, exists := properties.Attachments[id]; !exists || len(att) == 0 { + log.Warnf("LogicalID %s has no attachments, which is needed for durability", id) + } + } + + if err := validateIDsAndAttachments(allocation.LogicalIDs, properties.Attachments); err != nil { + return err + } + return nil +} + +// Healthy determines whether an instance is healthy. This is determined by whether it has successfully joined the +// Swarm. +func (s *managerFlavor) Healthy(flavorProperties json.RawMessage, inst instance.Description) (flavor.Health, error) { + return healthy(s.client, flavorProperties, inst) +} + +func (s *managerFlavor) Prepare(flavorProperties json.RawMessage, + spec instance.Spec, allocation types.AllocationMethod) (instance.Spec, error) { + + properties, err := parseProperties(flavorProperties) + if err != nil { + return spec, err + } + + swarmStatus, err := s.client.SwarmInspect(context.Background()) + if err != nil { + return spec, fmt.Errorf("Failed to fetch Swarm join tokens: %s", err) + } + + nodeInfo, err := s.client.Info(context.Background()) + if err != nil { + return spec, fmt.Errorf("Failed to fetch node self info: %s", err) + } + + self, _, err := s.client.NodeInspectWithRaw(context.Background(), nodeInfo.Swarm.NodeID) + if err != nil { + return spec, fmt.Errorf("Failed to fetch Swarm node status: %s", err) + } + + if self.ManagerStatus == nil { + return spec, errors.New( + "Swarm node status did not include manager status. Need to run 'docker swarm init`?") + } + + associationID := util.RandomAlphaNumericString(8) + spec.Tags[associationTag] = associationID + + if spec.LogicalID == nil { + return spec, errors.New("Manager nodes require a LogicalID, " + + "which will be used as an assigned private IP address") + } + + initScript, err := generateInitScript( + s.initScript, + self.ManagerStatus.Addr, + swarmStatus.JoinTokens.Manager, + associationID, + properties.DockerRestartCommand) + if err != nil { + return spec, err + } + spec.Init = initScript + + if spec.LogicalID != nil { + if attachments, exists := properties.Attachments[*spec.LogicalID]; exists { + spec.Attachments = append(spec.Attachments, attachments...) + } + } + + // TODO(wfarner): Use the cluster UUID to scope instances for this swarm separately from instances in another + // swarm. This will require plumbing back to Scaled (membership tags). + spec.Tags["swarm-id"] = swarmStatus.ID + + return spec, nil +} + +// Drain only explicitly remove worker nodes, not manager nodes. Manager nodes are assumed to have an +// attached volume for state, and fixed IP addresses. This allows them to rejoin as the same node. +func (s *managerFlavor) Drain(flavorProperties json.RawMessage, inst instance.Description) error { + return nil +} diff --git a/pkg/example/flavor/swarm/swarm-vagrant-workers.json b/pkg/example/flavor/swarm/swarm-vagrant-workers.json index 9892ffe74..8fd84d974 100644 --- a/pkg/example/flavor/swarm/swarm-vagrant-workers.json +++ b/pkg/example/flavor/swarm/swarm-vagrant-workers.json @@ -11,9 +11,8 @@ } }, "Flavor": { - "Plugin": "flavor-swarm", + "Plugin": "flavor-swarm/worker", "Properties": { - "Type": "worker", "DockerRestartCommand": "service docker restart" } } diff --git a/pkg/example/flavor/swarm/worker.go b/pkg/example/flavor/swarm/worker.go new file mode 100644 index 000000000..578af2b05 --- /dev/null +++ b/pkg/example/flavor/swarm/worker.go @@ -0,0 +1,141 @@ +package main + +import ( + "encoding/json" + "errors" + "fmt" + + docker_types "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/client" + "github.com/docker/infrakit/pkg/plugin/group/types" + "github.com/docker/infrakit/pkg/plugin/group/util" + "github.com/docker/infrakit/pkg/spi/flavor" + "github.com/docker/infrakit/pkg/spi/instance" + "github.com/docker/infrakit/pkg/template" + "golang.org/x/net/context" +) + +// NewWorkerFlavor creates a flavor.Plugin that creates manager and worker nodes connected in a swarm. +func NewWorkerFlavor(dockerClient client.APIClient, templ *template.Template) flavor.Plugin { + return &workerFlavor{client: dockerClient, initScript: templ} +} + +type workerFlavor struct { + client client.APIClient + initScript *template.Template +} + +func (s *workerFlavor) Validate(flavorProperties json.RawMessage, allocation types.AllocationMethod) error { + properties, err := parseProperties(flavorProperties) + if err != nil { + return err + } + + if properties.DockerRestartCommand == "" { + return errors.New("DockerRestartCommand must be specified") + } + + if err := validateIDsAndAttachments(allocation.LogicalIDs, properties.Attachments); err != nil { + return err + } + + return nil +} + +// Healthy determines whether an instance is healthy. This is determined by whether it has successfully joined the +// Swarm. +func (s *workerFlavor) Healthy(flavorProperties json.RawMessage, inst instance.Description) (flavor.Health, error) { + return healthy(s.client, flavorProperties, inst) +} + +func (s *workerFlavor) Prepare(flavorProperties json.RawMessage, spec instance.Spec, + allocation types.AllocationMethod) (instance.Spec, error) { + + properties, err := parseProperties(flavorProperties) + if err != nil { + return spec, err + } + + swarmStatus, err := s.client.SwarmInspect(context.Background()) + if err != nil { + return spec, fmt.Errorf("Failed to fetch Swarm join tokens: %s", err) + } + + nodeInfo, err := s.client.Info(context.Background()) + if err != nil { + return spec, fmt.Errorf("Failed to fetch node self info: %s", err) + } + + self, _, err := s.client.NodeInspectWithRaw(context.Background(), nodeInfo.Swarm.NodeID) + if err != nil { + return spec, fmt.Errorf("Failed to fetch Swarm node status: %s", err) + } + + if self.ManagerStatus == nil { + return spec, errors.New( + "Swarm node status did not include manager status. Need to run 'docker swarm init`?") + } + + associationID := util.RandomAlphaNumericString(8) + spec.Tags[associationTag] = associationID + + initScript, err := + generateInitScript( + s.initScript, + self.ManagerStatus.Addr, + swarmStatus.JoinTokens.Worker, + associationID, + properties.DockerRestartCommand) + if err != nil { + return spec, err + } + spec.Init = initScript + + if spec.LogicalID != nil { + if attachments, exists := properties.Attachments[*spec.LogicalID]; exists { + spec.Attachments = append(spec.Attachments, attachments...) + } + } + + // TODO(wfarner): Use the cluster UUID to scope instances for this swarm separately from instances in another + // swarm. This will require plumbing back to Scaled (membership tags). + spec.Tags["swarm-id"] = swarmStatus.ID + + return spec, nil +} + +func (s *workerFlavor) Drain(flavorProperties json.RawMessage, inst instance.Description) error { + + associationID, exists := inst.Tags[associationTag] + if !exists { + return fmt.Errorf("Unable to drain %s without an association tag", inst.ID) + } + + filter := filters.NewArgs() + filter.Add("label", fmt.Sprintf("%s=%s", associationTag, associationID)) + + nodes, err := s.client.NodeList(context.Background(), docker_types.NodeListOptions{Filters: filter}) + if err != nil { + return err + } + + switch { + case len(nodes) == 0: + return fmt.Errorf("Unable to drain %s, not found in swarm", inst.ID) + + case len(nodes) == 1: + err := s.client.NodeRemove( + context.Background(), + nodes[0].ID, + docker_types.NodeRemoveOptions{Force: true}) + if err != nil { + return err + } + + return nil + + default: + return fmt.Errorf("Expected at most one node with label %s, but found %s", associationID, nodes) + } +} diff --git a/pkg/rpc/flavor/client.go b/pkg/rpc/flavor/client.go index 540c4b1f4..cfa5c47b8 100644 --- a/pkg/rpc/flavor/client.go +++ b/pkg/rpc/flavor/client.go @@ -2,6 +2,8 @@ package flavor import ( "encoding/json" + + "github.com/docker/infrakit/pkg/plugin" "github.com/docker/infrakit/pkg/plugin/group/types" rpc_client "github.com/docker/infrakit/pkg/rpc/client" "github.com/docker/infrakit/pkg/spi/flavor" @@ -9,17 +11,19 @@ import ( ) // NewClient returns a plugin interface implementation connected to a remote plugin -func NewClient(socketPath string) flavor.Plugin { - return &client{client: rpc_client.New(socketPath, flavor.InterfaceSpec)} +func NewClient(name plugin.Name, socketPath string) flavor.Plugin { + return &client{name: name, client: rpc_client.New(socketPath, flavor.InterfaceSpec)} } type client struct { + name plugin.Name client rpc_client.Client } // Validate checks whether the helper can support a configuration. func (c client) Validate(flavorProperties json.RawMessage, allocation types.AllocationMethod) error { - req := ValidateRequest{Properties: &flavorProperties, Allocation: allocation} + _, flavorType := c.name.GetLookupAndType() + req := ValidateRequest{Type: flavorType, Properties: &flavorProperties, Allocation: allocation} resp := ValidateResponse{} return c.client.Call("Flavor.Validate", req, &resp) } @@ -28,7 +32,8 @@ func (c client) Validate(flavorProperties json.RawMessage, allocation types.Allo // helper could be used to place additional tags on the machine, or generate a specialized Init command based on // the flavor configuration. func (c client) Prepare(flavorProperties json.RawMessage, spec instance.Spec, allocation types.AllocationMethod) (instance.Spec, error) { - req := PrepareRequest{Properties: &flavorProperties, Spec: spec, Allocation: allocation} + _, flavorType := c.name.GetLookupAndType() + req := PrepareRequest{Type: flavorType, Properties: &flavorProperties, Spec: spec, Allocation: allocation} resp := PrepareResponse{} err := c.client.Call("Flavor.Prepare", req, &resp) if err != nil { @@ -39,7 +44,8 @@ func (c client) Prepare(flavorProperties json.RawMessage, spec instance.Spec, al // Healthy determines the Health of this Flavor on an instance. func (c client) Healthy(flavorProperties json.RawMessage, inst instance.Description) (flavor.Health, error) { - req := HealthyRequest{Properties: &flavorProperties, Instance: inst} + _, flavorType := c.name.GetLookupAndType() + req := HealthyRequest{Type: flavorType, Properties: &flavorProperties, Instance: inst} resp := HealthyResponse{} err := c.client.Call("Flavor.Healthy", req, &resp) return resp.Health, err @@ -47,7 +53,8 @@ func (c client) Healthy(flavorProperties json.RawMessage, inst instance.Descript // Drain allows the flavor to perform a best-effort cleanup operation before the instance is destroyed. func (c client) Drain(flavorProperties json.RawMessage, inst instance.Description) error { - req := DrainRequest{Properties: &flavorProperties, Instance: inst} + _, flavorType := c.name.GetLookupAndType() + req := DrainRequest{Type: flavorType, Properties: &flavorProperties, Instance: inst} resp := DrainResponse{} err := c.client.Call("Flavor.Drain", req, &resp) if err != nil { diff --git a/pkg/rpc/flavor/rpc_multi_test.go b/pkg/rpc/flavor/rpc_multi_test.go new file mode 100644 index 000000000..28caa4ea7 --- /dev/null +++ b/pkg/rpc/flavor/rpc_multi_test.go @@ -0,0 +1,238 @@ +package flavor + +import ( + "encoding/json" + "errors" + "path/filepath" + "testing" + + "github.com/docker/infrakit/pkg/plugin" + "github.com/docker/infrakit/pkg/plugin/group/types" + rpc_server "github.com/docker/infrakit/pkg/rpc/server" + "github.com/docker/infrakit/pkg/spi/flavor" + "github.com/docker/infrakit/pkg/spi/instance" + "github.com/stretchr/testify/require" +) + +func TestFlavorMultiPluginValidate(t *testing.T) { + socketPath := tempSocket() + name := filepath.Base(socketPath) + + inputFlavorPropertiesActual1 := make(chan json.RawMessage, 1) + inputFlavorProperties1 := json.RawMessage([]byte(`{"flavor":"zookeeper","role":"leader"}`)) + + inputFlavorPropertiesActual2 := make(chan json.RawMessage, 1) + inputFlavorProperties2 := json.RawMessage([]byte(`{"flavor":"zookeeper","role":"follower"}`)) + + server, err := rpc_server.StartPluginAtPath(socketPath, PluginServerWithTypes( + map[string]flavor.Plugin{ + "type1": &testPlugin{ + DoValidate: func(flavorProperties json.RawMessage, allocation types.AllocationMethod) error { + inputFlavorPropertiesActual1 <- flavorProperties + return nil + }, + }, + "type2": &testPlugin{ + DoValidate: func(flavorProperties json.RawMessage, allocation types.AllocationMethod) error { + inputFlavorPropertiesActual2 <- flavorProperties + return errors.New("something-went-wrong") + }, + }, + })) + require.NoError(t, err) + + require.NoError(t, NewClient(plugin.Name(name+"/type1"), socketPath).Validate(inputFlavorProperties1, allocation)) + + err = NewClient(plugin.Name(name+"/type2"), socketPath).Validate(inputFlavorProperties2, allocation) + require.Error(t, err) + require.Equal(t, "something-went-wrong", err.Error()) + + server.Stop() + + require.Equal(t, inputFlavorProperties1, <-inputFlavorPropertiesActual1) + require.Equal(t, inputFlavorProperties2, <-inputFlavorPropertiesActual2) +} + +func TestFlavorMultiPluginPrepare(t *testing.T) { + socketPath := tempSocket() + name := filepath.Base(socketPath) + + inputFlavorPropertiesActual1 := make(chan json.RawMessage, 1) + inputFlavorProperties1 := json.RawMessage([]byte(`{"flavor":"zookeeper","role":"leader"}`)) + inputInstanceSpecActual1 := make(chan instance.Spec, 1) + inputInstanceSpec1 := instance.Spec{ + Properties: &inputFlavorProperties1, + Tags: map[string]string{"foo": "bar1"}, + } + + inputFlavorPropertiesActual2 := make(chan json.RawMessage, 1) + inputFlavorProperties2 := json.RawMessage([]byte(`{"flavor":"zookeeper","role":"follower"}`)) + inputInstanceSpecActual2 := make(chan instance.Spec, 1) + inputInstanceSpec2 := instance.Spec{ + Properties: &inputFlavorProperties2, + Tags: map[string]string{"foo": "bar2"}, + } + + server, err := rpc_server.StartPluginAtPath(socketPath, PluginServerWithTypes( + map[string]flavor.Plugin{ + "type1": &testPlugin{ + DoPrepare: func( + flavorProperties json.RawMessage, + instanceSpec instance.Spec, + allocation types.AllocationMethod) (instance.Spec, error) { + + inputFlavorPropertiesActual1 <- flavorProperties + inputInstanceSpecActual1 <- instanceSpec + + return instanceSpec, nil + }, + }, + "type2": &testPlugin{ + DoPrepare: func( + flavorProperties json.RawMessage, + instanceSpec instance.Spec, + allocation types.AllocationMethod) (instance.Spec, error) { + + inputFlavorPropertiesActual2 <- flavorProperties + inputInstanceSpecActual2 <- instanceSpec + + return instanceSpec, errors.New("bad-thing-happened") + }, + }, + }, + )) + require.NoError(t, err) + + spec, err := NewClient(plugin.Name(name+"/type1"), socketPath).Prepare( + inputFlavorProperties1, + inputInstanceSpec1, + allocation) + require.NoError(t, err) + require.Equal(t, inputInstanceSpec1, spec) + + _, err = NewClient(plugin.Name(name+"/type2"), socketPath).Prepare( + inputFlavorProperties2, + inputInstanceSpec2, + allocation) + require.Error(t, err) + require.Equal(t, "bad-thing-happened", err.Error()) + + server.Stop() + + require.Equal(t, inputFlavorProperties1, <-inputFlavorPropertiesActual1) + require.Equal(t, inputInstanceSpec1, <-inputInstanceSpecActual1) + + require.Equal(t, inputFlavorProperties2, <-inputFlavorPropertiesActual2) + require.Equal(t, inputInstanceSpec2, <-inputInstanceSpecActual2) +} + +func TestFlavorMultiPluginHealthy(t *testing.T) { + socketPath := tempSocket() + name := filepath.Base(socketPath) + + inputPropertiesActual1 := make(chan json.RawMessage, 1) + inputInstanceActual1 := make(chan instance.Description, 1) + inputProperties1 := json.RawMessage("{}") + inputInstance1 := instance.Description{ + ID: instance.ID("foo1"), + Tags: map[string]string{"foo": "bar1"}, + } + + inputPropertiesActual2 := make(chan json.RawMessage, 1) + inputInstanceActual2 := make(chan instance.Description, 1) + inputProperties2 := json.RawMessage("{}") + inputInstance2 := instance.Description{ + ID: instance.ID("foo2"), + Tags: map[string]string{"foo": "bar2"}, + } + + server, err := rpc_server.StartPluginAtPath(socketPath, PluginServerWithTypes( + map[string]flavor.Plugin{ + "type1": &testPlugin{ + DoHealthy: func(properties json.RawMessage, inst instance.Description) (flavor.Health, error) { + inputPropertiesActual1 <- properties + inputInstanceActual1 <- inst + return flavor.Healthy, nil + }, + }, + "type2": &testPlugin{ + DoHealthy: func(flavorProperties json.RawMessage, inst instance.Description) (flavor.Health, error) { + inputPropertiesActual2 <- flavorProperties + inputInstanceActual2 <- inst + return flavor.Unknown, errors.New("oh-noes") + }, + }, + })) + require.NoError(t, err) + + health, err := NewClient(plugin.Name(name+"/type1"), socketPath).Healthy(inputProperties1, inputInstance1) + require.NoError(t, err) + require.Equal(t, flavor.Healthy, health) + + _, err = NewClient(plugin.Name(name+"/type2"), socketPath).Healthy(inputProperties2, inputInstance2) + require.Error(t, err) + require.Equal(t, "oh-noes", err.Error()) + + require.Equal(t, inputProperties1, <-inputPropertiesActual1) + require.Equal(t, inputInstance1, <-inputInstanceActual1) + + require.Equal(t, inputProperties2, <-inputPropertiesActual2) + require.Equal(t, inputInstance2, <-inputInstanceActual2) + + server.Stop() +} + +func TestFlavorMultiPluginDrain(t *testing.T) { + socketPath := tempSocket() + name := filepath.Base(socketPath) + + inputPropertiesActual1 := make(chan json.RawMessage, 1) + inputInstanceActual1 := make(chan instance.Description, 1) + inputProperties1 := json.RawMessage("{}") + inputInstance1 := instance.Description{ + ID: instance.ID("foo1"), + Tags: map[string]string{"foo": "bar1"}, + } + + inputPropertiesActual2 := make(chan json.RawMessage, 1) + inputInstanceActual2 := make(chan instance.Description, 1) + inputProperties2 := json.RawMessage("{}") + inputInstance2 := instance.Description{ + ID: instance.ID("foo2"), + Tags: map[string]string{"foo": "bar2"}, + } + + server, err := rpc_server.StartPluginAtPath(socketPath, PluginServerWithTypes( + map[string]flavor.Plugin{ + "type1": &testPlugin{ + DoDrain: func(properties json.RawMessage, inst instance.Description) error { + inputPropertiesActual1 <- properties + inputInstanceActual1 <- inst + return nil + }, + }, + "type2": &testPlugin{ + DoDrain: func(flavorProperties json.RawMessage, inst instance.Description) error { + inputPropertiesActual2 <- flavorProperties + inputInstanceActual2 <- inst + return errors.New("oh-noes") + }, + }, + }, + )) + require.NoError(t, err) + + require.NoError(t, NewClient(plugin.Name(name+"/type1"), socketPath).Drain(inputProperties1, inputInstance1)) + + require.Equal(t, inputProperties1, <-inputPropertiesActual1) + require.Equal(t, inputInstance1, <-inputInstanceActual1) + + err = NewClient(plugin.Name(name+"/type2"), socketPath).Drain(inputProperties2, inputInstance2) + require.Error(t, err) + require.Equal(t, "oh-noes", err.Error()) + + require.Equal(t, inputProperties2, <-inputPropertiesActual2) + require.Equal(t, inputInstance2, <-inputInstanceActual2) + + server.Stop() +} diff --git a/pkg/rpc/flavor/rpc_test.go b/pkg/rpc/flavor/rpc_test.go index becadb52e..c15c34109 100644 --- a/pkg/rpc/flavor/rpc_test.go +++ b/pkg/rpc/flavor/rpc_test.go @@ -3,8 +3,10 @@ package flavor import ( "encoding/json" "errors" + "path/filepath" "testing" + "github.com/docker/infrakit/pkg/plugin" "github.com/docker/infrakit/pkg/plugin/group/types" rpc_server "github.com/docker/infrakit/pkg/rpc/server" "github.com/docker/infrakit/pkg/spi/flavor" @@ -57,6 +59,7 @@ func (t *testPlugin) Drain(flavorProperties json.RawMessage, inst instance.Descr func TestFlavorPluginValidate(t *testing.T) { socketPath := tempSocket() + name := filepath.Base(socketPath) inputFlavorPropertiesActual := make(chan json.RawMessage, 1) inputFlavorProperties := json.RawMessage([]byte(`{"flavor":"zookeeper","role":"leader"}`)) @@ -69,7 +72,7 @@ func TestFlavorPluginValidate(t *testing.T) { })) require.NoError(t, err) - require.NoError(t, NewClient(socketPath).Validate(inputFlavorProperties, allocation)) + require.NoError(t, NewClient(plugin.Name(name), socketPath).Validate(inputFlavorProperties, allocation)) server.Stop() @@ -78,6 +81,7 @@ func TestFlavorPluginValidate(t *testing.T) { func TestFlavorPluginValidateError(t *testing.T) { socketPath := tempSocket() + name := filepath.Base(socketPath) inputFlavorPropertiesActual := make(chan json.RawMessage, 1) inputFlavorProperties := json.RawMessage([]byte(`{"flavor":"zookeeper","role":"leader"}`)) @@ -90,7 +94,7 @@ func TestFlavorPluginValidateError(t *testing.T) { })) require.NoError(t, err) - err = NewClient(socketPath).Validate(inputFlavorProperties, allocation) + err = NewClient(plugin.Name(name), socketPath).Validate(inputFlavorProperties, allocation) require.Error(t, err) require.Equal(t, "something-went-wrong", err.Error()) @@ -100,6 +104,7 @@ func TestFlavorPluginValidateError(t *testing.T) { func TestFlavorPluginPrepare(t *testing.T) { socketPath := tempSocket() + name := filepath.Base(socketPath) inputFlavorPropertiesActual := make(chan json.RawMessage, 1) inputFlavorProperties := json.RawMessage([]byte(`{"flavor":"zookeeper","role":"leader"}`)) @@ -123,7 +128,7 @@ func TestFlavorPluginPrepare(t *testing.T) { })) require.NoError(t, err) - spec, err := NewClient(socketPath).Prepare( + spec, err := NewClient(plugin.Name(name), socketPath).Prepare( inputFlavorProperties, inputInstanceSpec, allocation) @@ -138,6 +143,7 @@ func TestFlavorPluginPrepare(t *testing.T) { func TestFlavorPluginPrepareError(t *testing.T) { socketPath := tempSocket() + name := filepath.Base(socketPath) inputFlavorPropertiesActual := make(chan json.RawMessage, 1) inputFlavorProperties := json.RawMessage([]byte(`{"flavor":"zookeeper","role":"leader"}`)) @@ -161,7 +167,7 @@ func TestFlavorPluginPrepareError(t *testing.T) { })) require.NoError(t, err) - _, err = NewClient(socketPath).Prepare( + _, err = NewClient(plugin.Name(name), socketPath).Prepare( inputFlavorProperties, inputInstanceSpec, allocation) @@ -176,6 +182,7 @@ func TestFlavorPluginPrepareError(t *testing.T) { func TestFlavorPluginHealthy(t *testing.T) { socketPath := tempSocket() + name := filepath.Base(socketPath) inputPropertiesActual := make(chan json.RawMessage, 1) inputInstanceActual := make(chan instance.Description, 1) @@ -193,7 +200,7 @@ func TestFlavorPluginHealthy(t *testing.T) { })) require.NoError(t, err) - health, err := NewClient(socketPath).Healthy(inputProperties, inputInstance) + health, err := NewClient(plugin.Name(name), socketPath).Healthy(inputProperties, inputInstance) require.NoError(t, err) require.Equal(t, flavor.Healthy, health) @@ -204,6 +211,7 @@ func TestFlavorPluginHealthy(t *testing.T) { func TestFlavorPluginHealthyError(t *testing.T) { socketPath := tempSocket() + name := filepath.Base(socketPath) inputPropertiesActual := make(chan json.RawMessage, 1) inputInstanceActual := make(chan instance.Description, 1) @@ -221,7 +229,7 @@ func TestFlavorPluginHealthyError(t *testing.T) { })) require.NoError(t, err) - _, err = NewClient(socketPath).Healthy(inputProperties, inputInstance) + _, err = NewClient(plugin.Name(name), socketPath).Healthy(inputProperties, inputInstance) require.Error(t, err) require.Equal(t, "oh-noes", err.Error()) @@ -232,6 +240,7 @@ func TestFlavorPluginHealthyError(t *testing.T) { func TestFlavorPluginDrain(t *testing.T) { socketPath := tempSocket() + name := filepath.Base(socketPath) inputPropertiesActual := make(chan json.RawMessage, 1) inputInstanceActual := make(chan instance.Description, 1) @@ -249,7 +258,7 @@ func TestFlavorPluginDrain(t *testing.T) { })) require.NoError(t, err) - require.NoError(t, NewClient(socketPath).Drain(inputProperties, inputInstance)) + require.NoError(t, NewClient(plugin.Name(name), socketPath).Drain(inputProperties, inputInstance)) require.Equal(t, inputProperties, <-inputPropertiesActual) require.Equal(t, inputInstance, <-inputInstanceActual) @@ -258,6 +267,7 @@ func TestFlavorPluginDrain(t *testing.T) { func TestFlavorPluginDrainError(t *testing.T) { socketPath := tempSocket() + name := filepath.Base(socketPath) inputPropertiesActual := make(chan json.RawMessage, 1) inputInstanceActual := make(chan instance.Description, 1) @@ -275,7 +285,7 @@ func TestFlavorPluginDrainError(t *testing.T) { })) require.NoError(t, err) - err = NewClient(socketPath).Drain(inputProperties, inputInstance) + err = NewClient(plugin.Name(name), socketPath).Drain(inputProperties, inputInstance) require.Error(t, err) require.Equal(t, "oh-noes", err.Error()) diff --git a/pkg/rpc/flavor/service.go b/pkg/rpc/flavor/service.go index f1ac7b09e..1977592d9 100644 --- a/pkg/rpc/flavor/service.go +++ b/pkg/rpc/flavor/service.go @@ -2,6 +2,7 @@ package flavor import ( "encoding/json" + "fmt" "net/http" "github.com/docker/infrakit/pkg/spi" @@ -13,13 +14,25 @@ func PluginServer(p flavor.Plugin) *Flavor { return &Flavor{plugin: p} } +// PluginServerWithTypes which supports multiple types of flavor plugins. The de-multiplexing +// is done by the server's RPC method implementations. +func PluginServerWithTypes(typed map[string]flavor.Plugin) *Flavor { + return &Flavor{typedPlugins: typed} +} + // Flavor the exported type needed to conform to json-rpc call convention type Flavor struct { - plugin flavor.Plugin + plugin flavor.Plugin + typedPlugins map[string]flavor.Plugin // by type, as qualified in the name of the plugin } // VendorInfo returns a metadata object about the plugin, if the plugin implements it. See spi.Vendor func (p *Flavor) VendorInfo() *spi.VendorInfo { + // TODO(chungers) - support typed plugins + if p.plugin == nil { + return nil + } + if m, is := p.plugin.(spi.Vendor); is { return m.VendorInfo() } @@ -28,6 +41,11 @@ func (p *Flavor) VendorInfo() *spi.VendorInfo { // SetExampleProperties sets the rpc request with any example properties/ custom type func (p *Flavor) SetExampleProperties(request interface{}) { + // TODO(chungers) - support typed plugins + if p.plugin == nil { + return + } + i, is := p.plugin.(spi.InputExample) if !is { return @@ -60,9 +78,29 @@ func (p *Flavor) ImplementedInterface() spi.InterfaceSpec { return flavor.InterfaceSpec } +func (p *Flavor) getPlugin(flavorType string) flavor.Plugin { + if flavorType == "" { + return p.plugin + } + if p, has := p.typedPlugins[flavorType]; has { + return p + } + return nil +} + // Validate checks whether the helper can support a configuration. func (p *Flavor) Validate(_ *http.Request, req *ValidateRequest, resp *ValidateResponse) error { - err := p.plugin.Validate(*req.Properties, req.Allocation) + var raw json.RawMessage + if req.Properties != nil { + raw = *req.Properties + } + + resp.Type = req.Type + c := p.getPlugin(req.Type) + if c == nil { + return fmt.Errorf("no-plugin:%s", req.Type) + } + err := c.Validate(raw, req.Allocation) if err != nil { return err } @@ -74,7 +112,17 @@ func (p *Flavor) Validate(_ *http.Request, req *ValidateRequest, resp *ValidateR // helper could be used to place additional tags on the machine, or generate a specialized Init command based on // the flavor configuration. func (p *Flavor) Prepare(_ *http.Request, req *PrepareRequest, resp *PrepareResponse) error { - spec, err := p.plugin.Prepare(*req.Properties, req.Spec, req.Allocation) + var raw json.RawMessage + if req.Properties != nil { + raw = *req.Properties + } + + resp.Type = req.Type + c := p.getPlugin(req.Type) + if c == nil { + return fmt.Errorf("no-plugin:%s", req.Type) + } + spec, err := c.Prepare(raw, req.Spec, req.Allocation) if err != nil { return err } @@ -84,7 +132,12 @@ func (p *Flavor) Prepare(_ *http.Request, req *PrepareRequest, resp *PrepareResp // Healthy determines whether an instance is healthy. func (p *Flavor) Healthy(_ *http.Request, req *HealthyRequest, resp *HealthyResponse) error { - health, err := p.plugin.Healthy(*req.Properties, req.Instance) + resp.Type = req.Type + c := p.getPlugin(req.Type) + if c == nil { + return fmt.Errorf("no-plugin:%s", req.Type) + } + health, err := c.Healthy(*req.Properties, req.Instance) if err != nil { return err } @@ -94,7 +147,12 @@ func (p *Flavor) Healthy(_ *http.Request, req *HealthyRequest, resp *HealthyResp // Drain drains the instance. It's the inverse of prepare before provision and happens before destroy. func (p *Flavor) Drain(_ *http.Request, req *DrainRequest, resp *DrainResponse) error { - err := p.plugin.Drain(*req.Properties, req.Instance) + resp.Type = req.Type + c := p.getPlugin(req.Type) + if c == nil { + return fmt.Errorf("no-plugin:%s", req.Type) + } + err := c.Drain(*req.Properties, req.Instance) if err != nil { return err } diff --git a/pkg/rpc/flavor/types.go b/pkg/rpc/flavor/types.go index fa719e147..1c46efc76 100644 --- a/pkg/rpc/flavor/types.go +++ b/pkg/rpc/flavor/types.go @@ -10,17 +10,20 @@ import ( // ValidateRequest is the rpc wrapper for request parameters to Validate type ValidateRequest struct { + Type string Properties *json.RawMessage Allocation types.AllocationMethod } // ValidateResponse is the rpc wrapper for the results of Validate type ValidateResponse struct { - OK bool + Type string + OK bool } // PrepareRequest is the rpc wrapper of the params to Prepare type PrepareRequest struct { + Type string Properties *json.RawMessage Spec instance.Spec Allocation types.AllocationMethod @@ -28,27 +31,32 @@ type PrepareRequest struct { // PrepareResponse is the rpc wrapper of the result of Prepare type PrepareResponse struct { + Type string Spec instance.Spec } // HealthyRequest is the rpc wrapper of the params to Healthy type HealthyRequest struct { + Type string Properties *json.RawMessage Instance instance.Description } // HealthyResponse is the rpc wrapper of the result of Healthy type HealthyResponse struct { + Type string Health flavor.Health } // DrainRequest is the rpc wrapper of the params to Drain type DrainRequest struct { + Type string Properties *json.RawMessage Instance instance.Description } // DrainResponse is the rpc wrapper of the result of Drain type DrainResponse struct { - OK bool + Type string + OK bool } diff --git a/pkg/rpc/instance/service.go b/pkg/rpc/instance/service.go index 1dcadfa8d..4ef812c31 100644 --- a/pkg/rpc/instance/service.go +++ b/pkg/rpc/instance/service.go @@ -1,7 +1,7 @@ package instance import ( - "errors" + "encoding/json" "fmt" "net/http" @@ -81,8 +81,9 @@ func (p *Instance) getPlugin(instanceType string) instance.Plugin { // Validate performs local validation on a provision request. func (p *Instance) Validate(_ *http.Request, req *ValidateRequest, resp *ValidateResponse) error { - if req.Properties == nil { - return errors.New("Request Properties must be set") + var raw json.RawMessage + if req.Properties != nil { + raw = *req.Properties } c := p.getPlugin(req.Type) @@ -90,7 +91,7 @@ func (p *Instance) Validate(_ *http.Request, req *ValidateRequest, resp *Validat return fmt.Errorf("no-plugin:%s", req.Type) } resp.Type = req.Type - err := c.Validate(*req.Properties) + err := c.Validate(raw) if err != nil { return err } diff --git a/scripts/e2e-swarm.sh b/scripts/e2e-swarm.sh new file mode 100755 index 000000000..cc13501f5 --- /dev/null +++ b/scripts/e2e-swarm.sh @@ -0,0 +1,136 @@ +#!/usr/bin/env bash + +set -o errexit +set -o nounset + +HERE="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +cd "$HERE/.." + +export PATH=$PWD/build:$PATH + +E2E_CLEANUP=${E2E_CLEANUP:-true} + +starterpid="" # pid of the cli plugin starter +cleanup() { + if [ "$E2E_CLEANUP" = "true" ]; then + pgid=$(ps -o pgid= -p $starterpid) + echo "Stopping plugin starter utility - $starterpid , pgid=$pgid" + kill -TERM -$pgid + echo "Stopping other jobs" + kill $(jobs -p) + rm -rf tutorial + fi +} +trap cleanup EXIT + +# infrakit directories +plugins=~/.infrakit/plugins +mkdir -p $plugins +rm -rf $plugins/* + +configstore=~/.infrakit/configs +mkdir -p $configstore +rm -rf $configstore/* + +# set the leader -- for os / file based leader detection for manager +leaderfile=~/.infrakit/leader +echo group > $leaderfile + +# start up multiple instances of manager -- typically we want multiple SETS of plugins and managers +# but here for simplicity just start up with multiple managers and one set of plugins +infrakit-manager --name group --proxy-for-group group-stateless os --leader-file $leaderfile --store-dir $configstore & +infrakit-manager --name group1 --proxy-for-group group-stateless os --leader-file $leaderfile --store-dir $configstore & +infrakit-manager --name group2 --proxy-for-group group-stateless os --leader-file $leaderfile --store-dir $configstore & + +sleep 5 # manager needs to detect leadership + +# location of logfiles when plugins are started by the plugin cli +# the config json below expects LOG_DIR as an environment variable +LOG_DIR=~/.infrakit/logs +mkdir -p $LOG_DIR + +# see the config josn 'e2e-test-plugins.json' for reference of environment variable E2E_SWARM_DIR +E2E_SWARM_DIR=~/.infrakit/e2e_swarm +mkdir -p $E2E_SWARM_DIR +rm -rf $E2E_SWARM_DIR/* + +export LOG_DIR=$LOG_DIR +export E2E_SWARM_DIR=$E2E_SWARM_DIR +export SWARM_MANAGER="tcp://192.168.2.200:4243" + +# note -- on exit, this won't clean up the plugins started by the cli since they will be in a separate process group +infrakit plugin start --wait --config-url file:///$PWD/scripts/e2e-test-plugins.json --os group-default instance-vagrant flavor-swarm flavor-vanilla & + +starterpid=$! +echo "plugin start pid=$starterpid" + +sleep 5 + +expect_exact_output() { + message=$1 + cmd=$2 + expected_output=$3 + + actual_output="$($2)" + echo -n "--> $message: " + if [ "$actual_output" = "$3" ] + then + echo 'PASS' + else + echo 'FAIL' + echo "Expected output: $expected_output" + echo "Actual output: $actual_output" + exit 1 + fi +} + +expect_output_lines() { + message=$1 + cmd=$2 + expected_lines=$3 + + actual_line_count=$($2 | wc -l) + echo -n "--> $message: " + if [ "$actual_line_count" -eq "$3" ] + then + echo 'PASS' + else + echo 'FAIL' + echo "Expected line count: $expected_lines" + echo "Actual line count: $actual_line_count" + exit 1 + fi +} + +expect_output_lines "7 plugins should be discoverable" "infrakit plugin ls -q" "7" +expect_output_lines "0 instances should exist" "infrakit instance describe -q --name instance-vagrant" "0" + +echo "Commiting manager" +infrakit group commit pkg/example/flavor/swarm/swarm-vagrant-manager.json + +echo 'Waiting for group to be provisioned' +sleep 60 + +expect_exact_output "Should be watching one group" "infrakit group ls -q" "swarm-managers" +expect_output_lines "1 instances should exist in group" "infrakit group describe swarm-managers -q" "1" +expect_output_lines "1 instances should exist" "infrakit instance describe -q --name instance-vagrant" "1" + +echo "Commiting workers" +infrakit group commit pkg/example/flavor/swarm/swarm-vagrant-workers.json + +echo 'Waiting for group to be provisioned' +sleep 120 + +expect_output_lines "Should be watching two groups" "infrakit group ls -q" "2" +expect_output_lines "2 instances should exist in group" "infrakit group describe swarm-workers -q" "2" +expect_output_lines "3 instances should exist" "infrakit instance describe -q --name instance-vagrant" "3" + +echo "Destroying workers" +infrakit group destroy swarm-workers +expect_output_lines "1 instances should exist" "infrakit instance describe -q --name instance-vagrant" "1" + +echo "Destroying managers" +infrakit group destroy swarm-managers +expect_output_lines "0 instances should exist" "infrakit instance describe -q --name instance-vagrant" "0" + +echo 'ALL TESTS PASSED' diff --git a/scripts/e2e-test-plugins.json b/scripts/e2e-test-plugins.json index 1f954f112..b6c1e79fd 100644 --- a/scripts/e2e-test-plugins.json +++ b/scripts/e2e-test-plugins.json @@ -21,6 +21,17 @@ } } , + { + "Plugin" : "instance-vagrant", + "Launch" : { + "Exec" : "os", + "Properties" : { + "Cmd" : "infrakit-instance-vagrant --log 5 > {{env "LOG_DIR"}}/instance-vagrant-{{unixtime}}.log 2>&1", + "SamePgID" : true + } + } + } + , { "Plugin" : "flavor-vanilla", "Launch" : { @@ -31,4 +42,15 @@ } } } + , + { + "Plugin" : "flavor-swarm", + "Launch" : { + "Exec" : "os", + "Properties" : { + "Cmd" : "infrakit-flavor-swarm --host {{env "SWARM_MANAGER"}} --log 5 > {{env "LOG_DIR"}}/flavor-swarm-{{unixtime}}.log 2>&1", + "SamePgID" : true + } + } + } ]