Skip to content

Commit

Permalink
Merge pull request #24 from scality/feature/COSI-44-instroduce-rpc-fa…
Browse files Browse the repository at this point in the history
…ctory

COSI-44: Integrate Local gRPC Provisioner Server, and Refactor COSI Driver for Improved Maintainability and Observability
  • Loading branch information
anurag4DSB authored Nov 21, 2024
2 parents b7f5bc2 + 9214660 commit da6e298
Show file tree
Hide file tree
Showing 15 changed files with 682 additions and 8 deletions.
4 changes: 2 additions & 2 deletions cmd/scality-cosi-driver/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/scality/cosi-driver/pkg/driver"
"k8s.io/klog/v2"

"sigs.k8s.io/container-object-storage-interface-provisioner-sidecar/pkg/provisioner"
"github.com/scality/cosi-driver/pkg/grpcfactory"
)

const (
Expand Down Expand Up @@ -60,7 +60,7 @@ func run(ctx context.Context) error {
return fmt.Errorf("failed to initialize Scality driver: %w", err)
}

server, err := provisioner.NewDefaultCOSIProvisionerServer(*driverAddress, identityServer, bucketProvisioner)
server, err := grpcfactory.NewDefaultCOSIProvisionerServer(*driverAddress, identityServer, bucketProvisioner)
if err != nil {
return fmt.Errorf("failed to start the provisioner server: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ require (
k8s.io/client-go v0.31.2
k8s.io/klog/v2 v2.130.1
sigs.k8s.io/container-object-storage-interface-api v0.1.0
sigs.k8s.io/container-object-storage-interface-provisioner-sidecar v0.1.0
sigs.k8s.io/container-object-storage-interface-spec v0.1.0
)

Expand Down Expand Up @@ -57,7 +56,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/errors v0.9.1
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,6 @@ k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
sigs.k8s.io/container-object-storage-interface-api v0.1.0 h1:8tB6JFQhbQIC1hwGQ+q4+tmSSNfjKemb7bFI6C0CK/4=
sigs.k8s.io/container-object-storage-interface-api v0.1.0/go.mod h1:YiB+i/UGkzqgODDhRG3u7jkbWkQcoUeLEJ7hwOT/2Qk=
sigs.k8s.io/container-object-storage-interface-provisioner-sidecar v0.1.0 h1:S5Qh/VAd745a2vMyZfK6qLiWJxvGpbUOddBtEVX1nU4=
sigs.k8s.io/container-object-storage-interface-provisioner-sidecar v0.1.0/go.mod h1:JhfV605PePyAvL4F8wTjJ9ZiSAaGkrMmpn0liMPftJ4=
sigs.k8s.io/container-object-storage-interface-spec v0.1.0 h1:WHeei3OywFyebPwBkVUuuV1SuGjG6Qm4BBmnfFTVa1Y=
sigs.k8s.io/container-object-storage-interface-spec v0.1.0/go.mod h1:SzF/yVSh88TgYdBOAXqhT96XjU8pCQtoeQKxzIOOmWQ=
sigs.k8s.io/controller-runtime v0.12.3 h1:FCM8xeY/FI8hoAfh/V4XbbYMY20gElh9yh+A98usMio=
Expand Down
4 changes: 2 additions & 2 deletions pkg/driver/driver_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
. "github.com/onsi/gomega"
)

func TestCosiDev(t *testing.T) {
func TestDriverSuite(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Driver Suite")
RunSpecs(t, "Driver Test Suite")
}
File renamed without changes.
File renamed without changes.
32 changes: 32 additions & 0 deletions pkg/grpcfactory/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copyright 2024 Scality, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package grpcfactory

import (
"google.golang.org/grpc"
cosi "sigs.k8s.io/container-object-storage-interface-spec"
)

var (
_ cosi.IdentityClient = &COSIProvisionerClient{}
_ cosi.ProvisionerClient = &COSIProvisionerClient{}
)

type COSIProvisionerClient struct {
address string
conn *grpc.ClientConn
cosi.IdentityClient
cosi.ProvisionerClient
}
130 changes: 130 additions & 0 deletions pkg/grpcfactory/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package grpcfactory_test

import (
"context"
"net"
"os"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/scality/cosi-driver/pkg/grpcfactory"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
cosi "sigs.k8s.io/container-object-storage-interface-spec"
)

type MockIdentityServer struct {
cosi.UnimplementedIdentityServer
}

type MockProvisionerServer struct {
cosi.UnimplementedProvisionerServer
}

var _ = Describe("gRPC Factory Client", func() {
var (
client *grpcfactory.COSIProvisionerClient
grpcServer *grpc.Server
listener net.Listener
address string
)

BeforeEach(func() {
address = "unix:///tmp/test.sock"
grpcServer = grpc.NewServer()

// Remove any existing socket file to avoid "address already in use" errors
_ = os.Remove(address[7:])

// Create the listener
var err error
listener, err = net.Listen("unix", address[7:])
Expect(err).NotTo(HaveOccurred(), "Failed to create Unix listener for gRPC server")

// Register mock servers
cosi.RegisterIdentityServer(grpcServer, &MockIdentityServer{})
cosi.RegisterProvisionerServer(grpcServer, &MockProvisionerServer{})

// Start the gRPC server in a separate goroutine
go func() {
err := grpcServer.Serve(listener)
if err != nil && err != grpc.ErrServerStopped {
GinkgoWriter.Println("gRPC server encountered an error:", err)
}
}()
})

AfterEach(func() {
// Stop the gRPC server and close the listener
grpcServer.Stop()
if listener != nil {
listener.Close()
}
// Remove the Unix socket file to clean up after each test
_ = os.Remove(address[7:])
})

Describe("Initialization and Connection", func() {
It("should initialize and connect COSIProvisionerClient", func() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

// Add insecure credentials to the dial options for Unix socket
dialOpts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}

var err error
client, err = grpcfactory.NewCOSIProvisionerClient(ctx, address, dialOpts, nil)
Expect(err).NotTo(HaveOccurred())
Expect(client).NotTo(BeNil())
Expect(client.IdentityClient).NotTo(BeNil())
Expect(client.ProvisionerClient).NotTo(BeNil())
})
})

Describe("Interface Implementation", func() {
It("should implement cosi.IdentityClient and cosi.ProvisionerClient interfaces", func() {
client = &grpcfactory.COSIProvisionerClient{
IdentityClient: cosi.NewIdentityClient(nil),
ProvisionerClient: cosi.NewProvisionerClient(nil),
}

var _ cosi.IdentityClient = client
var _ cosi.ProvisionerClient = client
})
})

Describe("Interceptor Usage", func() {
It("should use ApiLogger as an interceptor if debug is true", func() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

client, err := grpcfactory.NewDefaultCOSIProvisionerClient(ctx, address, true)
Expect(err).NotTo(HaveOccurred())
Expect(client).NotTo(BeNil())
})

It("should initialize without interceptors if debug is false", func() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

client, err := grpcfactory.NewDefaultCOSIProvisionerClient(ctx, address, false)
Expect(err).NotTo(HaveOccurred())
Expect(client).NotTo(BeNil())
})
})

