Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reduce the high db cpu usage for tag retention #17296

Merged
merged 1 commit into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion make/migrations/postgresql/0090_2.6.0_schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ BEGIN
UPDATE execution SET end_time=(SELECT MAX(end_time) FROM task WHERE execution_id=exec.id) WHERE id=exec.id;
END IF;
END LOOP;
END $$;
END $$;

/* Add indexes to improve the performance of tag retention */
CREATE INDEX IF NOT EXISTS idx_artifact_blob_digest_blob ON artifact_blob (digest_blob);
CREATE INDEX IF NOT EXISTS idx_artifact_digest_project_id ON artifact (digest,project_id);
9 changes: 7 additions & 2 deletions src/controller/quota/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (c *controller) List(ctx context.Context, query *q.Query, options ...Option
return quotas, nil
}

func (c *controller) updateUsageWithRetry(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error)) error {
func (c *controller) updateUsageWithRetry(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error), retryOpts ...retry.Option) error {
f := func() error {
q, err := c.quotaMgr.GetByRef(ctx, reference, referenceID)
if err != nil {
Expand Down Expand Up @@ -202,6 +202,11 @@ func (c *controller) updateUsageWithRetry(ctx context.Context, reference, refere
log.G(ctx).Debugf("failed to update the quota usage for %s %s, error: %v", reference, referenceID, err)
}),
}
// append for override default retry options
if len(retryOpts) > 0 {
options = append(options, retryOpts...)
}

return retry.Retry(f, options...)
}

Expand All @@ -223,7 +228,7 @@ func (c *controller) Refresh(ctx context.Context, reference, referenceID string,
return newUsed, err
}

return c.updateUsageWithRetry(ctx, reference, referenceID, refreshResources(calculateUsage, opts.IgnoreLimitation))
return c.updateUsageWithRetry(ctx, reference, referenceID, refreshResources(calculateUsage, opts.IgnoreLimitation), opts.RetryOptions...)
}

func (c *controller) Request(ctx context.Context, reference, referenceID string, resources types.ResourceList, f func() error) error {
Expand Down
11 changes: 11 additions & 0 deletions src/controller/quota/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@

package quota

import "github.com/goharbor/harbor/src/lib/retry"

// Option option for `Refresh` method of `Controller`
type Option func(*Options)

// Options options used by `Refresh`, `Get`, `List` methods of `Controller`
type Options struct {
IgnoreLimitation bool
WithReferenceObject bool
// RetryOptions is the sets of options but for retry function.
RetryOptions []retry.Option
}

// IgnoreLimitation set IgnoreLimitation for the Options
Expand All @@ -37,6 +41,13 @@ func WithReferenceObject() func(*Options) {
}
}

// WithRetryOptions set RetryOptions to Options
func WithRetryOptions(retryOpts []retry.Option) func(*Options) {
return func(opts *Options) {
opts.RetryOptions = retryOpts
}
}

