Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support structured logging #349

Merged
merged 5 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ CMDS=csi-node-driver-registrar
all: build

include release-tools/build.make

test: test-logcheck
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need more changes in the PR. As @bells17 pointed out, we should focus on the structured logging and not add test-logcheck here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mistaken. . I deleted the line.

52 changes: 33 additions & 19 deletions cmd/csi-node-driver-registrar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
_ "net/http/pprof"
Expand All @@ -29,6 +30,10 @@ import (

"github.com/kubernetes-csi/csi-lib-utils/connection"
csirpc "github.com/kubernetes-csi/csi-lib-utils/rpc"
"k8s.io/component-base/featuregate"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
_ "k8s.io/component-base/logs/json/register"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
)

Expand Down Expand Up @@ -87,7 +92,8 @@ func newRegistrationServer(driverName string, endpoint string, versions []string

// GetInfo is the RPC invoked by plugin watcher
func (e registrationServer) GetInfo(ctx context.Context, req *registerapi.InfoRequest) (*registerapi.PluginInfo, error) {
klog.Infof("Received GetInfo call: %+v", req)
logger := klog.FromContext(ctx)
logger.Info("Received GetInfo call", "request", req)

return &registerapi.PluginInfo{
Type: registerapi.CSIPlugin,
Expand All @@ -98,9 +104,10 @@ func (e registrationServer) GetInfo(ctx context.Context, req *registerapi.InfoRe
}

func (e registrationServer) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) {
klog.Infof("Received NotifyRegistrationStatus call: %+v", status)
logger := klog.FromContext(ctx)
logger.Info("Received NotifyRegistrationStatus call", "status", status)
if !status.PluginRegistered {
klog.Errorf("Registration process failed with error: %+v, restarting registration container.", status.Error)
logger.Error(errors.New(status.Error), "Registration process failed with error, restarting registration container")
os.Exit(1)
}

Expand All @@ -112,18 +119,25 @@ func modeIsKubeletRegistrationProbe() bool {
}

func main() {
klog.InitFlags(nil)
flag.Set("logtostderr", "true")
fg := featuregate.NewFeatureGate()
logsapi.AddFeatureGates(fg)
c := logsapi.NewLoggingConfiguration()
logsapi.AddGoFlags(c, flag.CommandLine)
logs.InitLogs()
flag.Parse()
if err := logsapi.ValidateAndApply(c, fg); err != nil {
klog.ErrorS(err, "LoggingConfiguration is invalid")
os.Exit(1)
}

if *showVersion {
fmt.Println(os.Args[0], version)
return
}

if *kubeletRegistrationPath == "" {
klog.Error("kubelet-registration-path is a required parameter")
os.Exit(1)
klog.ErrorS(nil, "kubelet-registration-path is a required parameter")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

if modeIsKubeletRegistrationProbe() {
Expand All @@ -132,12 +146,12 @@ func main() {
os.Exit(0)
}

klog.Infof("Version: %s", version)
klog.Infof("Running node-driver-registrar in mode=%s", *mode)
klog.InfoS("Version", "version", version)
klog.InfoS("Running node-driver-registrar", "mode", *mode)

if *healthzPort > 0 && *httpEndpoint != "" {
klog.Error("only one of `--health-port` and `--http-endpoint` can be set.")
os.Exit(1)
klog.ErrorS(nil, "Only one of `--health-port` and `--http-endpoint` can be set")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
var addr string
if *healthzPort > 0 {
Expand All @@ -147,31 +161,31 @@ func main() {
}

if *connectionTimeout != 0 {
klog.Warning("--connection-timeout is deprecated and will have no effect")
klog.InfoS("--connection-timeout is deprecated and will have no effect")
}

// Once https://github.com/container-storage-interface/spec/issues/159 is
// resolved, if plugin does not support PUBLISH_UNPUBLISH_VOLUME, then we
// can skip adding mapping to "csi.volume.kubernetes.io/nodeid" annotation.

klog.V(1).Infof("Attempting to open a gRPC connection with: %q", *csiAddress)
klog.V(1).InfoS("Attempting to open a gRPC connection", "csiAddress", *csiAddress)
ctx := context.Background()
csiConn, err := connection.ConnectWithoutMetrics(ctx, *csiAddress)
if err != nil {
klog.Errorf("error connecting to CSI driver: %v", err)
os.Exit(1)
klog.ErrorS(err, "Error connecting to CSI driver")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

klog.V(1).Infof("Calling CSI driver to discover driver name")
klog.V(1).InfoS("Calling CSI driver to discover driver name")
ctx, cancel := context.WithTimeout(ctx, *operationTimeout)
defer cancel()

csiDriverName, err := csirpc.GetDriverName(ctx, csiConn)
if err != nil {
klog.Errorf("error retreiving CSI driver name: %v", err)
os.Exit(1)
klog.ErrorS(err, "Error retreiving CSI driver name")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
klog.V(2).Infof("CSI driver name: %q", csiDriverName)
klog.V(2).InfoS("CSI driver name", "csiDriverName", csiDriverName)
defer closeGrpcConnection(*csiAddress, csiConn)

// Run forever
Expand Down
43 changes: 22 additions & 21 deletions cmd/csi-node-driver-registrar/node_register.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func nodeRegister(ctx context.Context, csiDriverName, httpEndpoint string) {
registrar := newRegistrationServer(csiDriverName, *kubeletRegistrationPath, supportedVersions)
socketPath := buildSocketPath(csiDriverName)
if err := util.CleanupSocketFile(socketPath); err != nil {
klog.Errorf("%+v", err)
os.Exit(1)
klog.ErrorS(err, "")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

var oldmask int
Expand All @@ -52,16 +52,16 @@ func nodeRegister(ctx context.Context, csiDriverName, httpEndpoint string) {
oldmask, _ = util.Umask(0077)
}

klog.Infof("Starting Registration Server at: %s\n", socketPath)
klog.InfoS("Starting Registration Server", "socketPath", socketPath)
lis, err := net.Listen("unix", socketPath)
if err != nil {
klog.Errorf("failed to listen on socket: %s with error: %+v", socketPath, err)
os.Exit(1)
klog.ErrorS(err, "Failed to listen on socket", "socketPath", socketPath)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
if runtime.GOOS == "linux" {
util.Umask(oldmask)
}
klog.Infof("Registration Server started at: %s\n", socketPath)
klog.InfoS("Registration Server started", "socketPath", socketPath)
grpcServer := grpc.NewServer()

// Registers kubelet plugin watcher api.
Expand All @@ -71,8 +71,8 @@ func nodeRegister(ctx context.Context, csiDriverName, httpEndpoint string) {
go removeRegSocket(csiDriverName)
// Starts service
if err := grpcServer.Serve(lis); err != nil {
klog.Errorf("Registration Server stopped serving: %v", err)
os.Exit(1)
klog.ErrorS(err, "Registration Server stopped serving")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

// If gRPC server is gracefully shutdown, cleanup and exit
Expand All @@ -85,10 +85,10 @@ func buildSocketPath(csiDriverName string) string {

func httpServer(ctx context.Context, socketPath string, httpEndpoint string, csiDriverName string) {
if httpEndpoint == "" {
klog.Infof("Skipping HTTP server because endpoint is set to: %q", httpEndpoint)
klog.InfoS("Skipping HTTP server")
return
}
klog.Infof("Starting HTTP server at endpoint: %v\n", httpEndpoint)
klog.InfoS("Starting HTTP server", "endpoint", httpEndpoint)

// Prepare http endpoint for healthz + profiling (if enabled)
mux := http.NewServeMux()
Expand All @@ -99,20 +99,20 @@ func httpServer(ctx context.Context, socketPath string, httpEndpoint string, csi
if grpcSocketCheckError != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(grpcSocketCheckError.Error()))
klog.Errorf("health check failed: %+v", grpcSocketCheckError)
klog.ErrorS(grpcSocketCheckError, "Health check failed")
} else {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`ok`))
klog.V(5).Infof("health check succeeded")
klog.V(5).InfoS("Health check succeeded")
}
} else if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
klog.Errorf("health check failed: %+v", err)
klog.ErrorS(err, "Health check failed")
} else if !socketExists {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("registration socket does not exist"))
klog.Errorf("health check failed, registration socket does not exist")
klog.ErrorS(nil, "Health check failed, registration socket does not exist")
}
})

Expand All @@ -126,19 +126,20 @@ func httpServer(ctx context.Context, socketPath string, httpEndpoint string, csi
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}

klog.Fatal(http.ListenAndServe(httpEndpoint, mux))
klog.ErrorS(http.ListenAndServe(httpEndpoint, mux), "")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

func checkLiveRegistrationSocket(ctx context.Context, socketFile, csiDriverName string) error {
klog.V(2).Infof("Attempting to open a gRPC connection with: %q", socketFile)
klog.V(2).InfoS("Attempting to open a gRPC connection", "socketfile", socketFile)
grpcConn, err := connection.ConnectWithoutMetrics(ctx, socketFile)
if err != nil {
return fmt.Errorf("error connecting to node-registrar socket %s: %v", socketFile, err)
}

defer closeGrpcConnection(socketFile, grpcConn)

klog.V(2).Infof("Calling node registrar to check if it still responds")
klog.V(2).InfoS("Calling node registrar to check if it still responds")
ctx, cancel := context.WithTimeout(context.Background(), *operationTimeout)
defer cancel()

Expand All @@ -160,8 +161,8 @@ func checkLiveRegistrationSocket(ctx context.Context, socketFile, csiDriverName
func closeGrpcConnection(socketFile string, conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
klog.Errorf("error closing socket %s: %v", socketFile, err)
os.Exit(1)
klog.ErrorS(err, "Error closing socket", "socketfile", socketFile)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}

Expand All @@ -172,8 +173,8 @@ func removeRegSocket(csiDriverName string) {
socketPath := buildSocketPath(csiDriverName)
err := os.Remove(socketPath)
if err != nil && !os.IsNotExist(err) {
klog.Errorf("failed to remove socket: %s with error: %+v", socketPath, err)
os.Exit(1)
klog.ErrorS(err, "Failed to remove socket with error", "socket", socketPath)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
os.Exit(0)
}
16 changes: 15 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ require (
k8s.io/kubelet v0.30.0
)

require k8s.io/component-base v0.30.0

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
Expand All @@ -19,33 +21,45 @@ require (
github.com/emicklei/go-restful/v3 v3.12.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.53.0 // indirect
github.com/prometheus/procfs v0.14.0 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0 // indirect
go.opentelemetry.io/otel v1.26.0 // indirect
go.opentelemetry.io/otel/metric v1.26.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/text v0.15.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240509183442-62759503f434 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apimachinery v0.30.0 // indirect
k8s.io/component-base v0.30.0 // indirect
k8s.io/kube-openapi v0.0.0-20240430033511-f0e62f92d13f // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
)
Loading