Skip to content

Commit

Permalink
Support structured logging
Browse files Browse the repository at this point in the history
  • Loading branch information
rlia committed Dec 18, 2023
1 parent a2cca0e commit dc23d0e
Show file tree
Hide file tree
Showing 357 changed files with 73,390 additions and 41 deletions.
52 changes: 33 additions & 19 deletions cmd/csi-node-driver-registrar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
_ "net/http/pprof"
Expand All @@ -29,6 +30,10 @@ import (

"github.com/kubernetes-csi/csi-lib-utils/connection"
csirpc "github.com/kubernetes-csi/csi-lib-utils/rpc"
"k8s.io/component-base/featuregate"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
_ "k8s.io/component-base/logs/json/register"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
)

Expand Down Expand Up @@ -87,7 +92,8 @@ func newRegistrationServer(driverName string, endpoint string, versions []string

// GetInfo is the RPC invoked by plugin watcher
func (e registrationServer) GetInfo(ctx context.Context, req *registerapi.InfoRequest) (*registerapi.PluginInfo, error) {
klog.Infof("Received GetInfo call: %+v", req)
logger := klog.FromContext(ctx)
logger.Info("Received GetInfo call", "request", req)

return &registerapi.PluginInfo{
Type: registerapi.CSIPlugin,
Expand All @@ -98,9 +104,10 @@ func (e registrationServer) GetInfo(ctx context.Context, req *registerapi.InfoRe
}

func (e registrationServer) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) {
klog.Infof("Received NotifyRegistrationStatus call: %+v", status)
logger := klog.FromContext(ctx)
logger.Info("Received NotifyRegistrationStatus call", "status", status)
if !status.PluginRegistered {
klog.Errorf("Registration process failed with error: %+v, restarting registration container.", status.Error)
logger.Error(errors.New(status.Error), "Registration process failed with error, restarting registration container")
os.Exit(1)
}

Expand All @@ -112,18 +119,25 @@ func modeIsKubeletRegistrationProbe() bool {
}

func main() {
klog.InitFlags(nil)
flag.Set("logtostderr", "true")
fg := featuregate.NewFeatureGate()
logsapi.AddFeatureGates(fg)
c := logsapi.NewLoggingConfiguration()
logsapi.AddGoFlags(c, flag.CommandLine)
logs.InitLogs()
flag.Parse()
if err := logsapi.ValidateAndApply(c, fg); err != nil {
klog.ErrorS(err, "LoggingConfiguration is invalid")
os.Exit(1)
}

if *showVersion {
fmt.Println(os.Args[0], version)
return
}

if *kubeletRegistrationPath == "" {
klog.Error("kubelet-registration-path is a required parameter")
os.Exit(1)
klog.ErrorS(nil, "kubelet-registration-path is a required parameter")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

if modeIsKubeletRegistrationProbe() {
Expand All @@ -132,12 +146,12 @@ func main() {
os.Exit(0)
}

klog.Infof("Version: %s", version)
klog.Infof("Running node-driver-registrar in mode=%s", *mode)
klog.InfoS("Version", "version", version)
klog.InfoS("Running node-driver-registrar", "mode", *mode)

if *healthzPort > 0 && *httpEndpoint != "" {
klog.Error("only one of `--health-port` and `--http-endpoint` can be set.")
os.Exit(1)
klog.ErrorS(nil, "Only one of `--health-port` and `--http-endpoint` can be set")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
var addr string
if *healthzPort > 0 {
Expand All @@ -147,30 +161,30 @@ func main() {
}

if *connectionTimeout != 0 {
klog.Warning("--connection-timeout is deprecated and will have no effect")
klog.InfoS("--connection-timeout is deprecated and will have no effect")
}

// Once https://github.com/container-storage-interface/spec/issues/159 is
// resolved, if plugin does not support PUBLISH_UNPUBLISH_VOLUME, then we
// can skip adding mapping to "csi.volume.kubernetes.io/nodeid" annotation.

klog.V(1).Infof("Attempting to open a gRPC connection with: %q", *csiAddress)
klog.V(1).InfoS("Attempting to open a gRPC connection", "csiAddress", *csiAddress)
csiConn, err := connection.ConnectWithoutMetrics(*csiAddress)
if err != nil {
klog.Errorf("error connecting to CSI driver: %v", err)
os.Exit(1)
klog.ErrorS(err, "Error connecting to CSI driver")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

klog.V(1).Infof("Calling CSI driver to discover driver name")
klog.V(1).InfoS("Calling CSI driver to discover driver name")
ctx, cancel := context.WithTimeout(context.Background(), *operationTimeout)
defer cancel()

csiDriverName, err := csirpc.GetDriverName(ctx, csiConn)
if err != nil {
klog.Errorf("error retreiving CSI driver name: %v", err)
os.Exit(1)
klog.ErrorS(err, "Error retreiving CSI driver name")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
klog.V(2).Infof("CSI driver name: %q", csiDriverName)
klog.V(2).InfoS("CSI driver name", "csiDriverName", csiDriverName)
defer closeGrpcConnection(*csiAddress, csiConn)

// Run forever
Expand Down
43 changes: 22 additions & 21 deletions cmd/csi-node-driver-registrar/node_register.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func nodeRegister(csiDriverName, httpEndpoint string) {
registrar := newRegistrationServer(csiDriverName, *kubeletRegistrationPath, supportedVersions)
socketPath := buildSocketPath(csiDriverName)
if err := util.CleanupSocketFile(socketPath); err != nil {
klog.Errorf("%+v", err)
os.Exit(1)
klog.ErrorS(err, "")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

var oldmask int
Expand All @@ -52,16 +52,16 @@ func nodeRegister(csiDriverName, httpEndpoint string) {
oldmask, _ = util.Umask(0077)
}

klog.Infof("Starting Registration Server at: %s\n", socketPath)
klog.InfoS("Starting Registration Server", "socketPath", socketPath)
lis, err := net.Listen("unix", socketPath)
if err != nil {
klog.Errorf("failed to listen on socket: %s with error: %+v", socketPath, err)
os.Exit(1)
klog.ErrorS(err, "Failed to listen on socket", "socketPath", socketPath)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
if runtime.GOOS == "linux" {
util.Umask(oldmask)
}
klog.Infof("Registration Server started at: %s\n", socketPath)
klog.InfoS("Registration Server started", "socketPath", socketPath)
grpcServer := grpc.NewServer()

// Registers kubelet plugin watcher api.
Expand All @@ -71,8 +71,8 @@ func nodeRegister(csiDriverName, httpEndpoint string) {
go removeRegSocket(csiDriverName)
// Starts service
if err := grpcServer.Serve(lis); err != nil {
klog.Errorf("Registration Server stopped serving: %v", err)
os.Exit(1)
klog.ErrorS(err, "Registration Server stopped serving")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

// If gRPC server is gracefully shutdown, cleanup and exit
Expand All @@ -85,10 +85,10 @@ func buildSocketPath(csiDriverName string) string {

func httpServer(socketPath string, httpEndpoint string, csiDriverName string) {
if httpEndpoint == "" {
klog.Infof("Skipping HTTP server because endpoint is set to: %q", httpEndpoint)
klog.InfoS("Skipping HTTP server")
return
}
klog.Infof("Starting HTTP server at endpoint: %v\n", httpEndpoint)
klog.InfoS("Starting HTTP server", "endpoint", httpEndpoint)

// Prepare http endpoint for healthz + profiling (if enabled)
mux := http.NewServeMux()
Expand All @@ -99,20 +99,20 @@ func httpServer(socketPath string, httpEndpoint string, csiDriverName string) {
if grpcSocketCheckError != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(grpcSocketCheckError.Error()))
klog.Errorf("health check failed: %+v", grpcSocketCheckError)
klog.ErrorS(grpcSocketCheckError, "Health check failed")
} else {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`ok`))
klog.V(5).Infof("health check succeeded")
klog.V(5).InfoS("Health check succeeded")
}
} else if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
klog.Errorf("health check failed: %+v", err)
klog.ErrorS(err, "Health check failed")
} else if !socketExists {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("registration socket does not exist"))
klog.Errorf("health check failed, registration socket does not exist")
klog.ErrorS(nil, "Health check failed, registration socket does not exist")
}
})

Expand All @@ -126,19 +126,20 @@ func httpServer(socketPath string, httpEndpoint string, csiDriverName string) {
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}

klog.Fatal(http.ListenAndServe(httpEndpoint, mux))
klog.ErrorS(http.ListenAndServe(httpEndpoint, mux), "")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

func checkLiveRegistrationSocket(socketFile, csiDriverName string) error {
klog.V(2).Infof("Attempting to open a gRPC connection with: %q", socketFile)
klog.V(2).InfoS("Attempting to open a gRPC connection", "socketfile", socketFile)
grpcConn, err := connection.ConnectWithoutMetrics(socketFile)
if err != nil {
return fmt.Errorf("error connecting to node-registrar socket %s: %v", socketFile, err)
}

defer closeGrpcConnection(socketFile, grpcConn)

klog.V(2).Infof("Calling node registrar to check if it still responds")
klog.V(2).InfoS("Calling node registrar to check if it still responds")
ctx, cancel := context.WithTimeout(context.Background(), *operationTimeout)
defer cancel()

Expand All @@ -160,8 +161,8 @@ func checkLiveRegistrationSocket(socketFile, csiDriverName string) error {
func closeGrpcConnection(socketFile string, conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
klog.Errorf("error closing socket %s: %v", socketFile, err)
os.Exit(1)
klog.ErrorS(err, "Error closing socket", "socketfile", socketFile)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}

Expand All @@ -172,8 +173,8 @@ func removeRegSocket(csiDriverName string) {
socketPath := buildSocketPath(csiDriverName)
err := os.Remove(socketPath)
if err != nil && !os.IsNotExist(err) {
klog.Errorf("failed to remove socket: %s with error: %+v", socketPath, err)
os.Exit(1)
klog.ErrorS(err, "Failed to remove socket with error", "socket", socketPath)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
os.Exit(0)
}
13 changes: 12 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
golang.org/x/sys v0.15.0
google.golang.org/grpc v1.59.0
k8s.io/client-go v0.29.0-rc.1
k8s.io/component-base v0.29.0-rc.1
k8s.io/klog/v2 v2.110.1
k8s.io/kubelet v0.29.0-rc.1
)
Expand All @@ -19,14 +20,17 @@ require (
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-logr/zapr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand All @@ -38,18 +42,25 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 // indirect
go.opentelemetry.io/otel v1.20.0 // indirect
go.opentelemetry.io/otel/metric v1.20.0 // indirect
go.opentelemetry.io/otel/trace v1.20.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.19.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apimachinery v0.29.0-rc.1 // indirect
k8s.io/component-base v0.29.0-rc.1 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
)
Loading

0 comments on commit dc23d0e

Please sign in to comment.