Skip to content

Commit

Permalink
Unique repo path and permissions (#8517)
Browse files Browse the repository at this point in the history
Unique repo path and permissions (#8517)

Signed-off-by: Alexander Matyushentsev <[email protected]>
  • Loading branch information
alexmt authored Feb 24, 2022
1 parent ffa74bb commit c7da148
Show file tree
Hide file tree
Showing 10 changed files with 468 additions and 79 deletions.
13 changes: 9 additions & 4 deletions reposerver/repository/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type repositoryLock struct {
}

// Lock acquires lock unless lock is already acquired with the same commit and allowConcurrent is set to true
func (r *repositoryLock) Lock(path string, revision string, allowConcurrent bool, init func() error) (io.Closer, error) {
func (r *repositoryLock) Lock(path string, revision string, allowConcurrent bool, init func() (io.Closer, error)) (io.Closer, error) {
r.lock.Lock()
state, ok := r.stateByKey[path]
if !ok {
Expand All @@ -30,26 +30,30 @@ func (r *repositoryLock) Lock(path string, revision string, allowConcurrent bool
state.cond.L.Lock()
notify := false
state.processCount--
var err error
if state.processCount == 0 {
notify = true
state.revision = ""
err = state.initCloser.Close()
}

state.cond.L.Unlock()
if notify {
state.cond.Broadcast()
}
return nil
return err
})

for {
state.cond.L.Lock()
if state.revision == "" {
// no in progress operation for that repo. Go ahead.
if err := init(); err != nil {
initCloser, err := init()
if err != nil {
state.cond.L.Unlock()
return nil, err
}

state.initCloser = initCloser
state.revision = revision
state.processCount = 1
state.allowConcurrent = allowConcurrent
Expand All @@ -71,6 +75,7 @@ func (r *repositoryLock) Lock(path string, revision string, allowConcurrent bool
type repositoryState struct {
cond *sync.Cond
revision string
initCloser io.Closer
processCount int
allowConcurrent bool
}
14 changes: 7 additions & 7 deletions reposerver/repository/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ func lockQuickly(action func() (io.Closer, error)) (io.Closer, bool) {
}
}

func numberOfInits(initializedTimes *int) func() error {
return func() error {
func numberOfInits(initializedTimes *int) func() (io.Closer, error) {
return func() (io.Closer, error) {
*initializedTimes++
return nil
return util.NopCloser, nil
}
}

Expand Down Expand Up @@ -120,8 +120,8 @@ func TestLock_FailedInitialization(t *testing.T) {
lock := NewRepositoryLock()

closer1, done := lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "1", true, func() error {
return errors.New("failed")
return lock.Lock("myRepo", "1", true, func() (io.Closer, error) {
return util.NopCloser, errors.New("failed")
})
})

Expand All @@ -132,8 +132,8 @@ func TestLock_FailedInitialization(t *testing.T) {
assert.Nil(t, closer1)

closer2, done := lockQuickly(func() (io.Closer, error) {
return lock.Lock("myRepo", "1", true, func() error {
return nil
return lock.Lock("myRepo", "1", true, func() (io.Closer, error) {
return util.NopCloser, nil
})
})

Expand Down
130 changes: 103 additions & 27 deletions reposerver/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"encoding/json"
"errors"
"fmt"
goio "io"
"io/fs"
"io/ioutil"
"net/url"
"os"
Expand All @@ -16,17 +18,15 @@ import (
"strings"
"time"

"github.com/google/go-jsonnet"

"github.com/argoproj/argo-cd/v2/util/argo"

"github.com/Masterminds/semver/v3"
"github.com/TomOnTime/utfutil"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
textutils "github.com/argoproj/gitops-engine/pkg/utils/text"
"github.com/argoproj/pkg/sync"
jsonpatch "github.com/evanphx/json-patch"
"github.com/ghodss/yaml"
gogit "github.com/go-git/go-git/v5"
"github.com/google/go-jsonnet"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc/codes"
Expand All @@ -44,10 +44,12 @@ import (
"github.com/argoproj/argo-cd/v2/reposerver/metrics"
"github.com/argoproj/argo-cd/v2/util/app/discovery"
argopath "github.com/argoproj/argo-cd/v2/util/app/path"
"github.com/argoproj/argo-cd/v2/util/argo"
executil "github.com/argoproj/argo-cd/v2/util/exec"
"github.com/argoproj/argo-cd/v2/util/git"
"github.com/argoproj/argo-cd/v2/util/glob"
"github.com/argoproj/argo-cd/v2/util/gpg"
"github.com/argoproj/argo-cd/v2/util/grpc"
"github.com/argoproj/argo-cd/v2/util/helm"
"github.com/argoproj/argo-cd/v2/util/io"
"github.com/argoproj/argo-cd/v2/util/ksonnet"
Expand All @@ -68,12 +70,16 @@ const (
// Service implements ManifestService interface
type Service struct {
gitCredsStore git.CredsStore
rootDir string
gitRepoPaths *io.TempPaths
chartPaths *io.TempPaths
gitRepoInitializer func(rootPath string) goio.Closer
repoLock *repositoryLock
cache *reposervercache.Cache
parallelismLimitSemaphore *semaphore.Weighted
metricsServer *metrics.MetricsServer
resourceTracking argo.ResourceTracking
newGitClient func(rawRepoURL string, creds git.Creds, insecure bool, enableLfs bool, proxy string, opts ...git.ClientOpts) (git.Client, error)
newGitClient func(rawRepoURL string, root string, creds git.Creds, insecure bool, enableLfs bool, proxy string, opts ...git.ClientOpts) (git.Client, error)
newHelmClient func(repoURL string, creds helm.Creds, enableOci bool, proxy string, opts ...helm.ClientOpts) helm.Client
initConstants RepoServerInitConstants
// now is usually just time.Now, but may be replaced by unit tests for testing purposes
Expand All @@ -89,7 +95,7 @@ type RepoServerInitConstants struct {
}

// NewService returns a new instance of the Manifest service
func NewService(metricsServer *metrics.MetricsServer, cache *reposervercache.Cache, initConstants RepoServerInitConstants, resourceTracking argo.ResourceTracking, gitCredsStore git.CredsStore) *Service {
func NewService(metricsServer *metrics.MetricsServer, cache *reposervercache.Cache, initConstants RepoServerInitConstants, resourceTracking argo.ResourceTracking, gitCredsStore git.CredsStore, rootDir string) *Service {
var parallelismLimitSemaphore *semaphore.Weighted
if initConstants.ParallelismLimit > 0 {
parallelismLimitSemaphore = semaphore.NewWeighted(initConstants.ParallelismLimit)
Expand All @@ -100,15 +106,54 @@ func NewService(metricsServer *metrics.MetricsServer, cache *reposervercache.Cac
repoLock: repoLock,
cache: cache,
metricsServer: metricsServer,
newGitClient: git.NewClient,
newGitClient: git.NewClientExt,
resourceTracking: resourceTracking,
newHelmClient: func(repoURL string, creds helm.Creds, enableOci bool, proxy string, opts ...helm.ClientOpts) helm.Client {
return helm.NewClientWithLock(repoURL, creds, sync.NewKeyLock(), enableOci, proxy, opts...)
},
initConstants: initConstants,
now: time.Now,
gitCredsStore: gitCredsStore,
initConstants: initConstants,
now: time.Now,
gitCredsStore: gitCredsStore,
gitRepoPaths: io.NewTempPaths(rootDir),
chartPaths: io.NewTempPaths(rootDir),
gitRepoInitializer: directoryPermissionInitializer,
rootDir: rootDir,
}
}

func (s *Service) Init() error {
_, err := os.Stat(s.rootDir)
if os.IsNotExist(err) {
return os.MkdirAll(s.rootDir, 0300)
}
if err == nil {
// give itself read permissions to list previously written directories
err = os.Chmod(s.rootDir, 0700)
}
var files []fs.FileInfo
if err == nil {
files, err = ioutil.ReadDir(s.rootDir)
}
if err != nil {
log.Warnf("Failed to restore cloned repositories paths: %v", err)
return nil
}

for _, file := range files {
if !file.IsDir() {
continue
}
fullPath := filepath.Join(s.rootDir, file.Name())
closer := s.gitRepoInitializer(fullPath)
if repo, err := gogit.PlainOpen(fullPath); err == nil {
if remotes, err := repo.Remotes(); err == nil && len(remotes) > 0 && len(remotes[0].Config().URLs) > 0 {
s.gitRepoPaths.Add(git.NormalizeGitURL(remotes[0].Config().URLs[0]), fullPath)
}
}
io.Close(closer)
}
// remove read permissions since no-one should be able to list the directories
return os.Chmod(s.rootDir, 0300)
}

// List a subset of the refs (currently, branches and tags) of a git repo
Expand Down Expand Up @@ -148,8 +193,8 @@ func (s *Service) ListApps(ctx context.Context, q *apiclient.ListAppsRequest) (*
s.metricsServer.IncPendingRepoRequest(q.Repo.Repo)
defer s.metricsServer.DecPendingRepoRequest(q.Repo.Repo)

closer, err := s.repoLock.Lock(gitClient.Root(), commitSHA, true, func() error {
return checkoutRevision(gitClient, commitSHA, s.initConstants.SubmoduleEnabled)
closer, err := s.repoLock.Lock(gitClient.Root(), commitSHA, true, func() (goio.Closer, error) {
return s.checkoutRevision(gitClient, commitSHA, s.initConstants.SubmoduleEnabled)
})

if err != nil {
Expand Down Expand Up @@ -207,6 +252,11 @@ func (s *Service) runRepoOperation(
operation func(repoRoot, commitSHA, cacheKey string, ctxSrc operationContextSrc) error,
settings operationSettings) error {

if sanitizer, ok := grpc.SanitizerFromContext(ctx); ok {
// make sure randomized path replaced with '.' in the error message
sanitizer.AddRegexReplacement(regexp.MustCompile(`(`+s.rootDir+`/.*?)/`), ".")
}

var gitClient git.Client
var helmClient helm.Client
var err error
Expand Down Expand Up @@ -260,8 +310,8 @@ func (s *Service) runRepoOperation(
return &operationContext{chartPath, ""}, nil
})
} else {
closer, err := s.repoLock.Lock(gitClient.Root(), revision, settings.allowConcurrent, func() error {
return checkoutRevision(gitClient, revision, s.initConstants.SubmoduleEnabled)
closer, err := s.repoLock.Lock(gitClient.Root(), revision, settings.allowConcurrent, func() (goio.Closer, error) {
return s.checkoutRevision(gitClient, revision, s.initConstants.SubmoduleEnabled)
})

if err != nil {
Expand Down Expand Up @@ -1626,8 +1676,8 @@ func (s *Service) GetRevisionMetadata(ctx context.Context, q *apiclient.RepoServ
s.metricsServer.IncPendingRepoRequest(q.Repo.Repo)
defer s.metricsServer.DecPendingRepoRequest(q.Repo.Repo)

closer, err := s.repoLock.Lock(gitClient.Root(), q.Revision, true, func() error {
return checkoutRevision(gitClient, q.Revision, s.initConstants.SubmoduleEnabled)
closer, err := s.repoLock.Lock(gitClient.Root(), q.Revision, true, func() (goio.Closer, error) {
return s.checkoutRevision(gitClient, q.Revision, s.initConstants.SubmoduleEnabled)
})

if err != nil {
Expand Down Expand Up @@ -1675,8 +1725,12 @@ func fileParameters(q *apiclient.RepoServerAppDetailsQuery) []v1alpha1.HelmFileP
}

func (s *Service) newClient(repo *v1alpha1.Repository, opts ...git.ClientOpts) (git.Client, error) {
repoPath, err := s.gitRepoPaths.GetPath(git.NormalizeGitURL(repo.Repo))
if err != nil {
return nil, err
}
opts = append(opts, git.WithEventHandlers(metrics.NewGitClientEventHandlers(s.metricsServer)))
return s.newGitClient(repo.Repo, repo.GetGitCreds(s.gitCredsStore), repo.IsInsecure(), repo.EnableLFS, repo.Proxy, opts...)
return s.newGitClient(repo.Repo, repoPath, repo.GetGitCreds(s.gitCredsStore), repo.IsInsecure(), repo.EnableLFS, repo.Proxy, opts...)
}

// newClientResolveRevision is a helper to perform the common task of instantiating a git client
Expand All @@ -1695,7 +1749,7 @@ func (s *Service) newClientResolveRevision(repo *v1alpha1.Repository, revision s

func (s *Service) newHelmClientResolveRevision(repo *v1alpha1.Repository, revision string, chart string, noRevisionCache bool) (helm.Client, string, error) {
enableOCI := repo.EnableOCI || helm.IsHelmOciRepo(repo.Repo)
helmClient := s.newHelmClient(repo.Repo, repo.GetHelmCreds(), enableOCI, repo.Proxy, helm.WithIndexCache(s.cache))
helmClient := s.newHelmClient(repo.Repo, repo.GetHelmCreds(), enableOCI, repo.Proxy, helm.WithIndexCache(s.cache), helm.WithChartPaths(s.chartPaths))
// OCI helm registers don't support semver ranges. Assuming that given revision is exact version
if helm.IsVersion(revision) || enableOCI {
return helmClient, revision, nil
Expand All @@ -1719,13 +1773,35 @@ func (s *Service) newHelmClientResolveRevision(repo *v1alpha1.Repository, revisi
return helmClient, version.String(), nil
}

// directoryPermissionInitializer ensures the directory has read/write/execute permissions and returns
// a function that can be used to remove all permissions.
func directoryPermissionInitializer(rootPath string) goio.Closer {
if _, err := os.Stat(rootPath); err == nil {
if err := os.Chmod(rootPath, 0700); err != nil {
log.Warnf("Failed to restore read/write/execute permissions on %s: %v", rootPath, err)
} else {
log.Debugf("Successfully restored read/write/execute permissions on %s", rootPath)
}
}

return io.NewCloser(func() error {
if err := os.Chmod(rootPath, 0000); err != nil {
log.Warnf("Failed to remove permissions on %s: %v", rootPath, err)
} else {
log.Debugf("Successfully removed permissions on %s", rootPath)
}
return nil
})
}

// checkoutRevision is a convenience function to initialize a repo, fetch, and checkout a revision
// Returns the 40 character commit SHA after the checkout has been performed
// nolint:unparam
func checkoutRevision(gitClient git.Client, revision string, submoduleEnabled bool) error {
func (s *Service) checkoutRevision(gitClient git.Client, revision string, submoduleEnabled bool) (goio.Closer, error) {
closer := s.gitRepoInitializer(gitClient.Root())
err := gitClient.Init()
if err != nil {
return status.Errorf(codes.Internal, "Failed to initialize git repo: %v", err)
return closer, status.Errorf(codes.Internal, "Failed to initialize git repo: %v", err)
}

err = gitClient.Fetch(revision)
Expand All @@ -1735,25 +1811,25 @@ func checkoutRevision(gitClient git.Client, revision string, submoduleEnabled bo
log.Infof("Fallback to fetch default")
err = gitClient.Fetch("")
if err != nil {
return status.Errorf(codes.Internal, "Failed to fetch default: %v", err)
return closer, status.Errorf(codes.Internal, "Failed to fetch default: %v", err)
}
err = gitClient.Checkout(revision, submoduleEnabled)
if err != nil {
return status.Errorf(codes.Internal, "Failed to checkout revision %s: %v", revision, err)
return closer, status.Errorf(codes.Internal, "Failed to checkout revision %s: %v", revision, err)
}
return err
return closer, err
}

err = gitClient.Checkout("FETCH_HEAD", submoduleEnabled)
if err != nil {
return status.Errorf(codes.Internal, "Failed to checkout FETCH_HEAD: %v", err)
return closer, status.Errorf(codes.Internal, "Failed to checkout FETCH_HEAD: %v", err)
}

return err
return closer, err
}

func (s *Service) GetHelmCharts(ctx context.Context, q *apiclient.HelmChartsRequest) (*apiclient.HelmChartsResponse, error) {
index, err := s.newHelmClient(q.Repo.Repo, q.Repo.GetHelmCreds(), q.Repo.EnableOCI, q.Repo.Proxy).GetIndex(true)
index, err := s.newHelmClient(q.Repo.Repo, q.Repo.GetHelmCreds(), q.Repo.EnableOCI, q.Repo.Proxy, helm.WithChartPaths(s.chartPaths)).GetIndex(true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1814,7 +1890,7 @@ func (s *Service) ResolveRevision(ctx context.Context, q *apiclient.ResolveRevis
if helm.IsVersion(ambiguousRevision) {
return &apiclient.ResolveRevisionResponse{Revision: ambiguousRevision, AmbiguousRevision: ambiguousRevision}, nil
}
client := helm.NewClient(repo.Repo, repo.GetHelmCreds(), repo.EnableOCI || app.Spec.Source.IsHelmOci(), repo.Proxy)
client := helm.NewClient(repo.Repo, repo.GetHelmCreds(), repo.EnableOCI || app.Spec.Source.IsHelmOci(), repo.Proxy, helm.WithChartPaths(s.chartPaths))
index, err := client.GetIndex(false)
if err != nil {
return &apiclient.ResolveRevisionResponse{Revision: "", AmbiguousRevision: ""}, err
Expand Down
Loading

0 comments on commit c7da148

Please sign in to comment.