diff --git a/internal/git/repository.go b/internal/git/repository.go index 4ab62bc..f839ab5 100644 --- a/internal/git/repository.go +++ b/internal/git/repository.go @@ -362,38 +362,55 @@ func (r *Repository) ListCommits(ctx context.Context, branch, from string, filte }), nil } -type ViewUpdateOptions struct { +type BranchOptions struct { + // base on CreateBranchIfNotExists sets the created branch to be based on the head of base + // base on View is not used + base string + // branch on View resolves to the branch if revision is not explicitly supplied + // branch on Update will commit to and push the branch branch string // revision on View predicates it to the specific hash // revision on Update returns a conflict error if the branch head does not match revision plumbing.Hash // force configures an update to ignore any conflicts when attempting to push force bool + // pushIfEmpty on Update causes a push to happen even if commit is empty + pushIfEmpty bool } -func (r *Repository) getOptions(opts ...containers.Option[ViewUpdateOptions]) *ViewUpdateOptions { - defaultOptions := &ViewUpdateOptions{branch: r.defaultBranch} +func (r *Repository) getOptions(opts ...containers.Option[BranchOptions]) *BranchOptions { + defaultOptions := &BranchOptions{base: r.defaultBranch, branch: r.defaultBranch} containers.ApplyAll(defaultOptions, opts...) return defaultOptions } -func WithBranch(branch string) containers.Option[ViewUpdateOptions] { - return func(vuo *ViewUpdateOptions) { - vuo.branch = branch +func WithBase(name string) containers.Option[BranchOptions] { + return func(o *BranchOptions) { + o.base = name + } +} + +func WithBranch(branch string) containers.Option[BranchOptions] { + return func(o *BranchOptions) { + o.branch = branch } } -func WithRevision(rev plumbing.Hash) containers.Option[ViewUpdateOptions] { - return func(vuo *ViewUpdateOptions) { - vuo.revision = rev +func WithRevision(rev plumbing.Hash) containers.Option[BranchOptions] { + return func(o *BranchOptions) { + o.revision = rev } } -func WithForce(vuo *ViewUpdateOptions) { - vuo.force = true +func WithForce(o *BranchOptions) { + o.force = true +} + +func WithPushIfEmpty(o *BranchOptions) { + o.pushIfEmpty = true } -func (r *Repository) View(ctx context.Context, fn func(hash plumbing.Hash, fs fs.Filesystem) error, opts ...containers.Option[ViewUpdateOptions]) (err error) { +func (r *Repository) View(ctx context.Context, fn func(hash plumbing.Hash, fs fs.Filesystem) error, opts ...containers.Option[BranchOptions]) (err error) { r.mu.RLock() defer r.mu.RUnlock() @@ -417,7 +434,7 @@ func (r *Repository) View(ctx context.Context, fn func(hash plumbing.Hash, fs fs return fn(hash, fs) } -func (r *Repository) UpdateAndPush(ctx context.Context, fn func(fs fs.Filesystem) (string, error), opts ...containers.Option[ViewUpdateOptions]) (hash plumbing.Hash, err error) { +func (r *Repository) UpdateAndPush(ctx context.Context, fn func(fs fs.Filesystem) (string, error), opts ...containers.Option[BranchOptions]) (hash plumbing.Hash, err error) { r.mu.Lock() defer r.mu.Unlock() @@ -449,7 +466,19 @@ func (r *Repository) UpdateAndPush(ctx context.Context, fn func(fs fs.Filesystem commit, err := fs.commit(ctx, msg) if err != nil { - return hash, err + if !errors.Is(err, ErrEmptyCommit) { + return hash, err + } + + if !options.pushIfEmpty { + return hash, err + } + + // fetch commit for hash and we will attempt to re-push + commit, err = r.repo.CommitObject(hash) + if err != nil { + return hash, err + } } if r.remote != nil { @@ -534,17 +563,7 @@ func (r *Repository) Resolve(branch string) (plumbing.Hash, error) { return reference.Hash(), nil } -type CreateBranchOptions struct { - base string -} - -func WithBase(name string) containers.Option[CreateBranchOptions] { - return func(cbo *CreateBranchOptions) { - cbo.base = name - } -} - -func (r *Repository) CreateBranchIfNotExists(branch string, opts ...containers.Option[CreateBranchOptions]) error { +func (r *Repository) CreateBranchIfNotExists(branch string, opts ...containers.Option[BranchOptions]) error { r.mu.Lock() defer r.mu.Unlock() @@ -559,9 +578,7 @@ func (r *Repository) CreateBranchIfNotExists(branch string, opts ...containers.O return nil } - opt := CreateBranchOptions{base: r.defaultBranch} - - containers.ApplyAll(&opt, opts...) + opt := r.getOptions(opts...) reference, err := r.repo.Reference(plumbing.NewRemoteReferenceName(remoteName, opt.base), true) if err != nil { diff --git a/pkg/phases/git/git.go b/pkg/phases/git/git.go index 7da08d0..8a1fd84 100644 --- a/pkg/phases/git/git.go +++ b/pkg/phases/git/git.go @@ -22,11 +22,10 @@ import ( ) const ( - AnnotationGitBaseRefKey = "dev.getglu.git.base_ref" - AnnotationGitHeadSHAKey = "dev.getglu.git.head_sha" - AnnotationGitCommitURLKey = "dev.getglu.git.commit.url" - AnnotationProposalNumberKey = "dev.getglu.git.proposal.number" - AnnotationProposalURLKey = "dev.getglu.git.proposal.url" + AnnotationGitBaseRefKey = "dev.getglu.git.base_ref" + AnnotationGitHeadSHAKey = "dev.getglu.git.head_sha" + AnnotationGitCommitURLKey = "dev.getglu.git.commit.url" + AnnotationProposalURLKey = "dev.getglu.git.proposal.url" ) var ( @@ -47,14 +46,18 @@ type Resource interface { // Proposer is a type which can be used to create and manage proposals. type Proposer interface { GetCurrentProposal(_ context.Context, baseBranch, branchPrefix string) (*Proposal, error) + IsProposalOpen(context.Context, *Proposal) bool CreateProposal(context.Context, *Proposal, ProposalOption) error - MergeProposal(context.Context, *Proposal) error CloseProposal(context.Context, *Proposal) error + CommentProposal(context.Context, *Proposal, string) error } // Proposal contains the fields necessary to propose a resource update // to a Repository. type Proposal struct { + ID string + URL string + BaseRevision string BaseBranch string Branch string @@ -69,14 +72,16 @@ type Proposal struct { // Phase is a Git storage backed phase implementation. // It is used to manage the state of a resource as represented in a target Git repository. type Phase[R Resource] struct { - pipeline string - meta core.Metadata - newFn func() R - repo *git.Repository + pipeline string + meta core.Metadata + newFn func() R + repo *git.Repository + logger typed.PhaseLogger[R] + proposer Proposer proposeChange bool proposalOptions ProposalOption - logger typed.PhaseLogger[R] + currentProposal *Proposal } // Descriptor returns the phases descriptor. @@ -144,6 +149,11 @@ func New[R Resource]( return nil, err } + // attempt to populate cache with current proposal + if _, err := phase.getCurrentProposal(ctx); err != nil && !errors.Is(err, ErrProposalNotFound) { + return nil, err + } + return phase, nil } @@ -207,8 +217,7 @@ func (p *Phase[R]) Notify(ctx context.Context, refs map[string]string) error { return p.recordPhaseState(ctx, git.WithRevision(plumbing.NewHash(ref))) } -func (p *Phase[R]) recordPhaseState(ctx context.Context, opts ...containers.Option[git.ViewUpdateOptions]) (err error) { - +func (p *Phase[R]) recordPhaseState(ctx context.Context, opts ...containers.Option[git.BranchOptions]) (err error) { var ( r = p.newFn() hash plumbing.Hash @@ -276,20 +285,13 @@ func (p *Phase[R]) Update(ctx context.Context, to R, opts ...containers.Option[t return nil, err } - // use the target resources branch if it implementes an override - desc := p.Descriptor() - baseBranch := p.repo.DefaultBranch() - if branched, ok := core.Resource(to).(Branched); ok { - baseBranch = branched.Branch(desc) - } - annotations := map[string]string{ - AnnotationGitBaseRefKey: baseBranch, + AnnotationGitBaseRefKey: p.branch(), } updateOpts := typed.NewUpdateOptions(opts...) if !p.proposeChange { - annotations[AnnotationGitHeadSHAKey], err = p.updateAndPush(ctx, from, to, updateOpts, git.WithBranch(baseBranch)) + annotations[AnnotationGitHeadSHAKey], err = p.updateAndPush(ctx, from, to, updateOpts, git.WithBranch(p.branch())) if err != nil { return nil, err } @@ -301,7 +303,7 @@ func (p *Phase[R]) Update(ctx context.Context, to R, opts ...containers.Option[t return nil, errors.New("proposal requested but not configured") } - annotations, err = p.propose(ctx, from, to, baseBranch, updateOpts) + annotations, err = p.propose(ctx, from, to, updateOpts) if err != nil { return nil, err } @@ -309,10 +311,11 @@ func (p *Phase[R]) Update(ctx context.Context, to R, opts ...containers.Option[t return &core.Result{Annotations: annotations}, nil } -func (p *Phase[R]) propose(ctx context.Context, from, to R, baseBranch string, updateOpts *typed.UpdateOptions) (map[string]string, error) { +func (p *Phase[R]) propose(ctx context.Context, from, to R, updateOpts *typed.UpdateOptions) (map[string]string, error) { slog := slog.With("name", p.meta.Name) desc := p.Descriptor() + baseBranch := p.branch() baseRev, err := p.repo.Resolve(baseBranch) if err != nil { return nil, fmt.Errorf("resolving base branch %q: %w", baseBranch, err) @@ -323,60 +326,76 @@ func (p *Phase[R]) propose(ctx context.Context, from, to R, baseBranch string, u return nil, err } - digest, err := to.Digest() + toDigest, err := to.Digest() if err != nil { return nil, err } - slog.Debug("proposing update", "from", fromDigest, "to", digest) + slog.Debug("proposing update", "from", fromDigest, "to", toDigest) // create branch name and check if this phase, resource and state has previously been observed - var ( - branchPrefix = fmt.Sprintf("glu/%s/%s", p.pipeline, p.meta.Name) - branch = path.Join(branchPrefix, digest) - ) + branch := path.Join(p.branchPrefix(), toDigest) // ensure branch exists locally either way if err := p.repo.CreateBranchIfNotExists(branch, git.WithBase(baseBranch)); err != nil { return nil, err } - proposal, err := p.proposer.GetCurrentProposal(ctx, baseBranch, branchPrefix) - if err != nil { - if !errors.Is(err, ErrProposalNotFound) { - return nil, err - } - - slog.Debug("proposal not found") + options := []containers.Option[git.BranchOptions]{ + git.WithBranch(branch), + git.WithBase(baseBranch), + git.WithPushIfEmpty, } - options := []containers.Option[git.ViewUpdateOptions]{git.WithBranch(branch)} - if proposal != nil { + makeComment := func(*Proposal) error { return nil } + + proposal, err := p.getCurrentProposal(ctx) + if err == nil { // there is an existing proposal - slog.Debug("proposal found", "base", proposal.BaseBranch, "base_revision", proposal.BaseRevision) + slog.Debug("proposal found", + "base", proposal.BaseBranch, "base_revision", proposal.BaseRevision, + "proposal_digest", proposal.Digest, + "destination_digest", toDigest, + ) + + // we're potentially going to force update the branch to move the base + options = append(options, git.WithForce) + if proposal.BaseRevision != baseRev.String() { - // we're potentially going to force update the branch to move the base - options = append(options, git.WithForce) - } else if proposal.Digest == digest { + head, err := p.updateAndPush(ctx, from, to, updateOpts, options...) + if err != nil { + return nil, fmt.Errorf("updating existing proposal: %w", err) + } + + // we're updating the head position of an existing proposal + // so we need to update the value of head in the returned annotations + annotations := annotations(proposal) + annotations[AnnotationGitHeadSHAKey] = head + + return annotations, nil + } else if proposal.Digest == toDigest { // nothing has changed since the last promotion and proposals slog.Debug("skipping proposal", "reason", "AlreadyExistsAndUpToDate") return annotations(proposal), nil } - head, err := p.updateAndPush(ctx, from, to, updateOpts, options...) - if err != nil { - return nil, fmt.Errorf("updating existing proposal: %w", err) + // close the proposal as we're creating a new branch for the new proposal + if err := p.proposer.CloseProposal(ctx, proposal); err != nil { + return nil, fmt.Errorf("closing existing proposal: %w", err) } - // we're updating the head position of an existing proposal - // so we need to update the value of head in the returned annotations - annotations := annotations(proposal) - annotations[AnnotationGitHeadSHAKey] = head - - return annotations, nil + // configure commenting on the old proposal with a link to the new one + oldProposal := proposal + makeComment = func(newProposal *Proposal) error { + return p.proposer.CommentProposal(ctx, oldProposal, fmt.Sprintf("Closed in favour of new proposal #%v", newProposal.ID)) + } + } else if !errors.Is(err, ErrProposalNotFound) { + return nil, err } + slog.Debug("proposal not found") + if head, err := p.updateAndPush(ctx, from, to, updateOpts, options...); err != nil { slog.Error("while attempting update", "head", head, "error", err) return nil, err @@ -396,6 +415,7 @@ func (p *Phase[R]) propose(ctx context.Context, from, to R, baseBranch string, u BaseRevision: baseRev.String(), BaseBranch: baseBranch, Branch: branch, + Digest: toDigest, Title: title, Body: body, } @@ -404,10 +424,42 @@ func (p *Phase[R]) propose(ctx context.Context, from, to R, baseBranch string, u return nil, err } - return annotations(proposal), nil + // set current proposal + p.currentProposal = proposal + + return annotations(proposal), makeComment(proposal) +} + +func (p *Phase[R]) getCurrentProposal(ctx context.Context) (*Proposal, error) { + if p.proposer == nil { + return nil, ErrProposalNotFound + } + + if p.currentProposal != nil { + if !p.proposer.IsProposalOpen(ctx, p.currentProposal) { + p.currentProposal = nil + + return nil, ErrProposalNotFound + } + + return p.currentProposal, nil + } + + proposal, err := p.proposer.GetCurrentProposal(ctx, p.branch(), p.branchPrefix()) + if err != nil { + return nil, err + } + + p.currentProposal = proposal + + return proposal, nil +} + +func (p *Phase[R]) branchPrefix() string { + return fmt.Sprintf("glu/%s/%s", p.pipeline, p.meta.Name) } -func (p *Phase[R]) updateAndPush(ctx context.Context, from, to R, updateOpts *typed.UpdateOptions, opts ...containers.Option[git.ViewUpdateOptions]) (string, error) { +func (p *Phase[R]) updateAndPush(ctx context.Context, from, to R, updateOpts *typed.UpdateOptions, opts ...containers.Option[git.BranchOptions]) (string, error) { desc := p.Descriptor() update := func(fs fs.Filesystem) (message string, err error) { if err := to.WriteTo(ctx, desc, fs); err != nil { @@ -431,8 +483,9 @@ func (p *Phase[R]) updateAndPush(ctx context.Context, from, to R, updateOpts *ty func annotations(proposal *Proposal) map[string]string { a := map[string]string{ - AnnotationGitBaseRefKey: proposal.BaseBranch, - AnnotationGitHeadSHAKey: proposal.HeadRevision, + AnnotationProposalURLKey: proposal.URL, + AnnotationGitBaseRefKey: proposal.BaseBranch, + AnnotationGitHeadSHAKey: proposal.HeadRevision, } maps.Insert(a, maps.All(proposal.Annotations)) diff --git a/pkg/phases/logger/logger.go b/pkg/phases/logger/logger.go index 728d280..6568084 100644 --- a/pkg/phases/logger/logger.go +++ b/pkg/phases/logger/logger.go @@ -32,7 +32,6 @@ type PhaseLogger[R core.Resource] struct { db kv.DB encoder func(any) ([]byte, error) decoder func([]byte, any) error - last map[string]map[string]version } func New[R core.Resource](db kv.DB) *PhaseLogger[R] { @@ -40,7 +39,6 @@ func New[R core.Resource](db kv.DB) *PhaseLogger[R] { db: db, encoder: json.Marshal, decoder: json.Unmarshal, - last: map[string]map[string]version{}, } } @@ -131,7 +129,7 @@ func (l *PhaseLogger[R]) RecordLatest(ctx context.Context, phase core.Descriptor func (l *PhaseLogger[R]) isUpToDate(refs kv.Bucket, phase core.Descriptor, digest string) bool { slog := slog.With("pipeline", phase.Pipeline, "phase", phase.Metadata.Name) - curLatest, ok := l.getLatestVersion(refs, phase) + curLatest, ok := l.getLatestVersion(refs) if ok && bytes.Equal(curLatest.Digest, []byte(digest)) { slog.Debug("skipped recording latest", "reason", "NoChange") return true @@ -148,7 +146,7 @@ func (l *PhaseLogger[R]) GetLatestResource(ctx context.Context, phase core.Descr return err } - curLatest, ok := l.getLatestVersion(refs, phase) + curLatest, ok := l.getLatestVersion(refs) if !ok { return fmt.Errorf("latest version: %w", ErrNotFound) } @@ -204,27 +202,7 @@ func (l *PhaseLogger[R]) GetResourceAtVersion(ctx context.Context, phase core.De }) } -func (l *PhaseLogger[R]) getLatestVersion(refs kv.Bucket, phase core.Descriptor) (version, bool) { - phases, ok := l.last[phase.Pipeline] - if !ok { - phases = map[string]version{} - l.last[phase.Metadata.Name] = phases - } - - version, ok := phases[phase.Metadata.Name] - if !ok { - version, ok = l.fetchLatestVersion(refs) - if !ok { - return version, false - } - - phases[phase.Metadata.Name] = version - } - - return version, true -} - -func (l *PhaseLogger[R]) fetchLatestVersion(refs kv.Bucket) (v version, _ bool) { +func (l *PhaseLogger[R]) getLatestVersion(refs kv.Bucket) (v version, _ bool) { _, data, err := refs.Last() if err != nil { return v, false diff --git a/pkg/scm/github/github.go b/pkg/scm/github/github.go index 431db7a..9cb3206 100644 --- a/pkg/scm/github/github.go +++ b/pkg/scm/github/github.go @@ -32,20 +32,23 @@ func (s *SCM) GetCurrentProposal(ctx context.Context, baseBranch, branchPrefix s for pr := range prs.All() { parts := strings.Split(pr.Head.GetRef(), "/") - if strings.HasPrefix(pr.Head.GetRef(), branchPrefix) { - proposal = &git.Proposal{ - BaseBranch: pr.Base.GetRef(), - BaseRevision: pr.Base.GetSHA(), - Branch: pr.Head.GetRef(), - HeadRevision: pr.Head.GetSHA(), - Digest: parts[len(parts)-1], - Annotations: map[string]string{ - git.AnnotationProposalNumberKey: strconv.Itoa(pr.GetNumber()), - git.AnnotationProposalURLKey: pr.GetHTMLURL(), - }, - } - break + if !strings.HasPrefix(pr.Head.GetRef(), branchPrefix) { + continue + } + + proposal = &git.Proposal{ + ID: strconv.Itoa(pr.GetNumber()), + URL: pr.GetHTMLURL(), + + BaseBranch: pr.Base.GetRef(), + BaseRevision: pr.Base.GetSHA(), + Branch: pr.Head.GetRef(), + HeadRevision: pr.Head.GetSHA(), + Digest: parts[len(parts)-1], + + Annotations: map[string]string{}, } + break } if err := prs.Err(); err != nil { @@ -59,12 +62,25 @@ func (s *SCM) GetCurrentProposal(ctx context.Context, baseBranch, branchPrefix s return proposal, nil } +func (s *SCM) IsProposalOpen(ctx context.Context, proposal *git.Proposal) bool { + number, ok := prNumber(proposal) + if !ok { + return false + } + + pr, _, err := s.client.PullRequests.Get(ctx, s.repoOwner, s.repoName, number) + if err != nil { + slog.Warn("could not check if PR is open", "reason", "error getting PR", "error", err) + return false + } + + return pr.GetState() == "open" +} + func (s *SCM) CreateProposal(ctx context.Context, proposal *git.Proposal, opts git.ProposalOption) error { - slog.Debug("creating proposal", + slog := slog.With( "branch", proposal.Branch, "base", proposal.BaseBranch, - "title", proposal.Title, - "body", proposal.Body, ) pr, _, err := s.client.PullRequests.Create(ctx, s.repoOwner, s.repoName, &github.NewPullRequest{ @@ -79,12 +95,11 @@ func (s *SCM) CreateProposal(ctx context.Context, proposal *git.Proposal, opts g slog.Info("proposal created", "scm_type", "github", "proposal_url", pr.GetHTMLURL()) + proposal.ID = strconv.Itoa(pr.GetNumber()) + proposal.URL = pr.GetHTMLURL() proposal.BaseRevision = pr.Base.GetSHA() proposal.HeadRevision = pr.Head.GetSHA() - proposal.Annotations = map[string]string{ - git.AnnotationProposalNumberKey: strconv.Itoa(pr.GetNumber()), - git.AnnotationProposalURLKey: pr.GetHTMLURL(), - } + proposal.Annotations = map[string]string{} if len(opts.Labels) > 0 { if _, _, err := s.client.Issues.AddLabelsToIssue(ctx, s.repoOwner, s.repoName, pr.GetNumber(), opts.Labels); err != nil { @@ -95,32 +110,56 @@ func (s *SCM) CreateProposal(ctx context.Context, proposal *git.Proposal, opts g return nil } -func (s *SCM) MergeProposal(ctx context.Context, proposal *git.Proposal) error { - number, err := strconv.Atoi(proposal.Annotations[git.AnnotationProposalNumberKey]) - if err != nil { - slog.Warn("could not close pr", "reason", "missing PR number on proposal", "error", err) +func (s *SCM) CloseProposal(ctx context.Context, proposal *git.Proposal) error { + slog := slog.With( + "branch", proposal.Branch, + "base", proposal.BaseBranch, + ) + + number, ok := prNumber(proposal) + if !ok { return nil } - _, _, err = s.client.PullRequests.Merge(ctx, s.repoOwner, s.repoName, number, "", &github.PullRequestOptions{ - MergeMethod: "merge", + pr, _, err := s.client.PullRequests.Edit(ctx, s.repoOwner, s.repoName, number, &github.PullRequest{ + State: github.String("closed"), }) + if err != nil { + return err + } + + slog.Info("proposal closed", "scm_type", "github", "proposal_url", pr.GetHTMLURL()) + + proposal.BaseRevision = pr.Base.GetSHA() + proposal.HeadRevision = pr.Head.GetSHA() - return err + return nil } -func (s *SCM) CloseProposal(ctx context.Context, proposal *git.Proposal) error { - number, err := strconv.Atoi(proposal.Annotations[git.AnnotationProposalNumberKey]) - if err != nil { - slog.Warn("could not close pr", "reason", "missing PR number on proposal", "error", err) +func (s *SCM) CommentProposal(ctx context.Context, proposal *git.Proposal, message string) error { + number, ok := prNumber(proposal) + if !ok { return nil } - _, _, err = s.client.PullRequests.Edit(ctx, s.repoOwner, s.repoName, number, &github.PullRequest{ - State: github.String("closed"), + _, _, err := s.client.Issues.CreateComment(ctx, s.repoOwner, s.repoName, number, &github.IssueComment{ + Body: github.String(message), }) + if err != nil { + return err + } + + return nil +} + +func prNumber(proposal *git.Proposal) (int, bool) { + number, err := strconv.Atoi(proposal.ID) + if err != nil { + slog.Warn("could not check if PR is open", "reason", "missing PR number on proposal", "error", err) + return 0, false + } - return err + return number, true } type prs struct {