diff --git a/dfget/corev2/basic/interface.go b/dfget/corev2/basic/interface.go index 9493c32d3..696696585 100644 --- a/dfget/corev2/basic/interface.go +++ b/dfget/corev2/basic/interface.go @@ -32,3 +32,19 @@ type RangeRequest interface { // Extra gets the extra info. Extra() interface{} } + +// NotifyResult defines the result of notify. +type NotifyResult interface { + Success() bool + Error() error + Data() interface{} +} + +// Notify defines how to notify asynchronous call if finished and get the result. +type Notify interface { + // Done returns a channel that's closed when work done. + Done() <-chan struct{} + + // Result returns the NotifyResult and only valid after Done channel is closed. + Result() NotifyResult +} diff --git a/dfget/corev2/clientwriter/client_writer.go b/dfget/corev2/clientwriter/client_writer.go index 2dcbfcc01..8cc4ed25b 100644 --- a/dfget/corev2/clientwriter/client_writer.go +++ b/dfget/corev2/clientwriter/client_writer.go @@ -17,15 +17,24 @@ package clientwriter import ( + "context" + "io" + + "github.com/dragonflyoss/Dragonfly/dfget/corev2/basic" "github.com/dragonflyoss/Dragonfly/pkg/protocol" ) -// ClientStream defines how to organize distribution data for range request. +// ClientWriter defines how to organize distribution data for range request. // An instance binds to a range request. // It may receive a lot of distribution data. -// Developer could add a io.WriteCloser in constructor of instance, and the ClientWriter will +// Developer could call Run() to start the loop in which ClientWriter will // write request data to io.Writer. type ClientWriter interface { // WriteData writes the distribution data from other peers, it may be called more times. PutData(data protocol.DistributionData) error + + // Run starts the loop and ClientWriter will write request data to wc. + // Run should only be called once. + // caller gets the result by Notify. + Run(ctx context.Context, wc io.WriteCloser) (basic.Notify, error) } diff --git a/dfget/corev2/common/notify.go b/dfget/corev2/common/notify.go new file mode 100644 index 000000000..1ba3ad83c --- /dev/null +++ b/dfget/corev2/common/notify.go @@ -0,0 +1,89 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import ( + "fmt" + "sync" + + "github.com/dragonflyoss/Dragonfly/dfget/corev2/basic" +) + +// NewNotify creates Notify which implements basic.Notify. +func NewNotify() *Notify { + return &Notify{} +} + +// Notify is an implementation of basic.Notify. +type Notify struct { + sync.RWMutex + done chan struct{} + result basic.NotifyResult +} + +func (notify *Notify) Done() <-chan struct{} { + return notify.done +} + +// Result returns the NotifyResult and only valid after Done channel is closed. +func (notify *Notify) Result() basic.NotifyResult { + notify.RLock() + defer notify.RUnlock() + + return notify.result +} + +// Finish sets result and close done channel to notify work done. +func (notify *Notify) Finish(result basic.NotifyResult) error { + notify.Lock() + defer notify.Unlock() + + if notify.result != nil { + return fmt.Errorf("result have been set once") + } + + notify.result = result + close(notify.done) + return nil +} + +// NotifyResult is an implementation of basic.NotifyResult. +type notifyResult struct { + success bool + err error + data interface{} +} + +func NewNotifyResult(success bool, err error, data interface{}) basic.NotifyResult { + return ¬ifyResult{ + success: success, + err: err, + data: data, + } +} + +func (nr notifyResult) Success() bool { + return nr.success +} + +func (nr notifyResult) Error() error { + return nr.err +} + +func (nr notifyResult) Data() interface{} { + return nr.data +}