Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support contextual logging #410

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ all: build

include release-tools/build.make

test: test-logcheck
29 changes: 15 additions & 14 deletions cmd/csi-node-driver-registrar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ func main() {
logsapi.AddGoFlags(c, flag.CommandLine)
logs.InitLogs()
flag.Parse()
logger := klog.Background()
if err := logsapi.ValidateAndApply(c, fg); err != nil {
klog.ErrorS(err, "LoggingConfiguration is invalid")
os.Exit(1)
logger.Error(err, "LoggingConfiguration is invalid")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

if *showVersion {
Expand All @@ -136,7 +137,7 @@ func main() {
}

if *kubeletRegistrationPath == "" {
klog.ErrorS(nil, "kubelet-registration-path is a required parameter")
logger.Error(nil, "kubelet-registration-path is a required parameter")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

Expand All @@ -146,11 +147,11 @@ func main() {
os.Exit(0)
}

klog.InfoS("Version", "version", version)
klog.InfoS("Running node-driver-registrar", "mode", *mode)
logger.Info("Version", "version", version)
logger.Info("Running node-driver-registrar", "mode", *mode)

if *healthzPort > 0 && *httpEndpoint != "" {
klog.ErrorS(nil, "Only one of `--health-port` and `--http-endpoint` can be set")
logger.Error(nil, "Only one of `--health-port` and `--http-endpoint` can be set")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
var addr string
Expand All @@ -161,32 +162,32 @@ func main() {
}

if *connectionTimeout != 0 {
klog.InfoS("--connection-timeout is deprecated and will have no effect")
logger.Info("--connection-timeout is deprecated and will have no effect")
}

// Once https://github.com/container-storage-interface/spec/issues/159 is
// resolved, if plugin does not support PUBLISH_UNPUBLISH_VOLUME, then we
// can skip adding mapping to "csi.volume.kubernetes.io/nodeid" annotation.

klog.V(1).InfoS("Attempting to open a gRPC connection", "csiAddress", *csiAddress)
ctx := context.Background()
logger.V(1).Info("Attempting to open a gRPC connection", "csiAddress", *csiAddress)
ctx := klog.NewContext(context.Background(), logger)
csiConn, err := connection.ConnectWithoutMetrics(ctx, *csiAddress)
if err != nil {
klog.ErrorS(err, "Error connecting to CSI driver")
logger.Error(err, "Error connecting to CSI driver")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

klog.V(1).InfoS("Calling CSI driver to discover driver name")
logger.V(1).Info("Calling CSI driver to discover driver name")
ctx, cancel := context.WithTimeout(ctx, *operationTimeout)
defer cancel()

csiDriverName, err := csirpc.GetDriverName(ctx, csiConn)
if err != nil {
klog.ErrorS(err, "Error retreiving CSI driver name")
logger.Error(err, "Error retreiving CSI driver name")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
klog.V(2).InfoS("CSI driver name", "csiDriverName", csiDriverName)
defer closeGrpcConnection(*csiAddress, csiConn)
logger.V(2).Info("CSI driver name", "csiDriverName", csiDriverName)
defer closeGrpcConnection(logger, *csiAddress, csiConn)

// Run forever
nodeRegister(ctx, csiDriverName, addr)
Expand Down
54 changes: 32 additions & 22 deletions cmd/csi-node-driver-registrar/node_register.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ func nodeRegister(ctx context.Context, csiDriverName, httpEndpoint string) {
// When kubeletRegistrationPath is specified then driver-registrar ONLY acts
// as gRPC server which replies to registration requests initiated by kubelet's
// plugins watcher infrastructure. Node labeling is done by kubelet's csi code.
logger := klog.FromContext(ctx)
registrar := newRegistrationServer(csiDriverName, *kubeletRegistrationPath, supportedVersions)
socketPath := buildSocketPath(csiDriverName)
if err := util.CleanupSocketFile(socketPath); err != nil {
klog.ErrorS(err, "")
logger.Error(err, "")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

Expand All @@ -52,26 +53,32 @@ func nodeRegister(ctx context.Context, csiDriverName, httpEndpoint string) {
oldmask, _ = util.Umask(0077)
}

klog.InfoS("Starting Registration Server", "socketPath", socketPath)
logger.Info("Starting Registration Server", "socketPath", socketPath)
lis, err := net.Listen("unix", socketPath)
if err != nil {
klog.ErrorS(err, "Failed to listen on socket", "socketPath", socketPath)
logger.Error(err, "Failed to listen on socket", "socketPath", socketPath)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
if runtime.GOOS == "linux" {
util.Umask(oldmask)
}
klog.InfoS("Registration Server started", "socketPath", socketPath)
grpcServer := grpc.NewServer()

logger.Info("Registration Server started", "socketPath", socketPath)
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
return handler(klog.NewContext(ctx, logger), req)
}),
}
grpcServer := grpc.NewServer(opts...)

// Registers kubelet plugin watcher api.
registerapi.RegisterRegistrationServer(grpcServer, registrar)

go httpServer(ctx, socketPath, httpEndpoint, csiDriverName)
go removeRegSocket(csiDriverName)
go removeRegSocket(logger, csiDriverName)
// Starts service
if err := grpcServer.Serve(lis); err != nil {
klog.ErrorS(err, "Registration Server stopped serving")
logger.Error(err, "Registration Server stopped serving")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

Expand All @@ -84,40 +91,42 @@ func buildSocketPath(csiDriverName string) string {
}

func httpServer(ctx context.Context, socketPath string, httpEndpoint string, csiDriverName string) {
logger := klog.FromContext(ctx)
if httpEndpoint == "" {
klog.InfoS("Skipping HTTP server")
logger.Info("Skipping HTTP server")
return
}
klog.InfoS("Starting HTTP server", "endpoint", httpEndpoint)
logger.Info("Starting HTTP server", "endpoint", httpEndpoint)

// Prepare http endpoint for healthz + profiling (if enabled)
mux := http.NewServeMux()
mux.HandleFunc("/healthz", func(w http.ResponseWriter, req *http.Request) {
logger := klog.FromContext(req.Context())
socketExists, err := util.DoesSocketExist(socketPath)
if err == nil && socketExists {
grpcSocketCheckError := checkLiveRegistrationSocket(ctx, socketPath, csiDriverName)
if grpcSocketCheckError != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(grpcSocketCheckError.Error()))
klog.ErrorS(grpcSocketCheckError, "Health check failed")
logger.Error(grpcSocketCheckError, "Health check failed")
} else {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`ok`))
klog.V(5).InfoS("Health check succeeded")
logger.V(5).Info("Health check succeeded")
}
} else if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
klog.ErrorS(err, "Health check failed")
logger.Error(err, "Health check failed")
} else if !socketExists {
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("registration socket does not exist"))
klog.ErrorS(nil, "Health check failed, registration socket does not exist")
logger.Error(nil, "Health check failed, registration socket does not exist")
}
})

if *enableProfile {
klog.InfoS("Starting profiling", "endpoint", httpEndpoint)
logger.Info("Starting profiling", "endpoint", httpEndpoint)

mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
Expand All @@ -126,20 +135,21 @@ func httpServer(ctx context.Context, socketPath string, httpEndpoint string, csi
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}

klog.ErrorS(http.ListenAndServe(httpEndpoint, mux), "")
logger.Error(http.ListenAndServe(httpEndpoint, mux), "")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

func checkLiveRegistrationSocket(ctx context.Context, socketFile, csiDriverName string) error {
klog.V(2).InfoS("Attempting to open a gRPC connection", "socketfile", socketFile)
logger := klog.FromContext(ctx)
logger.V(2).Info("Attempting to open a gRPC connection", "socketfile", socketFile)
grpcConn, err := connection.ConnectWithoutMetrics(ctx, socketFile)
if err != nil {
return fmt.Errorf("error connecting to node-registrar socket %s: %v", socketFile, err)
}

defer closeGrpcConnection(socketFile, grpcConn)
defer closeGrpcConnection(logger, socketFile, grpcConn)

klog.V(2).InfoS("Calling node registrar to check if it still responds")
logger.V(2).Info("Calling node registrar to check if it still responds")
ctx, cancel := context.WithTimeout(context.Background(), *operationTimeout)
defer cancel()

Expand All @@ -158,22 +168,22 @@ func checkLiveRegistrationSocket(ctx context.Context, socketFile, csiDriverName
return fmt.Errorf("invalid driver name %s", info.Name)
}

func closeGrpcConnection(socketFile string, conn *grpc.ClientConn) {
func closeGrpcConnection(logger klog.Logger, socketFile string, conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
klog.ErrorS(err, "Error closing socket", "socketfile", socketFile)
logger.Error(err, "Error closing socket", "socketfile", socketFile)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}

func removeRegSocket(csiDriverName string) {
func removeRegSocket(logger klog.Logger, csiDriverName string) {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGTERM)
<-sigc
socketPath := buildSocketPath(csiDriverName)
err := os.Remove(socketPath)
if err != nil && !os.IsNotExist(err) {
klog.ErrorS(err, "Failed to remove socket with error", "socket", socketPath)
logger.Error(err, "Failed to remove socket with error", "socket", socketPath)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
os.Exit(0)
Expand Down
6 changes: 4 additions & 2 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
"testing"

utiltesting "k8s.io/client-go/util/testing"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/klog/v2/ktesting/init"
)

var socketFileName = "reg.sock"
Expand Down Expand Up @@ -88,9 +89,10 @@ func TestSocketPathSimple(t *testing.T) {

socketPath := filepath.Join(testDir, socketFileName)

logger, _ := ktesting.NewTestContext(t)
_, err = net.Listen("unix", socketPath)
if err != nil {
klog.ErrorS(err, "Failed to listen on socket", "socketPath", socketPath)
logger.Error(err, "Failed to listen on socket", "socketPath", socketPath)
os.Exit(1)
}

Expand Down
30 changes: 30 additions & 0 deletions vendor/k8s.io/klog/v2/ktesting/init/init.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading