From 07f83a01ab25490a901d270c167b171cbf8d005e Mon Sep 17 00:00:00 2001 From: i344628 Date: Thu, 7 Jun 2018 01:00:02 +0530 Subject: [PATCH] More refactoring. Framework for external driver client. Unit tests. --- hack/fakegrpcserver/fakegrpcserver.go | 53 ++++++- pkg/grpc/infraclient/infraclient.go | 184 +++++++++++++++++++++++ pkg/grpc/infraclient/provider.go | 34 +++++ pkg/grpc/infraserver/driver.go | 84 +++++++---- pkg/grpc/infraserver/infraserver.go | 80 +++------- pkg/grpc/infraserver/infraserver_test.go | 149 ++++++++++++++++++ pkg/grpc/infraserver/suite_test.go | 29 ++++ 7 files changed, 524 insertions(+), 89 deletions(-) create mode 100644 pkg/grpc/infraclient/infraclient.go create mode 100644 pkg/grpc/infraclient/provider.go create mode 100644 pkg/grpc/infraserver/infraserver_test.go create mode 100644 pkg/grpc/infraserver/suite_test.go diff --git a/hack/fakegrpcserver/fakegrpcserver.go b/hack/fakegrpcserver/fakegrpcserver.go index 72ab8b4c0..471cf064b 100644 --- a/hack/fakegrpcserver/fakegrpcserver.go +++ b/hack/fakegrpcserver/fakegrpcserver.go @@ -1,9 +1,60 @@ package main import ( + "log" + "path" + "time" + svr "github.com/gardener/machine-controller-manager/pkg/grpc/infraserver" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/testdata" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + tls = false //"Connection uses TLS if true, else plain TCP" + certFile = "" //"The TLS cert file" + keyFile = "" //"The TLS key file" + port = 10000 //"The server port" ) func main() { - svr.StartServer() + server := &svr.ExternalDriverManager{ + Port: port, + UseTLS: tls, + } + if tls { + if certFile == "" { + certFile = testdata.Path("server1.pem") + } + if keyFile == "" { + keyFile = testdata.Path("server1.key") + } + creds, err := credentials.NewServerTLSFromFile(certFile, keyFile) + if err != nil { + log.Fatalf("Failed to generate credentials %v", err) + } + server.Options = []grpc.ServerOption{grpc.Creds(creds)} + } + + server.Start() + + // Test go routine to test end to end flow + go func() { + for { + driver, _ := server.GetDriver(metav1.TypeMeta{ + Kind: "driver", + APIVersion: path.Join("poc", "alpha"), + }) + + time.Sleep(5 * time.Second) + log.Printf("Calling create") + driver.Create("fakeDriver", "a", "b") + + time.Sleep(5 * time.Second) + log.Printf("Calling delete") + driver.Delete("fakeDriver", "a", "b") + } + }() } diff --git a/pkg/grpc/infraclient/infraclient.go b/pkg/grpc/infraclient/infraclient.go new file mode 100644 index 000000000..c95f5072c --- /dev/null +++ b/pkg/grpc/infraclient/infraclient.go @@ -0,0 +1,184 @@ +/* +Copyright (c) 2017 SAP SE or an SAP affiliate company. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package infraclient + +import ( + "context" + "io" + + pb "github.com/gardener/machine-controller-manager/pkg/grpc/infrapb" + "github.com/golang/glog" + "google.golang.org/grpc" +) + +// ExternalDriver structure mediates the communication with the machine-controller-manager +type ExternalDriver struct { + serverAddr string + options []grpc.DialOption + provider ExternalDriverProvider + connection *grpc.ClientConn + stream pb.Infragrpc_RegisterClient +} + +// NewExternalDriver creates a new Driver instance. +func NewExternalDriver(serverAddr string, options []grpc.DialOption, provider ExternalDriverProvider) *ExternalDriver { + return &ExternalDriver{ + serverAddr: serverAddr, + options: options, + provider: provider, + } +} + +// Start starts the external driver. +func (d *ExternalDriver) Start() error { + conn, err := grpc.Dial(d.serverAddr, d.options...) + if err != nil { + glog.Fatalf("fail to dial: %v", err) + return err + } + d.connection = conn + client := pb.NewInfragrpcClient(conn) + + go func() { + d.serveMCM(client) + }() + + return nil +} + +// Stop stops the external driver. +func (d *ExternalDriver) Stop() error { + stream := d.stream + //connection := d.connection + + d.stream = nil + d.connection = nil + + if stream != nil && stream.Context().Err() == nil { + stream.Send(&pb.DriverSide{ + OperationType: "unregister", + }) + stream.CloseSend() + } + var err error + /* + if connection != nil { + err = connection.Close() + } + */ + + return err +} + +func (d *ExternalDriver) serveMCM(client pb.InfragrpcClient) error { + glog.Infof("Registering with MCM...") + ctx := context.Background() + + stream, err := client.Register(ctx) + if err != nil { + glog.Fatalf("%v.Register(_) = _, %v: ", client, err) + return err + } + + d.stream = stream + + for { + in, err := stream.Recv() + if err == io.EOF { + // read done. + return err + } + if err != nil { + glog.Fatalf("Failed to receive: %v", err) + return err + } + + glog.Infof("Operation %s", in.OperationType) + opParams := in.GetOperationparams() + glog.Infof("create parameters: %v", opParams) + + resp := pb.DriverSide{} + resp.OperationID = in.OperationID + resp.OperationType = in.OperationType + + switch in.OperationType { + case "register": + machineClassType := d.provider.Register() + pMachineClassType := &machineClassType + gvk := pMachineClassType.GroupVersionKind() + resp.Response = &pb.DriverSide_RegisterResp{ + RegisterResp: &pb.DriverSideRegisterationResp{ + Name: "externalDriver", + Kind: gvk.Kind, + Group: gvk.Group, + Version: gvk.Version, + }, + } + case "create": + var machineClass *MachineClassMeta + if opParams.MachineClassMetaData != nil { + machineClass = &MachineClassMeta{ + Name: opParams.MachineClassMetaData.Name, + Revision: opParams.MachineClassMetaData.Revision, + } + } + providerID, nodename, err := d.provider.Create(machineClass, opParams.Credentials, opParams.MachineID, opParams.MachineName) + + var sErr string + if err != nil { + sErr = err.Error() + } + resp.Response = &pb.DriverSide_Createresponse{ + Createresponse: &pb.DriverSideCreateResp{ + ProviderID: providerID, + Nodename: nodename, + Error: sErr, + }, + } + case "delete": + err := d.provider.Delete(opParams.Credentials, opParams.MachineID) + var sErr string + if err != nil { + sErr = err.Error() + } + resp.Response = &pb.DriverSide_Deleteresponse{ + Deleteresponse: &pb.DriverSideDeleteResp{ + Error: sErr, + }, + } + case "list": + vms, err := d.provider.List(opParams.MachineID) + list := []string{} + + var sErr string + if err == nil { + for _, machineID := range vms { + list = append(list, machineID) + } + } else { + sErr = err.Error() + } + resp.Response = &pb.DriverSide_Listresponse{ + Listresponse: &pb.DriverSideListResp{ + List: list, + Error: sErr, + }, + } + } + + stream.Send(&resp) + } +} diff --git a/pkg/grpc/infraclient/provider.go b/pkg/grpc/infraclient/provider.go new file mode 100644 index 000000000..b88cfd53a --- /dev/null +++ b/pkg/grpc/infraclient/provider.go @@ -0,0 +1,34 @@ +/* +Copyright (c) 2017 SAP SE or an SAP affiliate company. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package infraclient + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// MachineClassMeta has metadata about the machine class. +type MachineClassMeta struct { + Name string + Revision int32 +} + +// ExternalDriverProvider interface must be implemented by the providers. +type ExternalDriverProvider interface { + Register() metav1.TypeMeta + Create(machineclass *MachineClassMeta, credentials, machineID, machineName string) (string, string, error) + Delete(credentials, machineID string) error + List(machineID string) (map[string]string, error) +} diff --git a/pkg/grpc/infraserver/driver.go b/pkg/grpc/infraserver/driver.go index 29b03dc59..5d6663162 100644 --- a/pkg/grpc/infraserver/driver.go +++ b/pkg/grpc/infraserver/driver.go @@ -3,7 +3,6 @@ package infraserver import ( "errors" "fmt" - "log" "sync/atomic" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -12,10 +11,16 @@ import ( "github.com/golang/glog" ) +// MachineClassMeta has metadata about the machine class. +type MachineClassMeta struct { + Name string + Revision int32 +} + // Driver interface mediates the communication with the external driver type Driver interface { - Create(providerName, machineclass, machineID string) (string, string, error) - Delete(providerName, machineclass, machineID string) error + Create(machineClass *MachineClassMeta, credentials, machineID, machineName string) (string, string, error) + Delete(credentials, machineID string) error } // driver also implements the interface Infragrpc_RegisterServer as a proxy to unregister the driver automatically on error during Send or Recv. @@ -50,11 +55,17 @@ func (d *driver) recv() (*pb.DriverSide, error) { } func (d *driver) close() { - close(d.stopCh) + ch := d.stopCh + if ch != nil { + d.stopCh = nil + close(ch) + } } func (d *driver) wait() { - <-d.stopCh + if d.stopCh != nil { + <-d.stopCh + } } func (d *driver) nextRequestID() int32 { @@ -68,6 +79,11 @@ func (d *driver) receiveAndDispatch() error { return err } + if msg.OperationType == "unregister" { + d.close() + return nil + } + if ch, ok := d.pendingRequests[msg.OperationID]; ok { ch <- msg } else { @@ -85,7 +101,7 @@ func (d *driver) sendAndWait(params *pb.MCMsideOperationParams, opType string) ( } if err := d.send(&msg); err != nil { - log.Fatalf("Failed to send request: %v", err) + glog.Fatalf("Failed to send request: %v", err) return nil, err } @@ -98,6 +114,7 @@ func (d *driver) sendAndWait(params *pb.MCMsideOperationParams, opType string) ( response := <-ch delete(d.pendingRequests, id) + close(ch) if response == nil { return nil, fmt.Errorf("Received nil response from driver %v", d.machineClassType) @@ -107,53 +124,60 @@ func (d *driver) sendAndWait(params *pb.MCMsideOperationParams, opType string) ( } // Create sends create request to the driver over the grpc stream -func (d *driver) Create(providerName, machineclass, machineID string) (string, string, error) { +func (d *driver) Create(machineClass *MachineClassMeta, credentials, machineID, machineName string) (string, string, error) { createParams := pb.MCMsideOperationParams{ - MachineClassMetaData: &pb.MachineClassMeta{ - Name: "fakeclass", - Revision: 1, - }, - Credentials: "fakeCredentials", - MachineID: "fakeID", - MachineName: "fakename", + Credentials: credentials, + MachineID: machineID, + MachineName: machineName, + } + if machineClass != nil { + createParams.MachineClassMetaData = &pb.MachineClassMeta{ + Name: machineClass.Name, + Revision: machineClass.Revision, + } } createResp, err := d.sendAndWait(&createParams, "create") if err != nil { - log.Fatalf("Failed to send create req: %v", err) + glog.Fatalf("Failed to send create req: %v", err) + return "", "", err } if createResp == nil { - log.Printf("nil") return "", "", fmt.Errorf("Create response empty") } + + //TODO type check response := createResp.(*pb.DriverSide_Createresponse).Createresponse - log.Printf("Create. Return: %s %s %s", response.ProviderID, response.Nodename, response.Error) - return response.ProviderID, response.Nodename, errors.New(response.Error) + glog.Infof("Create. Return: %s %s %s", response.ProviderID, response.Nodename, response.Error) + + err = nil + if response.Error != "" { + err = errors.New(response.Error) + } + + return response.ProviderID, response.Nodename, err } // Delete sends delete request to the driver over the grpc stream -func (d *driver) Delete(providerName, machineclass, machineID string) error { +func (d *driver) Delete(credentials, machineID string) error { deleteParams := pb.MCMsideOperationParams{ - MachineClassMetaData: &pb.MachineClassMeta{ - Name: "fakeclass", - Revision: 1, - }, - Credentials: "fakeCredentials", - MachineID: "fakeID", - MachineName: "fakename", + Credentials: credentials, + MachineID: machineID, } deleteResp, err := d.sendAndWait(&deleteParams, "delete") if err != nil { - log.Fatalf("Failed to send delete req: %v", err) + return err } if deleteResp == nil { - log.Printf("nil") return fmt.Errorf("Delete response empty") } response := deleteResp.(*pb.DriverSide_Deleteresponse).Deleteresponse - log.Printf("Delete Return: %s", response.Error) - return errors.New(response.Error) + glog.Infof("Delete Return: %s", response.Error) + if response.Error != "" { + return errors.New(response.Error) + } + return nil } diff --git a/pkg/grpc/infraserver/infraserver.go b/pkg/grpc/infraserver/infraserver.go index ac8890932..4c7f6797a 100644 --- a/pkg/grpc/infraserver/infraserver.go +++ b/pkg/grpc/infraserver/infraserver.go @@ -4,40 +4,30 @@ import ( "context" "fmt" "io" - "log" "net" "path" - "time" "google.golang.org/grpc" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/testdata" - pb "github.com/gardener/machine-controller-manager/pkg/grpc/infrapb" "github.com/golang/glog" ) -var ( - tls = false //"Connection uses TLS if true, else plain TCP" - certFile = "" //"The TLS cert file" - keyFile = "" //"The TLS key file" - port = 10000 //"The server port" -) - // ExternalDriverManager manages the registered drivers. type ExternalDriverManager struct { // a map of machine class type to the corresponding driver. - drivers map[metav1.TypeMeta]*driver + drivers map[metav1.TypeMeta]*driver + Port int + Options []grpc.ServerOption + grpcServer *grpc.Server } //GetDriver gets a registered and working driver stream for the given machine class type. func (s *ExternalDriverManager) GetDriver(machineClassType metav1.TypeMeta) (Driver, error) { driver := s.drivers[machineClassType] if driver == nil { - log.Printf("No driver available for machine class type %s", machineClassType) - return nil, nil + return nil, fmt.Errorf("No driver available for machine class type %s", machineClassType) } stream := driver.stream @@ -54,7 +44,7 @@ func (s *ExternalDriverManager) GetDriver(machineClassType metav1.TypeMeta) (Dri func (s *ExternalDriverManager) registerDriver(machineClassType metav1.TypeMeta, stream pb.Infragrpc_RegisterServer) (*driver, error) { if stream == nil { - return nil, fmt.Errorf("Cannot register invalid driver stream for machine class type %s", machineClassType) + return nil, fmt.Errorf("Cannot register invalid driver stream for machine class type %v", machineClassType) } err := stream.Context().Err() @@ -63,13 +53,11 @@ func (s *ExternalDriverManager) registerDriver(machineClassType metav1.TypeMeta, } d, err := s.GetDriver(machineClassType) - if err != nil { - return nil, err - } else if d != nil { - return nil, fmt.Errorf("Driver for machineClassType %s already registered", machineClassType) + if err == nil && d != nil { + return nil, fmt.Errorf("Driver for machineClassType %v already registered", machineClassType) } - log.Printf("Registering new driver") + glog.Infof("Registering new driver") stopCh := make(chan interface{}) newDriver := &driver{ @@ -84,6 +72,8 @@ func (s *ExternalDriverManager) registerDriver(machineClassType metav1.TypeMeta, } s.drivers[machineClassType] = newDriver + glog.Infof("Registered new driver %v", machineClassType) + go func() { <-stopCh @@ -103,6 +93,7 @@ func (s *ExternalDriverManager) Stop() { for _, driver := range s.drivers { driver.close() } + s.grpcServer.Stop() } // Register Requests driver to send it's details, and sets up stream @@ -125,7 +116,7 @@ func (s *ExternalDriverManager) Register(stream pb.Infragrpc_RegisterServer) err return err } if err != nil { - log.Printf("received error %v", err) + glog.Warningf("received error %v", err) return err } @@ -155,45 +146,18 @@ func (s *ExternalDriverManager) GetMachineClass(ctx context.Context, in *pb.Mach return nil, nil } -// StartServer start the grpc server -func StartServer() { - lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) +// Start start the grpc server +func (s *ExternalDriverManager) Start() { + lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", s.Port)) if err != nil { - log.Fatalf("failed to listen: %v", err) - } - var opts []grpc.ServerOption - if tls { - if certFile == "" { - certFile = testdata.Path("server1.pem") - } - if keyFile == "" { - keyFile = testdata.Path("server1.key") - } - creds, err := credentials.NewServerTLSFromFile(certFile, keyFile) - if err != nil { - log.Fatalf("Failed to generate credentials %v", err) - } - opts = []grpc.ServerOption{grpc.Creds(creds)} + glog.Fatalf("failed to listen: %v", err) } - driverManager := &ExternalDriverManager{} - // Test go routine to test end to end flow + glog.Infof("Starting grpc server...") + s.grpcServer = grpc.NewServer(s.Options...) + pb.RegisterInfragrpcServer(s.grpcServer, s) + go func() { - for { - for _, driver := range driverManager.drivers { - time.Sleep(5 * time.Second) - log.Printf("Calling create") - driver.Create("fakeDriver", "a", "b") - - time.Sleep(5 * time.Second) - log.Printf("Calling delete") - driver.Delete("fakeDriver", "a", "b") - } - } + s.grpcServer.Serve(lis) }() - - log.Printf("Starting grpc server") - grpcServer := grpc.NewServer(opts...) - pb.RegisterInfragrpcServer(grpcServer, driverManager) - grpcServer.Serve(lis) } diff --git a/pkg/grpc/infraserver/infraserver_test.go b/pkg/grpc/infraserver/infraserver_test.go new file mode 100644 index 000000000..0cabc9633 --- /dev/null +++ b/pkg/grpc/infraserver/infraserver_test.go @@ -0,0 +1,149 @@ +/* +Copyright (c) 2017 SAP SE or an SAP affiliate company. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package infraserver + +import ( + "fmt" + "path" + "time" + + "github.com/gardener/machine-controller-manager/pkg/grpc/infraclient" + "github.com/golang/glog" + . "github.com/onsi/ginkgo" + . "github.com/onsi/ginkgo/extensions/table" + . "github.com/onsi/gomega" + "google.golang.org/grpc" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type testdata struct { + machineClass *MachineClassMeta + credentials string + machineID string + machineName string + providerID string + nodeName string + err error +} + +var _ = Describe("ExternalDriverManager", func() { + DescribeTable("##Start", + func(machineClassType *metav1.TypeMeta, creates, deletes []*testdata) { + server := &ExternalDriverManager{ + Port: 50000, + } + + defer server.Stop() + server.Start() + + fakeDriverProvider := &fakeExternalDriverProvider{ + machineClassType: machineClassType, + creates: creates, + deletes: deletes, + } + externalDriver := infraclient.NewExternalDriver("localhost:50000", []grpc.DialOption{ + grpc.WithInsecure(), + }, fakeDriverProvider) + + defer externalDriver.Stop() + externalDriver.Start() + + var ( + driver Driver + err error + ) + for i := 0; i < 1; i++ { + time.Sleep(2 * time.Second) + driver, err = server.GetDriver(*machineClassType) + glog.Infof("%v", err) + if err == nil { + break + } + } + + Expect(err).To(BeNil()) + Expect(driver).To(Not(BeNil())) + + for _, t := range creates { + providerID, nodeName, err := driver.Create(t.machineClass, t.credentials, t.machineID, t.machineName) + Expect(providerID).To(BeEquivalentTo(t.providerID)) + Expect(nodeName).To(BeEquivalentTo(t.nodeName)) + if t.err == nil { + Expect(err).To(BeNil()) + } else { + Expect(err.Error()).To(BeEquivalentTo(t.err.Error())) + } + } + + for _, t := range deletes { + err := driver.Delete(t.credentials, t.machineID) + if t.err == nil { + Expect(err).To(BeNil()) + } else { + Expect(err.Error()).To(BeEquivalentTo(t.err.Error())) + } + } + }, + Entry("happy path", &metav1.TypeMeta{ + Kind: "driver", + APIVersion: path.Join("poc", "alpha"), + }, []*testdata{&testdata{ + credentials: "c", + machineID: "a", + machineName: "b", + providerID: "fakeID", + nodeName: "fakename", + err: nil, + }}, []*testdata{&testdata{ + credentials: "c", + machineID: "a", + err: nil, + }}), + ) +}) + +type fakeExternalDriverProvider struct { + machineClassType *metav1.TypeMeta + creates []*testdata + deletes []*testdata +} + +func (f *fakeExternalDriverProvider) Register() metav1.TypeMeta { + return *f.machineClassType +} + +func (f *fakeExternalDriverProvider) Create(machineclass *infraclient.MachineClassMeta, credentials, machineID, machineName string) (string, string, error) { + for _, t := range f.creates { + if t.machineID == machineID { + return t.providerID, t.nodeName, t.err + } + } + return "", "", fmt.Errorf("No fake data found for %v", machineID) +} + +func (f *fakeExternalDriverProvider) Delete(credentials, machineID string) error { + for _, t := range f.creates { + if t.machineID == machineID { + return t.err + } + } + return fmt.Errorf("No fake data found for %v", machineID) +} + +func (f *fakeExternalDriverProvider) List(machineID string) (map[string]string, error) { + //TODO + return nil, nil +} diff --git a/pkg/grpc/infraserver/suite_test.go b/pkg/grpc/infraserver/suite_test.go new file mode 100644 index 000000000..13c1e7984 --- /dev/null +++ b/pkg/grpc/infraserver/suite_test.go @@ -0,0 +1,29 @@ +/* +Copyright (c) 2017 SAP SE or an SAP affiliate company. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package infraserver + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" +) + +func TestMachineControllerManagerSuite(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Machine Controller Manager External Driver suite") +}