Describe("Error Handling", func() {
It("should return an error if given an invalid address", func() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

// Attempt to connect using an invalid address format
_, err := grpcfactory.NewCOSIProvisionerClient(ctx, "invalid-address", nil, nil)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("unsupported scheme"))
})
})
})
107 changes: 107 additions & 0 deletions pkg/grpcfactory/grpc_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
Copyright 2024 Scality, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package grpcfactory

import (
"context"
"fmt"
"net/url"
"time"

"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/klog/v2"
cosi "sigs.k8s.io/container-object-storage-interface-spec"
)

const (
maxGrpcBackoff = 5 * 30 * time.Second
grpcDialTimeout = 30 * time.Second
)

func NewDefaultCOSIProvisionerClient(ctx context.Context, address string, debug bool) (*COSIProvisionerClient, error) {
backoffConfiguration := backoff.DefaultConfig
backoffConfiguration.MaxDelay = maxGrpcBackoff
dialOpts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()), // strictly restricting to local Unix domain socket
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoffConfiguration,
MinConnectTimeout: grpcDialTimeout,
}),
// Note: grpc.WithBlock() is deprecated; we proceed without it
}
interceptors := []grpc.UnaryClientInterceptor{}
if debug {
interceptors = append(interceptors, ApiLogger)
}
return NewCOSIProvisionerClient(ctx, address, dialOpts, interceptors)
}

// NewCOSIProvisionerClient creates a new GRPCClient that only supports unix domain sockets
func NewCOSIProvisionerClient(ctx context.Context, address string, dialOpts []grpc.DialOption, interceptors []grpc.UnaryClientInterceptor) (*COSIProvisionerClient, error) {
addr, err := url.Parse(address)
if err != nil {
return nil, err
}
if addr.Scheme != "unix" {
err := fmt.Errorf("unsupported scheme: expected 'unix', found '%s'", addr.Scheme)
klog.ErrorS(err, "Invalid address scheme")
return nil, err
}
if len(interceptors) > 0 {
dialOpts = append(dialOpts, grpc.WithChainUnaryInterceptor(interceptors...))
}
// Proceed without grpc.WithBlock(), allowing connection to establish asynchronously
conn, err := grpc.Dial(address, dialOpts...)
if err != nil {
klog.ErrorS(err, "Connection failed", "address", address)
return nil, err
}
return &COSIProvisionerClient{
address: address,
conn: conn,
IdentityClient: cosi.NewIdentityClient(conn),
ProvisionerClient: cosi.NewProvisionerClient(conn),
}, nil
}
func NewDefaultCOSIProvisionerServer(address string,
identityServer cosi.IdentityServer,
provisionerServer cosi.ProvisionerServer) (*COSIProvisionerServer, error) {
return NewCOSIProvisionerServer(address, identityServer, provisionerServer, []grpc.ServerOption{})
}
func NewCOSIProvisionerServer(address string,
identityServer cosi.IdentityServer,
provisionerServer cosi.ProvisionerServer,
listenOpts []grpc.ServerOption) (*COSIProvisionerServer, error) {
if identityServer == nil {
err := errors.New("Identity server cannot be nil")
klog.ErrorS(err, "Invalid argument")
return nil, err
}
if provisionerServer == nil {
err := errors.New("Provisioner server cannot be nil")
klog.ErrorS(err, "Invalid argument")
return nil, err
}
return &COSIProvisionerServer{
address: address,
identityServer: identityServer,
provisionerServer: provisionerServer,
listenOpts: listenOpts,
}, nil
}
13 changes: 13 additions & 0 deletions pkg/grpcfactory/grpc_factory_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package grpcfactory_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestGRPCFactorySuite(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "gRPC Factory Test Suite")
}
Loading

0 comments on commit da6e298

Please sign in to comment.