From 3cd96a041a178e80dd729495e52e9dd253524d2a Mon Sep 17 00:00:00 2001 From: "allen.wq" Date: Tue, 19 May 2020 10:27:15 +0800 Subject: [PATCH] add implementation of http/https protocol Signed-off-by: allen.wq --- pkg/protocol/http/http.go | 191 +++++++++++++++++++++++++++++ pkg/protocol/http/md.go | 51 ++++++++ pkg/protocol/http/md_test.go | 65 ++++++++++ pkg/protocol/http/resource.go | 134 ++++++++++++++++++++ pkg/protocol/http/resource_test.go | 113 +++++++++++++++++ pkg/protocol/url.go | 51 ++++++++ 6 files changed, 605 insertions(+) create mode 100644 pkg/protocol/http/http.go create mode 100644 pkg/protocol/http/md.go create mode 100644 pkg/protocol/http/md_test.go create mode 100644 pkg/protocol/http/resource.go create mode 100644 pkg/protocol/http/resource_test.go diff --git a/pkg/protocol/http/http.go b/pkg/protocol/http/http.go new file mode 100644 index 000000000..5ea904e68 --- /dev/null +++ b/pkg/protocol/http/http.go @@ -0,0 +1,191 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package http + +import ( + "crypto/tls" + "fmt" + "net" + "net/http" + "time" + + "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/dragonflyoss/Dragonfly/pkg/protocol" +) + +var ( + // DefaultTransport is default implementation of http.Transport. + DefaultTransport = newDefaultTransport() + + // DefaultClient is default implementation of Client. + DefaultClient = &Client{ + client: &http.Client{Transport: DefaultTransport}, + transport: DefaultTransport, + } +) + +const ( + // http protocol name + ProtocolHTTPName = "http" + + // https protocol name + ProtocolHTTPSName = "https" +) + +func init() { + protocol.RegisterProtocol(ProtocolHTTPName, &ClientBuilder{}) + protocol.RegisterProtocol(ProtocolHTTPSName, &ClientBuilder{supportHTTPS: true}) +} + +const ( + HTTPTransport = "http.transport" + TLSConfig = "tls.config" +) + +func newDefaultTransport() *http.Transport { + // copy from http.DefaultTransport + return &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + }).DialContext, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + } +} + +// ClientOpt is the argument of NewProtocolClient. +// ClientOpt supports some opt by key, such as "http.transport", "tls.config". +// if not set, default opt will be used. +type ClientOpt struct { + opt map[string]interface{} +} + +func NewClientOpt() *ClientOpt { + return &ClientOpt{ + opt: make(map[string]interface{}), + } +} + +func (opt *ClientOpt) Set(key string, value interface{}) error { + switch key { + case HTTPTransport: + if _, ok := value.(*http.Transport); !ok { + return errortypes.ErrConvertFailed + } + break + case TLSConfig: + if _, ok := value.(*tls.Config); !ok { + return errortypes.ErrConvertFailed + } + break + default: + return fmt.Errorf("not support") + } + + opt.opt[key] = value + return nil +} + +func (opt *ClientOpt) Get(key string) interface{} { + v, ok := opt.opt[key] + if !ok { + return nil + } + + return v +} + +var _ protocol.Client = &Client{} + +// Client is an implementation of protocol.Client for http protocol. +type Client struct { + client *http.Client + transport http.RoundTripper +} + +func (cli *Client) GetResource(url string, md protocol.Metadata) protocol.Resource { + var ( + hd *Headers + ) + + if md != nil { + h, ok := md.(*Headers) + if ok { + hd = h + } + } + + return &Resource{ + url: url, + hd: hd, + client: cli, + } +} + +// ClientBuilder is an implementation of protocol.ClientBuilder for http protocol. +type ClientBuilder struct { + supportHTTPS bool +} + +func (cb *ClientBuilder) NewProtocolClient(clientOpt interface{}) (protocol.Client, error) { + var ( + transport = DefaultTransport + tlsConfig *tls.Config + ) + + if clientOpt != nil { + opt, ok := clientOpt.(*ClientOpt) + if !ok { + return nil, errortypes.ErrConvertFailed + } + + tran := opt.Get(HTTPTransport) + if tran != nil { + transport = tran.(*http.Transport) + } + + config := opt.Get(TLSConfig) + if config != nil { + tlsConfig = config.(*tls.Config) + } + + // set tls config to transport + if config != nil { + if transport == DefaultTransport { + transport = newDefaultTransport() + } + + transport.TLSClientConfig = tlsConfig + } + } + + if cb.supportHTTPS { + if transport.TLSClientConfig == nil || transport.DialTLS == nil { + return nil, fmt.Errorf("in https mode, tls should be set") + } + } + + return &Client{ + client: &http.Client{Transport: transport}, + transport: transport, + }, nil +} diff --git a/pkg/protocol/http/md.go b/pkg/protocol/http/md.go new file mode 100644 index 000000000..3e0aee497 --- /dev/null +++ b/pkg/protocol/http/md.go @@ -0,0 +1,51 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package http + +import ( + "net/http" + + "github.com/dragonflyoss/Dragonfly/pkg/protocol" +) + +// NewHTTPMetaData generates an instance of protocol.Metadata. +func NewHTTPMetaData() protocol.Metadata { + return &Headers{ + Header: make(http.Header), + } +} + +// Headers is an implementation of protocol.Metadata. +type Headers struct { + http.Header +} + +func (hd *Headers) Get(key string) (interface{}, error) { + return hd.Header.Get(key), nil +} + +func (hd *Headers) Set(key string, value interface{}) { + hd.Header.Set(key, value.(string)) +} + +func (hd *Headers) Del(key string) { + hd.Header.Del(key) +} + +func (hd *Headers) All() interface{} { + return hd.Header +} diff --git a/pkg/protocol/http/md_test.go b/pkg/protocol/http/md_test.go new file mode 100644 index 000000000..fbc822729 --- /dev/null +++ b/pkg/protocol/http/md_test.go @@ -0,0 +1,65 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package http + +import ( + "net/http" + "testing" + + "github.com/dragonflyoss/Dragonfly/dfget/core/helper" + + "github.com/go-check/check" +) + +func Test(t *testing.T) { + check.TestingT(t) +} + +type HTTPSuite struct { + host string + port int + server *helper.MockFileServer +} + +func init() { + check.Suite(&HTTPSuite{}) +} + +func (suite *HTTPSuite) TestMd(c *check.C) { + md := NewHTTPMetaData() + md.Set("k1", "v1") + md.Set("k2", "v2") + + v1, err := md.Get("k1") + c.Assert(err, check.IsNil) + c.Assert(v1.(string), check.Equals, "v1") + + v2, err := md.Get("k2") + c.Assert(err, check.IsNil) + c.Assert(v2.(string), check.Equals, "v2") + + md.Del("k1") + v1, err = md.Get("k1") + c.Assert(err, check.IsNil) + c.Assert(v1.(string), check.Equals, "") + + hd, ok := md.All().(http.Header) + c.Assert(ok, check.Equals, true) + + v2 = hd.Get("k2") + c.Assert(v2, check.Equals, "v2") +} diff --git a/pkg/protocol/http/resource.go b/pkg/protocol/http/resource.go new file mode 100644 index 000000000..b66258d2c --- /dev/null +++ b/pkg/protocol/http/resource.go @@ -0,0 +1,134 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package http + +import ( + "context" + "fmt" + "io" + "net/http" + "strconv" + "time" + + "github.com/dragonflyoss/Dragonfly/dfget/config" + "github.com/dragonflyoss/Dragonfly/pkg/protocol" +) + +// Resource is an implementation of protocol.Resource for http protocol. +type Resource struct { + url string + hd *Headers + client *Client +} + +func (rs *Resource) Read(ctx context.Context, off int64, size int64) (rc io.ReadCloser, err error) { + req, err := rs.newRequest(ctx, off, size) + if err != nil { + return nil, err + } + + res, err := rs.doRequest(req) + if err != nil { + return nil, err + } + + defer func() { + if err != nil && res.Body != nil { + res.Body.Close() + } + }() + + if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusPartialContent { + return nil, fmt.Errorf("respnose code is not 200 or 206") + } + + return res.Body, nil +} + +func (rs *Resource) Length(ctx context.Context) (int64, error) { + timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + req, err := rs.newRequest(timeoutCtx, 0, 0) + if err != nil { + return 0, err + } + + res, err := rs.doRequest(req) + if err != nil { + return 0, err + } + + defer res.Body.Close() + lenStr := res.Header.Get(config.StrContentLength) + if lenStr == "" { + return 0, fmt.Errorf("failed to get content length") + } + + length, err := strconv.ParseInt(lenStr, 10, 64) + if err != nil { + return 0, fmt.Errorf("failed to prase %s to length: %sv", lenStr, err) + } + + return length, nil +} + +func (rs *Resource) Metadata(ctx context.Context) (protocol.Metadata, error) { + return rs.hd, nil +} + +func (rs *Resource) Expire(ctx context.Context) (bool, interface{}, error) { + // need to implementation + return false, nil, nil +} + +func (rs *Resource) Call(ctx context.Context, request interface{}) (response interface{}, err error) { + return nil, protocol.ErrNotImplementation +} + +func (rs *Resource) Close() error { + return nil +} + +func (rs *Resource) newRequest(ctx context.Context, off, size int64) (*http.Request, error) { + // off == 0 && size == 0 means all data. + if (off < 0 || size < 0) || (off > 0 && size == 0) { + return nil, fmt.Errorf("invalid argument") + } + + req, err := http.NewRequest(http.MethodGet, rs.url, nil) + if err != nil { + return nil, err + } + + if rs.hd != nil { + for k, v := range rs.hd.Header { + req.Header.Set(k, v[0]) + } + } + + if size > 0 { + req.Header.Set(config.StrRange, fmt.Sprintf("bytes=%d-%d", off, off+size-1)) + } + + req = req.WithContext(ctx) + return req, nil +} + +func (rs *Resource) doRequest(req *http.Request) (*http.Response, error) { + return rs.client.client.Do(req) +} diff --git a/pkg/protocol/http/resource_test.go b/pkg/protocol/http/resource_test.go new file mode 100644 index 000000000..1dc4fbd22 --- /dev/null +++ b/pkg/protocol/http/resource_test.go @@ -0,0 +1,113 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package http + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "math/rand" + "time" + + "github.com/dragonflyoss/Dragonfly/dfget/core/helper" + "github.com/dragonflyoss/Dragonfly/pkg/httputils" + + "github.com/go-check/check" +) + +func init() { + rand.Seed(time.Now().Unix()) +} + +func (suite *HTTPSuite) SetUpTest(c *check.C) { + suite.port = rand.Intn(1000) + 63000 + 15 + suite.host = fmt.Sprintf("127.0.0.1:%d", suite.port) + + suite.server = helper.NewMockFileServer() + err := suite.server.StartServer(context.Background(), suite.port) + c.Assert(err, check.IsNil) + + // 500KB + err = suite.server.RegisterFile("fileA", 500*1024, "abcde0123456789") + c.Assert(err, check.IsNil) +} + +func (suite *HTTPSuite) readFromFileServer(path string, off int64, size int64) ([]byte, error) { + url := fmt.Sprintf("http://%s/%s", suite.host, path) + header := map[string]string{} + + if size > 0 { + header["Range"] = fmt.Sprintf("bytes=%d-%d", off, off+size-1) + } + + code, data, err := httputils.GetWithHeaders(url, header, 5*time.Second) + if err != nil { + return nil, err + } + + if code >= 400 { + return nil, fmt.Errorf("resp code %d", code) + } + + return data, nil +} + +func (suite *HTTPSuite) checkDataWithFileServer(c *check.C, path string, off int64, size int64, obtainedRc io.ReadCloser) { + obtained, err := ioutil.ReadAll(obtainedRc) + c.Assert(err, check.IsNil) + defer obtainedRc.Close() + + expected, err := suite.readFromFileServer(path, off, size) + c.Assert(err, check.IsNil) + if string(obtained) != string(expected) { + c.Errorf("path %s, range [%d-%d]: get %s, expect %s", path, off, off+size-1, + string(obtained), string(expected)) + } + + c.Assert(string(obtained), check.Equals, string(expected)) +} + +func (suite *HTTPSuite) TestResource(c *check.C) { + fileName := "fileA" + fileLen := int64(500 * 1024) + + res := DefaultClient.GetResource(fmt.Sprintf("http://%s/%s", suite.host, fileName), nil) + length, err := res.Length(context.Background()) + c.Assert(err, check.IsNil) + c.Assert(length, check.Equals, fileLen) + + rc, err := res.Read(context.Background(), 0, 100) + c.Assert(err, check.IsNil) + suite.checkDataWithFileServer(c, fileName, 0, 100, rc) + + rc, err = res.Read(context.Background(), 1000, 100) + c.Assert(err, check.IsNil) + suite.checkDataWithFileServer(c, fileName, 1000, 100, rc) + + // all bytes + rc, err = res.Read(context.Background(), 0, 0) + c.Assert(err, check.IsNil) + suite.checkDataWithFileServer(c, fileName, 0, 0, rc) + + _, err = res.Read(context.Background(), 1000, 0) + c.Assert(err, check.NotNil) + + rc, err = res.Read(context.Background(), 1000, length-1000+1) + c.Assert(err, check.IsNil) + suite.checkDataWithFileServer(c, fileName, 1000, length-1000+1, rc) +} diff --git a/pkg/protocol/url.go b/pkg/protocol/url.go index 0aeeb4b8e..d1c97f261 100644 --- a/pkg/protocol/url.go +++ b/pkg/protocol/url.go @@ -18,7 +18,10 @@ package protocol import ( "context" + "errors" + "fmt" "io" + "sync" ) // Metadata defines how to operate the metadata. @@ -72,3 +75,51 @@ type ClientRegister interface { // GetClientBuilder gets the ClientBuilder by protocol. GetClientBuilder(protocol string) (ClientBuilder, error) } + +var ( + ErrNotImplementation = errors.New("not implementation") + ErrProtocolNotRegister = errors.New("protocol not register") + register = &defaultClientRegister{ + registerMap: make(map[string]ClientBuilder), + } +) + +// RegisterProtocol registers pair to defaultClientRegister. +func RegisterProtocol(protocol string, builder ClientBuilder) { + register.RegisterProtocol(protocol, builder) +} + +// GetClientBuilder get ClientBuilder by protocol in defaultClientRegister. +func GetClientBuilder(protocol string) (ClientBuilder, error) { + return register.GetClientBuilder(protocol) +} + +// defaultClientRegister is an implementation of ClientRegister. +type defaultClientRegister struct { + sync.RWMutex + registerMap map[string]ClientBuilder +} + +func (cliRegister *defaultClientRegister) RegisterProtocol(protocol string, builder ClientBuilder) { + cliRegister.Lock() + defer cliRegister.Unlock() + + _, ok := cliRegister.registerMap[protocol] + if ok { + panic(fmt.Sprintf("protocol %s has been register", protocol)) + } + + cliRegister.registerMap[protocol] = builder +} + +func (cliRegister *defaultClientRegister) GetClientBuilder(protocol string) (ClientBuilder, error) { + cliRegister.RLock() + defer cliRegister.RUnlock() + + builder, ok := cliRegister.registerMap[protocol] + if !ok { + return nil, ErrProtocolNotRegister + } + + return builder, nil +}