Skip to content

Commit

Permalink
fix: reduce the high db cpu usage for tag retention (goharbor#17296)
Browse files Browse the repository at this point in the history
1. Add two indexes to database migrations.
2. Skip refresh quota in middleware for requests from jobservice.
3. Refresh quota by self in the end of tag retention job.

Closes: goharbor#14708

Signed-off-by: chlins <[email protected]>
  • Loading branch information
chlins authored and sluetze committed Oct 29, 2022
1 parent 0de5b32 commit faaa825
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 6 deletions.
6 changes: 5 additions & 1 deletion make/migrations/postgresql/0090_2.6.0_schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ BEGIN
UPDATE execution SET end_time=(SELECT MAX(end_time) FROM task WHERE execution_id=exec.id) WHERE id=exec.id;
END IF;
END LOOP;
END $$;
END $$;

/* Add indexes to improve the performance of tag retention */
CREATE INDEX IF NOT EXISTS idx_artifact_blob_digest_blob ON artifact_blob (digest_blob);
CREATE INDEX IF NOT EXISTS idx_artifact_digest_project_id ON artifact (digest,project_id);
9 changes: 7 additions & 2 deletions src/controller/quota/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (c *controller) List(ctx context.Context, query *q.Query, options ...Option
return quotas, nil
}

func (c *controller) updateUsageWithRetry(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error)) error {
func (c *controller) updateUsageWithRetry(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error), retryOpts ...retry.Option) error {
f := func() error {
q, err := c.quotaMgr.GetByRef(ctx, reference, referenceID)
if err != nil {
Expand Down Expand Up @@ -202,6 +202,11 @@ func (c *controller) updateUsageWithRetry(ctx context.Context, reference, refere
log.G(ctx).Debugf("failed to update the quota usage for %s %s, error: %v", reference, referenceID, err)
}),
}
// append for override default retry options
if len(retryOpts) > 0 {
options = append(options, retryOpts...)
}

return retry.Retry(f, options...)
}

Expand All @@ -223,7 +228,7 @@ func (c *controller) Refresh(ctx context.Context, reference, referenceID string,
return newUsed, err
}

return c.updateUsageWithRetry(ctx, reference, referenceID, refreshResources(calculateUsage, opts.IgnoreLimitation))
return c.updateUsageWithRetry(ctx, reference, referenceID, refreshResources(calculateUsage, opts.IgnoreLimitation), opts.RetryOptions...)
}

func (c *controller) Request(ctx context.Context, reference, referenceID string, resources types.ResourceList, f func() error) error {
Expand Down
11 changes: 11 additions & 0 deletions src/controller/quota/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@

package quota

import "github.com/goharbor/harbor/src/lib/retry"

// Option option for `Refresh` method of `Controller`
type Option func(*Options)

// Options options used by `Refresh`, `Get`, `List` methods of `Controller`
type Options struct {
IgnoreLimitation bool
WithReferenceObject bool
// RetryOptions is the sets of options but for retry function.
RetryOptions []retry.Option
}

// IgnoreLimitation set IgnoreLimitation for the Options
Expand All @@ -37,6 +41,13 @@ func WithReferenceObject() func(*Options) {
}
}

// WithRetryOptions set RetryOptions to Options
func WithRetryOptions(retryOpts []retry.Option) func(*Options) {
return func(opts *Options) {
opts.RetryOptions = retryOpts
}
}

