Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
add stream mode which do not up dfget progress.
Browse files Browse the repository at this point in the history
Signed-off-by: allen.wq <[email protected]>
  • Loading branch information
jim3ma authored and wangforthinker committed Mar 30, 2020
1 parent b40524a commit 101a072
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 37 deletions.
53 changes: 47 additions & 6 deletions cmd/dfdaemon/app/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,58 @@ import (

"github.com/dragonflyoss/Dragonfly/dfdaemon/config"
"github.com/dragonflyoss/Dragonfly/dfdaemon/constant"
dfgetcfg "github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/dfget/util"
"github.com/dragonflyoss/Dragonfly/pkg/dflog"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/pkg/fileutils"
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
"github.com/dragonflyoss/Dragonfly/pkg/netutils"
statutil "github.com/dragonflyoss/Dragonfly/pkg/stat"
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// adjustSupernodeList adjusts the super nodes [a,b] to [a,b,b,a]
func adjustSupernodeList(nodes []string) []string {
switch nodesLen := len(nodes); nodesLen {
case 0:
return nodes
case 1:
return append(nodes, nodes[0])
default:
util.Shuffle(nodesLen, func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})
return append(nodes, nodes...)
}
}

// getLocalIP return the localIP which connects to supper node
func getLocalIP(nodes []string) (localIP string) {
var (
e error
)
for _, n := range nodes {
ip, port := netutils.GetIPAndPortFromNode(n, dfgetcfg.DefaultSupernodePort)
if localIP, e = httputils.CheckConnect(ip, port, 1000); e == nil {
return localIP
}
logrus.Warnf("Connect to node:%s error: %v", n, e)
}
return ""
}

