Skip to content

Commit

Permalink
Feature/download in stream mode (#755)
Browse files Browse the repository at this point in the history
  • Loading branch information
9547 authored Sep 22, 2020
1 parent 800fc9f commit 2d57d0b
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 28 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ bin/
/tests/tiup_mirrors/
/logs
docker/secret/
*__failpoint_binding__.go
*__failpoint_stash__
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ tools/bin/failpoint-ctl: go.mod
$(GO) build -o $@ github.com/pingcap/failpoint/failpoint-ctl

pkger:
$(GO) run tools/pkger/main.go -s templates -d pkg/cluster/embed
$(GO) run tools/pkger/main.go -s templates -d pkg/cluster/embed

fmt:
@echo "gofmt (simplify)"
Expand All @@ -121,4 +121,3 @@ tools/bin/revive: tools/check/go.mod

tools/bin/golangci-lint:
curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s -- -b ./tools/bin v1.27.0

1 change: 1 addition & 0 deletions cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type testCmdSuite struct {
func (s *testCmdSuite) SetUpSuite(c *C) {
s.testDir = filepath.Join(currentDir(), "testdata")
s.mirror = repository.NewMirror(s.testDir, repository.MirrorOptions{})
_ = s.mirror.Open()
}

/*
Expand Down
1 change: 1 addition & 0 deletions docker/control/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ FROM golang:1.14
apt-get install -qqy \
dos2unix \
default-mysql-client \
psmisc \
vim # not required by tiup-cluster itself, just for ease of use


Expand Down
18 changes: 4 additions & 14 deletions pkg/cluster/clusterutil/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package clusterutil

import (
"io"
"os"

"github.com/pingcap/tiup/pkg/environment"
Expand All @@ -41,6 +40,9 @@ func NewRepository(os, arch string) (Repository, error) {
mirror := repository.NewMirror(environment.Mirror(), repository.MirrorOptions{
Progress: repository.DisableProgress{},
})
if err := mirror.Open(); err != nil {
return nil, err
}
local, err := v1manifest.NewManifests(profile)
if err != nil {
return nil, err
Expand All @@ -59,19 +61,7 @@ func (r *repositoryT) DownloadComponent(comp, version, target string) error {
return err
}

reader, err := r.repo.FetchComponent(versionItem)
if err != nil {
return err
}

file, err := os.Create(target)
if err != nil {
return err
}
defer file.Close()

_, err = io.Copy(file, reader)
return err
return r.repo.DownloadComponent(versionItem, target)
}

func (r *repositoryT) VerifyComponent(comp, version, target string) error {
Expand Down
5 changes: 4 additions & 1 deletion pkg/environment/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func Mirror() string {
m := os.Getenv(repository.EnvMirrors)
if m != "" {
if cfg.Mirror != m {
fmt.Printf(`WARNING: both mirror config(%s)
fmt.Printf(`WARNING: both mirror config(%s)
and TIUP_MIRRORS(%s) have been set.
Setting mirror to TIUP_MIRRORS(%s)
`, cfg.Mirror, m, m)
Expand Down Expand Up @@ -94,6 +94,9 @@ func InitEnv(options repository.Options) (*Environment, error) {
// Replace the mirror if some sub-commands use different mirror address
mirrorAddr := Mirror()
mirror := repository.NewMirror(mirrorAddr, repository.MirrorOptions{})
if err := mirror.Open(); err != nil {
return nil, err
}

var repo *repository.Repository
var v1repo *repository.V1Repository
Expand Down
3 changes: 3 additions & 0 deletions pkg/localdata/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ const (
// EnvNameSCPPath is the variable name by which user can specific the executable scp binary path
EnvNameSCPPath = "TIUP_SCP_PATH"

// EnvNameKeepSourceTarget is the variable name by which user can keep the source target or not
EnvNameKeepSourceTarget = "TIUP_KEEP_SOURCE_TARGET"

// MetaFilename represents the process meta file name
MetaFilename = "tiup_process_meta"
)
22 changes: 21 additions & 1 deletion pkg/repository/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ func (l *httpMirror) Download(resource, targetDir string) error {
return errors.Trace(err)
}
defer r.Close()

if err := os.MkdirAll(targetDir, 0755); err != nil {
return errors.Trace(err)
}
return utils.Move(tmpFilePath, dstFilePath)
}

Expand Down Expand Up @@ -287,7 +291,23 @@ func (l *MockMirror) Open() error {

// Download implements Mirror.
func (l *MockMirror) Download(resource, targetDir string) error {
return errors.New("MockMirror::Download not implemented")
content, ok := l.Resources[resource]
if !ok {
return errors.Annotatef(ErrNotFound, "resource %s", resource)
}

if err := os.MkdirAll(targetDir, 0755); err != nil {
return err
}
target := filepath.Join(targetDir, resource)

file, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.ModePerm)
if err != nil {
return err
}
defer file.Close()
_, err = file.Write([]byte(content))
return err
}

// Fetch implements Mirror.
Expand Down
64 changes: 59 additions & 5 deletions pkg/repository/v1_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
stderrors "errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"runtime"
"strconv"
Expand All @@ -29,6 +31,7 @@ import (
"github.com/fatih/color"
cjson "github.com/gibson042/canonicaljson-go"
"github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/localdata"
"github.com/pingcap/tiup/pkg/repository/v0manifest"
"github.com/pingcap/tiup/pkg/repository/v1manifest"
"github.com/pingcap/tiup/pkg/utils"
Expand Down Expand Up @@ -101,6 +104,10 @@ func (r *V1Repository) UpdateComponents(specs []ComponentSpec) error {
return errors.Trace(err)
}

keepSource := false
if v := os.Getenv(localdata.EnvNameKeepSourceTarget); v == "enable" || v == "true" {
keepSource = true
}
var errs []string
for _, spec := range specs {
manifest, err := r.updateComponentManifest(spec.ID, false)
Expand Down Expand Up @@ -153,20 +160,35 @@ func (r *V1Repository) UpdateComponents(specs []ComponentSpec) error {
}
}

reader, err := r.FetchComponent(versionItem)
if spec.Version == "" {
spec.Version = version
}

targetDir := filepath.Join(r.local.TargetRootDir(), localdata.ComponentParentDir, spec.ID, spec.Version)
target := filepath.Join(targetDir, versionItem.URL)
err = r.DownloadComponent(versionItem, target)
if err != nil {
errs = append(errs, err.Error())
continue
}

if spec.Version == "" {
spec.Version = version
}
err = r.local.InstallComponent(reader, spec.TargetDir, spec.ID, spec.Version, versionItem.URL, r.DisableDecompress)
reader, err := os.Open(target)
if err != nil {
errs = append(errs, err.Error())
continue
}

err = r.local.InstallComponent(reader, targetDir, spec.ID, spec.Version, versionItem.URL, r.DisableDecompress)
reader.Close()

if err != nil {
errs = append(errs, err.Error())
}

// remove the source gzip target if expand is on
if !r.DisableDecompress && !keepSource {
_ = os.Remove(target)
}
}

if len(errs) > 0 {
Expand Down Expand Up @@ -486,6 +508,38 @@ func (r *V1Repository) FetchComponent(item *v1manifest.VersionItem) (io.Reader,
return checkHash(reader, item.Hashes[v1manifest.SHA256])
}

// DownloadComponent downloads the component specified by item into local file,
// the component will be removed if hash is not correct
func (r *V1Repository) DownloadComponent(item *v1manifest.VersionItem, target string) error {
targetDir := filepath.Dir(target)
err := r.mirror.Download(item.URL, targetDir)
if err != nil {
return errors.Trace(err)
}

// the downloaded file is named by item.URL, which maybe differ to target name
if downloaded := path.Join(targetDir, item.URL); downloaded != target {
err := os.Rename(downloaded, target)
if err != nil {
return errors.Trace(err)
}
}

reader, err := os.Open(target)
if err != nil {
return err
}

_, err = checkHash(reader, item.Hashes[v1manifest.SHA256])
reader.Close()

// remove the target compoonent to avoid attacking
if err != nil {
_ = os.Remove(target)
}
return err
}

// FetchTimestamp downloads the timestamp file, validates it, and checks if the snapshot hash in it
// has the same value of our local one. (not hashing the snapshot file itself)
// Return weather the manifest is changed compared to the one in local ts and the FileHash of snapshot.
Expand Down
18 changes: 13 additions & 5 deletions pkg/repository/v1manifest/local_manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type LocalManifests interface {
// ManifestVersion opens filename, if it exists and is a manifest, returns its manifest version number. Otherwise
// returns 0.
ManifestVersion(filename string) uint

// TargetRootDir returns the root directory of target
TargetRootDir() string
}

// FsManifests represents a collection of v1 manifests on disk.
Expand Down Expand Up @@ -230,11 +233,6 @@ func (ms *FsManifests) ComponentInstalled(component, version string) (bool, erro

// InstallComponent implements LocalManifests.
func (ms *FsManifests) InstallComponent(reader io.Reader, targetDir, component, version, filename string, noExpand bool) error {
// TODO factor path construction to profile (also used by v0 repo).
if targetDir == "" {
targetDir = ms.profile.Path(localdata.ComponentParentDir, component, version)
}

if !noExpand {
return utils.Untar(reader, targetDir)
}
Expand Down Expand Up @@ -281,6 +279,11 @@ func (ms *FsManifests) ManifestVersion(filename string) uint {
return base.Version
}

// TargetRootDir implements LocalManifests.
func (ms *FsManifests) TargetRootDir() string {
return ms.profile.Root()
}

// MockManifests is a LocalManifests implementation for testing.
type MockManifests struct {
Manifests map[string]*Manifest
Expand Down Expand Up @@ -392,6 +395,11 @@ func (ms *MockManifests) KeyStore() *KeyStore {
return ms.Ks
}

// TargetRootDir implements LocalManifests.
func (ms *MockManifests) TargetRootDir() string {
return ""
}

// ManifestVersion implements LocalManifests.
func (ms *MockManifests) ManifestVersion(filename string) uint {
manifest, ok := ms.Manifests[filename]
Expand Down

0 comments on commit 2d57d0b

Please sign in to comment.