Skip to content

Commit

Permalink
Merge branch 'main' of github.com:dragonflyoss/Dragonfly2 into featur…
Browse files Browse the repository at this point in the history
…e/stream-send-error-code

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Jan 17, 2022
2 parents 516ef64 + 55350fe commit 946b3fe
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 42 deletions.
1 change: 0 additions & 1 deletion client/config/constants_otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,5 @@ const (
SpanWritePiece = "write-piece"
SpanWriteBackPiece = "write-back-piece"
SpanWaitPieceLimit = "wait-limit"
SpanPushPieceResult = "push-result"
SpanPeerGC = "peer-gc"
)
24 changes: 21 additions & 3 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,25 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
err = errors.Errorf("empty schedule result")
return nil, err
}
log := logger.With("peer", request.PeerId, "task", result.TaskId, "component", "PeerTask")

var (
log *logger.SugaredLoggerOnWith
traceID = span.SpanContext().TraceID()
)

if traceID.IsValid() {
log = logger.With(
"peer", request.PeerId,
"task", result.TaskId,
"component", "PeerTask",
"trace", traceID.String())
} else {
log = logger.With(
"peer", request.PeerId,
"task", result.TaskId,
"component", "PeerTask")
}

span.SetAttributes(config.AttributeTaskID.String(result.TaskId))
log.Infof("register task success, SizeScope: %s", base.SizeScope_name[int32(result.SizeScope)])

Expand Down Expand Up @@ -1015,7 +1033,7 @@ func (pt *peerTaskConductor) ReportPieceResult(request *DownloadPieceRequest, re
}

func (pt *peerTaskConductor) reportSuccessResult(request *DownloadPieceRequest, result *DownloadPieceResult) {
_, span := tracer.Start(pt.ctx, config.SpanPushPieceResult)
_, span := tracer.Start(pt.ctx, config.SpanReportPieceResult)
span.SetAttributes(config.AttributeWritePieceSuccess.Bool(true))

err := pt.peerPacketStream.Send(
Expand All @@ -1040,7 +1058,7 @@ func (pt *peerTaskConductor) reportSuccessResult(request *DownloadPieceRequest,
}

func (pt *peerTaskConductor) reportFailResult(request *DownloadPieceRequest, result *DownloadPieceResult, code base.Code) {
_, span := tracer.Start(pt.ctx, config.SpanPushPieceResult)
_, span := tracer.Start(pt.ctx, config.SpanReportPieceResult)
span.SetAttributes(config.AttributeWritePieceSuccess.Bool(false))

err := pt.peerPacketStream.Send(&scheduler.PieceResult{
Expand Down
6 changes: 5 additions & 1 deletion client/daemon/peer/peertask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@ func (ptm *peerTaskManager) findPeerTaskConductor(taskID string) (*peerTaskCondu
return pt.(*peerTaskConductor), true
}

func (ptm *peerTaskManager) getOrCreatePeerTaskConductor(ctx context.Context, taskID string, request *scheduler.PeerTaskRequest, limit rate.Limit) (*peerTaskConductor, error) {
func (ptm *peerTaskManager) getOrCreatePeerTaskConductor(
ctx context.Context,
taskID string,
request *scheduler.PeerTaskRequest,
limit rate.Limit) (*peerTaskConductor, error) {
if ptc, ok := ptm.findPeerTaskConductor(taskID); ok {
logger.Debugf("peer task found: %s/%s", ptc.taskID, ptc.peerID)
return ptc, nil
Expand Down
65 changes: 38 additions & 27 deletions client/daemon/peer/peertask_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,30 +379,6 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
return sourceClient
},
},
{
name: "normal size scope - schedule timeout - auto back source",
taskData: testBytes,
pieceParallelCount: 4,
pieceSize: 1024,
peerID: "peer-0",
peerPacketDelay: []time.Duration{time.Second},
scheduleTimeout: time.Nanosecond,
urlGenerator: func(ts *testSpec) string {
server := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
n, err := w.Write(testBytes)
assert.Nil(err)
assert.Equal(len(ts.taskData), n)
}))
ts.cleanUp = append(ts.cleanUp, func() {
server.Close()
})
return server.URL
},
sizeScope: base.SizeScope_NORMAL,
mockPieceDownloader: nil,
mockHTTPSourceClient: nil,
},
{
name: "normal size scope - back source - no content length",
taskData: testBytes,
Expand Down Expand Up @@ -449,6 +425,30 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
return sourceClient
},
},
{
name: "normal size scope - schedule timeout - auto back source",
taskData: testBytes,
pieceParallelCount: 4,
pieceSize: 1024,
peerID: "peer-0",
peerPacketDelay: []time.Duration{time.Second},
scheduleTimeout: time.Nanosecond,
urlGenerator: func(ts *testSpec) string {
server := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
n, err := w.Write(testBytes)
assert.Nil(err)
assert.Equal(len(ts.taskData), n)
}))
ts.cleanUp = append(ts.cleanUp, func() {
server.Close()
})
return server.URL
},
sizeScope: base.SizeScope_NORMAL,
mockPieceDownloader: nil,
mockHTTPSourceClient: nil,
},
}

