Skip to content

Commit

Permalink
Merge pull request #1253 from feiskyer/log-latency
Browse files Browse the repository at this point in the history
chore: add verbose logs for latency and operation start timestamps
  • Loading branch information
k8s-ci-robot authored Mar 16, 2022
2 parents 72ebb26 + 2898d2c commit f61169b
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 34 deletions.
25 changes: 19 additions & 6 deletions pkg/metrics/azure_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"

"sigs.k8s.io/cloud-provider-azure/pkg/consts"
"sigs.k8s.io/cloud-provider-azure/pkg/retry"
Expand Down Expand Up @@ -80,23 +81,35 @@ func (mc *MetricContext) ThrottledCount() {
}

// Observe observes the request latency and failed requests.
func (mc *MetricContext) Observe(rerr *retry.Error) {
apiMetrics.latency.WithLabelValues(mc.attributes...).Observe(
time.Since(mc.start).Seconds())
func (mc *MetricContext) Observe(rerr *retry.Error, labelAndValues ...interface{}) {
latency := time.Since(mc.start).Seconds()
apiMetrics.latency.WithLabelValues(mc.attributes...).Observe(latency)
if rerr != nil {
errorCode := rerr.ServiceErrorCode()
attributes := append(mc.attributes, errorCode)
apiMetrics.errors.WithLabelValues(attributes...).Inc()
}
mc.logLatency(latency, append(labelAndValues, "error_code", rerr.ServiceErrorCode())...)
}

// ObserveOperationWithResult observes the request latency and failed requests of an operation.
func (mc *MetricContext) ObserveOperationWithResult(isOperationSucceeded bool) {
operationMetrics.operationLatency.WithLabelValues(mc.attributes...).Observe(
time.Since(mc.start).Seconds())
func (mc *MetricContext) ObserveOperationWithResult(isOperationSucceeded bool, labelAndValues ...interface{}) {
latency := time.Since(mc.start).Seconds()
operationMetrics.operationLatency.WithLabelValues(mc.attributes...).Observe(latency)
resultCode := "succeeded"
if !isOperationSucceeded {
resultCode = "failed"
mc.CountFailedOperation()
}
mc.logLatency(latency, append(labelAndValues, "result_code", resultCode)...)
}

func (mc *MetricContext) logLatency(latency float64, additionalKeysAndValues ...interface{}) {
keysAndValues := []interface{}{"latency_seconds", latency}
for i, label := range metricLabels {
keysAndValues = append(keysAndValues, label, mc.attributes[i])
}
klog.V(3).InfoS("Observed Request Latency", append(keysAndValues, additionalKeysAndValues...)...)
}

// CountFailedOperation increase the number of failed operations
Expand Down
77 changes: 55 additions & 22 deletions pkg/provider/azure_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,9 @@ func getPublicIPDomainNameLabel(service *v1.Service) (string, bool) {
return "", false
}

// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
func (az *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
// When a client updates the internal load balancer annotation,
// the service may be switched from an internal LB to a public one, or vise versa.
// Here we'll firstly ensure service do not lie in the opposite LB.
// reconcileService reconcile the LoadBalancer service. It returns LoadBalancerStatus on success.
func (az *Cloud) reconcileService(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
serviceName := getServiceName(service)
klog.V(5).Infof("ensureloadbalancer(%s): START clusterName=%q, service: %v", serviceName, clusterName, service)

mc := metrics.NewMetricContext("services", "ensure_loadbalancer", az.ResourceGroup, az.SubscriptionID, serviceName)
isOperationSucceeded := false
defer func() {
mc.ObserveOperationWithResult(isOperationSucceeded)
}()

lb, err := az.reconcileLoadBalancer(clusterName, service, nodes, true /* wantLb */)
if err != nil {
klog.Errorf("reconcileLoadBalancer(%s) failed: %v", serviceName, err)
Expand All @@ -115,7 +104,7 @@ func (az *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, ser
if lbStatus != nil && len(lbStatus.Ingress) > 0 {
serviceIP = &lbStatus.Ingress[0].IP
}
klog.V(2).Infof("EnsureLoadBalancer: reconciling security group for service %q with IP %q, wantLb = true", serviceName, logSafe(serviceIP))
klog.V(2).Infof("reconcileService: reconciling security group for service %q with IP %q, wantLb = true", serviceName, logSafe(serviceIP))
if _, err := az.reconcileSecurityGroup(clusterName, service, serviceIP, true /* wantLb */); err != nil {
klog.Errorf("reconcileSecurityGroup(%s) failed: %#v", serviceName, err)
return nil, err
Expand All @@ -129,30 +118,70 @@ func (az *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, ser
}

// lb is not reused here because the ETAG may be changed in above operations, hence reconcilePublicIP() would get lb again from cache.
klog.V(2).Infof("EnsureLoadBalancer: reconciling pip")
klog.V(2).Infof("reconcileService: reconciling pip")
if _, err := az.reconcilePublicIP(clusterName, updateService, to.String(lb.Name), true /* wantLb */); err != nil {
klog.Errorf("reconcilePublicIP(%s) failed: %#v", serviceName, err)
return nil, err
}

return lbStatus, nil
}

// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
func (az *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
// When a client updates the internal load balancer annotation,
// the service may be switched from an internal LB to a public one, or vise versa.
// Here we'll firstly ensure service do not lie in the opposite LB.
var err error
serviceName := getServiceName(service)
mc := metrics.NewMetricContext("services", "ensure_loadbalancer", az.ResourceGroup, az.SubscriptionID, serviceName)
klog.V(5).InfoS("EnsureLoadBalancer Start", "service", serviceName, "cluster", clusterName, "service_spec", service)

isOperationSucceeded := false
defer func() {
mc.ObserveOperationWithResult(isOperationSucceeded)
klog.V(5).InfoS("EnsureLoadBalancer Finish", "service", serviceName, "cluster", clusterName, "service_spec", service, "error", err)
}()

lbStatus, err := az.reconcileService(ctx, clusterName, service, nodes)
if err != nil {
return nil, err
}

isOperationSucceeded = true
return lbStatus, nil
}

// UpdateLoadBalancer updates hosts under the specified load balancer.
func (az *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
var err error
serviceName := getServiceName(service)
mc := metrics.NewMetricContext("services", "update_loadbalancer", az.ResourceGroup, az.SubscriptionID, serviceName)
klog.V(5).InfoS("UpdateLoadBalancer Start", "service", serviceName, "cluster", clusterName, "service_spec", service)
isOperationSucceeded := false
defer func() {
mc.ObserveOperationWithResult(isOperationSucceeded)
klog.V(5).InfoS("UpdateLoadBalancer Finish", "service", serviceName, "cluster", clusterName, "service_spec", service, "error", err)
}()

shouldUpdateLB, err := az.shouldUpdateLoadBalancer(clusterName, service, nodes)
if err != nil {
return err
}

if !shouldUpdateLB {
isOperationSucceeded = true
klog.V(2).Infof("UpdateLoadBalancer: skipping service %s because it is either being deleted or does not exist anymore", service.Name)
return nil
}

_, err = az.EnsureLoadBalancer(ctx, clusterName, service, nodes)
return err
_, err = az.reconcileService(ctx, clusterName, service, nodes)
if err != nil {
return err
}

isOperationSucceeded = true
return nil
}

// EnsureLoadBalancerDeleted deletes the specified load balancer if it
Expand All @@ -162,14 +191,15 @@ func (az *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, ser
// have multiple underlying components, meaning a Get could say that the LB
// doesn't exist even if some part of it is still laying around.
func (az *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
var err error
isInternal := requiresInternalLoadBalancer(service)
serviceName := getServiceName(service)
klog.V(5).Infof("Delete service (%s): START clusterName=%q", serviceName, clusterName)

mc := metrics.NewMetricContext("services", "ensure_loadbalancer_deleted", az.ResourceGroup, az.SubscriptionID, serviceName)
klog.V(5).InfoS("EnsureLoadBalancerDeleted Start", "service", serviceName, "cluster", clusterName, "service_spec", service)
isOperationSucceeded := false
defer func() {
mc.ObserveOperationWithResult(isOperationSucceeded)
klog.V(5).InfoS("EnsureLoadBalancerDeleted Finish", "service", serviceName, "cluster", clusterName, "service_spec", service, "error", err)
}()

serviceIPToCleanup, err := az.findServiceIPAddress(ctx, clusterName, service, isInternal)
Expand All @@ -178,15 +208,18 @@ func (az *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName stri
}

klog.V(2).Infof("EnsureLoadBalancerDeleted: reconciling security group for service %q with IP %q, wantLb = false", serviceName, serviceIPToCleanup)
if _, err := az.reconcileSecurityGroup(clusterName, service, &serviceIPToCleanup, false /* wantLb */); err != nil {
_, err = az.reconcileSecurityGroup(clusterName, service, &serviceIPToCleanup, false /* wantLb */)
if err != nil {
return err
}

if _, err := az.reconcileLoadBalancer(clusterName, service, nil, false /* wantLb */); err != nil && !retry.HasStatusForbiddenOrIgnoredError(err) {
_, err = az.reconcileLoadBalancer(clusterName, service, nil, false /* wantLb */)
if err != nil && !retry.HasStatusForbiddenOrIgnoredError(err) {
return err
}

if _, err := az.reconcilePublicIP(clusterName, service, "", false /* wantLb */); err != nil {
_, err = az.reconcilePublicIP(clusterName, service, "", false /* wantLb */)
if err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/provider/azure_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (az *Cloud) createRouteTable() error {
// route.Name will be ignored, although the cloud-provider may use nameHint
// to create a more user-meaningful name.
func (az *Cloud) CreateRoute(ctx context.Context, clusterName string, nameHint string, kubeRoute *cloudprovider.Route) error {
mc := metrics.NewMetricContext("routes", "create_route", az.ResourceGroup, az.SubscriptionID, "")
mc := metrics.NewMetricContext("routes", "create_route", az.ResourceGroup, az.SubscriptionID, string(kubeRoute.TargetNode))
isOperationSucceeded := false
defer func() {
mc.ObserveOperationWithResult(isOperationSucceeded)
Expand Down Expand Up @@ -449,7 +449,7 @@ func (az *Cloud) CreateRoute(ctx context.Context, clusterName string, nameHint s
// DeleteRoute deletes the specified managed route
// Route should be as returned by ListRoutes
func (az *Cloud) DeleteRoute(ctx context.Context, clusterName string, kubeRoute *cloudprovider.Route) error {
mc := metrics.NewMetricContext("routes", "delete_route", az.ResourceGroup, az.SubscriptionID, "")
mc := metrics.NewMetricContext("routes", "delete_route", az.ResourceGroup, az.SubscriptionID, string(kubeRoute.TargetNode))
isOperationSucceeded := false
defer func() {
mc.ObserveOperationWithResult(isOperationSucceeded)
Expand Down
4 changes: 2 additions & 2 deletions pkg/provider/azure_standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ func (as *availabilitySet) EnsureHostInPool(service *v1.Service, nodeName types.
// EnsureHostsInPool ensures the given Node's primary IP configurations are
// participating in the specified LoadBalancer Backend Pool.
func (as *availabilitySet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string) error {
mc := metrics.NewMetricContext("services", "vmas_ensure_hosts_in_pool", as.ResourceGroup, as.SubscriptionID, service.Name)
mc := metrics.NewMetricContext("services", "vmas_ensure_hosts_in_pool", as.ResourceGroup, as.SubscriptionID, getServiceName(service))
isOperationSucceeded := false
defer func() {
mc.ObserveOperationWithResult(isOperationSucceeded)
Expand Down Expand Up @@ -1041,7 +1041,7 @@ func (as *availabilitySet) EnsureBackendPoolDeleted(service *v1.Service, backend
return nil
}

mc := metrics.NewMetricContext("services", "vmas_ensure_backend_pool_deleted", as.ResourceGroup, as.SubscriptionID, service.Name)
mc := metrics.NewMetricContext("services", "vmas_ensure_backend_pool_deleted", as.ResourceGroup, as.SubscriptionID, getServiceName(service))
isOperationSucceeded := false
defer func() {
mc.ObserveOperationWithResult(isOperationSucceeded)
Expand Down
4 changes: 2 additions & 2 deletions pkg/provider/azure_vmss.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,7 +1269,7 @@ func (ss *ScaleSet) ensureVMSSInPool(service *v1.Service, nodes []*v1.Node, back
// EnsureHostsInPool ensures the given Node's primary IP configurations are
// participating in the specified LoadBalancer Backend Pool.
func (ss *ScaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetNameOfLB string) error {
mc := metrics.NewMetricContext("services", "vmss_ensure_hosts_in_pool", ss.ResourceGroup, ss.SubscriptionID, service.Name)
mc := metrics.NewMetricContext("services", "vmss_ensure_hosts_in_pool", ss.ResourceGroup, ss.SubscriptionID, getServiceName(service))
isOperationSucceeded := false
defer func() {
mc.ObserveOperationWithResult(isOperationSucceeded)
Expand Down Expand Up @@ -1529,7 +1529,7 @@ func (ss *ScaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID,
return nil
}

mc := metrics.NewMetricContext("services", "vmss_ensure_backend_pool_deleted", ss.ResourceGroup, ss.SubscriptionID, service.Name)
mc := metrics.NewMetricContext("services", "vmss_ensure_backend_pool_deleted", ss.ResourceGroup, ss.SubscriptionID, getServiceName(service))
isOperationSucceeded := false
defer func() {
mc.ObserveOperationWithResult(isOperationSucceeded)
Expand Down

0 comments on commit f61169b

Please sign in to comment.