Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
redefine the scheduler and update other interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: allen.wq <[email protected]>
  • Loading branch information
wangforthinker committed May 14, 2020
1 parent c9332ef commit e5191eb
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,21 @@
* limitations under the License.
*/

package distributiondata
package basic

import (
"io"
)

// DistributionData defines the protocol of distribute data which is exchanged in peers.
type DistributionData interface {
// EncodeData encodes the original data.
Encode() (out io.Reader, err error)

// DecodeData decodes the encoded data.
Decode() (out io.Reader, err error)
// Response defines the response.
type Response interface {
Success() bool
Data() interface{}
}

// Offset gets the offset in file of original data.
// RangeRequest defines the range request.
type RangeRequest interface {
URL() string
Offset() int64

// Size gets the size of original data.
Size() int64
Header() map[string]string

// Extra gets the extra info.
Extra() interface{}
}
8 changes: 4 additions & 4 deletions dfget/corev2/clientstream/client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ import (
"context"
"io"

"github.com/dragonflyoss/Dragonfly/dfget/protocol/distributiondata"
"github.com/dragonflyoss/Dragonfly/pkg/protocol/distributiondata"
)

// ClientStream defines how to organize distribution data and get read stream for proxy request.
// An instance binds to a proxy request.
// ClientStream defines how to organize distribution data and get read stream for range request.
// An instance binds to a range request.
// It may receive a lot of distribution data.
type ClientStream interface {
// WriteData writes the distribution data from other peers, it may be called more times.
WriteData(data distributiondata.DistributionData) error

// ReadStream provides the read stream for proxy request
// ReadStream provides the read stream for range request.
ReadStream(ctx context.Context) (io.ReadCloser, error)
}
39 changes: 32 additions & 7 deletions dfget/corev2/datascheduler/data_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,49 @@ package datascheduler
import (
"context"

"github.com/dragonflyoss/Dragonfly/dfget/corev2/basic"

strfmt "github.com/go-openapi/strfmt"
)

// PlaceInfo represents the target address which is provided the download data.
type PlaceInfo struct {
// PeerInfo represents the target address which is provided the download data.
type PeerInfo struct {
IP strfmt.IPv4
Port int32
Path string
// ID represents the client ID of peer.
ID string
}

// SchedulerResult represents the result of schedule.
type SchedulerResult struct {
// SchedulerResult defines the result of schedule of range data.
type SchedulePieceDataResult struct {
Off int64
Size int64
PI []*PlaceInfo

// PeerInfos represents the schedule peers which to get the range data.
PeerInfos []*PeerInfo
}

// SchedulerResult defines the schedule result of request range.
// For some implementation, developer could do more than one schedule for the same request range.
type SchedulerResult interface {
// Result get the schedule result for range data which may not include all data of request range.
Result() []*SchedulePieceDataResult

// State gets the temporary states of this schedule which binds to range request.
State() ScheduleState
}

// ScheduleState defines the state of this schedule.
type ScheduleState interface {
// Again allows to call "Schedule" once more to obtain the rest range.
Again() bool

// Finish tells caller all range has been scheduled over. Do not use the instance to schedule again.
Finish() bool
}

// DataScheduler defines how to schedule peers for proxy request. An instance binds to a proxy request.
// DataScheduler defines how to schedule peers for range request.
type DataScheduler interface {
Schedule(ctx context.Context) ([]*SchedulerResult, error)
Schedule(ctx context.Context, rr basic.RangeRequest, state ScheduleState) (SchedulerResult, error)
}
2 changes: 1 addition & 1 deletion dfget/corev2/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"io"
)

// Downloader defines how to download file range from peer/source/cdn, an instance binds to <task, target address>.
// Downloader defines how to download file range from peer/cdn, an instance binds to <task, target address>.
type Downloader interface {
// Download downloads range data.
Download(ctx context.Context, off, size int64) (io.ReadCloser, error)
Expand Down
4 changes: 3 additions & 1 deletion dfget/corev2/regist/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package regist

import "github.com/dragonflyoss/Dragonfly/dfget/corev2/basic"

// SupernodeRegister encapsulates the Register steps into a struct.
type SupernodeRegister interface {
Register(peerPort int) (registerResult interface{}, err error)
Register(peerPort int) (response basic.Response, err error)
}
8 changes: 5 additions & 3 deletions dfget/corev2/report/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package report

import "github.com/dragonflyoss/Dragonfly/dfget/locator"
import (
"github.com/dragonflyoss/Dragonfly/dfget/corev2/basic"
"github.com/dragonflyoss/Dragonfly/dfget/locator"
)

// Reporter defines how to report resource to suprnode.
// developer could
type Reporter interface {
Report(supernode *locator.Supernode) (interface{}, error)
Report(supernode *locator.Supernode) (basic.Response, error)
}
4 changes: 2 additions & 2 deletions dfget/corev2/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ import "io"

// Uploader defines how to upload range by path.
type Uploader interface {
// UploadRange defines how to upload range by path.
UploadRange(path string, off, size int64) (io.ReadCloser, error)
// UploadRange defines how to upload range by path.
UploadRange(path string, off, size int64, opt interface{}) (io.ReadCloser, error)
}
49 changes: 0 additions & 49 deletions dfget/protocol/url/protocol.go

This file was deleted.

57 changes: 57 additions & 0 deletions pkg/protocol/distributiondata/protocol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package distributiondata

import (
"context"
"io"
)

type DataEncoder interface {
// Encode data.
Encode(io.Reader) (io.Reader, error)

// Decode data.
Decode(io.Reader) (io.Reader, error)
}

// DistributionData defines the protocol of distribute data which is exchanged in peers.
type DistributionData interface {
// Size gets the size of data.
Size() int64

// Metadata gets the metadata.
Metadata() interface{}

// Content gets the content of data.
Content(ctx context.Context) (io.Reader, error)
}

// EOFDistributionData represents the eof of file.
type EOFDistributionData struct{}

func (eof *EOFDistributionData) Size() int64 {
return 0
}

func (eof *EOFDistributionData) Metadata() interface{} {
return nil
}

func (eof *EOFDistributionData) Content(ctx context.Context) (io.Reader, error) {
return nil, io.EOF
}
57 changes: 57 additions & 0 deletions pkg/protocol/url/protocol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package url

import (
"context"
"io"
)

// ProtocolClient defines the way how to get data from remote resource.
// An instance should bind a resource.
// Developers can implement their own ProtocolClient which could support different protocol.
type ProtocolClient interface {
// Read gets range data from the binding resource.
Read(ctx context.Context, off int64, size int64) (io.ReadCloser, error)

// Length gets the length of binding resource.
Length(ctx context.Context) (int64, error)

// Metadata gets the metadata of binding resource.
Metadata(ctx context.Context) (interface{}, error)

// Expire gets if the binding resource is expired.
Expire(ctx context.Context) (bool, interface{}, error)

// Call calls the user defined method.
Call(ctx context.Context, method string, request interface{}) (response interface{}, err error)
}

// ProtocolClientNewInstanceInterface defines how to create an instance of ProtocolClient.
type ProtocolClientNewInstanceInterface interface {
// NewProtocolClient creates an instance of ProtocolClient.
NewProtocolClient(url string, header map[string]string, opts interface{}) (ProtocolClient, error)
}

// ClientRegister defines how to register pair <protocol, ProtocolClientNewInstanceInterface>.
type ClientRegister interface {
// RegisterProtocol registers pair <protocol, ProtocolClientNewInstanceInterface>.
RegisterProtocol(protocol string, inf ProtocolClientNewInstanceInterface)

// GetProtocolClientNewInstanceInterface gets the ProtocolClientNewInstanceInterface by protocol.
GetProtocolClientNewInstanceInterface(protocol string) (ProtocolClientNewInstanceInterface, error)
}

0 comments on commit e5191eb

Please sign in to comment.