Skip to content

Commit

Permalink
Support contextual logging
Browse files Browse the repository at this point in the history
  • Loading branch information
bells17 committed May 16, 2024
1 parent 5763404 commit 5885e9b
Show file tree
Hide file tree
Showing 9 changed files with 638 additions and 38 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ all: build

include release-tools/build.make

test: test-logcheck
29 changes: 15 additions & 14 deletions cmd/csi-node-driver-registrar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ func main() {
logsapi.AddGoFlags(c, flag.CommandLine)
logs.InitLogs()
flag.Parse()
logger := klog.Background()
if err := logsapi.ValidateAndApply(c, fg); err != nil {
klog.ErrorS(err, "LoggingConfiguration is invalid")
os.Exit(1)
logger.Error(err, "LoggingConfiguration is invalid")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

if *showVersion {
Expand All @@ -136,7 +137,7 @@ func main() {
}

if *kubeletRegistrationPath == "" {
klog.ErrorS(nil, "kubelet-registration-path is a required parameter")
logger.Error(nil, "kubelet-registration-path is a required parameter")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

Expand All @@ -146,11 +147,11 @@ func main() {
os.Exit(0)
}

klog.InfoS("Version", "version", version)
klog.InfoS("Running node-driver-registrar", "mode", *mode)
logger.Info("Version", "version", version)
logger.Info("Running node-driver-registrar", "mode", *mode)

if *healthzPort > 0 && *httpEndpoint != "" {
klog.ErrorS(nil, "Only one of `--health-port` and `--http-endpoint` can be set")
logger.Error(nil, "Only one of `--health-port` and `--http-endpoint` can be set")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
var addr string
Expand All @@ -161,32 +162,32 @@ func main() {
}

if *connectionTimeout != 0 {
klog.InfoS("--connection-timeout is deprecated and will have no effect")
logger.Info("--connection-timeout is deprecated and will have no effect")
}

// Once https://github.com/container-storage-interface/spec/issues/159 is
// resolved, if plugin does not support PUBLISH_UNPUBLISH_VOLUME, then we
// can skip adding mapping to "csi.volume.kubernetes.io/nodeid" annotation.

klog.V(1).InfoS("Attempting to open a gRPC connection", "csiAddress", *csiAddress)
ctx := context.Background()
logger.V(1).Info("Attempting to open a gRPC connection", "csiAddress", *csiAddress)
ctx := klog.NewContext(context.Background(), logger)
csiConn, err := connection.ConnectWithoutMetrics(ctx, *csiAddress)
if err != nil {
klog.ErrorS(err, "Error connecting to CSI driver")
logger.Error(err, "Error connecting to CSI driver")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

klog.V(1).InfoS("Calling CSI driver to discover driver name")
logger.V(1).Info("Calling CSI driver to discover driver name")
ctx, cancel := context.WithTimeout(ctx, *operationTimeout)
defer cancel()

csiDriverName, err := csirpc.GetDriverName(ctx, csiConn)
if err != nil {
klog.ErrorS(err, "Error retreiving CSI driver name")
logger.Error(err, "Error retreiving CSI driver name")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
klog.V(2).InfoS("CSI driver name", "csiDriverName", csiDriverName)
defer closeGrpcConnection(*csiAddress, csiConn)
logger.V(2).Info("CSI driver name", "csiDriverName", csiDriverName)
defer closeGrpcConnection(logger, *csiAddress, csiConn)

// Run forever
nodeRegister(ctx, csiDriverName, addr)
Expand Down
54 changes: 32 additions & 22 deletions cmd/csi-node-driver-registrar/node_register.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ func nodeRegister(ctx context.Context, csiDriverName, httpEndpoint string) {
// When kubeletRegistrationPath is specified then driver-registrar ONLY acts
// as gRPC server which replies to registration requests initiated by kubelet's
// plugins watcher infrastructure. Node labeling is done by kubelet's csi code.
logger := klog.FromContext(ctx)
registrar := newRegistrationServer(csiDriverName, *kubeletRegistrationPath, supportedVersions)
socketPath := buildSocketPath(csiDriverName)
if err := util.CleanupSocketFile(socketPath); err != nil {
klog.ErrorS(err, "")
logger.Error(err, "")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

Expand All @@ -52,26 +53,32 @@ func nodeRegister(ctx context.Context, csiDriverName, httpEndpoint string) {
oldmask, _ = util.Umask(0077)
}

klog.InfoS("Starting Registration Server", "socketPath", socketPath)
logger.Info("Starting Registration Server", "socketPath", socketPath)
lis, err := net.Listen("unix", socketPath)
if err != nil {
klog.ErrorS(err, "Failed to listen on socket", "socketPath", socketPath)
logger.Error(err, "Failed to listen on socket", "socketPath", socketPath)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
if runtime.GOOS == "linux" {
util.Umask(oldmask)
}
klog.InfoS("Registration Server started", "socketPath", socketPath)
grpcServer := grpc.NewServer()

logger.Info("Registration Server started", "socketPath", socketPath)
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
return handler(klog.NewContext(ctx, logger), req)
}),
}
grpcServer := grpc.NewServer(opts...)

// Registers kubelet plugin watcher api.
registerapi.RegisterRegistrationServer(grpcServer, registrar)

go httpServer(ctx, socketPath, httpEndpoint, csiDriverName)
go removeRegSocket(csiDriverName)
go removeRegSocket(logger, csiDriverName)
// Starts service
if err := grpcServer.Serve(lis); err != nil {
klog.ErrorS(err, "Registration Server stopped serving")
logger.Error(err, "Registration Server stopped serving")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

Expand All @@ -84,40 +91,42 @@ func buildSocketPath(csiDriverName string) string {
}

func httpServer(ctx context.Context, socketPath string, httpEndpoint string, csiDriverName string) {
logger := klog.FromContext(ctx)
if httpEndpoint == "" {
klog.InfoS("Skipping HTTP server")
logger.Info("Skipping HTTP server")
return
}
klog.InfoS("Starting HTTP server", "endpoint", httpEndpoint)
logger.Info("Starting HTTP server", "endpoint", httpEndpoint)

// Prepare http endpoint for healthz + profiling (if enabled)
mux := http.NewServeMux()
mux.HandleFunc("/healthz", func(w http.ResponseWriter, req *http.Request) {
logger := klog.FromContext(req.Context())
socketExists, err := util.DoesSocketExist(socketPath)
if err == nil && socketExists {
grpcSocketCheckError := checkLiveRegistrationSocket(ctx, socketPath, csiDriverName)
if grpcSocketCheckError != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(grpcSocketCheckError.Error()))
klog.ErrorS(grpcSocketCheckError, "Health check failed")
logger.Error(grpcSocketCheckError, "Health check failed")
} else {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`ok`))
klog.V(5).InfoS("Health check succeeded")
logger.V(5).Info("Health check succeeded")
}
} else if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
klog.ErrorS(err, "Health check failed")
logger.Error(err, "Health check failed")
} else if !socketExists {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("registration socket does not exist"))
klog.ErrorS(nil, "Health check failed, registration socket does not exist")
logger.Error(nil, "Health check failed, registration socket does not exist")
}
})

if *enableProfile {
klog.InfoS("Starting profiling", "endpoint", httpEndpoint)
logger.Info("Starting profiling", "endpoint", httpEndpoint)

mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
Expand All @@ -126,20 +135,21 @@ func httpServer(ctx context.Context, socketPath string, httpEndpoint string, csi
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}

klog.ErrorS(http.ListenAndServe(httpEndpoint, mux), "")
logger.Error(http.ListenAndServe(httpEndpoint, mux), "")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

func checkLiveRegistrationSocket(ctx context.Context, socketFile, csiDriverName string) error {
klog.V(2).InfoS("Attempting to open a gRPC connection", "socketfile", socketFile)
logger := klog.FromContext(ctx)
logger.V(2).Info("Attempting to open a gRPC connection", "socketfile", socketFile)
grpcConn, err := connection.ConnectWithoutMetrics(ctx, socketFile)
if err != nil {
return fmt.Errorf("error connecting to node-registrar socket %s: %v", socketFile, err)
}

defer closeGrpcConnection(socketFile, grpcConn)
defer closeGrpcConnection(logger, socketFile, grpcConn)

klog.V(2).InfoS("Calling node registrar to check if it still responds")
logger.V(2).Info("Calling node registrar to check if it still responds")
ctx, cancel := context.WithTimeout(context.Background(), *operationTimeout)
defer cancel()

Expand All @@ -158,22 +168,22 @@ func checkLiveRegistrationSocket(ctx context.Context, socketFile, csiDriverName
return fmt.Errorf("invalid driver name %s", info.Name)
}

func closeGrpcConnection(socketFile string, conn *grpc.ClientConn) {
func closeGrpcConnection(logger klog.Logger, socketFile string, conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
klog.ErrorS(err, "Error closing socket", "socketfile", socketFile)
logger.Error(err, "Error closing socket", "socketfile", socketFile)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}

func removeRegSocket(csiDriverName string) {
func removeRegSocket(logger klog.Logger, csiDriverName string) {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGTERM)
<-sigc
socketPath := buildSocketPath(csiDriverName)
err := os.Remove(socketPath)
if err != nil && !os.IsNotExist(err) {
klog.ErrorS(err, "Failed to remove socket with error", "socket", socketPath)
logger.Error(err, "Failed to remove socket with error", "socket", socketPath)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
os.Exit(0)
Expand Down
6 changes: 4 additions & 2 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
"testing"

utiltesting "k8s.io/client-go/util/testing"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/klog/v2/ktesting/init"
)

var socketFileName = "reg.sock"
Expand Down Expand Up @@ -88,9 +89,10 @@ func TestSocketPathSimple(t *testing.T) {

socketPath := filepath.Join(testDir, socketFileName)

logger, _ := ktesting.NewTestContext(t)
_, err = net.Listen("unix", socketPath)
if err != nil {
klog.ErrorS(err, "Failed to listen on socket", "socketPath", socketPath)
logger.Error(err, "Failed to listen on socket", "socketPath", socketPath)
os.Exit(1)
}

Expand Down
30 changes: 30 additions & 0 deletions vendor/k8s.io/klog/v2/ktesting/init/init.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5885e9b

Please sign in to comment.