-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FileManager #112
FileManager #112
Changes from 20 commits
a53328f
e589e33
4b8572b
24c303f
2081766
c09e0cf
23c1177
380b942
bac9cc8
b96c270
c3dbe34
f756307
4ab2a7f
463df28
ff95f05
b2d919f
419a678
cb75cac
737f7a3
3636d04
9c41efc
ef53907
34ce72c
97938cb
fb19caf
42b0890
026669a
7e5f9df
db395cd
6f219e9
d6b2bd0
bbee71e
20d13a4
cfd5767
2750df9
1b3aa33
7a32638
a17e0b1
8966691
2485891
4c7cf4e
930b8cd
00880ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package main | ||
|
||
import ( | ||
"flag" | ||
"fmt" | ||
"github.com/PaddlePaddle/cloud/go/filemanager/pfsserver" | ||
"net/http" | ||
|
||
log "github.com/golang/glog" | ||
) | ||
|
||
func main() { | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpicking here, I know new line can separate regions of code of different functionalities, but a new line at the beginning of a function does not seem necessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
router := pfsserver.NewRouter() | ||
|
||
portPtr := flag.Int("port", 8080, "listen port") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
flag.Parse() | ||
|
||
addr := fmt.Sprintf("0.0.0.0:%d", *portPtr) | ||
|
||
log.Infof("server on:%s\n", addr) | ||
log.Fatal(http.ListenAndServe(addr, router)) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
package pfsmod | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This package name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
log "github.com/golang/glog" | ||
"io" | ||
"net/url" | ||
"os" | ||
"strconv" | ||
) | ||
|
||
//ChunkCmd structure | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code next line already shows it's a struct, I think we need a more descriptive comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
type ChunkCmd struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need a comment that describe what is ChunkCmd, I am confused why there is "cmd", from the definition, I can not see what "cmd" does? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
Path string | ||
Offset int64 | ||
ChunkSize int64 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Else if you intend to save the size of current chunk, it should be named There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
Data []byte | ||
} | ||
|
||
//NewChunkCmd get a new ChunkCmd | ||
func NewChunkCmd(path string, offset, chunkSize int64) *ChunkCmd { | ||
return &ChunkCmd{ | ||
Path: path, | ||
Offset: offset, | ||
ChunkSize: chunkSize, | ||
} | ||
} | ||
|
||
//ToURLParam encodes variables to url encoding parameters | ||
func (p *ChunkCmd) ToURLParam() string { | ||
parameters := url.Values{} | ||
parameters.Add("path", p.Path) | ||
|
||
str := fmt.Sprint(p.Offset) | ||
parameters.Add("offset", str) | ||
|
||
str = fmt.Sprint(p.ChunkSize) | ||
parameters.Add("chunksize", str) | ||
|
||
return parameters.Encode() | ||
} | ||
|
||
//ToJSON encodes chunkcmd to json string | ||
func (p *ChunkCmd) ToJSON() ([]byte, error) { | ||
return nil, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why it's returning There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
|
||
//Run function runs a ChunkCmd | ||
func (p *ChunkCmd) Run() (interface{}, error) { | ||
return nil, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why it's returning There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
|
||
//NewChunkCmdFromURLParam get a ChunkCmd structure | ||
// path example: | ||
// path=/pfs/datacenter1/1.txt&offset=4096&chunksize=4096 | ||
func NewChunkCmdFromURLParam(path string) (*ChunkCmd, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
cmd := ChunkCmd{} | ||
|
||
m, err := url.ParseQuery(path) | ||
if err != nil || | ||
len(m["path"]) == 0 || | ||
len(m["offset"]) == 0 || | ||
len(m["chunksize"]) == 0 { | ||
return nil, errors.New(StatusText(StatusJSONErr)) | ||
} | ||
|
||
//var err error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line should be removed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
cmd.Path = m["path"][0] | ||
cmd.Offset, err = strconv.ParseInt(m["offset"][0], 10, 64) | ||
if err != nil { | ||
return nil, errors.New(StatusText(StatusJSONErr)) | ||
} | ||
|
||
chunkSize, err := strconv.ParseInt(m["chunksize"][0], 10, 64) | ||
if err != nil { | ||
return nil, errors.New(StatusText(StatusBadChunkSize)) | ||
} | ||
cmd.ChunkSize = chunkSize | ||
|
||
return &cmd, nil | ||
} | ||
|
||
//LoadChunkData loads a specified chunk to w | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
func (p *ChunkCmd) LoadChunkData(w io.Writer) error { | ||
f, err := os.Open(p.Path) | ||
if err != nil { | ||
return err | ||
} | ||
defer Close(f) | ||
|
||
_, err = f.Seek(p.Offset, 0) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
writen, err := io.CopyN(w, f, p.ChunkSize) | ||
log.V(2).Infof("writen:%d\n", writen) | ||
return err | ||
} | ||
|
||
//SaveChunkData save data from r | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
func (p *ChunkCmd) SaveChunkData(r io.Reader) error { | ||
f, err := os.OpenFile(p.Path, os.O_WRONLY, 0600) | ||
if err != nil { | ||
return err | ||
} | ||
defer Close(f) | ||
|
||
_, err = f.Seek(p.Offset, 0) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
writen, err := io.CopyN(f, r, p.ChunkSize) | ||
log.V(2).Infof("chunksize:%d writen:%d\n", p.ChunkSize, writen) | ||
return err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
package pfsmod | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个后续补上。 |
||
|
||
import ( | ||
"crypto/md5" | ||
"encoding/hex" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"net/url" | ||
"os" | ||
"sort" | ||
"strconv" | ||
) | ||
|
||
const ( | ||
defaultMaxChunkSize = 4 * 1024 * 1024 | ||
defaultMinChunkSize = 4 * 1024 | ||
//DefaultChunkSize is the default chunk's size | ||
DefaultChunkSize = 2 * 1024 * 1024 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 哦。有道理。我本来是想把关于同一个主题的都放到一个文件中。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
) | ||
const ( | ||
chunkMetaCmdName = "GetChunkMeta" | ||
) | ||
|
||
//ChunkMeta holds the chunk meta's info | ||
type ChunkMeta struct { | ||
Offset int64 `json:"offset"` | ||
Checksum string `json:"checksum"` | ||
Len int64 `json:"len"` | ||
} | ||
|
||
//ChunkMetaCmd is a command | ||
type ChunkMetaCmd struct { | ||
Method string `json:"method"` | ||
FilePath string `json:"path"` | ||
ChunkSize int64 `json:"chunksize"` | ||
} | ||
|
||
//ToURLParam encodes ChunkMetaCmd to URL encoding string | ||
func (p *ChunkMetaCmd) ToURLParam() string { | ||
parameters := url.Values{} | ||
parameters.Add("method", p.Method) | ||
parameters.Add("path", p.FilePath) | ||
|
||
str := fmt.Sprint(p.ChunkSize) | ||
parameters.Add("chunksize", str) | ||
|
||
return parameters.Encode() | ||
} | ||
|
||
//ToJSON encodes ChunkMetaCmd to JSON string | ||
func (p *ChunkMetaCmd) ToJSON() ([]byte, error) { | ||
return json.Marshal(p) | ||
} | ||
|
||
//Run is a functions which run ChunkMetaCmd | ||
func (p *ChunkMetaCmd) Run() (interface{}, error) { | ||
return GetChunkMeta(p.FilePath, p.ChunkSize) | ||
} | ||
|
||
func (p *ChunkMetaCmd) checkChunkSize() error { | ||
if p.ChunkSize < defaultMinChunkSize || | ||
p.ChunkSize > defaultMaxChunkSize { | ||
return errors.New(StatusText(StatusBadChunkSize)) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
//CloudCheck checks the conditions when running on cloud | ||
func (p *ChunkMetaCmd) CloudCheck() error { | ||
if !IsCloudPath(p.FilePath) { | ||
return errors.New(StatusText(StatusShouldBePfsPath) + ": p.FilePath") | ||
} | ||
if !CheckUser(p.FilePath) { | ||
return errors.New(StatusText(StatusUnAuthorized) + ":" + p.FilePath) | ||
} | ||
return p.checkChunkSize() | ||
} | ||
|
||
//LocalCheck checks the conditions when running local | ||
func (p *ChunkMetaCmd) LocalCheck() error { | ||
return p.checkChunkSize() | ||
} | ||
|
||
//NewChunkMetaCmdFromURLParam get a new ChunkMetaCmd | ||
func NewChunkMetaCmdFromURLParam(r *http.Request) (*ChunkMetaCmd, error) { | ||
method := r.URL.Query().Get("method") | ||
path := r.URL.Query().Get("path") | ||
chunkStr := r.URL.Query().Get("chunksize") | ||
|
||
if len(method) == 0 || | ||
method != chunkMetaCmdName || | ||
len(path) < 4 || | ||
len(chunkStr) == 0 { | ||
return nil, errors.New(http.StatusText(http.StatusBadRequest)) | ||
} | ||
|
||
chunkSize, err := strconv.ParseInt(chunkStr, 10, 64) | ||
if err != nil { | ||
return nil, errors.New(StatusText(StatusBadChunkSize)) | ||
} | ||
|
||
return &ChunkMetaCmd{ | ||
Method: method, | ||
FilePath: path, | ||
ChunkSize: chunkSize, | ||
}, nil | ||
} | ||
|
||
//NewChunkMetaCmd get a new ChunkMetaCmd with path and chunksize | ||
func NewChunkMetaCmd(path string, chunkSize int64) *ChunkMetaCmd { | ||
return &ChunkMetaCmd{ | ||
Method: chunkMetaCmdName, | ||
FilePath: path, | ||
ChunkSize: chunkSize, | ||
} | ||
} | ||
|
||
type metaSlice []ChunkMeta | ||
|
||
func (a metaSlice) Len() int { return len(a) } | ||
func (a metaSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } | ||
func (a metaSlice) Less(i, j int) bool { return a[i].Offset < a[j].Offset } | ||
|
||
//GetDiffChunkMeta gets difference between srcMeta and dstMeta | ||
func GetDiffChunkMeta(srcMeta []ChunkMeta, dstMeta []ChunkMeta) ([]ChunkMeta, error) { | ||
if len(dstMeta) == 0 || len(srcMeta) == 0 { | ||
return srcMeta, nil | ||
} | ||
|
||
if !sort.IsSorted(metaSlice(srcMeta)) { | ||
sort.Sort(metaSlice(srcMeta)) | ||
} | ||
|
||
if !sort.IsSorted(metaSlice(dstMeta)) { | ||
sort.Sort(metaSlice(dstMeta)) | ||
} | ||
|
||
dstIdx := 0 | ||
srcIdx := 0 | ||
diff := make([]ChunkMeta, 0, len(srcMeta)) | ||
|
||
for { | ||
if srcMeta[srcIdx].Offset < dstMeta[dstIdx].Offset { | ||
diff = append(diff, srcMeta[srcIdx]) | ||
srcIdx++ | ||
} else if srcMeta[srcIdx].Offset > dstMeta[dstIdx].Offset { | ||
dstIdx++ | ||
} else { | ||
if srcMeta[srcIdx].Checksum != dstMeta[dstIdx].Checksum { | ||
diff = append(diff, srcMeta[srcIdx]) | ||
} | ||
|
||
dstIdx++ | ||
srcIdx++ | ||
} | ||
|
||
if dstIdx >= len(dstMeta) { | ||
break | ||
} | ||
|
||
if srcIdx >= len(srcMeta) { | ||
break | ||
} | ||
} | ||
|
||
if srcIdx < len(srcMeta) { | ||
diff = append(diff, srcMeta[srcIdx:]...) | ||
} | ||
|
||
return diff, nil | ||
} | ||
|
||
//GetChunkMeta gets chunk metas from path of file | ||
func GetChunkMeta(path string, len int64) ([]ChunkMeta, error) { | ||
f, err := os.Open(path) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer Close(f) | ||
|
||
if len > defaultMaxChunkSize || len < defaultMinChunkSize { | ||
return nil, errors.New(StatusText(StatusBadChunkSize)) | ||
} | ||
|
||
fi, err := f.Stat() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
metas := make([]ChunkMeta, 0, fi.Size()/len+1) | ||
data := make([]byte, len) | ||
offset := int64(0) | ||
|
||
for { | ||
n, err := f.Read(data) | ||
if err != nil && err != io.EOF { | ||
return metas, err | ||
} | ||
|
||
if err == io.EOF { | ||
break | ||
} | ||
|
||
//log.Println(n) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please no commented out code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
m := ChunkMeta{} | ||
m.Offset = offset | ||
sum := md5.Sum(data[:n]) | ||
m.Checksum = hex.EncodeToString(sum[:]) | ||
m.Len = int64(n) | ||
|
||
metas = append(metas, m) | ||
|
||
offset += int64(n) | ||
} | ||
|
||
return metas, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use "github.com/namsral/flag" (a drop in replacement of "flag") instead of "flag", since the former can parse env variable as input.
Please see this discussion: PaddlePaddle/Paddle#2245 (comment)
Usage of "github.com/namsral/flag" see: https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/go/cmd/pserver/pserver.go#L9
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@helinwang @typhoonzero
github.com/golang/glog
里边判断了而它用的是go原生的
flag
,所以用了github.com/namsral/flag
,每一行打印中都会出现类似ERROR: logging before flag.Parse: I0531 11:34:47.868818 3294 main.go:21]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
鉴于需要使用
glog
的level日志的功能,同时flag
的功能已经足够用了。先用flag
。There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. It's fine as long as we don't need to parse environment variable for file server.