Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition with async API operations #1380

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 54 additions & 42 deletions api/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,6 @@
}

resources = append(resources, string(snapshot.ResourceKey()))
err = snapshotCollection.LoadComplete(snapshot)
if err != nil {
AbortWithJSONError(c, 500, fmt.Errorf("unable to publish: %s", err))
return
}

sources = append(sources, snapshot)
}
Expand All @@ -173,28 +168,13 @@
return
}

resources = append(resources, string(localRepo.Key()))
err = localCollection.LoadComplete(localRepo)
if err != nil {
AbortWithJSONError(c, 500, fmt.Errorf("unable to publish: %s", err))
}

sources = append(sources, localRepo)
}
} else {
AbortWithJSONError(c, 400, fmt.Errorf("unknown SourceKind"))
return
}

published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, b.MultiDist)
if err != nil {
AbortWithJSONError(c, 500, fmt.Errorf("unable to publish: %s", err))
return
}

resources = append(resources, string(published.Key()))
collection := collectionFactory.PublishedRepoCollection()

taskName := fmt.Sprintf("Publish %s: %s", b.SourceKind, strings.Join(names, ", "))
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
taskDetail := task.PublishDetail{
Expand All @@ -205,6 +185,29 @@
PublishDetail: taskDetail,
}

for _, source := range sources {
switch s := source.(type) {
case *deb.Snapshot:
snapshotCollection := collectionFactory.SnapshotCollection()
err = snapshotCollection.LoadComplete(s)
case *deb.LocalRepo:
localCollection := collectionFactory.LocalRepoCollection()
err = localCollection.LoadComplete(s)
default:
err = fmt.Errorf("unexpected type for source: %T", source)

Check warning on line 197 in api/publish.go

View check run for this annotation

Codecov / codecov/patch

api/publish.go#L196-L197

Added lines #L196 - L197 were not covered by tests
}
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
}

Check warning on line 201 in api/publish.go

View check run for this annotation

Codecov / codecov/patch

api/publish.go#L200-L201

Added lines #L200 - L201 were not covered by tests
}

published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, b.MultiDist)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
}

Check warning on line 207 in api/publish.go

View check run for this annotation

Codecov / codecov/patch

api/publish.go#L206-L207

Added lines #L206 - L207 were not covered by tests

resources = append(resources, string(published.Key()))

if b.Origin != "" {
published.Origin = b.Origin
}
Expand All @@ -230,13 +233,14 @@
published.AcquireByHash = *b.AcquireByHash
}

collection := collectionFactory.PublishedRepoCollection()
duplicate := collection.CheckDuplicate(published)
if duplicate != nil {
collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory)
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)
}

err := published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite)
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
}
Expand Down Expand Up @@ -282,47 +286,29 @@

collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()

published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
AbortWithJSONError(c, 404, fmt.Errorf("unable to update: %s", err))
return
}
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, 500, fmt.Errorf("unable to update: %s", err))
return
}

var updatedComponents []string
var updatedSnapshots []string
var resources []string

if published.SourceKind == deb.SourceLocalRepo {
if len(b.Snapshots) > 0 {
AbortWithJSONError(c, 400, fmt.Errorf("snapshots shouldn't be given when updating local repo"))
return
}
updatedComponents = published.Components()
for _, component := range updatedComponents {
published.UpdateLocalRepo(component)
}
} else if published.SourceKind == "snapshot" {
for _, snapshotInfo := range b.Snapshots {
snapshotCollection := collectionFactory.SnapshotCollection()
snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name)
if err2 != nil {
AbortWithJSONError(c, 404, err2)
AbortWithJSONError(c, http.StatusNotFound, err2)

Check warning on line 309 in api/publish.go

View check run for this annotation

Codecov / codecov/patch

api/publish.go#L309

Added line #L309 was not covered by tests
return
}

err2 = snapshotCollection.LoadComplete(snapshot)
if err2 != nil {
AbortWithJSONError(c, 500, err2)
return
}

published.UpdateSnapshot(snapshotInfo.Component, snapshot)
updatedComponents = append(updatedComponents, snapshotInfo.Component)
updatedSnapshots = append(updatedSnapshots, snapshot.Name)
}
Expand All @@ -347,10 +333,36 @@
published.MultiDist = *b.MultiDist
}

var resources []string
resources = append(resources, string(published.Key()))
taskName := fmt.Sprintf("Update published %s (%s): %s", published.SourceKind, strings.Join(updatedComponents, " "), strings.Join(updatedSnapshots, ", "))
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err := published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite)
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("Unable to update: %s", err)
}

Check warning on line 343 in api/publish.go

View check run for this annotation

Codecov / codecov/patch

api/publish.go#L342-L343

Added lines #L342 - L343 were not covered by tests

if published.SourceKind == deb.SourceLocalRepo {
for _, component := range updatedComponents {
published.UpdateLocalRepo(component)
}
} else if published.SourceKind == "snapshot" {
for _, snapshotInfo := range b.Snapshots {
snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name)
if err2 != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err2
}

Check warning on line 354 in api/publish.go

View check run for this annotation

Codecov / codecov/patch

api/publish.go#L353-L354

