Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: scheduler grpc #1310

Merged
merged 2 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/compatibility-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ env:
KIND_VERSION: v0.11.1
CONTAINERD_VERSION: v1.5.2
KIND_CONFIG_PATH: test/testdata/kind/config.yaml
DRAGONFLY_STABLE_IMAGE_TAG: v2.0.3-beta.2
DRAGONFLY_STABLE_IMAGE_TAG: v2.0.3-beta.3
DRAGONFLY_CHARTS_PATH: deploy/helm-charts/charts/dragonfly
DRAGONFLY_CHARTS_CONFIG_PATH: test/testdata/charts/config.yaml
DRAGONFLY_FILE_SERVER_PATH: test/testdata/k8s/file-server.yaml
Expand Down
7 changes: 5 additions & 2 deletions client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type clientDaemon struct {
dfpath dfpath.Dfpath
schedulers []*manager.Scheduler
managerClient managerclient.Client
schedulerClient schedulerclient.SchedulerClient
schedulerClient schedulerclient.Client
}

func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
Expand Down Expand Up @@ -144,7 +144,10 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {

var opts []grpc.DialOption
if opt.Options.Telemetry.Jaeger != "" {
opts = append(opts, grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()))
opts = append(opts,
grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()),
)
}
sched, err := schedulerclient.GetClientByAddr(addrs, opts...)
if err != nil {
Expand Down
13 changes: 10 additions & 3 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type peerTaskConductor struct {

// schedule options
schedulerOption config.SchedulerOption
schedulerClient schedulerclient.SchedulerClient
schedulerClient schedulerclient.Client

// peer task meta info
peerID string
Expand All @@ -108,7 +108,7 @@ type peerTaskConductor struct {
tinyData *TinyData

// peerPacketStream stands schedulerclient.PeerPacketStream from scheduler
peerPacketStream schedulerclient.PeerPacketStream
peerPacketStream scheduler.Scheduler_ReportPieceResultClient
// peerPacket is the latest available peers from peerPacketCh
// Deprecated: remove in future release
peerPacket atomic.Value // *scheduler.PeerPacket
Expand Down Expand Up @@ -179,6 +179,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
span.SetAttributes(semconv.HTTPURLKey.String(request.Url))

taskID := idgen.TaskID(request.Url, request.UrlMeta)
request.TaskId = taskID

var (
log *logger.SugaredLoggerOnWith
Expand Down Expand Up @@ -324,7 +325,7 @@ func (pt *peerTaskConductor) register() error {
}
}

peerPacketStream, err := pt.schedulerClient.ReportPieceResult(pt.ctx, result.TaskId, pt.request)
peerPacketStream, err := pt.schedulerClient.ReportPieceResult(pt.ctx, pt.request)
pt.Infof("step 2: start report piece result")
if err != nil {
pt.span.RecordError(err)
Expand Down Expand Up @@ -1469,6 +1470,9 @@ func (pt *peerTaskConductor) done() {
schedulerclient.NewEndOfPiece(pt.taskID, pt.peerID, pt.readyPieces.Settled()))
pt.Debugf("end piece result sent: %v, peer task finished", err)

err = pt.peerPacketStream.CloseSend()
pt.Debugf("close stream result: %v", err)

err = pt.schedulerClient.ReportPeerResult(
peerResultCtx,
&scheduler.PeerResult{
Expand Down Expand Up @@ -1521,6 +1525,9 @@ func (pt *peerTaskConductor) fail() {
schedulerclient.NewEndOfPiece(pt.taskID, pt.peerID, pt.readyPieces.Settled()))
pt.Debugf("end piece result sent: %v, peer task finished", err)

err = pt.peerPacketStream.CloseSend()
pt.Debugf("close stream result: %v", err)

ctx := trace.ContextWithSpan(context.Background(), trace.SpanFromContext(pt.ctx))
peerResultCtx, peerResultSpan := tracer.Start(ctx, config.SpanReportPeerResult)
defer peerResultSpan.End()
Expand Down
12 changes: 8 additions & 4 deletions client/daemon/peer/peertask_dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"d7y.io/dragonfly/v2/pkg/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
)

// when scheduler is not available, use dummySchedulerClient to back source
Expand All @@ -36,7 +35,7 @@ func (d *dummySchedulerClient) RegisterPeerTask(ctx context.Context, request *sc
panic("should not call this function")
}

func (d *dummySchedulerClient) ReportPieceResult(ctx context.Context, s string, request *scheduler.PeerTaskRequest, option ...grpc.CallOption) (schedulerclient.PeerPacketStream, error) {
func (d *dummySchedulerClient) ReportPieceResult(ctx context.Context, request *scheduler.PeerTaskRequest, option ...grpc.CallOption) (scheduler.Scheduler_ReportPieceResultClient, error) {
return &dummyPeerPacketStream{}, nil
}

Expand Down Expand Up @@ -68,13 +67,18 @@ func (d *dummySchedulerClient) GetState() []dfnet.NetAddr {
}

type dummyPeerPacketStream struct {
grpc.ClientStream
}

func (d *dummyPeerPacketStream) Recv() (pp *scheduler.PeerPacket, err error) {
func (d *dummyPeerPacketStream) Recv() (*scheduler.PeerPacket, error) {
// TODO set base.Code_SchedNeedBackSource in *scheduler.PeerPacket instead of error
return nil, dferrors.New(base.Code_SchedNeedBackSource, "")
}

func (d *dummyPeerPacketStream) Send(pr *scheduler.PieceResult) (err error) {
func (d *dummyPeerPacketStream) Send(pr *scheduler.PieceResult) error {
return nil
}

func (d *dummyPeerPacketStream) CloseSend() error {
return nil
}
4 changes: 2 additions & 2 deletions client/daemon/peer/peertask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func init() {

type peerTaskManager struct {
host *scheduler.PeerHost
schedulerClient schedulerclient.SchedulerClient
schedulerClient schedulerclient.Client
schedulerOption config.SchedulerOption
pieceManager PieceManager
storageManager storage.Manager
Expand All @@ -140,7 +140,7 @@ func NewPeerTaskManager(
host *scheduler.PeerHost,
pieceManager PieceManager,
storageManager storage.Manager,
schedulerClient schedulerclient.SchedulerClient,
schedulerClient schedulerclient.Client,
schedulerOption config.SchedulerOption,
perPeerRateLimit rate.Limit,
multiplex bool,
Expand Down
19 changes: 11 additions & 8 deletions client/daemon/peer/peertask_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ import (
daemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
mock_scheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"
mock_scheduler_client "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"
mock_scheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler/mocks"
"d7y.io/dragonfly/v2/pkg/source"
"d7y.io/dragonfly/v2/pkg/source/clients/httpprotocol"
sourceMock "d7y.io/dragonfly/v2/pkg/source/mock"
Expand Down Expand Up @@ -85,7 +86,7 @@ type componentsOption struct {

//go:generate mockgen -package mock_server_grpc -source ../../../pkg/rpc/dfdaemon/dfdaemon.pb.go -destination ../test/mock/daemongrpc/daemon_server_grpc.go
func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOption) (
schedulerclient.SchedulerClient, storage.Manager) {
schedulerclient.Client, storage.Manager) {
port := int32(freeport.GetPort())
// 1. set up a mock daemon server for uploading pieces info
var daemon = mock_daemon.NewMockDaemonServer(ctrl)
Expand Down Expand Up @@ -177,7 +178,7 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio
time.Sleep(100 * time.Millisecond)

// 2. setup a scheduler
pps := mock_scheduler.NewMockPeerPacketStream(ctrl)
pps := mock_scheduler.NewMockScheduler_ReportPieceResultClient(ctrl)
pps.EXPECT().Send(gomock.Any()).AnyTimes().DoAndReturn(
func(pr *scheduler.PieceResult) error {
return nil
Expand Down Expand Up @@ -212,7 +213,9 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio
StealPeers: nil,
}, nil
})
sched := mock_scheduler.NewMockSchedulerClient(ctrl)
pps.EXPECT().CloseSend().AnyTimes()

sched := mock_scheduler_client.NewMockClient(ctrl)
sched.EXPECT().RegisterPeerTask(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(ctx context.Context, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (*scheduler.RegisterResult, error) {
switch opt.scope {
Expand Down Expand Up @@ -250,9 +253,9 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio
DirectPiece: nil,
}, nil
})
sched.EXPECT().ReportPieceResult(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(ctx context.Context, taskId string, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (
schedulerclient.PeerPacketStream, error) {
sched.EXPECT().ReportPieceResult(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(ctx context.Context, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (
scheduler.Scheduler_ReportPieceResultClient, error) {
return pps, nil
})
sched.EXPECT().ReportPeerResult(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
Expand All @@ -274,7 +277,7 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio
type mockManager struct {
testSpec *testSpec
peerTaskManager *peerTaskManager
schedulerClient schedulerclient.SchedulerClient
schedulerClient schedulerclient.Client
storageManager storage.Manager
}

Expand Down
12 changes: 7 additions & 5 deletions client/daemon/peer/peertask_stream_backsource_partial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ import (
daemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
mock_scheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"
mock_scheduler_client "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"
mock_scheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler/mocks"
"d7y.io/dragonfly/v2/pkg/source"
"d7y.io/dragonfly/v2/pkg/source/clients/httpprotocol"
sourceMock "d7y.io/dragonfly/v2/pkg/source/mock"
"d7y.io/dragonfly/v2/pkg/util/digestutils"
)

func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte, opt componentsOption) (
schedulerclient.SchedulerClient, storage.Manager) {
schedulerclient.Client, storage.Manager) {
port := int32(freeport.GetPort())
// 1. set up a mock daemon server for uploading pieces info
var daemon = mock_daemon.NewMockDaemonServer(ctrl)
Expand Down Expand Up @@ -111,7 +112,7 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte,
time.Sleep(100 * time.Millisecond)

// 2. setup a scheduler
pps := mock_scheduler.NewMockPeerPacketStream(ctrl)
pps := mock_scheduler.NewMockScheduler_ReportPieceResultClient(ctrl)
var (
wg = sync.WaitGroup{}
backSourceSent = atomic.Bool{}
Expand Down Expand Up @@ -159,7 +160,8 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte,
StealPeers: nil,
}, nil
})
sched := mock_scheduler.NewMockSchedulerClient(ctrl)
pps.EXPECT().CloseSend().AnyTimes()
sched := mock_scheduler_client.NewMockClient(ctrl)
sched.EXPECT().RegisterPeerTask(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(ctx context.Context, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (*scheduler.RegisterResult, error) {
return &scheduler.RegisterResult{
Expand All @@ -169,7 +171,7 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte,
}, nil
})
sched.EXPECT().ReportPieceResult(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(ctx context.Context, taskId string, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (schedulerclient.PeerPacketStream, error) {
func(ctx context.Context, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (scheduler.Scheduler_ReportPieceResultClient, error) {
return pps, nil
})
sched.EXPECT().ReportPeerResult(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
Expand Down
4 changes: 2 additions & 2 deletions client/daemon/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ func (s *server) SyncPieceTasks(sync dfdaemongrpc.Daemon_SyncPieceTasksServer) e
if !attributeSent && len(p.PieceInfos) > 0 {
exa, e := s.storageManager.GetExtendAttribute(ctx,
&storage.PeerTaskMetadata{
PeerID: request.TaskId,
TaskID: request.DstPid,
PeerID: request.DstPid,
TaskID: request.TaskId,
})
if e != nil {
log.Errorf("get extend attribute error: %s", e.Error())
Expand Down
Loading