Skip to content

Commit

Permalink
cmd/go/internal: remove some users of par.Work
Browse files Browse the repository at this point in the history
par.Work is used in a number of places as a parallel
work queue. This change replaces it with goroutines
and channels in a number of simpler places where it's
used.

Change-Id: I0620eda46ec7b2c0599a8b9361639af7bb73a05a
Reviewed-on: https://go-review.googlesource.com/c/go/+/248326
Run-TryBot: Michael Matloob <[email protected]>
TryBot-Result: Gobot Gobot <[email protected]>
Reviewed-by: Bryan C. Mills <[email protected]>
  • Loading branch information
matloob committed Aug 17, 2020
1 parent 2ac4bf3 commit f30044a
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 92 deletions.
69 changes: 39 additions & 30 deletions src/cmd/go/internal/modcmd/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
package modcmd

import (
"cmd/go/internal/modfetch"
"context"
"encoding/json"
"os"
"runtime"

"cmd/go/internal/base"
"cmd/go/internal/cfg"
"cmd/go/internal/modfetch"
"cmd/go/internal/modload"
"cmd/go/internal/par"
"cmd/go/internal/work"

"golang.org/x/mod/module"
Expand Down Expand Up @@ -102,33 +102,7 @@ func runDownload(ctx context.Context, cmd *base.Command, args []string) {
}
}

var mods []*moduleJSON
var work par.Work
listU := false
listVersions := false
for _, info := range modload.ListModules(ctx, args, listU, listVersions) {
if info.Replace != nil {
info = info.Replace
}
if info.Version == "" && info.Error == nil {
// main module or module replaced with file path.
// Nothing to download.
continue
}
m := &moduleJSON{
Path: info.Path,
Version: info.Version,
}
mods = append(mods, m)
if info.Error != nil {
m.Error = info.Error.Err
continue
}
work.Add(m)
}

