Skip to content
This repository has been archived by the owner on Jan 21, 2020. It is now read-only.

Commit

Permalink
Multiplex flavor plugins by type in a single RPC endpoint. (#364)
Browse files Browse the repository at this point in the history
Signed-off-by: David Chung <[email protected]>
  • Loading branch information
David Chung authored Jan 15, 2017
1 parent 3113b14 commit 89b031a
Show file tree
Hide file tree
Showing 17 changed files with 822 additions and 229 deletions.
2 changes: 1 addition & 1 deletion cmd/cli/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func flavorPluginCommand(plugins func() discovery.Plugins) *cobra.Command {
return err
}

flavorPlugin = flavor_plugin.NewClient(endpoint.Address)
flavorPlugin = flavor_plugin.NewClient(plugin.Name(*name), endpoint.Address)

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/group/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ func main() {
if err != nil {
return nil, err
}
return instance_client.NewClient(plugin.Name(n), endpoint.Address), nil
return instance_client.NewClient(n, endpoint.Address), nil
}

flavorPluginLookup := func(n plugin.Name) (flavor.Plugin, error) {
endpoint, err := plugins.Find(n)
if err != nil {
return nil, err
}
return flavor_client.NewClient(endpoint.Address), nil
return flavor_client.NewClient(n, endpoint.Address), nil
}

cli.RunPlugin(*name, group_server.PluginServer(
Expand Down
2 changes: 1 addition & 1 deletion pkg/example/flavor/combo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func main() {
if err != nil {
return nil, err
}
return flavor_rpc.NewClient(endpoint.Address), nil
return flavor_rpc.NewClient(n, endpoint.Address), nil
}

cli.SetLogLevel(*logLevel)
Expand Down
203 changes: 25 additions & 178 deletions pkg/example/flavor/swarm/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,22 @@ import (
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/docker/infrakit/pkg/plugin/group/types"
"github.com/docker/infrakit/pkg/plugin/group/util"
"github.com/docker/infrakit/pkg/spi/flavor"
"github.com/docker/infrakit/pkg/spi/instance"
"github.com/docker/infrakit/pkg/template"
"golang.org/x/net/context"
)

type nodeType string

const (
worker nodeType = "worker"
manager nodeType = "manager"
ebsAttachment string = "ebs"
ebsAttachment string = "ebs"
)

// NewSwarmFlavor creates a flavor.Plugin that creates manager and worker nodes connected in a swarm.
func NewSwarmFlavor(dockerClient client.APIClient, templ *template.Template) flavor.Plugin {
return &swarmFlavor{client: dockerClient, initScript: templ}
}

type swarmFlavor struct {
client client.APIClient
initScript *template.Template
}

type schema struct {
Type nodeType
Attachments map[instance.LogicalID][]instance.Attachment
DockerRestartCommand string
}
Expand All @@ -48,7 +37,9 @@ func parseProperties(flavorProperties json.RawMessage) (schema, error) {
return s, err
}

func validateIDsAndAttachments(logicalIDs []instance.LogicalID, attachments map[instance.LogicalID][]instance.Attachment) error {
func validateIDsAndAttachments(logicalIDs []instance.LogicalID,
attachments map[instance.LogicalID][]instance.Attachment) error {

// Each attachment association must be represented by a logical ID.
idsMap := map[instance.LogicalID]bool{}
for _, id := range logicalIDs {
Expand Down Expand Up @@ -97,49 +88,14 @@ func validateIDsAndAttachments(logicalIDs []instance.LogicalID, attachments map[
return nil
}

func (s swarmFlavor) Validate(flavorProperties json.RawMessage, allocation types.AllocationMethod) error {
properties, err := parseProperties(flavorProperties)
if err != nil {
return err
}

if properties.DockerRestartCommand == "" {
return errors.New("DockerRestartCommand must be specified")
}

switch properties.Type {
case manager:
numIDs := len(allocation.LogicalIDs)
if numIDs != 1 && numIDs != 3 && numIDs != 5 {
return errors.New("Must have 1, 3, or 5 manager logical IDs")
}
case worker:

default:
return errors.New("Unrecognized node Type")
}

if properties.Type == manager {
for _, id := range allocation.LogicalIDs {
if att, exists := properties.Attachments[id]; !exists || len(att) == 0 {
log.Warnf("LogicalID %s has no attachments, which is needed for durability", id)
}
}
}

if err := validateIDsAndAttachments(allocation.LogicalIDs, properties.Attachments); err != nil {
return err
}

return nil
}

const (
// associationTag is a machine tag added to associate machines with Swarm nodes.
associationTag = "swarm-association-id"
)

func generateInitScript(templ *template.Template, joinIP, joinToken, associationID, restartCommand string) (string, error) {
func generateInitScript(templ *template.Template,
joinIP, joinToken, associationID, restartCommand string) (string, error) {

var buffer bytes.Buffer
err := templ.Execute(&buffer, map[string]string{
"MY_IP": joinIP,
Expand All @@ -153,9 +109,25 @@ func generateInitScript(templ *template.Template, joinIP, joinToken, association
return buffer.String(), nil
}

func (s swarmFlavor) Validate(flavorProperties json.RawMessage, allocation types.AllocationMethod) error {
properties, err := parseProperties(flavorProperties)
if err != nil {
return err
}
if properties.DockerRestartCommand == "" {
return errors.New("DockerRestartCommand must be specified")
}
if err := validateIDsAndAttachments(allocation.LogicalIDs, properties.Attachments); err != nil {
return err
}
return nil
}

// Healthy determines whether an instance is healthy. This is determined by whether it has successfully joined the
// Swarm.
func (s swarmFlavor) Healthy(flavorProperties json.RawMessage, inst instance.Description) (flavor.Health, error) {
func healthy(client client.APIClient,
flavorProperties json.RawMessage, inst instance.Description) (flavor.Health, error) {

associationID, exists := inst.Tags[associationTag]
if !exists {
log.Info("Reporting unhealthy for instance without an association tag", inst.ID)
Expand All @@ -165,7 +137,7 @@ func (s swarmFlavor) Healthy(flavorProperties json.RawMessage, inst instance.Des
filter := filters.NewArgs()
filter.Add("label", fmt.Sprintf("%s=%s", associationTag, associationID))

nodes, err := s.client.NodeList(context.Background(), docker_types.NodeListOptions{Filters: filter})
nodes, err := client.NodeList(context.Background(), docker_types.NodeListOptions{Filters: filter})
if err != nil {
return flavor.Unknown, err
}
Expand All @@ -183,128 +155,3 @@ func (s swarmFlavor) Healthy(flavorProperties json.RawMessage, inst instance.Des
return flavor.Healthy, nil
}
}

func (s swarmFlavor) Drain(flavorProperties json.RawMessage, inst instance.Description) error {
properties, err := parseProperties(flavorProperties)
if err != nil {
return err
}

// Only explicitly remove worker nodes, not manager nodes. Manager nodes are assumed to have an
// attached volume for state, and fixed IP addresses. This allows them to rejoin as the same node.
if properties.Type != worker {
return nil
}

associationID, exists := inst.Tags[associationTag]
if !exists {
return fmt.Errorf("Unable to drain %s without an association tag", inst.ID)
}

filter := filters.NewArgs()
filter.Add("label", fmt.Sprintf("%s=%s", associationTag, associationID))

nodes, err := s.client.NodeList(context.Background(), docker_types.NodeListOptions{Filters: filter})
if err != nil {
return err
}

switch {
case len(nodes) == 0:
return fmt.Errorf("Unable to drain %s, not found in swarm", inst.ID)

case len(nodes) == 1:
err := s.client.NodeRemove(
context.Background(),
nodes[0].ID,
docker_types.NodeRemoveOptions{Force: true})
if err != nil {
return err
}

return nil

default:
return fmt.Errorf("Expected at most one node with label %s, but found %s", associationID, nodes)
}
}

func (s *swarmFlavor) Prepare(
flavorProperties json.RawMessage,
spec instance.Spec,
allocation types.AllocationMethod) (instance.Spec, error) {

properties, err := parseProperties(flavorProperties)
if err != nil {
return spec, err
}

swarmStatus, err := s.client.SwarmInspect(context.Background())
if err != nil {
return spec, fmt.Errorf("Failed to fetch Swarm join tokens: %s", err)
}

nodeInfo, err := s.client.Info(context.Background())
if err != nil {
return spec, fmt.Errorf("Failed to fetch node self info: %s", err)
}

self, _, err := s.client.NodeInspectWithRaw(context.Background(), nodeInfo.Swarm.NodeID)
if err != nil {
return spec, fmt.Errorf("Failed to fetch Swarm node status: %s", err)
}

if self.ManagerStatus == nil {
return spec, errors.New(
"Swarm node status did not include manager status. Need to run 'docker swarm init`?")
}

associationID := util.RandomAlphaNumericString(8)
spec.Tags[associationTag] = associationID

switch properties.Type {
case worker:
initScript, err :=
generateInitScript(
s.initScript,
self.ManagerStatus.Addr,
swarmStatus.JoinTokens.Worker,
associationID,
properties.DockerRestartCommand)
if err != nil {
return spec, err
}
spec.Init = initScript

case manager:
if spec.LogicalID == nil {
return spec, errors.New("Manager nodes require a LogicalID, " +
"which will be used as an assigned private IP address")
}

initScript, err := generateInitScript(
s.initScript,
self.ManagerStatus.Addr,
swarmStatus.JoinTokens.Manager,
associationID,
properties.DockerRestartCommand)
if err != nil {
return spec, err
}
spec.Init = initScript
default:
return spec, errors.New("Unsupported node type")
}

if spec.LogicalID != nil {
if attachments, exists := properties.Attachments[*spec.LogicalID]; exists {
spec.Attachments = append(spec.Attachments, attachments...)
}
}

// TODO(wfarner): Use the cluster UUID to scope instances for this swarm separately from instances in another
// swarm. This will require plumbing back to Scaled (membership tags).
spec.Tags["swarm-id"] = swarmStatus.ID

return spec, nil
}
34 changes: 15 additions & 19 deletions pkg/example/flavor/swarm/flavor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,46 +29,42 @@ func TestValidate(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

swarmFlavor := NewSwarmFlavor(mock_client.NewMockAPIClient(ctrl), templ())
managerFlavor := NewManagerFlavor(mock_client.NewMockAPIClient(ctrl), templ())
workerFlavor := NewWorkerFlavor(mock_client.NewMockAPIClient(ctrl), templ())

require.NoError(t, swarmFlavor.Validate(
json.RawMessage(`{"Type": "worker", "DockerRestartCommand": "systemctl restart docker"}`),
require.NoError(t, workerFlavor.Validate(
json.RawMessage(`{"DockerRestartCommand": "systemctl restart docker"}`),
types.AllocationMethod{Size: 5}))
require.NoError(t, swarmFlavor.Validate(
json.RawMessage(`{"Type": "manager", "DockerRestartCommand": "systemctl restart docker"}`),
require.NoError(t, managerFlavor.Validate(
json.RawMessage(`{"DockerRestartCommand": "systemctl restart docker"}`),
types.AllocationMethod{LogicalIDs: []instance.LogicalID{"127.0.0.1"}}))

// Logical ID with multiple attachments is allowed.
require.NoError(t, swarmFlavor.Validate(
require.NoError(t, managerFlavor.Validate(
json.RawMessage(`{
"Type": "manager",
"DockerRestartCommand": "systemctl restart docker",
"Attachments": {"127.0.0.1": [{"ID": "a", "Type": "ebs"}, {"ID": "b", "Type": "ebs"}]}}`),
types.AllocationMethod{LogicalIDs: []instance.LogicalID{"127.0.0.1"}}))

require.Error(t, swarmFlavor.Validate(json.RawMessage(`{"type": "other"}`), types.AllocationMethod{Size: 5}))

// Logical ID used more than once.
err := swarmFlavor.Validate(
json.RawMessage(`{"Type": "manager", "DockerRestartCommand": "systemctl restart docker"}`),
err := managerFlavor.Validate(
json.RawMessage(`{"DockerRestartCommand": "systemctl restart docker"}`),
types.AllocationMethod{LogicalIDs: []instance.LogicalID{"127.0.0.1", "127.0.0.1", "127.0.0.2"}})
require.Error(t, err)
require.Equal(t, "LogicalID 127.0.0.1 specified more than once", err.Error())

// Attachment cannot be associated with multiple Logical IDs.
err = swarmFlavor.Validate(
err = managerFlavor.Validate(
json.RawMessage(`{
"Type": "manager",
"DockerRestartCommand": "systemctl restart docker",
"Attachments": {"127.0.0.1": [{"ID": "a", "Type": "ebs"}], "127.0.0.2": [{"ID": "a", "Type": "ebs"}]}}`),
types.AllocationMethod{LogicalIDs: []instance.LogicalID{"127.0.0.1", "127.0.0.2", "127.0.0.3"}})
require.Error(t, err)
require.Equal(t, "Attachment a specified more than once", err.Error())

// Unsupported Attachment Type.
err = swarmFlavor.Validate(
err = managerFlavor.Validate(
json.RawMessage(`{
"Type": "manager",
"DockerRestartCommand": "systemctl restart docker",
"Attachments": {"127.0.0.1": [{"ID": "a", "Type": "keyboard"}]}}`),
types.AllocationMethod{LogicalIDs: []instance.LogicalID{"127.0.0.1"}})
Expand All @@ -82,7 +78,7 @@ func TestWorker(t *testing.T) {

client := mock_client.NewMockAPIClient(ctrl)

flavorImpl := NewSwarmFlavor(client, templ())
flavorImpl := NewWorkerFlavor(client, templ())

swarmInfo := swarm.Swarm{
ClusterInfo: swarm.ClusterInfo{ID: "ClusterUUID"},
Expand All @@ -99,7 +95,7 @@ func TestWorker(t *testing.T) {
client.EXPECT().NodeInspectWithRaw(gomock.Any(), nodeID).Return(nodeInfo, nil, nil)

details, err := flavorImpl.Prepare(
json.RawMessage(`{"Type": "worker"}`),
json.RawMessage(`{}`),
instance.Spec{Tags: map[string]string{"a": "b"}},
types.AllocationMethod{Size: 5})
require.NoError(t, err)
Expand Down Expand Up @@ -144,7 +140,7 @@ func TestManager(t *testing.T) {

client := mock_client.NewMockAPIClient(ctrl)

flavorImpl := NewSwarmFlavor(client, templ())
flavorImpl := NewManagerFlavor(client, templ())

swarmInfo := swarm.Swarm{
ClusterInfo: swarm.ClusterInfo{ID: "ClusterUUID"},
Expand All @@ -162,7 +158,7 @@ func TestManager(t *testing.T) {

id := instance.LogicalID("127.0.0.1")
details, err := flavorImpl.Prepare(
json.RawMessage(`{"Type": "manager", "Attachments": {"127.0.0.1": [{"ID": "a", "Type": "gpu"}]}}`),
json.RawMessage(`{"Attachments": {"127.0.0.1": [{"ID": "a", "Type": "gpu"}]}}`),
instance.Spec{Tags: map[string]string{"a": "b"}, LogicalID: &id},
types.AllocationMethod{LogicalIDs: []instance.LogicalID{"127.0.0.1"}})
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 89b031a

Please sign in to comment.