Skip to content

Commit

Permalink
cmd/go/internal: remove some users of par.Work
Browse files Browse the repository at this point in the history
par.Work is used in a number of places as a parallel
work queue. This change replaces it with goroutines
and channels in a number of simpler places where it's
used.

This is the same CL as golang.org/cl/240062 and golang.org/cl/248326
except for the following changes in convert.go (all line numbers
from this CL), as well as fixing up imports in download.go:
- On line 44, the "*" before modules.Versions is removed (we were
trying to assign to a nil value on lines 72 and 73).
- Line 64 is new, and ensures that we receive on the semaphore
channel once the goroutine function exits. (The previous versions
of this CL only received at the end of the function, ignoring
the return point in the branch in the middle of the function.)
- The semaphore channel receive right before line 74 is gone,
replaced with the deferred receive above.
- The if block at line 83 is new, accounting for cases where
modfetch.ImportRepoRev returned an error in the goroutine,
so that versions[i] is ignored.

Change-Id: I0e33670bb2eb0a1e4d7a5fa693a471e61ffbc8b7
Reviewed-on: https://go-review.googlesource.com/c/go/+/249020
Run-TryBot: Michael Matloob <[email protected]>
TryBot-Result: Gobot Gobot <[email protected]>
Reviewed-by: Jay Conrod <[email protected]>
  • Loading branch information
matloob committed Aug 19, 2020
1 parent 64350f1 commit 18239be
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 91 deletions.
69 changes: 39 additions & 30 deletions src/cmd/go/internal/modcmd/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"context"
"encoding/json"
"os"
"runtime"

"cmd/go/internal/base"
"cmd/go/internal/cfg"
"cmd/go/internal/modfetch"
"cmd/go/internal/modload"
"cmd/go/internal/par"
"cmd/go/internal/modfetch"
"cmd/go/internal/work"

"golang.org/x/mod/module"
Expand Down Expand Up @@ -102,33 +102,7 @@ func runDownload(ctx context.Context, cmd *base.Command, args []string) {
}
}

var mods []*moduleJSON
var work par.Work
listU := false
listVersions := false
for _, info := range modload.ListModules(ctx, args, listU, listVersions) {
if info.Replace != nil {
info = info.Replace
}
if info.Version == "" && info.Error == nil {
// main module or module replaced with file path.
// Nothing to download.
continue
}
m := &moduleJSON{
Path: info.Path,
Version: info.Version,
}
mods = append(mods, m)
if info.Error != nil {
m.Error = info.Error.Err
continue
}
work.Add(m)
}

