From e5191ebd885657862ab5b527e1258b0c06222ba5 Mon Sep 17 00:00:00 2001 From: "allen.wq" Date: Thu, 14 May 2020 21:29:34 +0800 Subject: [PATCH] redefine the scheduler and update other interfaces Signed-off-by: allen.wq --- .../protocol.go => corev2/basic/interface.go} | 28 +++++---- dfget/corev2/clientstream/client_stream.go | 8 +-- dfget/corev2/datascheduler/data_scheduler.go | 39 ++++++++++--- dfget/corev2/downloader/downloader.go | 2 +- dfget/corev2/regist/register.go | 4 +- dfget/corev2/report/reporter.go | 8 ++- dfget/corev2/uploader/uploader.go | 4 +- dfget/protocol/url/protocol.go | 49 ---------------- pkg/protocol/distributiondata/protocol.go | 57 +++++++++++++++++++ pkg/protocol/url/protocol.go | 57 +++++++++++++++++++ 10 files changed, 174 insertions(+), 82 deletions(-) rename dfget/{protocol/distributiondata/protocol.go => corev2/basic/interface.go} (60%) delete mode 100644 dfget/protocol/url/protocol.go create mode 100644 pkg/protocol/distributiondata/protocol.go create mode 100644 pkg/protocol/url/protocol.go diff --git a/dfget/protocol/distributiondata/protocol.go b/dfget/corev2/basic/interface.go similarity index 60% rename from dfget/protocol/distributiondata/protocol.go rename to dfget/corev2/basic/interface.go index e5cd7307f..9493c32d3 100644 --- a/dfget/protocol/distributiondata/protocol.go +++ b/dfget/corev2/basic/interface.go @@ -14,23 +14,21 @@ * limitations under the License. */ -package distributiondata +package basic -import ( - "io" -) - -// DistributionData defines the protocol of distribute data which is exchanged in peers. -type DistributionData interface { - // EncodeData encodes the original data. - Encode() (out io.Reader, err error) - - // DecodeData decodes the encoded data. - Decode() (out io.Reader, err error) +// Response defines the response. +type Response interface { + Success() bool + Data() interface{} +} - // Offset gets the offset in file of original data. +// RangeRequest defines the range request. +type RangeRequest interface { + URL() string Offset() int64 - - // Size gets the size of original data. Size() int64 + Header() map[string]string + + // Extra gets the extra info. + Extra() interface{} } diff --git a/dfget/corev2/clientstream/client_stream.go b/dfget/corev2/clientstream/client_stream.go index 4bab4a126..995d2cd5a 100644 --- a/dfget/corev2/clientstream/client_stream.go +++ b/dfget/corev2/clientstream/client_stream.go @@ -20,16 +20,16 @@ import ( "context" "io" - "github.com/dragonflyoss/Dragonfly/dfget/protocol/distributiondata" + "github.com/dragonflyoss/Dragonfly/pkg/protocol/distributiondata" ) -// ClientStream defines how to organize distribution data and get read stream for proxy request. -// An instance binds to a proxy request. +// ClientStream defines how to organize distribution data and get read stream for range request. +// An instance binds to a range request. // It may receive a lot of distribution data. type ClientStream interface { // WriteData writes the distribution data from other peers, it may be called more times. WriteData(data distributiondata.DistributionData) error - // ReadStream provides the read stream for proxy request + // ReadStream provides the read stream for range request. ReadStream(ctx context.Context) (io.ReadCloser, error) } diff --git a/dfget/corev2/datascheduler/data_scheduler.go b/dfget/corev2/datascheduler/data_scheduler.go index 27681f9eb..25869b607 100644 --- a/dfget/corev2/datascheduler/data_scheduler.go +++ b/dfget/corev2/datascheduler/data_scheduler.go @@ -19,24 +19,49 @@ package datascheduler import ( "context" + "github.com/dragonflyoss/Dragonfly/dfget/corev2/basic" + strfmt "github.com/go-openapi/strfmt" ) -// PlaceInfo represents the target address which is provided the download data. -type PlaceInfo struct { +// PeerInfo represents the target address which is provided the download data. +type PeerInfo struct { IP strfmt.IPv4 Port int32 Path string + // ID represents the client ID of peer. + ID string } -// SchedulerResult represents the result of schedule. -type SchedulerResult struct { +// SchedulerResult defines the result of schedule of range data. +type SchedulePieceDataResult struct { Off int64 Size int64 - PI []*PlaceInfo + + // PeerInfos represents the schedule peers which to get the range data. + PeerInfos []*PeerInfo +} + +// SchedulerResult defines the schedule result of request range. +// For some implementation, developer could do more than one schedule for the same request range. +type SchedulerResult interface { + // Result get the schedule result for range data which may not include all data of request range. + Result() []*SchedulePieceDataResult + + // State gets the temporary states of this schedule which binds to range request. + State() ScheduleState +} + +// ScheduleState defines the state of this schedule. +type ScheduleState interface { + // Again allows to call "Schedule" once more to obtain the rest range. + Again() bool + + // Finish tells caller all range has been scheduled over. Do not use the instance to schedule again. + Finish() bool } -// DataScheduler defines how to schedule peers for proxy request. An instance binds to a proxy request. +// DataScheduler defines how to schedule peers for range request. type DataScheduler interface { - Schedule(ctx context.Context) ([]*SchedulerResult, error) + Schedule(ctx context.Context, rr basic.RangeRequest, state ScheduleState) (SchedulerResult, error) } diff --git a/dfget/corev2/downloader/downloader.go b/dfget/corev2/downloader/downloader.go index e2b35a008..bb821dd6a 100644 --- a/dfget/corev2/downloader/downloader.go +++ b/dfget/corev2/downloader/downloader.go @@ -21,7 +21,7 @@ import ( "io" ) -// Downloader defines how to download file range from peer/source/cdn, an instance binds to . +// Downloader defines how to download file range from peer/cdn, an instance binds to . type Downloader interface { // Download downloads range data. Download(ctx context.Context, off, size int64) (io.ReadCloser, error) diff --git a/dfget/corev2/regist/register.go b/dfget/corev2/regist/register.go index f66e2fc61..f17aad7c1 100644 --- a/dfget/corev2/regist/register.go +++ b/dfget/corev2/regist/register.go @@ -16,7 +16,9 @@ package regist +import "github.com/dragonflyoss/Dragonfly/dfget/corev2/basic" + // SupernodeRegister encapsulates the Register steps into a struct. type SupernodeRegister interface { - Register(peerPort int) (registerResult interface{}, err error) + Register(peerPort int) (response basic.Response, err error) } diff --git a/dfget/corev2/report/reporter.go b/dfget/corev2/report/reporter.go index 6e8e17c41..5be4ae199 100644 --- a/dfget/corev2/report/reporter.go +++ b/dfget/corev2/report/reporter.go @@ -16,10 +16,12 @@ package report -import "github.com/dragonflyoss/Dragonfly/dfget/locator" +import ( + "github.com/dragonflyoss/Dragonfly/dfget/corev2/basic" + "github.com/dragonflyoss/Dragonfly/dfget/locator" +) // Reporter defines how to report resource to suprnode. -// developer could type Reporter interface { - Report(supernode *locator.Supernode) (interface{}, error) + Report(supernode *locator.Supernode) (basic.Response, error) } diff --git a/dfget/corev2/uploader/uploader.go b/dfget/corev2/uploader/uploader.go index 337db06fc..e55eb0f90 100644 --- a/dfget/corev2/uploader/uploader.go +++ b/dfget/corev2/uploader/uploader.go @@ -20,6 +20,6 @@ import "io" // Uploader defines how to upload range by path. type Uploader interface { - // UploadRange defines how to upload range by path. - UploadRange(path string, off, size int64) (io.ReadCloser, error) + // UploadRange defines how to upload range by path. + UploadRange(path string, off, size int64, opt interface{}) (io.ReadCloser, error) } diff --git a/dfget/protocol/url/protocol.go b/dfget/protocol/url/protocol.go deleted file mode 100644 index 6fd0aa591..000000000 --- a/dfget/protocol/url/protocol.go +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright The Dragonfly Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package url - -import ( - "context" - "io" - "net/http" -) - -// ClientReader defines the way how to get data from remote resource. -// An instance should bind a resource. -// Developers can implement their own ClientReader which could support different protocol. -type ClientReader interface { - // Read gets range data from the binding resource. - Read(ctx context.Context, off int64, size int64) (io.ReadCloser, error) - - // FullSize gets the full size of binding resource. - FullSize(ctx context.Context) (int64, error) -} - -// ClientReaderNewInstanceInterface defines how to create an instance of ClientReader. -type ClientReaderNewInstanceInterface interface { - // NewClientReader creates an instance of ClientReader. - NewClientReader(url string, header http.Header, opts interface{}) (ClientReader, error) -} - -// ClientRegister defines how to register pair . -type ClientRegister interface { - // RegisterProtocol registers pair . - RegisterProtocol(protocol string, inf ClientReaderNewInstanceInterface) - - // GetClientReaderNewInstanceInterface gets the ClientReaderNewInstanceInterface by protocol. - GetClientReaderNewInstanceInterface(protocol string) (ClientReaderNewInstanceInterface, error) -} diff --git a/pkg/protocol/distributiondata/protocol.go b/pkg/protocol/distributiondata/protocol.go new file mode 100644 index 000000000..32d6b4fea --- /dev/null +++ b/pkg/protocol/distributiondata/protocol.go @@ -0,0 +1,57 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package distributiondata + +import ( + "context" + "io" +) + +type DataEncoder interface { + // Encode data. + Encode(io.Reader) (io.Reader, error) + + // Decode data. + Decode(io.Reader) (io.Reader, error) +} + +// DistributionData defines the protocol of distribute data which is exchanged in peers. +type DistributionData interface { + // Size gets the size of data. + Size() int64 + + // Metadata gets the metadata. + Metadata() interface{} + + // Content gets the content of data. + Content(ctx context.Context) (io.Reader, error) +} + +// EOFDistributionData represents the eof of file. +type EOFDistributionData struct{} + +func (eof *EOFDistributionData) Size() int64 { + return 0 +} + +func (eof *EOFDistributionData) Metadata() interface{} { + return nil +} + +func (eof *EOFDistributionData) Content(ctx context.Context) (io.Reader, error) { + return nil, io.EOF +} diff --git a/pkg/protocol/url/protocol.go b/pkg/protocol/url/protocol.go new file mode 100644 index 000000000..1d0376a95 --- /dev/null +++ b/pkg/protocol/url/protocol.go @@ -0,0 +1,57 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package url + +import ( + "context" + "io" +) + +// ProtocolClient defines the way how to get data from remote resource. +// An instance should bind a resource. +// Developers can implement their own ProtocolClient which could support different protocol. +type ProtocolClient interface { + // Read gets range data from the binding resource. + Read(ctx context.Context, off int64, size int64) (io.ReadCloser, error) + + // Length gets the length of binding resource. + Length(ctx context.Context) (int64, error) + + // Metadata gets the metadata of binding resource. + Metadata(ctx context.Context) (interface{}, error) + + // Expire gets if the binding resource is expired. + Expire(ctx context.Context) (bool, interface{}, error) + + // Call calls the user defined method. + Call(ctx context.Context, method string, request interface{}) (response interface{}, err error) +} + +// ProtocolClientNewInstanceInterface defines how to create an instance of ProtocolClient. +type ProtocolClientNewInstanceInterface interface { + // NewProtocolClient creates an instance of ProtocolClient. + NewProtocolClient(url string, header map[string]string, opts interface{}) (ProtocolClient, error) +} + +// ClientRegister defines how to register pair . +type ClientRegister interface { + // RegisterProtocol registers pair . + RegisterProtocol(protocol string, inf ProtocolClientNewInstanceInterface) + + // GetProtocolClientNewInstanceInterface gets the ProtocolClientNewInstanceInterface by protocol. + GetProtocolClientNewInstanceInterface(protocol string) (ProtocolClientNewInstanceInterface, error) +}