Added lines #L353 - L354 were not covered by tests

err2 = snapshotCollection.LoadComplete(snapshot)
if err2 != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err2
}

Check warning on line 359 in api/publish.go

View check run for this annotation

Codecov / codecov/patch

api/publish.go#L358-L359

Added lines #L358 - L359 were not covered by tests

published.UpdateSnapshot(snapshotInfo.Component, snapshot)
}
}

err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
Expand Down
54 changes: 24 additions & 30 deletions api/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,14 @@
return
}

err = collection.LoadComplete(repo)
if err != nil {
AbortWithJSONError(c, 500, err)
return
}

resources := []string{string(repo.Key())}

maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}

Check warning on line 297 in api/repos.go

View check run for this annotation

Codecov / codecov/patch

api/repos.go#L296-L297

Added lines #L296 - L297 were not covered by tests

out.Printf("Loading packages...\n")
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
if err != nil {
Expand Down Expand Up @@ -394,12 +394,6 @@
return
}

err = collection.LoadComplete(repo)
if err != nil {
AbortWithJSONError(c, 500, err)
return
}

var taskName string
var sources []string
if fileParam == "" {
Expand All @@ -413,6 +407,11 @@
resources := []string{string(repo.Key())}
resources = append(resources, sources...)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}

Check warning on line 413 in api/repos.go

View check run for this annotation

Codecov / codecov/patch

api/repos.go#L412-L413

Added lines #L412 - L413 were not covered by tests

verifier := context.GetVerifier()

var (
Expand Down Expand Up @@ -514,17 +513,7 @@
return
}

err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
if err != nil {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("dest repo error: %s", err))
return
}

var (
srcRefList *deb.PackageRefList
srcRepo *deb.LocalRepo
)

var srcRepo *deb.LocalRepo
srcRepo, err = collectionFactory.LocalRepoCollection().ByName(srcRepoName)
if err != nil {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("src repo error: %s", err))
Expand All @@ -536,17 +525,22 @@
return
}

err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
if err != nil {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("src repo error: %s", err))
return
}

srcRefList = srcRepo.RefList()
taskName := fmt.Sprintf("Copy packages from repo %s to repo %s", srcRepoName, dstRepoName)
resources := []string{string(dstRepo.Key()), string(srcRepo.Key())}

maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
}

Check warning on line 535 in api/repos.go

View check run for this annotation

Codecov / codecov/patch

api/repos.go#L534-L535

Added lines #L534 - L535 were not covered by tests

err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
}

Check warning on line 540 in api/repos.go

View check run for this annotation

Codecov / codecov/patch

api/repos.go#L539-L540

Added lines #L539 - L540 were not covered by tests

srcRefList := srcRepo.RefList()

reporter := &aptly.RecordingResultReporter{
Warnings: []string{},
AddedLines: []string{},
Expand Down
45 changes: 24 additions & 21 deletions api/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,17 @@
return
}

err = snapshotCollection.LoadComplete(sources[i])
if err != nil {
AbortWithJSONError(c, 500, err)
return
}

resources = append(resources, string(sources[i].ResourceKey()))
}

maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
for i := range sources {
err = snapshotCollection.LoadComplete(sources[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}

Check warning on line 146 in api/snapshot.go

View check run for this annotation

Codecov / codecov/patch

api/snapshot.go#L145-L146

Added lines #L145 - L146 were not covered by tests
}

list := deb.NewPackageList()

// verify package refs and build package list
Expand Down Expand Up @@ -468,17 +469,20 @@
return
}

err = snapshotCollection.LoadComplete(sources[i])
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, err)
return
}
resources[i] = string(sources[i].ResourceKey())
}

maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = snapshotCollection.LoadComplete(sources[0])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}

Check warning on line 479 in api/snapshot.go

View check run for this annotation

Codecov / codecov/patch

api/snapshot.go#L478-L479

Added lines #L478 - L479 were not covered by tests
result := sources[0].RefList()
for i := 1; i < len(sources); i++ {
err = snapshotCollection.LoadComplete(sources[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}

Check warning on line 485 in api/snapshot.go

View check run for this annotation

Codecov / codecov/patch

api/snapshot.go#L484-L485

Added lines #L484 - L485 were not covered by tests
result = result.Merge(sources[i].RefList(), overrideMatching, false)
}

Expand Down Expand Up @@ -566,27 +570,26 @@
AbortWithJSONError(c, http.StatusNotFound, err)
return
}
err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, err)
return
}

// Load <Source> snapshot
sourceSnapshot, err := collectionFactory.SnapshotCollection().ByName(body.Source)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, err)
return
}
err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, err)
return
}

resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())}
taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}

Check warning on line 587 in api/snapshot.go

View check run for this annotation

Codecov / codecov/patch

api/snapshot.go#L586-L587

Added lines #L586 - L587 were not covered by tests
err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}

Check warning on line 591 in api/snapshot.go

View check run for this annotation

Codecov / codecov/patch

api/snapshot.go#L590-L591

Added lines #L590 - L591 were not covered by tests

// convert snapshots to package list
toPackageList, err := deb.NewPackageListFromRefList(toSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
if err != nil {
Expand Down
Loading