-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FileManager #112
FileManager #112
Changes from 25 commits
a53328f
e589e33
4b8572b
24c303f
2081766
c09e0cf
23c1177
380b942
bac9cc8
b96c270
c3dbe34
f756307
4ab2a7f
463df28
ff95f05
b2d919f
419a678
cb75cac
737f7a3
3636d04
9c41efc
ef53907
34ce72c
97938cb
fb19caf
42b0890
026669a
7e5f9df
db395cd
6f219e9
d6b2bd0
bbee71e
20d13a4
cfd5767
2750df9
1b3aa33
7a32638
a17e0b1
8966691
2485891
4c7cf4e
930b8cd
00880ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package main | ||
|
||
import ( | ||
"flag" | ||
"fmt" | ||
"net/http" | ||
|
||
"github.com/PaddlePaddle/cloud/go/filemanager/pfsserver" | ||
log "github.com/golang/glog" | ||
) | ||
|
||
func main() { | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpicking here, I know new line can separate regions of code of different functionalities, but a new line at the beginning of a function does not seem necessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
port := flag.Int("port", 8080, "port of server") | ||
ip := flag.String("ip", "0.0.0.0", "ip of server") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious, when will the ip of server necessary? I usually do ":8080" rather than "0.0.0.0:8080" so that I don't need to specify the ip. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Listening on an IP address is always useful, ":8080" is the same as "0.0.0.0:8080". When we need to bind TCP server to the specified network(in a LAN or VLAN or VPC), we need to specify the IP of the network card. |
||
flag.Parse() | ||
|
||
router := pfsserver.NewRouter() | ||
addr := fmt.Sprintf("%s:%d", *ip, *port) | ||
|
||
log.Infof("server on:%s\n", addr) | ||
log.Fatal(http.ListenAndServe(addr, router)) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
package pfsmodules | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"io" | ||
"net/url" | ||
"os" | ||
"strconv" | ||
|
||
log "github.com/golang/glog" | ||
) | ||
|
||
// Chunk respresents a chunk info | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please see https://github.com/golang/go/wiki/CodeReviewComments#comment-sentences , comments should be a full sentence. (end with a period ".") There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
type Chunk struct { | ||
Path string | ||
Offset int64 | ||
Size int64 | ||
} | ||
|
||
// NewChunk get a new Chunk | ||
func NewChunk(path string, offset, chunkSize int64) *Chunk { | ||
return &Chunk{ | ||
Path: path, | ||
Offset: offset, | ||
Size: chunkSize, | ||
} | ||
} | ||
|
||
// ToURLParam encodes variables to url encoding parameters | ||
func (p *Chunk) ToURLParam() string { | ||
parameters := url.Values{} | ||
parameters.Add("path", p.Path) | ||
|
||
str := fmt.Sprint(p.Offset) | ||
parameters.Add("offset", str) | ||
|
||
str = fmt.Sprint(p.Size) | ||
parameters.Add("chunksize", str) | ||
|
||
return parameters.Encode() | ||
} | ||
|
||
// ParseChunk get a Chunk struct from path | ||
// path example: | ||
// path=/pfs/datacenter1/1.txt&offset=4096&chunksize=4096 | ||
func ParseChunk(path string) (*Chunk, error) { | ||
cmd := Chunk{} | ||
|
||
m, err := url.ParseQuery(path) | ||
if err != nil || | ||
len(m["path"]) == 0 || | ||
len(m["offset"]) == 0 || | ||
len(m["chunksize"]) == 0 { | ||
return nil, errors.New(StatusJSONErr) | ||
} | ||
|
||
cmd.Path = m["path"][0] | ||
cmd.Offset, err = strconv.ParseInt(m["offset"][0], 10, 64) | ||
if err != nil { | ||
return nil, errors.New(StatusJSONErr) | ||
} | ||
|
||
chunkSize, err := strconv.ParseInt(m["chunksize"][0], 10, 64) | ||
if err != nil { | ||
return nil, errors.New(StatusBadChunkSize) | ||
} | ||
cmd.Size = chunkSize | ||
|
||
return &cmd, nil | ||
} | ||
|
||
// LoadChunkData loads a specified chunk to io.Writer | ||
func (p *Chunk) LoadChunkData(w io.Writer) error { | ||
f, err := os.Open(p.Path) | ||
if err != nil { | ||
return err | ||
} | ||
defer Close(f) | ||
|
||
_, err = f.Seek(p.Offset, 0) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
writen, err := io.CopyN(w, f, p.Size) | ||
log.V(2).Infof("writen:%d\n", writen) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought this function loads data, why here says "written"? Maybe "loaded" is more appropriate? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
return err | ||
} | ||
|
||
// SaveChunkData save data from io.Reader | ||
func (p *Chunk) SaveChunkData(r io.Reader) error { | ||
f, err := os.OpenFile(p.Path, os.O_WRONLY, 0600) | ||
if err != nil { | ||
return err | ||
} | ||
defer Close(f) | ||
|
||
_, err = f.Seek(p.Offset, 0) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
writen, err := io.CopyN(f, r, p.Size) | ||
log.V(2).Infof("chunksize:%d writen:%d\n", p.Size, writen) | ||
return err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,218 @@ | ||
package pfsmodules | ||
|
||
import ( | ||
"crypto/md5" | ||
"encoding/hex" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"net/url" | ||
"os" | ||
"sort" | ||
"strconv" | ||
) | ||
|
||
const ( | ||
defaultMaxChunkSize = 4 * 1024 * 1024 | ||
defaultMinChunkSize = 4 * 1024 | ||
) | ||
const ( | ||
chunkMetaCmdName = "GetChunkMeta" | ||
) | ||
|
||
// ChunkMeta holds the chunk meta's info | ||
type ChunkMeta struct { | ||
Offset int64 `json:"offset"` | ||
Checksum string `json:"checksum"` | ||
Len int64 `json:"len"` | ||
} | ||
|
||
// ChunkMetaCmd is a command | ||
type ChunkMetaCmd struct { | ||
Method string `json:"method"` | ||
FilePath string `json:"path"` | ||
ChunkSize int64 `json:"chunksize"` | ||
} | ||
|
||
// ToURLParam encodes ChunkMetaCmd to URL encoding string | ||
func (p *ChunkMetaCmd) ToURLParam() string { | ||
parameters := url.Values{} | ||
parameters.Add("method", p.Method) | ||
parameters.Add("path", p.FilePath) | ||
|
||
str := fmt.Sprint(p.ChunkSize) | ||
parameters.Add("chunksize", str) | ||
|
||
return parameters.Encode() | ||
} | ||
|
||
// ToJSON encodes ChunkMetaCmd to JSON string | ||
func (p *ChunkMetaCmd) ToJSON() ([]byte, error) { | ||
return json.Marshal(p) | ||
} | ||
|
||
// Run is a functions which run ChunkMetaCmd | ||
func (p *ChunkMetaCmd) Run() (interface{}, error) { | ||
return GetChunkMeta(p.FilePath, p.ChunkSize) | ||
} | ||
|
||
func (p *ChunkMetaCmd) checkChunkSize() error { | ||
if p.ChunkSize < defaultMinChunkSize || | ||
p.ChunkSize > defaultMaxChunkSize { | ||
return errors.New(StatusBadChunkSize) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// CloudCheck checks the conditions when running on cloud | ||
func (p *ChunkMetaCmd) CloudCheck() error { | ||
if !IsCloudPath(p.FilePath) { | ||
return errors.New(StatusShouldBePfsPath + ": p.FilePath") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 惭愧,搞错了。 |
||
} | ||
if !CheckUser(p.FilePath) { | ||
return errors.New(StatusUnAuthorized + ":" + p.FilePath) | ||
} | ||
return p.checkChunkSize() | ||
} | ||
|
||
// LocalCheck checks the conditions when running local | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. running local -> running locally There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
func (p *ChunkMetaCmd) LocalCheck() error { | ||
return p.checkChunkSize() | ||
} | ||
|
||
// NewChunkMetaCmdFromURLParam get a new ChunkMetaCmd | ||
func NewChunkMetaCmdFromURLParam(r *http.Request) (*ChunkMetaCmd, error) { | ||
method := r.URL.Query().Get("method") | ||
path := r.URL.Query().Get("path") | ||
chunkStr := r.URL.Query().Get("chunksize") | ||
|
||
if len(method) == 0 || | ||
method != chunkMetaCmdName || | ||
len(path) < 4 || | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does the constant 4 mean? I guess because There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
len(chunkStr) == 0 { | ||
return nil, errors.New(http.StatusText(http.StatusBadRequest)) | ||
} | ||
|
||
chunkSize, err := strconv.ParseInt(chunkStr, 10, 64) | ||
if err != nil { | ||
return nil, errors.New(StatusBadChunkSize) | ||
} | ||
|
||
return &ChunkMetaCmd{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is using struct literal ( To reduce complexity, I recommend unless there is some other initialization that a struct literal can not do, don't use a new function for it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个当时主要考虑可以少写一部分,如 |
||
Method: method, | ||
FilePath: path, | ||
ChunkSize: chunkSize, | ||
}, nil | ||
} | ||
|
||
// NewChunkMetaCmd get a new ChunkMetaCmd with path and chunksize | ||
func NewChunkMetaCmd(path string, chunkSize int64) *ChunkMetaCmd { | ||
return &ChunkMetaCmd{ | ||
Method: chunkMetaCmdName, | ||
FilePath: path, | ||
ChunkSize: chunkSize, | ||
} | ||
} | ||
|
||
type metaSlice []ChunkMeta | ||
|
||
func (a metaSlice) Len() int { return len(a) } | ||
func (a metaSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } | ||
func (a metaSlice) Less(i, j int) bool { return a[i].Offset < a[j].Offset } | ||
|
||
// GetDiffChunkMeta gets difference between srcMeta and dstMeta | ||
func GetDiffChunkMeta(srcMeta []ChunkMeta, dstMeta []ChunkMeta) ([]ChunkMeta, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function needs a test case. Btw, there is no test case in this PR. Conservatively, We need to aim for 70% test coverage. Anyway, this PR is getting big, other test cases can be submitted by another PR. (but I think this function need a test case for this PR.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 好的 |
||
if len(dstMeta) == 0 || len(srcMeta) == 0 { | ||
return srcMeta, nil | ||
} | ||
|
||
if !sort.IsSorted(metaSlice(srcMeta)) { | ||
sort.Sort(metaSlice(srcMeta)) | ||
} | ||
|
||
if !sort.IsSorted(metaSlice(dstMeta)) { | ||
sort.Sort(metaSlice(dstMeta)) | ||
} | ||
|
||
dstIdx := 0 | ||
srcIdx := 0 | ||
diff := make([]ChunkMeta, 0, len(srcMeta)) | ||
|
||
for { | ||
if srcMeta[srcIdx].Offset < dstMeta[dstIdx].Offset { | ||
diff = append(diff, srcMeta[srcIdx]) | ||
srcIdx++ | ||
} else if srcMeta[srcIdx].Offset > dstMeta[dstIdx].Offset { | ||
dstIdx++ | ||
} else { | ||
if srcMeta[srcIdx].Checksum != dstMeta[dstIdx].Checksum { | ||
diff = append(diff, srcMeta[srcIdx]) | ||
} | ||
|
||
dstIdx++ | ||
srcIdx++ | ||
} | ||
|
||
if dstIdx >= len(dstMeta) { | ||
break | ||
} | ||
|
||
if srcIdx >= len(srcMeta) { | ||
break | ||
} | ||
} | ||
|
||
if srcIdx < len(srcMeta) { | ||
diff = append(diff, srcMeta[srcIdx:]...) | ||
} | ||
|
||
return diff, nil | ||
} | ||
|
||
// GetChunkMeta gets chunk metas from path of file | ||
func GetChunkMeta(path string, len int64) ([]ChunkMeta, error) { | ||
f, err := os.Open(path) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer Close(f) | ||
|
||
if len > defaultMaxChunkSize || len < defaultMinChunkSize { | ||
return nil, errors.New(StatusBadChunkSize) | ||
} | ||
|
||
fi, err := f.Stat() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
metas := make([]ChunkMeta, 0, fi.Size()/len+1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I don't understand why there is a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
data := make([]byte, len) | ||
offset := int64(0) | ||
|
||
for { | ||
n, err := f.Read(data) | ||
if err != nil && err != io.EOF { | ||
return metas, err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't partial meta useless, maybe just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
|
||
if err == io.EOF { | ||
break | ||
} | ||
|
||
m := ChunkMeta{} | ||
m.Offset = offset | ||
sum := md5.Sum(data[:n]) | ||
m.Checksum = hex.EncodeToString(sum[:]) | ||
m.Len = int64(n) | ||
|
||
metas = append(metas, m) | ||
|
||
offset += int64(n) | ||
} | ||
|
||
return metas, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use "github.com/namsral/flag" (a drop in replacement of "flag") instead of "flag", since the former can parse env variable as input.
Please see this discussion: PaddlePaddle/Paddle#2245 (comment)
Usage of "github.com/namsral/flag" see: https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/go/cmd/pserver/pserver.go#L9
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@helinwang @typhoonzero
github.com/golang/glog
里边判断了而它用的是go原生的
flag
,所以用了github.com/namsral/flag
,每一行打印中都会出现类似ERROR: logging before flag.Parse: I0531 11:34:47.868818 3294 main.go:21]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
鉴于需要使用
glog
的level日志的功能,同时flag
的功能已经足够用了。先用flag
。There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. It's fine as long as we don't need to parse environment variable for file server.