Skip to content

Commit

Permalink
Added running modes, reusing kubelet-registration-path in the exec mode
Browse files Browse the repository at this point in the history
  • Loading branch information
mauriciopoppe committed Jul 29, 2021
1 parent 5387d2b commit 58bd237
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 10 deletions.
55 changes: 51 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,23 @@ There are two UNIX domain sockets used by the node-driver-registrar:
operations (e.g. `/csi/csi.sock`).
* `--kubelet-registration-path`: This is the path to the CSI driver socket on
the host node that kubelet will use to issue CSI operations (e.g.
`/var/lib/kubelet/plugins/<drivername.example.com>/csi.sock). Note this is NOT
`/var/lib/kubelet/plugins/<drivername.example.com>/csi.sock`). Note this is NOT
the path to the registration socket.

### Optional arguments

* `--http-endpoint`: "The TCP network address where the HTTP server for diagnostics, including
* `--http-endpoint`: The TCP network address where the HTTP server for diagnostics, including
the health check indicating whether the registration socket exists, will listen (example:
`:8080`). The default is empty string, which means the server is disabled.

* `--health-port`: (deprecated) This is the port of the health check server for the
node-driver-registrar, which checks if the registration socket exists. A value <= 0 disables
node-driver-registrar, which checks if the registration socket exists. A value &lt;= 0 disables
the server. Server is disabled by default.

* `--timeout <duration>`: Timeout of all calls to CSI driver. It should be set to a value that accommodates the `GetDriverName` calls. 1 second is used by default.

* `--mode <mode>` (default: `--mode=registration`): The running mode of node-driver-registrar. `registration` runs node-driver-registrar as a long running process. `kubelet-registration-probe` runs as a health check and returns a status code of 0 if the driver was registered successfully, in the probe definition make sure that the value of `--kubelet-registration-path` is the same as in the container.

### Required permissions

The node-driver-registrar does not interact with the Kubernetes API, so no RBAC
Expand All @@ -76,11 +78,56 @@ permissions to:
* Access the registration socket (typically in `/var/lib/kubelet/plugins_registry/`).
* Used by the `node-driver-registrar` to register the driver with kubelet.

### Health Check
### Health Check with the http server

If `--http-endpoint` is set, the node-driver-registrar exposes a health check endpoint at the
specified address and the path `/healthz`, indicating whether the registration socket exists.

### Health Check with an exec probe

If `--mode=kubelet-registration-probe` node-driver-registrar can act as a probe checking that kubelet has registered the driver meaning that the kubelet plugin registration succeeded, this is useful to detect if the registration got stuck as seen in issue [#143](https://github.com/kubernetes-csi/node-driver-registrar/issues/143)

The value of `--kubelet-registration-path` must be the same as the one set in the container args, `--csi-address` is not required in this mode, for example:

**Linux**

```yaml
containers:
- name: csi-driver-registrar
image: k8s.gcr.io/sig-storage/csi-node-driver-registrar:v2.3.0
args:
- "--v=5"
- "--csi-address=/csi/csi.sock"
- "--kubelet-registration-path=/var/lib/kubelet/plugins/pd.csi.storage.gke.io/csi.sock"
livenessProbe:
exec:
command:
- /csi-node-driver-registrar
- --kubelet-registration-path=/var/lib/kubelet/plugins/pd.csi.storage.gke.io/csi.sock
- --mode=kubelet-registration-probe
initialDelaySeconds: 3
```
**Windows**
```yaml
containers:
- name: csi-driver-registrar
image: k8s.gcr.io/sig-storage/csi-node-driver-registrar:v2.3.0
args:
- --v=5
- --csi-address=unix://C:\\csi\\csi.sock
- --kubelet-registration-path=C:\\var\\lib\\kubelet\\plugins\\pd.csi.storage.gke.io\\csi.sock
livenessProbe:
exec:
command:
- /csi-node-driver-registrar.exe
- --kubelet-registration-path=C:\\var\\lib\\kubelet\\plugins\\pd.csi.storage.gke.io\\csi.sock
- --mode=kubelet-registration-probe
initialDelaySeconds: 3
```
Related issue [#143](https://github.com/kubernetes-csi/node-driver-registrar/issues/143)
### Example
Here is an example sidecar spec in the driver DaemonSet. `<drivername.example.com>` should be replaced by
Expand Down
62 changes: 57 additions & 5 deletions cmd/csi-node-driver-registrar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"flag"
"fmt"
"os"
"path/filepath"
"strconv"
"time"

"github.com/kubernetes-csi/csi-lib-utils/metrics"
"github.com/kubernetes-csi/node-driver-registrar/pkg/util"
"k8s.io/klog/v2"

"github.com/kubernetes-csi/csi-lib-utils/connection"
Expand All @@ -41,6 +43,21 @@ const (
sleepDuration = 2 * time.Minute
)

const (
// ModeRegistration runs node-driver-registrar as a long running process
ModeRegistration = "registration"

// ModeKubeletRegistrationProbe makes node-driver-registrar act as an exec probe
// that checks if the kubelet plugin registration succeded.
ModeKubeletRegistrationProbe = "kubelet-registration-probe"
)

var (
// The registration probe path, set when the program runs and used as the path of the file
// to create when the kubelet plugin registration succeeds.
registrationProbePath = ""
)

// Command line flags
var (
connectionTimeout = flag.Duration("connection-timeout", 0, "The --connection-timeout flag is deprecated")
Expand All @@ -51,7 +68,10 @@ var (
healthzPort = flag.Int("health-port", 0, "(deprecated) TCP port for healthz requests. Set to 0 to disable the healthz server. Only one of `--health-port` and `--http-endpoint` can be set.")
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including the health check indicating whether the registration socket exists, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--health-port` and `--http-endpoint` can be set.")
showVersion = flag.Bool("version", false, "Show version.")
version = "unknown"
mode = flag.String("mode", ModeRegistration, `The running mode of node-driver-registrar. "registration" runs node-driver-registrar as a long running process. "kubelet-registration-probe" runs as a health check and returns a status code of 0 if the driver was registered successfully, in the probe definition make sure that the value of --kubelet-registration-path is the same as in the container.`)

// Set during compilation time
version = "unknown"

// List of supported versions
supportedVersions = []string{"1.0.0"}
Expand All @@ -78,6 +98,14 @@ func newRegistrationServer(driverName string, endpoint string, versions []string
// GetInfo is the RPC invoked by plugin watcher
func (e registrationServer) GetInfo(ctx context.Context, req *registerapi.InfoRequest) (*registerapi.PluginInfo, error) {
klog.Infof("Received GetInfo call: %+v", req)

// on successful registration, create the registration probe file
err := util.TouchFile(registrationProbePath)
if err != nil {
klog.ErrorS(err, "Failed to create registration probe file", "registrationProbePath", registrationProbePath)
}
klog.InfoS("Kubelet registration probe created", "path", registrationProbePath)

return &registerapi.PluginInfo{
Type: registerapi.CSIPlugin,
Name: e.driverName,
Expand All @@ -96,21 +124,45 @@ func (e registrationServer) NotifyRegistrationStatus(ctx context.Context, status
return &registerapi.RegistrationStatusResponse{}, nil
}

func modeIsKubeletRegistrationProbe() bool {
return *mode == ModeKubeletRegistrationProbe
}

func main() {
klog.InitFlags(nil)
flag.Set("logtostderr", "true")
flag.Parse()

if *showVersion {
fmt.Println(os.Args[0], version)
return
}

if *kubeletRegistrationPath == "" {
klog.Error("kubelet-registration-path is a required parameter")
os.Exit(1)
}

if *showVersion {
fmt.Println(os.Args[0], version)
return
// set after we made sure that *kubeletRegistrationPath exists
kubeletRegistrationPathDir := filepath.Dir(*kubeletRegistrationPath)
registrationProbePath = filepath.Join(kubeletRegistrationPathDir, "registration")

// with the mode kubelet-registration-probe
if modeIsKubeletRegistrationProbe() {
lockfileExists, err := util.DoesFileExist(registrationProbePath)
if err != nil {
fmt.Printf("Failed to check if registration path exists, registrationProbePath=%s err=%v", registrationProbePath, err)
os.Exit(1)
}
if !lockfileExists {
fmt.Printf("Kubelet plugin registration hasn't succeeded yet, file=%s doesn't exist.", registrationProbePath)
os.Exit(1)
}
fmt.Printf("Kubelet plugin registration succeeded.")
os.Exit(0)
}

klog.Infof("Version: %s", version)
klog.Infof("Running node-driver-registrar in mode=%s", *mode)

if *healthzPort > 0 && *httpEndpoint != "" {
klog.Error("only one of `--health-port` and `--http-endpoint` can be set.")
Expand Down
10 changes: 9 additions & 1 deletion cmd/csi-node-driver-registrar/node_register.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func nodeRegister(csiDriverName, httpEndpoint string) {
}
klog.Infof("Registration Server started at: %s\n", socketPath)
grpcServer := grpc.NewServer()

// Before registing node-driver-registrar with the kubelet ensure that the lockfile doesn't exist
// a lockfile may exist because the container was forcefully shutdown
util.CleanupFile(registrationProbePath)

// Registers kubelet plugin watcher api.
registerapi.RegisterRegistrationServer(grpcServer, registrar)

Expand All @@ -70,7 +75,10 @@ func nodeRegister(csiDriverName, httpEndpoint string) {
klog.Errorf("Registration Server stopped serving: %v", err)
os.Exit(1)
}
// If gRPC server is gracefully shutdown, exit

// clean the file on graceful shutdown
util.CleanupFile(registrationProbePath)
// If gRPC server is gracefully shutdown, cleanup and exit
os.Exit(0)
}

Expand Down
46 changes: 46 additions & 0 deletions pkg/util/util_linux.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build linux
// +build linux

/*
Expand All @@ -21,6 +22,7 @@ package util
import (
"fmt"
"os"
"path/filepath"

"golang.org/x/sys/unix"
)
Expand Down Expand Up @@ -55,3 +57,47 @@ func DoesSocketExist(socketPath string) (bool, error) {
}
return false, nil
}

func CleanupFile(filePath string) error {
fileExists, err := DoesFileExist(filePath)
if err != nil {
return err
}
if fileExists {
if err := os.Remove(filePath); err != nil {
return fmt.Errorf("failed to remove stale file=%s with error: %+v", filePath, err)
}
}
return nil
}

func DoesFileExist(filePath string) (bool, error) {
info, err := os.Stat(filePath)
if err == nil {
return info.Mode().IsRegular(), nil
}
if err != nil && !os.IsNotExist(err) {
return false, fmt.Errorf("Failed to stat the file=%s with error: %+v", filePath, err)
}
return false, nil
}

func TouchFile(filePath string) error {
exists, err := DoesFileExist(filePath)
if err != nil {
return err
}
if !exists {
err := os.MkdirAll(filepath.Dir(filePath), 0755)
if err != nil {
return err
}

file, err := os.Create(filePath)
if err != nil {
return err
}
file.Close()
}
return nil
}
39 changes: 39 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
)

var socketFileName = "reg.sock"
var kubeletRegistrationPath = "/var/lib/kubelet/plugins/csi-dummy/registration"

// TestSocketFileDoesNotExist - Test1: file does not exist. So clean up should be successful.
func TestSocketFileDoesNotExist(t *testing.T) {
Expand Down Expand Up @@ -173,3 +174,41 @@ func TestSocketRegularFile(t *testing.T) {
}
}
}

// TestTouchFile creates a file if it doesn't exist
func TestTouchFile(t *testing.T) {
// Create a temp directory
testDir, err := utiltesting.MkTmpdir("csi-test")
if err != nil {
t.Fatalf("could not create temp dir: %v", err)
}
defer os.RemoveAll(testDir)

filePath := filepath.Join(testDir, kubeletRegistrationPath)
fileExists, err := DoesFileExist(filePath)
if err != nil {
t.Fatalf("Failed to execute file exist: %+v", err)
}
if fileExists {
t.Fatalf("File %s must not exist", filePath)
}

// returns an error only if it failed to clean the file, not if the file didn't exist
err = CleanupFile(filePath)
if err != nil {
t.Fatalf("Failed to execute file cleanup: %+v", err)
}

err = TouchFile(filePath)
if err != nil {
t.Fatalf("Failed to execute file touch: %+v", err)
}

fileExists, err = DoesFileExist(filePath)
if err != nil {
t.Fatalf("Failed to execute file exist: %+v", err)
}
if !fileExists {
t.Fatalf("File %s must exist", filePath)
}
}
46 changes: 46 additions & 0 deletions pkg/util/util_windows.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build windows
// +build windows

/*
Expand All @@ -22,6 +23,7 @@ import (
"errors"
"fmt"
"os"
"path/filepath"
)

func Umask(mask int) (int, error) {
Expand Down Expand Up @@ -53,3 +55,47 @@ func DoesSocketExist(socketPath string) (bool, error) {
}
return true, nil
}

func CleanupFile(filePath string) error {
fileExists, err := DoesFileExist(filePath)
if err != nil {
return err
}
if fileExists {
if err := os.Remove(filePath); err != nil {
return fmt.Errorf("failed to remove stale file=%s with error: %+v", filePath, err)
}
}
return nil
}

func DoesFileExist(filePath string) (bool, error) {
info, err := os.Lstat(filePath)
if err == nil {
return info.Mode().IsRegular(), nil
}
if err != nil && !os.IsNotExist(err) {
return false, fmt.Errorf("Failed to stat the file=%s with error: %+v", filePath, err)
}
return false, nil
}

func TouchFile(filePath string) error {
exists, err := DoesFileExist(filePath)
if err != nil {
return err
}
if !exists {
err := os.MkdirAll(filepath.Dir(filePath), 0755)
if err != nil {
return err
}

file, err := os.Create(filePath)
if err != nil {
return err
}
file.Close()
}
return nil
}

0 comments on commit 58bd237

Please sign in to comment.