From b3222f561dc8a2a48670625b0c407e98aee39180 Mon Sep 17 00:00:00 2001 From: "allen.wq" Date: Wed, 1 Apr 2020 18:27:33 +0800 Subject: [PATCH] add lru queue and mockFileServer Signed-off-by: allen.wq --- dfget/core/helper/helper_test.go | 117 ++++++++++++++++++++ dfget/core/helper/test_helper.go | 184 +++++++++++++++++++++++++++++++ pkg/queue/lru_queue.go | 167 ++++++++++++++++++++++++++++ pkg/queue/lru_queue_test.go | 108 ++++++++++++++++++ 4 files changed, 576 insertions(+) create mode 100644 pkg/queue/lru_queue.go create mode 100644 pkg/queue/lru_queue_test.go diff --git a/dfget/core/helper/helper_test.go b/dfget/core/helper/helper_test.go index ed4cc9003..fb74ffa87 100644 --- a/dfget/core/helper/helper_test.go +++ b/dfget/core/helper/helper_test.go @@ -17,12 +17,17 @@ package helper import ( + "context" + "fmt" + "io/ioutil" "reflect" "runtime" "strings" "testing" "github.com/dragonflyoss/Dragonfly/dfget/config" + "github.com/dragonflyoss/Dragonfly/pkg/httputils" + "github.com/go-check/check" ) @@ -74,3 +79,115 @@ func (s *HelperTestSuite) TestDownloadPattern(c *check.C) { check.Commentf("f:%v pattern:%s", name(v.f), v.pattern)) } } + +func (s *HelperTestSuite) readFromFileServer(port int, path string, off int64, size int64) ([]byte, error) { + url := fmt.Sprintf("http://127.0.0.1:%d/%s", port, path) + header := map[string]string{} + + if size > 0 { + header["Range"] = fmt.Sprintf("bytes=%d-%d", off, off+size-1) + } + + resp, err := httputils.HTTPGet(url, header) + if err != nil { + return nil, err + } + + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + return nil, fmt.Errorf("resp code %d", resp.StatusCode) + } + + return ioutil.ReadAll(resp.Body) +} + +func (s *HelperTestSuite) getRepeatStr(data []byte, size int64) []byte { + for { + if int64(len(data)) >= size { + break + } + + newData := make([]byte, len(data)*2) + copy(newData, data) + copy(newData[len(data):], data) + data = newData + } + + return data[:size] +} + +func (s *HelperTestSuite) TestMockFileServer(c *check.C) { + var ( + data []byte + err error + ) + + // run server + mfs := NewMockFileServer() + err = mfs.StartServer(context.Background(), 10011) + c.Assert(err, check.IsNil) + + // register fileA + err = mfs.RegisterFile("fileA", 100, "a") + c.Assert(err, check.IsNil) + + // test fileA + // read 0-9 + data, err = s.readFromFileServer(mfs.Port, "fileA", 0, 10) + c.Assert(err, check.IsNil) + c.Assert(string(data), check.Equals, string(s.getRepeatStr([]byte("a"), 10))) + + // read 1-20 + data, err = s.readFromFileServer(mfs.Port, "fileA", 1, 20) + c.Assert(err, check.IsNil) + c.Assert(string(data), check.Equals, string(s.getRepeatStr([]byte("a"), 20))) + + // read 1-100 + data, err = s.readFromFileServer(mfs.Port, "fileA", 1, 99) + c.Assert(err, check.IsNil) + c.Assert(string(data), check.Equals, string(s.getRepeatStr([]byte("a"), 99))) + + // read all + data, err = s.readFromFileServer(mfs.Port, "fileA", 0, -1) + c.Assert(err, check.IsNil) + c.Assert(string(data), check.Equals, string(s.getRepeatStr([]byte("a"), 100))) + + // read 0-99 + data, err = s.readFromFileServer(mfs.Port, "fileA", 0, 100) + c.Assert(err, check.IsNil) + c.Assert(string(data), check.Equals, string(s.getRepeatStr([]byte("a"), 100))) + + // register fileB + err = mfs.RegisterFile("fileB", 10000, "abcde") + c.Assert(err, check.IsNil) + + // read 0-9 + data, err = s.readFromFileServer(mfs.Port, "fileB", 0, 10) + c.Assert(err, check.IsNil) + c.Assert(string(data), check.Equals, string(s.getRepeatStr([]byte("abcde"), 10))) + + // read 8-100 + data, err = s.readFromFileServer(mfs.Port, "fileB", 8, 93) + c.Assert(err, check.IsNil) + expectStr := s.getRepeatStr([]byte("abcde"), 101) + c.Assert(string(data), check.Equals, string(expectStr[8:])) + + // read 1000-9999 + data, err = s.readFromFileServer(mfs.Port, "fileB", 1000, 9000) + c.Assert(err, check.IsNil) + expectStr = s.getRepeatStr([]byte("abcde"), 10000) + c.Assert(string(data), check.Equals, string(expectStr[1000:])) + + // read 1001-9000 + data, err = s.readFromFileServer(mfs.Port, "fileB", 1001, 8000) + c.Assert(err, check.IsNil) + expectStr = s.getRepeatStr([]byte("abcde"), 9001) + c.Assert(string(data), check.Equals, string(expectStr[1001:])) + + // read all + data, err = s.readFromFileServer(mfs.Port, "fileB", 0, -1) + c.Assert(err, check.IsNil) + expectStr = s.getRepeatStr([]byte("abcde"), 10000) + c.Assert(string(data), check.Equals, string(expectStr)) +} diff --git a/dfget/core/helper/test_helper.go b/dfget/core/helper/test_helper.go index 23d556406..8ccd8acb9 100644 --- a/dfget/core/helper/test_helper.go +++ b/dfget/core/helper/test_helper.go @@ -17,12 +17,18 @@ package helper import ( + "context" "fmt" "io" "io/ioutil" + "math" "math/rand" + "net" + "net/http" "os" "path/filepath" + "strings" + "sync" api_types "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/dfget/config" @@ -30,6 +36,7 @@ import ( "github.com/dragonflyoss/Dragonfly/dfget/types" "github.com/dragonflyoss/Dragonfly/pkg/constants" "github.com/dragonflyoss/Dragonfly/pkg/fileutils" + "github.com/dragonflyoss/Dragonfly/pkg/httputils" "github.com/sirupsen/logrus" ) @@ -210,3 +217,180 @@ func CreateRegisterFunc() RegisterFuncType { return nil, nil } } + +// MockFileServer mocks the file server. +type MockFileServer struct { + sync.Mutex + Port int + fileMap map[string]*mockFile + sr *http.Server +} + +func NewMockFileServer() *MockFileServer { + return &MockFileServer{ + fileMap: make(map[string]*mockFile), + } +} + +// StartServer asynchronously starts the server, it will not be blocked. +func (fs *MockFileServer) StartServer(ctx context.Context, port int) error { + addr, err := net.ResolveTCPAddr("", fmt.Sprintf(":%d", port)) + if err != nil { + return err + } + + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return err + } + + fs.Port = addr.Port + sr := &http.Server{} + sr.Handler = fs + fs.sr = sr + + go func() { + if err := fs.sr.Serve(l); err != nil { + panic(err) + } + }() + + go func() { + for { + select { + case <-ctx.Done(): + fs.sr.Close() + return + } + } + }() + + return nil +} + +func (fs *MockFileServer) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + var ( + err error + reqRange *httputils.RangeStruct + ) + + if req.Method != http.MethodGet { + resp.WriteHeader(http.StatusNotFound) + return + } + + path := req.URL.Path + path = strings.Trim(path, "/") + + fs.Lock() + mf, exist := fs.fileMap[path] + fs.Unlock() + + if !exist { + resp.WriteHeader(http.StatusNotFound) + return + } + + rangeSt := []*httputils.RangeStruct{} + rangeStr := req.Header.Get("Range") + + if rangeStr != "" { + rangeSt, err = httputils.GetRangeSE(rangeStr, math.MaxInt64) + if err != nil { + resp.WriteHeader(http.StatusBadRequest) + return + } + } + + if len(rangeSt) > 0 { + reqRange = rangeSt[0] + } + + fs.MockResp(resp, mf, reqRange) +} + +func (fs *MockFileServer) RegisterFile(path string, size int64, repeatStr string) error { + fs.Lock() + defer fs.Unlock() + + path = strings.Trim(path, "/") + _, exist := fs.fileMap[path] + if exist { + return os.ErrExist + } + + data := []byte(repeatStr) + if len(data) < 1024 { + for { + newData := make([]byte, len(data)*2) + copy(newData, data) + copy(newData[len(data):], data) + data = newData + + if len(data) >= 1024 { + break + } + } + } + + fs.fileMap[path] = &mockFile{ + path: path, + size: size, + repeatStr: data, + } + + return nil +} + +func (fs *MockFileServer) UnRegisterFile(path string) { + fs.Lock() + defer fs.Unlock() + + delete(fs.fileMap, strings.Trim(path, "/")) +} + +func (fs *MockFileServer) MockResp(resp http.ResponseWriter, mf *mockFile, rangeSt *httputils.RangeStruct) { + var ( + respCode int + start int64 + end = mf.size - 1 + ) + if rangeSt != nil { + start = rangeSt.StartIndex + if rangeSt.EndIndex < end { + end = rangeSt.EndIndex + } + respCode = http.StatusPartialContent + } else { + respCode = http.StatusOK + } + + resp.Header().Set("Content-Length", fmt.Sprintf("%d", end-start+1)) + resp.WriteHeader(respCode) + + repeatStrLen := int64(len(mf.repeatStr)) + strIndex := start % int64(repeatStrLen) + + for { + if start > end { + break + } + + copyDataLen := repeatStrLen - strIndex + if copyDataLen > (end - start + 1) { + copyDataLen = end - start + 1 + } + + resp.Write(mf.repeatStr[strIndex : copyDataLen+strIndex]) + strIndex = 0 + start += copyDataLen + } + + return +} + +type mockFile struct { + path string + size int64 + repeatStr []byte +} diff --git a/pkg/queue/lru_queue.go b/pkg/queue/lru_queue.go new file mode 100644 index 000000000..02ae7a337 --- /dev/null +++ b/pkg/queue/lru_queue.go @@ -0,0 +1,167 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package queue + +import ( + "container/list" + "sync" + + "github.com/dragonflyoss/Dragonfly/pkg/errortypes" +) + +// cQElementData is the value of list.Element.Value. +// It records the key and data of item. +type cQElementData struct { + key string + data interface{} +} + +// LRUQueue is implementation of LRU. +type LRUQueue struct { + lock sync.Mutex + capacity int + + itemMap map[string]*list.Element + l *list.List +} + +func NewLRUQueue(capacity int) *LRUQueue { + return &LRUQueue{ + capacity: capacity, + itemMap: make(map[string]*list.Element, capacity), + l: list.New(), + } +} + +// Put puts item to front, return the obsolete item +func (q *LRUQueue) Put(key string, data interface{}) (obsoleteKey string, obsoleteData interface{}) { + q.lock.Lock() + defer q.lock.Unlock() + + if i, ok := q.itemMap[key]; ok { + i.Value.(*cQElementData).data = data + q.putAtFront(i) + return + } + + if len(q.itemMap) >= q.capacity { + // remove the earliest item + i := q.removeFromTail() + if i != nil { + delete(q.itemMap, i.Value.(*cQElementData).key) + obsoleteKey = i.Value.(*cQElementData).key + obsoleteData = i.Value.(*cQElementData).data + } + } + + i := q.putValue(&cQElementData{key: key, data: data}) + q.itemMap[key] = i + return +} + +// Get will return the item by key. And it will put the item to front. +func (q *LRUQueue) Get(key string) (interface{}, error) { + q.lock.Lock() + defer q.lock.Unlock() + + data, exist := q.itemMap[key] + if !exist { + return nil, errortypes.ErrDataNotFound + } + + q.putAtFront(data) + + return data.Value.(*cQElementData).data, nil +} + +// GetFront will get several items from front and not poll out them. +func (q *LRUQueue) GetFront(count int) []interface{} { + if count <= 0 { + return nil + } + + q.lock.Lock() + defer q.lock.Unlock() + + result := make([]interface{}, count) + item := q.l.Front() + index := 0 + for { + if item == nil { + break + } + + result[index] = item.Value.(*cQElementData).data + index++ + if index >= count { + break + } + + item = item.Next() + } + + return result[:index] +} + +// GetItemByKey will return the item by key. But it will not put the item to front. +func (q *LRUQueue) GetItemByKey(key string) (interface{}, error) { + q.lock.Lock() + defer q.lock.Unlock() + + if data, exist := q.itemMap[key]; exist { + return data.Value.(*cQElementData).data, nil + } + + return nil, errortypes.ErrDataNotFound +} + +// Delete deletes the item by key, return the deleted item if item exists. +func (q *LRUQueue) Delete(key string) interface{} { + q.lock.Lock() + defer q.lock.Unlock() + + data, exist := q.itemMap[key] + if !exist { + return nil + } + + retData := data.Value.(*cQElementData).data + delete(q.itemMap, key) + q.removeElement(data) + + return retData +} + +func (q *LRUQueue) putAtFront(i *list.Element) { + q.l.MoveToFront(i) +} + +func (q *LRUQueue) putValue(data interface{}) *list.Element { + e := q.l.PushFront(data) + return e +} + +func (q *LRUQueue) removeFromTail() *list.Element { + e := q.l.Back() + q.l.Remove(e) + + return e +} + +func (q *LRUQueue) removeElement(i *list.Element) { + q.l.Remove(i) +} diff --git a/pkg/queue/lru_queue_test.go b/pkg/queue/lru_queue_test.go new file mode 100644 index 000000000..e7d063b5a --- /dev/null +++ b/pkg/queue/lru_queue_test.go @@ -0,0 +1,108 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package queue + +import ( + "github.com/go-check/check" +) + +func (suite *DFGetUtilSuite) TestLRUQueue(c *check.C) { + q := NewLRUQueue(5) + + q.Put("key1", 1) + + v1, err := q.GetItemByKey("key1") + c.Assert(err, check.IsNil) + c.Assert(v1.(int), check.Equals, 1) + + items := q.GetFront(1) + c.Assert(len(items), check.Equals, 1) + c.Assert(items[0], check.Equals, 1) + + q.Put("key2", 2) + q.Put("key1", 3) + + v1, err = q.GetItemByKey("key1") + c.Assert(err, check.IsNil) + c.Assert(v1.(int), check.Equals, 3) + + items = q.GetFront(10) + c.Assert(len(items), check.Equals, 2) + c.Assert(items[0], check.Equals, 3) + c.Assert(items[1], check.Equals, 2) + + items = q.GetFront(1) + c.Assert(len(items), check.Equals, 1) + c.Assert(items[0], check.Equals, 3) + + _, err = q.GetItemByKey("key3") + c.Assert(err, check.NotNil) + + obsoleteKey, _ := q.Put("key3", "data3") + c.Assert(obsoleteKey, check.Equals, "") + obsoleteKey, _ = q.Put("key4", "data4") + c.Assert(obsoleteKey, check.Equals, "") + obsoleteKey, _ = q.Put("key5", "data5") + c.Assert(obsoleteKey, check.Equals, "") + + items = q.GetFront(10) + c.Assert(len(items), check.Equals, 5) + c.Assert(items[0], check.Equals, "data5") + c.Assert(items[1], check.Equals, "data4") + c.Assert(items[2], check.Equals, "data3") + c.Assert(items[3], check.Equals, 3) + c.Assert(items[4], check.Equals, 2) + + obsoleteKey, obsoleteData := q.Put("key6", "data6") + c.Assert(obsoleteKey, check.Equals, "key2") + c.Assert(obsoleteData.(int), check.Equals, 2) + _, err = q.GetItemByKey("key2") + c.Assert(err, check.NotNil) + + items = q.GetFront(5) + c.Assert(len(items), check.Equals, 5) + c.Assert(items[0], check.Equals, "data6") + c.Assert(items[1], check.Equals, "data5") + c.Assert(items[2], check.Equals, "data4") + c.Assert(items[3], check.Equals, "data3") + c.Assert(items[4], check.Equals, 3) + + v1, err = q.Get("key5") + c.Assert(err, check.IsNil) + c.Assert(v1.(string), check.Equals, "data5") + + items = q.GetFront(5) + c.Assert(len(items), check.Equals, 5) + c.Assert(items[0], check.Equals, "data5") + c.Assert(items[1], check.Equals, "data6") + c.Assert(items[2], check.Equals, "data4") + c.Assert(items[3], check.Equals, "data3") + c.Assert(items[4], check.Equals, 3) + + v1 = q.Delete("key3") + c.Assert(v1, check.Equals, "data3") + + items = q.GetFront(5) + c.Assert(len(items), check.Equals, 4) + c.Assert(items[0], check.Equals, "data5") + c.Assert(items[1], check.Equals, "data6") + c.Assert(items[2], check.Equals, "data4") + c.Assert(items[3], check.Equals, 3) + + v1 = q.Delete("key3") + c.Assert(v1, check.IsNil) +}