// initDfdaemon sets up running environment for dfdaemon according to the given config.
func initDfdaemon(cfg config.Properties) error {
func initDfdaemon(cfg *config.Properties) error {
// if Options.MaxProcs <= 0, programs run with GOMAXPROCS set to the number of cores available.
if cfg.MaxProcs > 0 {
runtime.GOMAXPROCS(cfg.MaxProcs)
}

if err := initLogger(cfg); err != nil {
if err := initLogger(*cfg); err != nil {
return errors.Wrap(err, "init logger")
}

Expand All @@ -58,14 +93,20 @@ func initDfdaemon(cfg config.Properties) error {
"ensure local repo %s exists", cfg.DFRepo,
)
}
cfg.SuperNodes = adjustSupernodeList(cfg.SuperNodes)
if stringutils.IsEmptyStr(cfg.LocalIP) {
cfg.LocalIP = getLocalIP(cfg.SuperNodes)
}

go cleanLocalRepo(cfg.DFRepo)

dfgetVersion, err := exec.Command(cfg.DFPath, "version").CombinedOutput()
if err != nil {
return errors.Wrap(err, "get dfget version")
if !cfg.StreamMode {
dfgetVersion, err := exec.Command(cfg.DFPath, "version").CombinedOutput()
if err != nil {
return errors.Wrap(err, "get dfget version")
}
logrus.Infof("use %s from %s", bytes.TrimSpace(dfgetVersion), cfg.DFPath)
}
logrus.Infof("use %s from %s", bytes.TrimSpace(dfgetVersion), cfg.DFPath)

return nil
}
Expand Down
8 changes: 7 additions & 1 deletion cmd/dfdaemon/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var rootCmd = &cobra.Command{
return errors.Wrap(err, "get config from viper")
}

if err := initDfdaemon(*cfg); err != nil {
if err := initDfdaemon(cfg); err != nil {
return errors.Wrap(err, "init dfdaemon")
}

Expand All @@ -72,6 +72,10 @@ var rootCmd = &cobra.Command{
if err != nil {
return errors.Wrap(err, "create dfdaemon from config")
}
// if stream mode, launch peer server in dfdaemon progress
if cfg.StreamMode {
go dfdaemon.LaunchPeerServer(*cfg)
}
return s.Start()
},
}
Expand All @@ -93,6 +97,8 @@ func init() {
// http server config
rf.String("hostIp", "127.0.0.1", "dfdaemon host ip, default: 127.0.0.1")
rf.Uint("port", 65001, "dfdaemon will listen the port")
rf.Uint("peerPort", 0, "peerserver will listen the port")
rf.Bool("streamMode", false, "dfdaemon will run in stream mode")
rf.String("certpem", "", "cert.pem file path")
rf.String("keypem", "", "key.pem file path")

Expand Down
17 changes: 8 additions & 9 deletions dfdaemon/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"crypto/x509"
"encoding/json"
"net/url"
"os"
"path/filepath"
"regexp"

Expand Down Expand Up @@ -111,7 +110,10 @@ type Properties struct {
DFRepo string `yaml:"localrepo" json:"localrepo"`
DFPath string `yaml:"dfpath" json:"dfpath"`

LogConfig dflog.LogConfig `yaml:"logConfig" json:"logConfig"`
LogConfig dflog.LogConfig `yaml:"logConfig" json:"logConfig"`
LocalIP string `yaml:"localIP" json:"localIP"`
PeerPort int `yaml:"peerPort" json:"peerPort"`
StreamMode bool `yaml:"streamMode" json:"streamMode"`
}

// Validate validates the config
Expand All @@ -130,13 +132,6 @@ func (p *Properties) Validate() error {
)
}

if _, err := os.Stat(p.DFPath); err != nil && os.IsNotExist(err) {
return dferr.Newf(
constant.CodeExitDfgetNotFound,
"dfpath %s not found", p.DFPath,
)
}

return nil
}

Expand All @@ -156,6 +151,8 @@ func (p *Properties) DFGetConfig() DFGetConfig {
RateLimit: p.RateLimit.String(),
DFRepo: p.DFRepo,
DFPath: p.DFPath,
LocalIP: p.LocalIP,
PeerPort: p.PeerPort,
}
if p.HijackHTTPS != nil {
dfgetConfig.HostsConfig = p.HijackHTTPS.Hosts
Expand All @@ -181,6 +178,8 @@ type DFGetConfig struct {
DFRepo string `yaml:"localrepo"`
DFPath string `yaml:"dfpath"`
HostsConfig []*HijackHost `yaml:"hosts" json:"hosts"`
PeerPort int `yaml:"peerPort"`
LocalIP string `yaml:"localIP"`
}

// RegistryMirror configures the mirror of the official docker registry
Expand Down
13 changes: 0 additions & 13 deletions dfdaemon/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"math/rand"
"os"
"strings"
"testing"
"time"

"github.com/dragonflyoss/Dragonfly/dfdaemon/constant"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
Expand Down Expand Up @@ -85,17 +83,6 @@ func (ts *configTestSuite) TestValidateDFRepo() {
r.Equal(constant.CodeExitPathNotAbs, getCode(c.Validate()))
}

func (ts *configTestSuite) TestValidateDFPath() {
c := defaultConfig()
r := ts.Require()

c.DFPath = "/"
r.Nil(c.Validate())

c.DFPath = fmt.Sprintf("/df-test-%d-%d", time.Now().UnixNano(), rand.Int())
r.Equal(constant.CodeExitDfgetNotFound, getCode(c.Validate()))
}

func (ts *configTestSuite) TestURLNew() {
r := ts.Require()

Expand Down
12 changes: 11 additions & 1 deletion dfdaemon/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package downloader

import "context"
import (
"context"
"io"
)

// Interface specifies on how an plugin can download a file.
type Interface interface {
Expand All @@ -25,5 +28,12 @@ type Interface interface {
DownloadContext(ctx context.Context, url string, header map[string][]string, name string) (string, error)
}

type Stream interface {
// DownloadContext downloads the resource as specified in url, and it accepts
// a context parameter so that it can handle timeouts correctly.
DownloadStreamContext(ctx context.Context, url string, header map[string][]string, name string) (io.Reader, error)
}

// Factory is a function that returns a new downloader.
type Factory func() Interface
type StreamFactory func() Stream
40 changes: 40 additions & 0 deletions dfdaemon/downloader/p2p/dfclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package p2p

import (
"context"
"errors"
"io"

"github.com/dragonflyoss/Dragonfly/dfdaemon/config"
)

type Client struct {
}

func (c *Client) DownloadContext(ctx context.Context, url string, header map[string][]string, name string) (string, error) {
return "", errors.New("Not Implementation")
}

func (c *Client) DownloadStreamContext(ctx context.Context, url string, header map[string][]string, name string) (io.Reader, error) {
return nil, errors.New("Not Implementation")
}

func NewClient(cfg config.DFGetConfig) *Client {
return &Client{}
}
25 changes: 23 additions & 2 deletions dfdaemon/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/dragonflyoss/Dragonfly/dfdaemon/config"
"github.com/dragonflyoss/Dragonfly/dfdaemon/downloader"
"github.com/dragonflyoss/Dragonfly/dfdaemon/downloader/dfget"
"github.com/dragonflyoss/Dragonfly/dfdaemon/downloader/p2p"
"github.com/dragonflyoss/Dragonfly/dfdaemon/transport"

"github.com/pkg/errors"
Expand Down Expand Up @@ -101,6 +102,20 @@ func WithDownloaderFactory(f downloader.Factory) Option {
}
}

func WithStreamMode(streamMode bool) Option {
return func(p *Proxy) error {
p.streamMode = streamMode
return nil
}
}

func WithStreamDownloaderFactory(f downloader.StreamFactory) Option {
return func(p *Proxy) error {
p.streamDownloadFactory = f
return nil
}
}

// New returns a new transparent proxy with the given rules
func New(opts ...Option) (*Proxy, error) {
proxy := &Proxy{
Expand All @@ -124,6 +139,10 @@ func NewFromConfig(c config.Properties) (*Proxy, error) {
WithDownloaderFactory(func() downloader.Interface {
return dfget.NewGetter(c.DFGetConfig())
}),
WithStreamDownloaderFactory(func() downloader.Stream {
return p2p.NewClient(c.DFGetConfig())
}),
WithStreamMode(c.StreamMode),
}

logrus.Infof("registry mirror: %s", c.RegistryMirror.Remote)
Expand Down Expand Up @@ -171,7 +190,9 @@ type Proxy struct {
// directHandler are used to handle non proxy requests
directHandler http.Handler
// downloadFactory returns the downloader used for p2p downloading
downloadFactory downloader.Factory
downloadFactory downloader.Factory
streamDownloadFactory downloader.StreamFactory
streamMode bool
}

func (proxy *Proxy) mirrorRegistry(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -239,7 +260,7 @@ func (proxy *Proxy) handleHTTP(w http.ResponseWriter, req *http.Request) {

func (proxy *Proxy) roundTripper(tlsConfig *tls.Config) http.RoundTripper {
rt, _ := transport.New(
transport.WithDownloader(proxy.downloadFactory()),
transport.WithStreamDownloader(proxy.streamDownloadFactory()),
transport.WithTLS(tlsConfig),
transport.WithCondition(proxy.shouldUseDfget),
)
Expand Down
15 changes: 15 additions & 0 deletions dfdaemon/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/dragonflyoss/Dragonfly/dfdaemon/config"
"github.com/dragonflyoss/Dragonfly/dfdaemon/handler"
"github.com/dragonflyoss/Dragonfly/dfdaemon/proxy"
dfgetConfig "github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/dfget/core/uploader"
"github.com/dragonflyoss/Dragonfly/version"

"github.com/pkg/errors"
Expand Down Expand Up @@ -115,6 +117,19 @@ func NewFromConfig(cfg config.Properties) (*Server, error) {
return New(opts...)
}

func LaunchPeerServer(cfg config.Properties) error {
peerServerConfig := dfgetConfig.NewConfig()
peerServerConfig.RV.LocalIP = cfg.LocalIP
peerServerConfig.RV.PeerPort = cfg.PeerPort
peerServerConfig.RV.ServerAliveTime = 0
port, err := uploader.LaunchPeerServer(peerServerConfig)
if err != nil {
return err
}
peerServerConfig.RV.PeerPort = port
return nil
}

// Start runs dfdaemon's http server.
func (s *Server) Start() error {
_ = proxy.WithDirectHandler(handler.New())(s.proxy)
Expand Down
Loading

0 comments on commit 101a072

Please sign in to comment.