for _, _tc := range testCases {
Expand Down Expand Up @@ -651,12 +651,23 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
assert.True(r, fmt.Sprintf("task %d result should be true", i))
}

var taskCount int
var (
runningTaskCount int
success bool
)
select {
case <-ptc.successCh:
success = true
case <-ptc.failCh:
case <-time.After(10 * time.Minute):
}
assert.True(success, "task should success")

ptm.runningPeerTasks.Range(func(key, value interface{}) bool {
taskCount++
runningTaskCount++
return true
})
assert.Equal(0, taskCount, "no running tasks")
assert.Equal(0, runningTaskCount, "no running tasks")

// test reuse stream task
rc, _, ok := ptm.tryReuseStreamPeerTask(context.Background(), request)
Expand Down
10 changes: 7 additions & 3 deletions client/daemon/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/golang/groupcache/lru"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/semconv"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -248,8 +249,8 @@ func (proxy *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer func() {
span.End()
}()
// update ctx for transfer trace id
// TODO(jim): only support HTTP scheme, need support HTTPS scheme

// update ctx to transfer trace id
r = r.WithContext(ctx)

// check authenticity
Expand Down Expand Up @@ -316,7 +317,7 @@ func proxyBasicAuth(r *http.Request) (username, password string, ok bool) {
// "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==" returns ("Aladdin", "open sesame", true).
func parseBasicAuth(auth string) (username, password string, ok bool) {
const prefix = "Basic "
// Case insensitive prefix match. See Issue 22736.
// Case-insensitive prefix match. See Issue 22736.
if len(auth) < len(prefix) || !strings.EqualFold(auth[:len(prefix)], prefix) {
return
}
Expand All @@ -333,6 +334,7 @@ func parseBasicAuth(auth string) (username, password string, ok bool) {
}

func (proxy *Proxy) handleHTTP(span trace.Span, w http.ResponseWriter, req *http.Request) {
// FIXME did not need create a transport per request
resp, err := proxy.newTransport(nil).RoundTrip(req)
if err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
Expand Down Expand Up @@ -425,6 +427,8 @@ func (proxy *Proxy) handleHTTPS(w http.ResponseWriter, r *http.Request) {

rp := &httputil.ReverseProxy{
Director: func(req *http.Request) {
// we can not change req.ctx in Director, so inject trace with header
propagation.TraceContext{}.Inject(r.Context(), propagation.HeaderCarrier(req.Header))
req.URL.Host = req.Host
req.URL.Scheme = schemaHTTPS
if proxy.dumpHTTPContent {
Expand Down
24 changes: 20 additions & 4 deletions client/daemon/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package transport

import (
"context"
"crypto/tls"
"fmt"
"net"
Expand All @@ -27,6 +28,7 @@ import (
"time"

"github.com/go-http-utils/headers"
"go.opentelemetry.io/otel/propagation"

"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/metrics"
Expand All @@ -42,7 +44,8 @@ var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation

var (
// layerReg the regex to determine if it is an image download
layerReg = regexp.MustCompile("^.+/blobs/sha256.*$")
layerReg = regexp.MustCompile("^.+/blobs/sha256.*$")
traceContext = propagation.TraceContext{}
)

// transport implements RoundTripper for dragonfly.
Expand Down Expand Up @@ -150,9 +153,10 @@ func (rt *transport) RoundTrip(req *http.Request) (resp *http.Response, err erro
// delete the Accept-Encoding header to avoid returning the same cached
// result for different requests
req.Header.Del("Accept-Encoding")
ctx := traceContext.Extract(req.Context(), propagation.HeaderCarrier(req.Header))
logger.Debugf("round trip with dragonfly: %s", req.URL.String())
metrics.ProxyRequestViaDragonflyCount.Add(1)
resp, err = rt.download(req)
resp, err = rt.download(ctx, req)
} else {
logger.Debugf("round trip directly, method: %s, url: %s", req.Method, req.URL.String())
req.Host = req.URL.Host
Expand Down Expand Up @@ -182,7 +186,8 @@ func NeedUseDragonfly(req *http.Request) bool {
}

// download uses dragonfly to download.
func (rt *transport) download(req *http.Request) (*http.Response, error) {
// the ctx has span info from transport, did not use the ctx from request
func (rt *transport) download(ctx context.Context, req *http.Request) (*http.Response, error) {
url := req.URL.String()
peerID := idgen.PeerID(rt.peerHost.Ip)
log := logger.With("peer", peerID, "component", "transport")
Expand All @@ -209,7 +214,7 @@ func (rt *transport) download(req *http.Request) (*http.Response, error) {
meta.Filter = filter

body, attr, err := rt.peerTaskManager.StartStreamTask(
req.Context(),
ctx,
&peer.StreamTaskRequest{
URL: url,
URLMeta: meta,
Expand Down Expand Up @@ -292,6 +297,8 @@ func defaultHTTPTransport(cfg *tls.Config) *http.Transport {
// obsoleted RFC 2616 (section 13.5.1) and are used for backward
// compatibility.
// copy from net/http/httputil/reverseproxy.go

// dragonfly need generate task id with header, need to remove some other headers
var hopHeaders = []string{
"Connection",
"Proxy-Connection", // non-standard but still sent by libcurl and rejected by e.g. google
Expand All @@ -302,11 +309,20 @@ var hopHeaders = []string{
"Trailer", // not Trailers per URL above; https://www.rfc-editor.org/errata_search.php?eid=4522
"Transfer-Encoding",
"Upgrade",

// remove by dragonfly
"Accept",
"User-Agent",
"X-Forwarded-For",
}

// delHopHeaders delete hop-by-hop headers.
func delHopHeaders(header http.Header) {
for _, h := range hopHeaders {
header.Del(h)
}
// remove correlation with trace header
for _, h := range traceContext.Fields() {
header.Del(h)
}
}
2 changes: 1 addition & 1 deletion deploy/helm-charts
2 changes: 1 addition & 1 deletion docs/en/deployment/configuration/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ server:
# port is the ip and port scheduler server listens on.
port: 8002
# limit the number of requests
listenLimit: 1000
listenLimit: 10000
# cacheDir is dynconfig cache storage directory
# in linux, default value is /var/cache/dragonfly
# in macos(just for testing), default value is /Users/$USER/.dragonfly/cache
Expand Down
2 changes: 1 addition & 1 deletion docs/zh-CN/deployment/configuration/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ server:
# 服务监听端口
port:
# 限制请求并发数
listenLimit: 1000
listenLimit: 10000
# daemon 动态配置缓存目录
# linux 上默认目录 /var/cache/dragonfly
# macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly/cache
Expand Down
13 changes: 13 additions & 0 deletions scheduler/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,19 @@ func TestScheduler_ScheduleParent(t *testing.T) {
assert.False(ok)
},
},
{
name: "parent is peer's ancestor",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning)
mockPeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(mockPeer)
mockPeer.StoreChild(peer)
},
expect: func(t *testing.T, parents []*resource.Peer, ok bool) {
assert := assert.New(t)
assert.False(ok)
},
},
{
name: "parent free upload load is zero",
mock: func(peer *resource.Peer, mockPeer *resource.Peer, blocklist set.SafeSet, stream rpcscheduler.Scheduler_ReportPieceResultServer, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder) {
Expand Down

0 comments on commit 946b3fe

Please sign in to comment.