work.Do(10, func(item interface{}) {
m := item.(*moduleJSON)
downloadModule := func(m *moduleJSON) {
var err error
m.Info, err = modfetch.InfoFile(m.Path, m.Version)
if err != nil {
Expand Down Expand Up @@ -157,7 +131,42 @@ func runDownload(ctx context.Context, cmd *base.Command, args []string) {
m.Error = err.Error()
return
}
})
}

var mods []*moduleJSON
listU := false
listVersions := false
type token struct{}
sem := make(chan token, runtime.GOMAXPROCS(0))
for _, info := range modload.ListModules(ctx, args, listU, listVersions) {
if info.Replace != nil {
info = info.Replace
}
if info.Version == "" && info.Error == nil {
// main module or module replaced with file path.
// Nothing to download.
continue
}
m := &moduleJSON{
Path: info.Path,
Version: info.Version,
}
mods = append(mods, m)
if info.Error != nil {
m.Error = info.Error.Err
continue
}
sem <- token{}
go func() {
downloadModule(m)
<-sem
}()
}

// Fill semaphore channel to wait for goroutines to finish.
for n := cap(sem); n > 0; n-- {
sem <- token{}
}

if *downloadJSON {
for _, m := range mods {
Expand Down
19 changes: 10 additions & 9 deletions src/cmd/go/internal/modcmd/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"cmd/go/internal/base"
"cmd/go/internal/cfg"
"cmd/go/internal/modload"
"cmd/go/internal/par"
"cmd/go/internal/work"

"golang.org/x/mod/module"
Expand Down Expand Up @@ -59,23 +58,25 @@ func runGraph(ctx context.Context, cmd *base.Command, args []string) {
return m.Path + "@" + m.Version
}

// Note: using par.Work only to manage work queue.
// No parallelism here, so no locking.
var out []string
var deps int // index in out where deps start
var work par.Work
work.Add(modload.Target)
work.Do(1, func(item interface{}) {
m := item.(module.Version)
seen := map[module.Version]bool{modload.Target: true}
queue := []module.Version{modload.Target}
for len(queue) > 0 {
var m module.Version
m, queue = queue[0], queue[1:]
list, _ := reqs.Required(m)
for _, r := range list {
work.Add(r)
if !seen[r] {
queue = append(queue, r)
seen[r] = true
}
out = append(out, format(m)+" "+format(r)+"\n")
}
if m == modload.Target {
deps = len(out)
}
})
}

sort.Slice(out[deps:], func(i, j int) bool {
return out[deps+i][0] < out[deps+j][0]
Expand Down
59 changes: 32 additions & 27 deletions src/cmd/go/internal/modconv/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ package modconv
import (
"fmt"
"os"
"runtime"
"sort"
"strings"
"sync"

"cmd/go/internal/base"
"cmd/go/internal/modfetch"
"cmd/go/internal/par"

"golang.org/x/mod/modfile"
"golang.org/x/mod/module"
Expand Down Expand Up @@ -42,46 +41,52 @@ func ConvertLegacyConfig(f *modfile.File, file string, data []byte) error {

// Convert requirements block, which may use raw SHA1 hashes as versions,
// to valid semver requirement list, respecting major versions.
var (
work par.Work
mu sync.Mutex
need = make(map[string]string)
replace = make(map[string]*modfile.Replace)
)
versions := make([]*module.Version, len(mf.Require))
replace := make(map[string]*modfile.Replace)

for _, r := range mf.Replace {
replace[r.New.Path] = r
replace[r.Old.Path] = r
}
for _, r := range mf.Require {

type token struct{}
sem := make(chan token, runtime.GOMAXPROCS(0))
for i, r := range mf.Require {
m := r.Mod
if m.Path == "" {
continue
}
if re, ok := replace[m.Path]; ok {
work.Add(re.New)
continue
m = re.New
}
work.Add(r.Mod)
sem <- token{}
go func(i int, m module.Version) {
repo, info, err := modfetch.ImportRepoRev(m.Path, m.Version)
if err != nil {
fmt.Fprintf(os.Stderr, "go: converting %s: stat %s@%s: %v\n", base.ShortPath(file), m.Path, m.Version, err)
return
}

path := repo.ModulePath()
versions[i].Path = path
versions[i].Version = info.Version

<-sem
}(i, m)
}
// Fill semaphore channel to wait for all tasks to finish.
for n := cap(sem); n > 0; n-- {
sem <- token{}
}

work.Do(10, func(item interface{}) {
r := item.(module.Version)
repo, info, err := modfetch.ImportRepoRev(r.Path, r.Version)
if err != nil {
fmt.Fprintf(os.Stderr, "go: converting %s: stat %s@%s: %v\n", base.ShortPath(file), r.Path, r.Version, err)
return
}
mu.Lock()
path := repo.ModulePath()
need := map[string]string{}
for _, v := range versions {
// Don't use semver.Max here; need to preserve +incompatible suffix.
if v, ok := need[path]; !ok || semver.Compare(v, info.Version) < 0 {
need[path] = info.Version
if needv, ok := need[v.Path]; !ok || semver.Compare(needv, v.Version) < 0 {
need[v.Path] = v.Version
}
mu.Unlock()
})

var paths []string
}
paths := make([]string, 0, len(need))
for path := range need {
paths = append(paths, path)
}
Expand Down
41 changes: 28 additions & 13 deletions src/cmd/go/internal/modget/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
Expand All @@ -21,7 +22,6 @@ import (
"cmd/go/internal/load"
"cmd/go/internal/modload"
"cmd/go/internal/mvs"
"cmd/go/internal/par"
"cmd/go/internal/search"
"cmd/go/internal/work"

Expand Down Expand Up @@ -725,18 +725,8 @@ func runGet(ctx context.Context, cmd *base.Command, args []string) {
// reported. A map from module paths to queries is returned, which includes
// queries and modOnly.
func runQueries(ctx context.Context, cache map[querySpec]*query, queries []*query, modOnly map[string]*query) map[string]*query {
var lookup par.Work
for _, q := range queries {
if cached := cache[q.querySpec]; cached != nil {
*q = *cached
} else {
cache[q.querySpec] = q
lookup.Add(q)
}
}

lookup.Do(10, func(item interface{}) {
q := item.(*query)
runQuery := func(q *query) {
if q.vers == "none" {
// Wait for downgrade step.
q.m = module.Version{Path: q.path, Version: "none"}
Expand All @@ -747,7 +737,32 @@ func runQueries(ctx context.Context, cache map[querySpec]*query, queries []*quer
base.Errorf("go get %s: %v", q.arg, err)
}
q.m = m
})
}

type token struct{}
sem := make(chan token, runtime.GOMAXPROCS(0))
for _, q := range queries {
if cached := cache[q.querySpec]; cached != nil {
*q = *cached
} else {
sem <- token{}
go func(q *query) {
runQuery(q)
<-sem
}(q)
}
}

// Fill semaphore channel to wait for goroutines to finish.
for n := cap(sem); n > 0; n-- {
sem <- token{}
}

// Add to cache after concurrent section to avoid races...
for _, q := range queries {
cache[q.querySpec] = q
}

base.ExitIfErrors()

byPath := make(map[string]*query)
Expand Down
37 changes: 24 additions & 13 deletions src/cmd/go/internal/modload/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,48 @@ import (
"errors"
"fmt"
"os"
"runtime"
"strings"

"cmd/go/internal/base"
"cmd/go/internal/cfg"
"cmd/go/internal/modinfo"
"cmd/go/internal/par"
"cmd/go/internal/search"

"golang.org/x/mod/module"
)

func ListModules(ctx context.Context, args []string, listU, listVersions bool) []*modinfo.ModulePublic {
mods := listModules(ctx, args, listVersions)

type token struct{}
sem := make(chan token, runtime.GOMAXPROCS(0))
if listU || listVersions {
var work par.Work
for _, m := range mods {
work.Add(m)
add := func(m *modinfo.ModulePublic) {
sem <- token{}
go func() {
if listU {
addUpdate(m)
}
if listVersions {
addVersions(m)
}
<-sem
}()
}

add(m)
if m.Replace != nil {
work.Add(m.Replace)
add(m.Replace)
}
}
work.Do(10, func(item interface{}) {
m := item.(*modinfo.ModulePublic)
if listU {
addUpdate(m)
}
if listVersions {
addVersions(m)
}
})
}
// Fill semaphore channel to wait for all tasks to finish.
for n := cap(sem); n > 0; n-- {
sem <- token{}
}

return mods
}

Expand Down

0 comments on commit f30044a

Please sign in to comment.