func newOptions(options ...Option) *Options {
opts := &Options{}
for _, f := range options {
Expand Down
4 changes: 2 additions & 2 deletions src/pkg/clients/core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ type ChartClient interface {
}

// New returns an instance of the client which is a default implement for Client
func New(url string, httpclient *http.Client, authorizer modifier.Modifier) Client {
func New(url string, httpclient *http.Client, modifiers ...modifier.Modifier) Client {
return &client{
url: url,
httpclient: chttp.NewClient(httpclient, authorizer),
httpclient: chttp.NewClient(httpclient, modifiers...),
}
}

Expand Down
14 changes: 13 additions & 1 deletion src/pkg/retention/dep/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/goharbor/harbor/src/common/http/modifier/auth"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/selector"
"github.com/goharbor/harbor/src/pkg/clients/core"
)
Expand Down Expand Up @@ -59,6 +60,17 @@ type Client interface {
Delete(candidate *selector.Candidate) error
}

type injectVendorType struct{}

// injectVendorType injects vendor type to request header.
func (i *injectVendorType) Modify(req *http.Request) error {
if req != nil {
req.Header.Set("VendorType", job.Retention)
}

return nil
}

// NewClient new a basic client
func NewClient(client ...*http.Client) Client {
var c *http.Client
Expand All @@ -73,7 +85,7 @@ func NewClient(client ...*http.Client) Client {
internalCoreURL := config.GetCoreURL()
jobserviceSecret := config.GetAuthSecret()
authorizer := auth.NewSecretAuthorizer(jobserviceSecret)
coreClient := core.New(internalCoreURL, c, authorizer)
coreClient := core.New(internalCoreURL, c, authorizer, &injectVendorType{})

return &basicClient{
internalCoreURL: internalCoreURL,
Expand Down
12 changes: 12 additions & 0 deletions src/pkg/retention/dep/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package dep

import (
"net/http"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -143,6 +144,17 @@ func (c *clientTestSuite) TestDelete() {
require.NotNil(c.T(), err)
}

func (c *clientTestSuite) TestInjectVendorType() {
injector := &injectVendorType{}
req, err := http.NewRequest("GET", "http://localhost:8080/api", nil)
assert.NoError(c.T(), err)
assert.Equal(c.T(), "", req.Header.Get("VendorType"))
// after injecting should appear vendor type in header
err = injector.Modify(req)
assert.NoError(c.T(), err)
assert.Equal(c.T(), "RETENTION", req.Header.Get("VendorType"))
}

func TestClientTestSuite(t *testing.T) {
suite.Run(t, new(clientTestSuite))
}
37 changes: 37 additions & 0 deletions src/pkg/retention/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ package retention

import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/olekukonko/tablewriter"

"github.com/goharbor/harbor/src/controller/quota"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/retry"
"github.com/goharbor/harbor/src/lib/selector"
"github.com/goharbor/harbor/src/pkg/retention/dep"
"github.com/goharbor/harbor/src/pkg/retention/policy"
Expand Down Expand Up @@ -116,6 +119,14 @@ func (pj *Job) Run(ctx job.Context, params job.Parameters) error {
return logError(myLogger, err)
}

// refreshQuota after the deleting candidates
if !isDryRun {
if err = refreshQuota(ctx.SystemContext(), results); err != nil {
// just log error if refresh quota error
myLogger.Errorf("Refresh quota error after deleting candidates, error: %v", err)
}
}

// Log stage: results with table view
logResults(myLogger, allCandidates, results)

Expand Down Expand Up @@ -288,3 +299,29 @@ func getParamMeta(params job.Parameters) (*lwp.Metadata, error) {

return meta, nil
}

// refreshQuota refreshes quota by deleted results.
func refreshQuota(ctx context.Context, results []*selector.Result) error {
projects := make(map[int64]struct{})
for _, res := range results {
if res != nil && res.Target != nil {
projects[res.Target.NamespaceID] = struct{}{}
}
}

// refresh quota by project
for pid := range projects {
// retry options, enable backoff to reduce the db CPU resource usage.
opts := []retry.Option{
retry.Backoff(true),
// the interval value was determined based on experimental results as a way to achieve a faster total time with less cpu.
retry.InitialInterval(5 * time.Second),
retry.MaxInterval(10 * time.Second),
}
if err := quota.Ctl.Refresh(ctx, quota.ProjectReference, fmt.Sprintf("%d", pid), quota.WithRetryOptions(opts)); err != nil {
return err
}
}

return nil
}
10 changes: 10 additions & 0 deletions src/server/middleware/quota/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/goharbor/harbor/src/pkg/quota"
"github.com/goharbor/harbor/src/pkg/quota/types"
"github.com/goharbor/harbor/src/server/middleware"
"github.com/goharbor/harbor/src/server/middleware/security"
)

var (
Expand Down Expand Up @@ -219,6 +220,15 @@ func RefreshMiddleware(config RefreshConfig, skipers ...middleware.Skipper) func
return nil
}

// if the request is from jobservice and is retention job, ignore refresh as tag retention
// delete artifact, and if the number of artifact is large that will
// cause huge db CPU resource for refresh quota, so ignore here and let
// task call the refresh on the own initiative.
if security.FromJobservice(r) && security.FromJobRetention(r) {
logger.Debugf("quota is skipped for %s %s, because this request is from jobservice retention job", reference, referenceID)
return nil
}

if err = quotaController.Refresh(r.Context(), reference, referenceID, cq.IgnoreLimitation(config.IgnoreLimitation)); err != nil {
logger.Errorf("refresh quota for %s %s failed, error: %v", reference, referenceID, err)

Expand Down
23 changes: 23 additions & 0 deletions src/server/middleware/security/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package security
import (
"net/http"
"strings"

commonsecret "github.com/goharbor/harbor/src/common/secret"
"github.com/goharbor/harbor/src/common/security"
"github.com/goharbor/harbor/src/jobservice/job"
)

func bearerToken(req *http.Request) string {
Expand All @@ -16,3 +20,22 @@ func bearerToken(req *http.Request) string {
}
return strings.TrimSpace(token[1])
}

// FromJobservice detects whether this request is from jobservice.
func FromJobservice(req *http.Request) bool {
chlins marked this conversation as resolved.
Show resolved Hide resolved
sc, ok := security.FromContext(req.Context())
if !ok {
return false
}
// check whether the user is jobservice user
return sc.GetUsername() == commonsecret.JobserviceUser
}

// FromJobRetention detects whether this request is from tag retention job.
func FromJobRetention(req *http.Request) bool {
if req != nil && req.Header != nil {
return req.Header.Get("VendorType") == job.Retention
}

return false
}
36 changes: 36 additions & 0 deletions src/server/middleware/security/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import (
"net/http"
"testing"

"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/security"
"github.com/goharbor/harbor/src/common/security/local"
securitysecret "github.com/goharbor/harbor/src/common/security/secret"
"github.com/goharbor/harbor/src/lib/config"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -38,3 +43,34 @@ func TestBearerToken(t *testing.T) {
assert.Equal(t, c.token, bearerToken(c.request))
}
}

func TestFromJobservice(t *testing.T) {
// no security ctx should return false
req1, _ := http.NewRequest(http.MethodHead, "/api", nil)
assert.False(t, FromJobservice(req1))
// other username should return false
req2, _ := http.NewRequest(http.MethodHead, "/api", nil)
secCtx1 := local.NewSecurityContext(&models.User{UserID: 1, Username: "test-user"})
req2 = req2.WithContext(security.NewContext(req2.Context(), secCtx1))
assert.False(t, FromJobservice(req2))
// secret ctx from jobservice should return true
req3, _ := http.NewRequest(http.MethodHead, "/api", nil)
config.Init()
secCtx2 := securitysecret.NewSecurityContext(config.JobserviceSecret(), config.SecretStore)
req3 = req3.WithContext(security.NewContext(req3.Context(), secCtx2))
assert.True(t, FromJobservice(req3))
}

func TestFromJobRetention(t *testing.T) {
// return false if req is nil
assert.False(t, FromJobRetention(nil))
// return false if req has no header
req1, err := http.NewRequest("GET", "http://localhost:8080/api", nil)
assert.NoError(t, err)
assert.False(t, FromJobRetention(req1))
// return true if header has retention vendor type
req2, err := http.NewRequest("GET", "http://localhost:8080/api", nil)
assert.NoError(t, err)
req2.Header.Set("VendorType", "RETENTION")
assert.True(t, FromJobRetention(req2))
}