work.Do(10, func(item interface{}) {
m := item.(*moduleJSON)
downloadModule := func(m *moduleJSON) {
var err error
m.Info, err = modfetch.InfoFile(m.Path, m.Version)
if err != nil {
Expand Down Expand Up @@ -157,7 +131,42 @@ func runDownload(ctx context.Context, cmd *base.Command, args []string) {
m.Error = err.Error()
return
}
})
}

var mods []*moduleJSON
listU := false
listVersions := false
type token struct{}
sem := make(chan token, runtime.GOMAXPROCS(0))
for _, info := range modload.ListModules(ctx, args, listU, listVersions) {
if info.Replace != nil {
info = info.Replace
}
if info.Version == "" && info.Error == nil {
// main module or module replaced with file path.
// Nothing to download.
continue
}
m := &moduleJSON{
Path: info.Path,
Version: info.Version,
}
mods = append(mods, m)
if info.Error != nil {
m.Error = info.Error.Err
continue
}
sem <- token{}
go func() {
downloadModule(m)
<-sem
}()
}

// Fill semaphore channel to wait for goroutines to finish.
for n := cap(sem); n > 0; n-- {
sem <- token{}
}

if *downloadJSON {
for _, m := range mods {
Expand Down
19 changes: 10 additions & 9 deletions src/cmd/go/internal/modcmd/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"cmd/go/internal/base"
"cmd/go/internal/cfg"
"cmd/go/internal/modload"
"cmd/go/internal/par"
"cmd/go/internal/work"

"golang.org/x/mod/module"
Expand Down Expand Up @@ -59,23 +58,25 @@ func runGraph(ctx context.Context, cmd *base.Command, args []string) {
return m.Path + "@" + m.Version
}

// Note: using par.Work only to manage work queue.
// No parallelism here, so no locking.
var out []string
var deps int // index in out where deps start
var work par.Work
work.Add(modload.Target)
work.Do(1, func(item interface{}) {
m := item.(module.Version)
seen := map[module.Version]bool{modload.Target: true}
queue := []module.Version{modload.Target}
for len(queue) > 0 {
var m module.Version
m, queue = queue[0], queue[1:]
list, _ := reqs.Required(m)
for _, r := range list {
work.Add(r)
if !seen[r] {
queue = append(queue, r)
seen[r] = true
}
out = append(out, format(m)+" "+format(r)+"\n")
}
if m == modload.Target {
deps = len(out)
}
})
}

sort.Slice(out[deps:], func(i, j int) bool {
return out[deps+i][0] < out[deps+j][0]
Expand Down
59 changes: 33 additions & 26 deletions src/cmd/go/internal/modconv/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ package modconv
import (
"fmt"
"os"
"runtime"
"sort"
"strings"
"sync"

"cmd/go/internal/base"
"cmd/go/internal/modfetch"
"cmd/go/internal/par"

"golang.org/x/mod/modfile"
"golang.org/x/mod/module"
Expand Down Expand Up @@ -42,46 +41,54 @@ func ConvertLegacyConfig(f *modfile.File, file string, data []byte) error {

// Convert requirements block, which may use raw SHA1 hashes as versions,
// to valid semver requirement list, respecting major versions.
var (
work par.Work
mu sync.Mutex
need = make(map[string]string)
replace = make(map[string]*modfile.Replace)
)
versions := make([]module.Version, len(mf.Require))
replace := make(map[string]*modfile.Replace)

for _, r := range mf.Replace {
replace[r.New.Path] = r
replace[r.Old.Path] = r
}
for _, r := range mf.Require {

type token struct{}
sem := make(chan token, runtime.GOMAXPROCS(0))
for i, r := range mf.Require {
m := r.Mod
if m.Path == "" {
continue
}
if re, ok := replace[m.Path]; ok {
work.Add(re.New)
continue
m = re.New
}
work.Add(r.Mod)
sem <- token{}
go func(i int, m module.Version) {
defer func() { <-sem }()
repo, info, err := modfetch.ImportRepoRev(m.Path, m.Version)
if err != nil {
fmt.Fprintf(os.Stderr, "go: converting %s: stat %s@%s: %v\n", base.ShortPath(file), m.Path, m.Version, err)
return
}

path := repo.ModulePath()
versions[i].Path = path
versions[i].Version = info.Version
}(i, m)
}
// Fill semaphore channel to wait for all tasks to finish.
for n := cap(sem); n > 0; n-- {
sem <- token{}
}

work.Do(10, func(item interface{}) {
r := item.(module.Version)
repo, info, err := modfetch.ImportRepoRev(r.Path, r.Version)
if err != nil {
fmt.Fprintf(os.Stderr, "go: converting %s: stat %s@%s: %v\n", base.ShortPath(file), r.Path, r.Version, err)
return
need := map[string]string{}
for _, v := range versions {
if v.Path == "" {
continue
}
mu.Lock()
path := repo.ModulePath()
// Don't use semver.Max here; need to preserve +incompatible suffix.
if v, ok := need[path]; !ok || semver.Compare(v, info.Version) < 0 {
need[path] = info.Version
if needv, ok := need[v.Path]; !ok || semver.Compare(needv, v.Version) < 0 {
need[v.Path] = v.Version
}
mu.Unlock()
})

var paths []string
}
paths := make([]string, 0, len(need))
for path := range need {
paths = append(paths, path)
}
Expand Down
41 changes: 28 additions & 13 deletions src/cmd/go/internal/modget/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
Expand All @@ -21,7 +22,6 @@ import (
"cmd/go/internal/load"
"cmd/go/internal/modload"
"cmd/go/internal/mvs"
"cmd/go/internal/par"
"cmd/go/internal/search"
"cmd/go/internal/work"

Expand Down Expand Up @@ -725,18 +725,8 @@ func runGet(ctx context.Context, cmd *base.Command, args []string) {
// reported. A map from module paths to queries is returned, which includes
// queries and modOnly.
func runQueries(ctx context.Context, cache map[querySpec]*query, queries []*query, modOnly map[string]*query) map[string]*query {
var lookup par.Work
for _, q := range queries {
if cached := cache[q.querySpec]; cached != nil {
*q = *cached
} else {
cache[q.querySpec] = q
lookup.Add(q)
}
}

lookup.Do(10, func(item interface{}) {
q := item.(*query)
runQuery := func(q *query) {
if q.vers == "none" {
// Wait for downgrade step.
q.m = module.Version{Path: q.path, Version: "none"}
Expand All @@ -747,7 +737,32 @@ func runQueries(ctx context.Context, cache map[querySpec]*query, queries []*quer
base.Errorf("go get %s: %v", q.arg, err)
}
q.m = m
})
}

type token struct{}
sem := make(chan token, runtime.GOMAXPROCS(0))
for _, q := range queries {
if cached := cache[q.querySpec]; cached != nil {
*q = *cached
} else {
sem <- token{}
go func(q *query) {
runQuery(q)
<-sem
}(q)
}
}

// Fill semaphore channel to wait for goroutines to finish.
for n := cap(sem); n > 0; n-- {
sem <- token{}
}

// Add to cache after concurrent section to avoid races...
for _, q := range queries {
cache[q.querySpec] = q
}

base.ExitIfErrors()

byPath := make(map[string]*query)
Expand Down
37 changes: 24 additions & 13 deletions src/cmd/go/internal/modload/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,48 @@ import (
"errors"
"fmt"
"os"
"runtime"
"strings"

"cmd/go/internal/base"
"cmd/go/internal/cfg"
"cmd/go/internal/modinfo"
"cmd/go/internal/par"
"cmd/go/internal/search"

"golang.org/x/mod/module"
)

func ListModules(ctx context.Context, args []string, listU, listVersions bool) []*modinfo.ModulePublic {
mods := listModules(ctx, args, listVersions)

type token struct{}
sem := make(chan token, runtime.GOMAXPROCS(0))
if listU || listVersions {
var work par.Work
for _, m := range mods {
work.Add(m)
add := func(m *modinfo.ModulePublic) {
sem <- token{}
go func() {
if listU {
addUpdate(m)
}
if listVersions {
addVersions(m)
}
<-sem
}()
}

add(m)
if m.Replace != nil {
work.Add(m.Replace)
add(m.Replace)
}
}
work.Do(10, func(item interface{}) {
m := item.(*modinfo.ModulePublic)
if listU {
addUpdate(m)
}
if listVersions {
addVersions(m)
}
})
}
// Fill semaphore channel to wait for all tasks to finish.
for n := cap(sem); n > 0; n-- {
sem <- token{}
}

return mods
}

Expand Down

0 comments on commit 18239be

Please sign in to comment.