func newOptions(options ...Option) *Options {
opts := &Options{}
for _, f := range options {
Expand Down
4 changes: 2 additions & 2 deletions src/pkg/clients/core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ type ChartClient interface {
}

// New returns an instance of the client which is a default implement for Client
func New(url string, httpclient *http.Client, authorizer modifier.Modifier) Client {
func New(url string, httpclient *http.Client, modifiers ...modifier.Modifier) Client {
return &client{
url: url,
httpclient: chttp.NewClient(httpclient, authorizer),
httpclient: chttp.NewClient(httpclient, modifiers...),
}
}

Expand Down
14 changes: 13 additions & 1 deletion src/pkg/retention/dep/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/goharbor/harbor/src/common/http/modifier/auth"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/selector"
"github.com/goharbor/harbor/src/pkg/clients/core"
)
Expand Down Expand Up @@ -59,6 +60,17 @@ type Client interface {
Delete(candidate *selector.Candidate) error
}

type injectVendorType struct{}

// injectVendorType injects vendor type to request header.
func (i *injectVendorType) Modify(req *http.Request) error {
if req != nil {
req.Header.Set("VendorType", job.Retention)
}

return nil
}

// NewClient new a basic client
func NewClient(client ...*http.Client) Client {
var c *http.Client
Expand All @@ -73,7 +85,7 @@ func NewClient(client ...*http.Client) Client {
internalCoreURL := config.GetCoreURL()
jobserviceSecret := config.GetAuthSecret()
authorizer := auth.NewSecretAuthorizer(jobserviceSecret)
coreClient := core.New(internalCoreURL, c, authorizer)
coreClient := core.New(internalCoreURL, c, authorizer, &injectVendorType{})

return &basicClient{
internalCoreURL: internalCoreURL,
Expand Down
12 changes: 12 additions & 0 deletions src/pkg/retention/dep/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package dep

import (
"net/http"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -143,6 +144,17 @@ func (c *clientTestSuite) TestDelete() {
require.NotNil(c.T(), err)
}

func (c *clientTestSuite) TestInjectVendorType() {
injector := &injectVendorType{}
req, err := http.NewRequest("GET", "http://localhost:8080/api", nil)
assert.NoError(c.T(), err)
assert.Equal(c.T(), "", req.Header.Get("VendorType"))
// after injecting should appear vendor type in header
err = injector.Modify(req)
assert.NoError(c.T(), err)
assert.Equal(c.T(), "RETENTION", req.Header.Get("VendorType"))
}

func TestClientTestSuite(t *testing.T) {
suite.Run(t, new(clientTestSuite))
}
37 changes: 37 additions & 0 deletions src/pkg/retention/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ package retention

import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/olekukonko/tablewriter"

"github.com/goharbor/harbor/src/controller/quota"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/retry"
"github.com/goharbor/harbor/src/lib/selector"
"github.com/goharbor/harbor/src/pkg/retention/dep"
"github.com/goharbor/harbor/src/pkg/retention/policy"
Expand Down Expand Up @@ -116,6 +119,14 @@ func (pj *Job) Run(ctx job.Context, params job.Parameters) error {
return logError(myLogger, err)
}

// refreshQuota after the deleting candidates
if !isDryRun {
if err = refreshQuota(ctx.SystemContext(), results); err != nil {
// just log error if refresh quota error
myLogger.Errorf("Refresh quota error after deleting candidates, error: %v", err)
}
}

// Log stage: results with table view
logResults(myLogger, allCandidates, results)

Expand Down Expand Up @@ -288,3 +299,29 @@ func getParamMeta(params job.Parameters) (*lwp.Metadata, error) {

return meta, nil
}

// refreshQuota refreshes quota by deleted results.
func refreshQuota(ctx context.Context, results []*selector.Result) error {
projects := make(map[int64]struct{})
for _, res := range results {
if res != nil && res.Target != nil {
projects[res.Target.NamespaceID] = struct{}{}
}
}

// refresh quota by project
for pid := range projects {
// retry options, enable backoff to reduce the db CPU resource usage.
opts := []retry.Option{
retry.Backoff(true),
// the interval value was determined based on experimental results as a way to achieve a faster total time with less cpu.
retry.InitialInterval(5 * time.Second),
retry.MaxInterval(10 * time.Second),
}
if err := quota.Ctl.Refresh(ctx, quota.ProjectReference, fmt.Sprintf("%d", pid), quota.WithRetryOptions(opts)); err != nil {
return err
}
}

return nil
}
10 changes: 10 additions & 0 deletions src/server/middleware/quota/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/goharbor/harbor/src/pkg/quota"
"github.com/goharbor/harbor/src/pkg/quota/types"
"github.com/goharbor/harbor/src/server/middleware"
"github.com/goharbor/harbor/src/server/middleware/security"
)

var (
Expand Down Expand Up @@ -219,6 +220,15 @@ func RefreshMiddleware(config RefreshConfig, skipers ...middleware.Skipper) func
return nil
}

// if the request is from jobservice and is retention job, ignore refresh as tag retention
// delete artifact, and if the number of artifact is large that will
// cause huge db CPU resource for refresh quota, so ignore here and let
// task call the refresh on the own initiative.
if security.FromJobservice(r) && security.FromJobRetention(r) {
logger.Debugf("quota is skipped for %s %s, because this request is from jobservice retention job", reference, referenceID)
return nil
}

if err = quotaController.Refresh(r.Context(), reference, referenceID, cq.IgnoreLimitation(config.IgnoreLimitation)); err != nil {
logger.Errorf("refresh quota for %s %s failed, error: %v", reference, referenceID, err)

Expand Down
23 changes: 23 additions & 0 deletions src/server/middleware/security/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package security
import (
"net/http"
"strings"

commonsecret "github.com/goharbor/harbor/src/common/secret"
"github.com/goharbor/harbor/src/common/security"
"github.com/goharbor/harbor/src/jobservice/job"
)

func bearerToken(req *http.Request) string {
Expand All @@ -16,3 +20,22 @@ func bearerToken(req *http.Request) string {
}
return strings.TrimSpace(token[1])
}

// FromJobservice detects whether this request is from jobservice.
func FromJobservice(req *http.Request) bool {
sc, ok := security.FromContext(req.Context())
if !ok {
return false
}
// check whether the user is jobservice user
return sc.GetUsername() == commonsecret.JobserviceUser
}

// FromJobRetention detects whether this request is from tag retention job.
func FromJobRetention(req *http.Request) bool {
if req != nil && req.Header != nil {
return req.Header.Get("VendorType") == job.Retention
}

return false
}
36 changes: 36 additions & 0 deletions src/server/middleware/security/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import (
"net/http"
"testing"

"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/security"
"github.com/goharbor/harbor/src/common/security/local"
securitysecret "github.com/goharbor/harbor/src/common/security/secret"
"github.com/goharbor/harbor/src/lib/config"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -38,3 +43,34 @@ func TestBearerToken(t *testing.T) {
assert.Equal(t, c.token, bearerToken(c.request))
}
}

func TestFromJobservice(t *testing.T) {
// no security ctx should return false
req1, _ := http.NewRequest(http.MethodHead, "/api", nil)
assert.False(t, FromJobservice(req1))
// other username should return false
req2, _ := http.NewRequest(http.MethodHead, "/api", nil)
secCtx1 := local.NewSecurityContext(&models.User{UserID: 1, Username: "test-user"})
req2 = req2.WithContext(security.NewContext(req2.Context(), secCtx1))
assert.False(t, FromJobservice(req2))
// secret ctx from jobservice should return true
req3, _ := http.NewRequest(http.MethodHead, "/api", nil)
config.Init()
secCtx2 := securitysecret.NewSecurityContext(config.JobserviceSecret(), config.SecretStore)
req3 = req3.WithContext(security.NewContext(req3.Context(), secCtx2))
assert.True(t, FromJobservice(req3))
}

func TestFromJobRetention(t *testing.T) {
// return false if req is nil
assert.False(t, FromJobRetention(nil))
// return false if req has no header
req1, err := http.NewRequest("GET", "http://localhost:8080/api", nil)
assert.NoError(t, err)
assert.False(t, FromJobRetention(req1))
// return true if header has retention vendor type
req2, err := http.NewRequest("GET", "http://localhost:8080/api", nil)
assert.NoError(t, err)
req2.Header.Set("VendorType", "RETENTION")
assert.True(t, FromJobRetention(req2))
}

0 comments on commit faaa825

Please sign in to comment.