Skip to content

Commit

Permalink
support structured logging
Browse files Browse the repository at this point in the history
  • Loading branch information
AiRanthem committed Jul 23, 2024
1 parent c5c6df7 commit 190126f
Show file tree
Hide file tree
Showing 73 changed files with 332 additions and 323 deletions.
6 changes: 3 additions & 3 deletions cmd/daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ func main() {
if _, err := os.Stat(*pluginConfigFile); err == nil {
err = plugin.RegisterCredentialProviderPlugins(*pluginConfigFile, *pluginBinDir)
if err != nil {
klog.Errorf("Failed to register credential provider plugins: %v", err)
klog.ErrorS(err, "Failed to register credential provider plugins")
}
} else if os.IsNotExist(err) {
klog.Infof("No plugin config file found, skipping: %s", *pluginConfigFile)
klog.InfoS("No plugin config file found, skipping", "configFile", *pluginConfigFile)
} else {
klog.Errorf("Failed to check plugin config file: %v", err)
klog.ErrorS(err, "Failed to check plugin config file")
}
// make sure the new docker key ring is made and set after the credential plugins are registered
secret.MakeAndSetKeyring()
Expand Down
30 changes: 15 additions & 15 deletions pkg/daemon/containermeta/container_meta_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *Controller) Run(stop <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting containermeta Controller")
klog.Info("Starting containermeta Controller")
go c.restarter.Run(stop)
for i := 0; i < workers; i++ {
go wait.Until(func() {
Expand Down Expand Up @@ -239,7 +239,7 @@ func (c *Controller) processNextWorkItem() bool {
func (c *Controller) sync(key string) (retErr error) {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.Warningf("Invalid key: %s", key)
klog.InfoS("Invalid key", "key", key)
return nil
}

Expand All @@ -248,7 +248,7 @@ func (c *Controller) sync(key string) (retErr error) {
if errors.IsNotFound(err) {
return nil
}
klog.Errorf("Failed to get Pod %s/%s from lister: %v", namespace, name, err)
klog.ErrorS(err, "Failed to get Pod %s/%s from lister", "namespace", namespace, "name", name)
return err
} else if pod.DeletionTimestamp != nil || len(pod.Status.ContainerStatuses) == 0 {
return nil
Expand All @@ -258,24 +258,24 @@ func (c *Controller) sync(key string) (retErr error) {
if duration < maxExpectationWaitDuration {
return nil
}
klog.Warningf("Wait for Pod %s/%s resourceVersion expectation over %v", namespace, name, duration)
klog.InfoS("Waiting Pod resourceVersion expectation time out", "namespace", namespace, "name", name, "duration", duration)
resourceVersionExpectation.Delete(pod)
}

criRuntime, kubeRuntime, err := c.getRuntimeForPod(pod)
if err != nil {
klog.Errorf("Failed to get runtime for Pod %s/%s: %v", namespace, name, err)
klog.ErrorS(err, "Failed to get runtime for Pod", "namespace", namespace, "name", name)
return nil
} else if criRuntime == nil {
return nil
}

klog.V(3).Infof("Start syncing for %s/%s", namespace, name)
klog.V(3).InfoS("Start syncing", "namespace", namespace, "name", name)
defer func() {
if retErr != nil {
klog.Errorf("Failed to sync for %s/%s: %v", namespace, name, retErr)
klog.ErrorS(retErr, "Failed to sync", "namespace", namespace, "name", name)
} else {
klog.V(3).Infof("Finished syncing for %s/%s", namespace, name)
klog.V(3).InfoS("Finished syncing", "namespace", namespace, "name", name)
}
}()

Expand All @@ -286,7 +286,7 @@ func (c *Controller) sync(key string) (retErr error) {

oldMetaSet, err := appspub.GetRuntimeContainerMetaSet(pod)
if err != nil {
klog.Warningf("Failed to get old runtime meta from Pod %s/%s: %v", namespace, name, err)
klog.ErrorS(err, "Failed to get old runtime meta from Pod", "namespace", namespace, "name", name)
}
newMetaSet := c.manageContainerMetaSet(pod, kubePodStatus, oldMetaSet, criRuntime)

Expand All @@ -302,7 +302,7 @@ func (c *Controller) reportContainerMetaSet(pod *v1.Pod, oldMetaSet, newMetaSet
ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name},
}
containerMetaSetStr := util.DumpJSON(newMetaSet)
klog.Infof("Reporting container meta changed in Pod %s/%s: %v", pod.Namespace, pod.Name, containerMetaSetStr)
klog.InfoS("Reporting container meta changed in Pod", "namespace", pod.Namespace, "name", pod.Name, "containerMetaSetStr", containerMetaSetStr)
mergePatch, _ := json.Marshal(map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]string{
Expand Down Expand Up @@ -354,11 +354,11 @@ func (c *Controller) manageContainerMetaSet(pod *v1.Pod, kubePodStatus *kubeletc
envGetter := wrapEnvGetter(criRuntime, status.ID.ID, fmt.Sprintf("container %s (%s) in Pod %s/%s", containerSpec.Name, status.ID.String(), pod.Namespace, pod.Name))
containerMeta.Hashes.ExtractedEnvFromMetadataHash, err = envHasher.GetCurrentHash(containerSpec, envGetter)
if err != nil {
klog.Errorf("Failed to hash container %s (%s) with env for Pod %s/%s: %v", containerSpec.Name, status.ID.String(), pod.Namespace, pod.Name, err)
klog.ErrorS(err, "Failed to hash container with env for Pod", "containerName", containerSpec.Name, "containerID", status.ID.String(), "namespace", pod.Namespace, "podName", pod.Name)
enqueueAfter(c.queue, pod, time.Second*3)
} else {
klog.V(4).Infof("Extracted env from metadata for container %s (%s) in Pod %s/%s, hash: %v",
containerSpec.Name, status.ID.String(), pod.Namespace, pod.Name, containerMeta.Hashes.ExtractedEnvFromMetadataHash)
klog.V(4).InfoS("Extracted env from metadata for container",
"containerName", containerSpec.Name, "containerID", status.ID.String(), "namespace", pod.Namespace, "podName", pod.Name, "hash", containerMeta.Hashes.ExtractedEnvFromMetadataHash)
}
}

Expand All @@ -368,7 +368,7 @@ func (c *Controller) manageContainerMetaSet(pod *v1.Pod, kubePodStatus *kubeletc
// Trigger restarting when expected env hash is not equal to current hash
if containerMeta.Hashes.ExtractedEnvFromMetadataHash > 0 && containerMeta.Hashes.ExtractedEnvFromMetadataHash != envHasher.GetExpectHash(containerSpec, pod) {
// Maybe checking PlainHash inconsistent here can skip to trigger restart. But it is not a good idea for some special scenarios.
klog.V(2).Infof("Triggering container %s (%s) in Pod %s/%s to restart, for it has inconsistent hash of env from metadata", containerSpec.Name, status.ID.String(), pod.Namespace, pod.Name)
klog.V(2).InfoS("Triggering container in Pod to restart, for it has inconsistent hash of env from metadata", "containerName", containerSpec.Name, "containerID", status.ID.String(), "namespace", pod.Namespace, "podName", pod.Name)
c.restarter.queue.AddRateLimited(status.ID)
}
}
Expand Down Expand Up @@ -401,7 +401,7 @@ func wrapEnvGetter(criRuntime criapi.RuntimeService, containerID, logID string)
if getErr != nil {
return "", getErr
}
klog.V(4).Infof("Got env %s=%s in %s", key, envMap[key], logID)
klog.V(4).InfoS("Got env", "key", key, "value", envMap[key], "logID", logID)
return envMap[key], nil
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/daemon/containermeta/container_meta_restarter.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,21 @@ func (c *restartController) processNextWorkItem() bool {
func (c *restartController) sync(containerID kubeletcontainer.ContainerID) error {
criRuntime := c.runtimeFactory.GetRuntimeServiceByName(containerID.Type)
if criRuntime == nil {
klog.Errorf("Not found runtime service for %s in daemon", containerID.Type)
klog.InfoS("Not found runtime service in daemon", "type", containerID.Type)
return nil
}

containers, err := criRuntime.ListContainers(context.TODO(), &runtimeapi.ContainerFilter{Id: containerID.ID})
if err != nil {
klog.Errorf("Failed to list containers by %s: %v", containerID.String(), err)
klog.ErrorS(err, "Failed to list containers", "id", containerID.String())
return err
}
if len(containers) == 0 || containers[0].State != runtimeapi.ContainerState_CONTAINER_RUNNING {
klog.V(4).Infof("Skip to kill container %s because of not found or non-running state.", containerID.String())
klog.V(4).InfoS("Skip to kill container because of not found or non-running state.", "id", containerID.String())
return nil
}

klog.V(3).Infof("Preparing to stop container %s", containerID.String())
klog.V(3).InfoS("Preparing to stop container", "id", containerID.String())
kubeRuntime := kuberuntime.NewGenericRuntime(containerID.Type, criRuntime, c.eventRecorder, &http.Client{})
msg := fmt.Sprintf("Stopping containerID %s by container meta restarter", containerID.String())
err = kubeRuntime.KillContainer(nil, containerID, "", msg, nil)
Expand Down
28 changes: 14 additions & 14 deletions pkg/daemon/containerrecreate/crr_daemon_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (c *Controller) Run(stop <-chan struct{}) {
return
}

klog.Infof("Starting crr daemon controller")
klog.Info("Starting crr daemon controller")
for i := 0; i < workers; i++ {
go wait.Until(func() {
for c.processNextWorkItem() {
Expand Down Expand Up @@ -210,7 +210,7 @@ func (c *Controller) processNextWorkItem() bool {
func (c *Controller) sync(key string) (retErr error) {
namespace, podName, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.Warningf("Invalid key: %s", key)
klog.InfoS("Invalid key", "key", key)
return nil
}

Expand All @@ -235,12 +235,12 @@ func (c *Controller) sync(key string) (retErr error) {
return err
}

klog.V(3).Infof("Start syncing for %s/%s", namespace, crr.Name)
klog.V(3).InfoS("Start syncing", "namespace", namespace, "name", crr.Name)
defer func() {
if retErr != nil {
klog.Errorf("Failed to sync for %s/%s: %v", namespace, crr.Name, retErr)
klog.ErrorS(retErr, "Failed to sync", "namespace", namespace, "name", crr.Name)
} else {
klog.V(3).Infof("Finished syncing for %s/%s", namespace, crr.Name)
klog.V(3).InfoS("Finished syncing", "namespace", namespace, "name", crr.Name)
}
}()

Expand All @@ -252,19 +252,19 @@ func (c *Controller) sync(key string) (retErr error) {
if crr.Spec.Strategy.UnreadyGracePeriodSeconds != nil {
unreadyTimeStr := crr.Annotations[appsv1alpha1.ContainerRecreateRequestUnreadyAcquiredKey]
if unreadyTimeStr == "" {
klog.Infof("CRR %s/%s is waiting for unready acquirement.", crr.Namespace, crr.Name)
klog.InfoS("CRR is waiting for unready acquirement", "namespace", crr.Namespace, "name", crr.Name)
return nil
}

unreadyTime, err := time.Parse(time.RFC3339, unreadyTimeStr)
if err != nil {
klog.Errorf("CRR %s/%s failed to parse unready time %s: %v", crr.Namespace, crr.Name, unreadyTimeStr, err)
klog.ErrorS(err, "CRR failed to parse unready time", "namespace", crr.Namespace, "name", crr.Name, "unreadyTimeStr", unreadyTimeStr)
return c.completeCRRStatus(crr, fmt.Sprintf("failed to parse unready time %s: %v", unreadyTimeStr, err))
}

leftTime := time.Duration(*crr.Spec.Strategy.UnreadyGracePeriodSeconds)*time.Second - time.Since(unreadyTime)
if leftTime > 0 {
klog.Infof("CRR %s/%s is waiting for unready grace period %v left time.", crr.Namespace, crr.Name, leftTime)
klog.InfoS("CRR is waiting for unready grace period", "namespace", crr.Namespace, "name", crr.Name, "leftTime", leftTime)
c.queue.AddAfter(crr.Namespace+"/"+crr.Spec.PodName, leftTime+100*time.Millisecond)
return nil
}
Expand All @@ -287,7 +287,7 @@ func (c *Controller) pickRecreateRequest(crrList []*appsv1alpha1.ContainerRecrea
if duration < maxExpectationWaitDuration {
break
}
klog.Warningf("Wait for CRR %s/%s resourceVersion expectation over %v", crr.Namespace, crr.Name, duration)
klog.InfoS("Wait for CRR resourceVersion expectation", "namespace", crr.Namespace, "name", crr.Name, "duration", duration)
resourceVersionExpectation.Delete(crr)
}

Expand All @@ -296,7 +296,7 @@ func (c *Controller) pickRecreateRequest(crrList []*appsv1alpha1.ContainerRecrea
picked = crr
} else if crr.Status.Phase == "" {
if err := c.updateCRRPhase(crr, appsv1alpha1.ContainerRecreateRequestPending); err != nil {
klog.Errorf("Failed to update CRR %s/%s status to Pending: %v", crr.Namespace, crr.Name, err)
klog.ErrorS(err, "Failed to update CRR status to Pending", "namespace", crr.Namespace, "name", crr.Name)
return nil, err
}
}
Expand All @@ -307,7 +307,7 @@ func (c *Controller) pickRecreateRequest(crrList []*appsv1alpha1.ContainerRecrea
func (c *Controller) manage(crr *appsv1alpha1.ContainerRecreateRequest) error {
runtimeManager, err := c.newRuntimeManager(c.runtimeFactory, crr)
if err != nil {
klog.Errorf("Failed to find runtime service for %s/%s: %v", crr.Namespace, crr.Name, err)
klog.ErrorS(err, "Failed to find runtime service", "namespace", crr.Namespace, "name", crr.Name)
return c.completeCRRStatus(crr, fmt.Sprintf("failed to find runtime service: %v", err))
}

Expand All @@ -317,7 +317,7 @@ func (c *Controller) manage(crr *appsv1alpha1.ContainerRecreateRequest) error {
if err != nil {
return fmt.Errorf("failed to GetPodStatus %s/%s with uid %s: %v", pod.Namespace, pod.Name, pod.UID, err)
}
klog.V(5).Infof("CRR %s/%s for Pod %s GetPodStatus: %v", crr.Namespace, crr.Name, pod.Name, util.DumpJSON(podStatus))
klog.V(5).InfoS("CRR for Pod GetPodStatus", "namespace", crr.Namespace, "name", crr.Name, "podName", pod.Name, "podStatus", util.DumpJSON(podStatus))

newCRRContainerRecreateStates := getCurrentCRRContainersRecreateStates(crr, podStatus)
if !reflect.DeepEqual(crr.Status.ContainerRecreateStates, newCRRContainerRecreateStates) {
Expand Down Expand Up @@ -356,7 +356,7 @@ func (c *Controller) manage(crr *appsv1alpha1.ContainerRecreateRequest) error {
msg := fmt.Sprintf("Stopping container %s by ContainerRecreateRequest %s", state.Name, crr.Name)
err := runtimeManager.KillContainer(pod, kubeContainerStatus.ID, state.Name, msg, nil)
if err != nil {
klog.Errorf("Failed to kill container %s in Pod %s/%s for CRR %s/%s: %v", state.Name, pod.Namespace, pod.Name, crr.Namespace, crr.Name, err)
klog.ErrorS(err, "Failed to kill container in Pod for CRR", "containerName", state.Name, "podNamespace", pod.Namespace, "podName", pod.Name, "crrNamespace", crr.Namespace, "crrName", crr.Name)
state.Phase = appsv1alpha1.ContainerRecreateRequestFailed
state.Message = fmt.Sprintf("kill container error: %v", err)
if crr.Spec.Strategy.FailurePolicy == appsv1alpha1.ContainerRecreateRequestFailurePolicyIgnore {
Expand Down Expand Up @@ -385,7 +385,7 @@ func (c *Controller) manage(crr *appsv1alpha1.ContainerRecreateRequest) error {
}

func (c *Controller) patchCRRContainerRecreateStates(crr *appsv1alpha1.ContainerRecreateRequest, newCRRContainerRecreateStates []appsv1alpha1.ContainerRecreateRequestContainerRecreateState) error {
klog.V(3).Infof("CRR %s/%s patch containerRecreateStates: %v", crr.Namespace, crr.Name, util.DumpJSON(newCRRContainerRecreateStates))
klog.V(3).InfoS("CRR patch containerRecreateStates", "namespace", crr.Namespace, "name", crr.Name, "states", util.DumpJSON(newCRRContainerRecreateStates))
crr = crr.DeepCopy()
body := fmt.Sprintf(`{"status":{"containerRecreateStates":%s}}`, util.DumpJSON(newCRRContainerRecreateStates))
oldRev := crr.ResourceVersion
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/containerrecreate/crr_daemon_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func getCRRSyncContainerStatuses(crr *appsv1alpha1.ContainerRecreateRequest) map
}
var syncContainerStatuses []appsv1alpha1.ContainerRecreateRequestSyncContainerStatus
if err := json.Unmarshal([]byte(str), &syncContainerStatuses); err != nil {
klog.Errorf("Failed to unmarshal CRR %s/%s syncContainerStatuses %s: %v", crr.Namespace, crr.Name, str, err)
klog.ErrorS(err, "Failed to unmarshal CRR syncContainerStatuses", "namespace", crr.Namespace, "name", crr.Name, "rawString", str)
return nil
}

Expand Down
29 changes: 16 additions & 13 deletions pkg/daemon/criruntime/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,39 +95,39 @@ func NewFactory(varRunPath string, accountManager daemonutil.ImagePullAccountMan
case ContainerRuntimeContainerd, ContainerRuntimeCommonCRI, ContainerRuntimePouch:
addr, _, err := kubeletutil.GetAddressAndDialer(cfg.runtimeRemoteURI)
if err != nil {
klog.Warningf("Failed to get address for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
klog.ErrorS(err, "Failed to get address", "runtimeType", cfg.runtimeType, "runtimeURI", cfg.runtimeURI, "runtimeRemoteURI", cfg.runtimeRemoteURI)
continue
}
imageService, err = runtimeimage.NewCRIImageService(addr, accountManager)
if err != nil {
klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
klog.ErrorS(err, "Failed to new image service", "runtimeType", cfg.runtimeType, "runtimeURI", cfg.runtimeURI, "runtimeRemoteURI", cfg.runtimeRemoteURI)
continue
}
case ContainerRuntimeDocker:
imageService, err = runtimeimage.NewDockerImageService(cfg.runtimeURI, accountManager)
if err != nil {
klog.Warningf("Failed to new image service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
klog.ErrorS(err, "Failed to new image service", "runtimeType", cfg.runtimeType, "runtimeURI", cfg.runtimeURI, "runtimeRemoteURI", cfg.runtimeRemoteURI)
continue
}
}

if _, err = imageService.ListImages(context.TODO()); err != nil {
klog.Warningf("Failed to list images for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
klog.ErrorS(err, "Failed to list images", "runtimeType", cfg.runtimeType, "runtimeURI", cfg.runtimeURI, "runtimeRemoteURI", cfg.runtimeRemoteURI)
continue
}

runtimeService, err = criremote.NewRemoteRuntimeService(cfg.runtimeRemoteURI, time.Second*5, oteltrace.NewNoopTracerProvider())
if err != nil {
klog.Warningf("Failed to new runtime service for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
klog.ErrorS(err, "Failed to new runtime service", "runtimeType", cfg.runtimeType, "runtimeURI", cfg.runtimeURI, "runtimeRemoteURI", cfg.runtimeRemoteURI)
continue
}
typedVersion, err = runtimeService.Version(context.TODO(), kubeRuntimeAPIVersion)
if err != nil {
klog.Warningf("Failed to get runtime typed version for %v (%s, %s): %v", cfg.runtimeType, cfg.runtimeURI, cfg.runtimeRemoteURI, err)
klog.ErrorS(err, "Failed to get runtime typed version", "runtimeType", cfg.runtimeType, "runtimeURI", cfg.runtimeURI, "runtimeRemoteURI", cfg.runtimeRemoteURI)
continue
}

klog.V(2).Infof("Add runtime impl %v, URI: (%s, %s)", typedVersion.RuntimeName, cfg.runtimeURI, cfg.runtimeRemoteURI)
klog.V(2).InfoS("Add runtime", "runtimeName", typedVersion.RuntimeName, "runtimeURI", cfg.runtimeURI, "runtimeRemoteURI", cfg.runtimeRemoteURI)
f.impls = append(f.impls, &runtimeImpl{
cfg: cfg,
runtimeName: typedVersion.RuntimeName,
Expand Down Expand Up @@ -170,9 +170,9 @@ func detectRuntime(varRunPath string) (cfgs []runtimeConfig) {
runtimeType: ContainerRuntimeCommonCRI,
runtimeRemoteURI: fmt.Sprintf("unix://%s/%s", varRunPath, *CRISocketFileName),
})
klog.Infof("Find configured CRI socket %s with given flag", filePath)
klog.InfoS("Find configured CRI socket with given flag", "filePath", filePath)
} else {
klog.Errorf("Failed to stat the CRI socket %s with given flag: %v", filePath, err)
klog.ErrorS(err, "Failed to stat the CRI socket with given flag", "filePath", filePath)
}
return
}
Expand All @@ -190,9 +190,10 @@ func detectRuntime(varRunPath string) (cfgs []runtimeConfig) {
runtimeRemoteURI: fmt.Sprintf("unix://%s/pouchcri.sock", varRunPath),
})
} else if err1 == nil && err2 != nil {
klog.Errorf("%s/pouchd.sock exists, but not found %s/pouchcri.sock", varRunPath, varRunPath)
klog.ErrorS(err2, "pouchd.sock exists, but not found pouchcri.sock", "varRunPath", varRunPath)
} else if err1 != nil && err2 == nil {
klog.Errorf("%s/pouchdcri.sock exists, but not found %s/pouchd.sock", varRunPath, varRunPath)
// structured logging seems not necessary here
klog.ErrorS(err1, "pouchdcri.sock exists, but not found pouchd.sock", "varRunPath", varRunPath)
}
}

Expand All @@ -207,9 +208,11 @@ func detectRuntime(varRunPath string) (cfgs []runtimeConfig) {
runtimeRemoteURI: fmt.Sprintf("unix://%s/dockershim.sock", varRunPath),
})
} else if err1 == nil && err2 != nil {
klog.Errorf("%s/docker.sock exists, but not found %s/dockershim.sock", varRunPath, varRunPath)
// structured logging seems not necessary here
klog.ErrorS(err2, "docker.sock exists, but not found dockershim.sock", "varRunPath", varRunPath)
} else if err1 != nil && err2 == nil {
klog.Errorf("%s/dockershim.sock exists, but not found %s/docker.sock", varRunPath, varRunPath)
// structured logging seems not necessary here
klog.ErrorS(err1, "dockershim.sock exists, but not found docker.sock", "varRunPath", varRunPath)
}
}

Expand Down
Loading

0 comments on commit 190126f

Please sign in to comment.