-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FileManager #112
FileManager #112
Changes from 26 commits
a53328f
e589e33
4b8572b
24c303f
2081766
c09e0cf
23c1177
380b942
bac9cc8
b96c270
c3dbe34
f756307
4ab2a7f
463df28
ff95f05
b2d919f
419a678
cb75cac
737f7a3
3636d04
9c41efc
ef53907
34ce72c
97938cb
fb19caf
42b0890
026669a
7e5f9df
db395cd
6f219e9
d6b2bd0
bbee71e
20d13a4
cfd5767
2750df9
1b3aa33
7a32638
a17e0b1
8966691
2485891
4c7cf4e
930b8cd
00880ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package main | ||
|
||
import ( | ||
"flag" | ||
"fmt" | ||
"net/http" | ||
|
||
"github.com/PaddlePaddle/cloud/go/filemanager/pfsserver" | ||
log "github.com/golang/glog" | ||
) | ||
|
||
func main() { | ||
port := flag.Int("port", 8080, "port of server") | ||
ip := flag.String("ip", "0.0.0.0", "ip of server") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious, when will the ip of server necessary? I usually do ":8080" rather than "0.0.0.0:8080" so that I don't need to specify the ip. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Listening on an IP address is always useful, ":8080" is the same as "0.0.0.0:8080". When we need to bind TCP server to the specified network(in a LAN or VLAN or VPC), we need to specify the IP of the network card. |
||
flag.Parse() | ||
|
||
router := pfsserver.NewRouter() | ||
addr := fmt.Sprintf("%s:%d", *ip, *port) | ||
|
||
log.Infof("server on:%s\n", addr) | ||
log.Fatal(http.ListenAndServe(addr, router)) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
package pfsmodules | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"io" | ||
"net/url" | ||
"os" | ||
"strconv" | ||
|
||
log "github.com/golang/glog" | ||
) | ||
|
||
// Chunk respresents a chunk info. | ||
type Chunk struct { | ||
Path string | ||
Offset int64 | ||
Size int64 | ||
} | ||
|
||
// ToURLParam encodes variables to url encoding parameters. | ||
func (p *Chunk) ToURLParam() string { | ||
parameters := url.Values{} | ||
parameters.Add("path", p.Path) | ||
|
||
str := fmt.Sprint(p.Offset) | ||
parameters.Add("offset", str) | ||
|
||
str = fmt.Sprint(p.Size) | ||
parameters.Add("chunksize", str) | ||
|
||
return parameters.Encode() | ||
} | ||
|
||
// ParseChunk get a Chunk struct from path. | ||
// path example: | ||
// path=/pfs/datacenter1/1.txt&offset=4096&chunksize=4096 | ||
func ParseChunk(path string) (*Chunk, error) { | ||
cmd := Chunk{} | ||
|
||
m, err := url.ParseQuery(path) | ||
if err != nil || | ||
len(m["path"]) == 0 || | ||
len(m["offset"]) == 0 || | ||
len(m["chunksize"]) == 0 { | ||
return nil, errors.New(StatusJSONErr) | ||
} | ||
|
||
cmd.Path = m["path"][0] | ||
cmd.Offset, err = strconv.ParseInt(m["offset"][0], 10, 64) | ||
if err != nil { | ||
return nil, errors.New(StatusJSONErr) | ||
} | ||
|
||
chunkSize, err := strconv.ParseInt(m["chunksize"][0], 10, 64) | ||
if err != nil { | ||
return nil, errors.New(StatusBadChunkSize) | ||
} | ||
cmd.Size = chunkSize | ||
|
||
return &cmd, nil | ||
} | ||
|
||
// LoadChunkData loads a specified chunk to io.Writer. | ||
func (p *Chunk) LoadChunkData(w io.Writer) error { | ||
f, err := os.Open(p.Path) | ||
if err != nil { | ||
return err | ||
} | ||
defer Close(f) | ||
|
||
_, err = f.Seek(p.Offset, 0) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
loaded, err := io.CopyN(w, f, p.Size) | ||
log.V(2).Infof("loaded:%d\n", loaded) | ||
return err | ||
} | ||
|
||
// SaveChunkData save data from io.Reader. | ||
func (p *Chunk) SaveChunkData(r io.Reader) error { | ||
f, err := os.OpenFile(p.Path, os.O_WRONLY, 0600) | ||
if err != nil { | ||
return err | ||
} | ||
defer Close(f) | ||
|
||
_, err = f.Seek(p.Offset, 0) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
writen, err := io.CopyN(f, r, p.Size) | ||
log.V(2).Infof("chunksize:%d writen:%d\n", p.Size, writen) | ||
return err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,209 @@ | ||
package pfsmodules | ||
|
||
import ( | ||
"crypto/md5" | ||
"encoding/hex" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"net/url" | ||
"os" | ||
"sort" | ||
"strconv" | ||
) | ||
|
||
const ( | ||
defaultMaxChunkSize = 4 * 1024 * 1024 | ||
defaultMinChunkSize = 4 * 1024 | ||
) | ||
const ( | ||
ChunkMetaCmdName = "GetChunkMeta" | ||
) | ||
|
||
// ChunkMeta holds the chunk meta's info. | ||
type ChunkMeta struct { | ||
Offset int64 `json:"offset"` | ||
Checksum string `json:"checksum"` | ||
Len int64 `json:"len"` | ||
} | ||
|
||
// ChunkMetaCmd is a command. | ||
type ChunkMetaCmd struct { | ||
Method string `json:"method"` | ||
FilePath string `json:"path"` | ||
ChunkSize int64 `json:"chunksize"` | ||
} | ||
|
||
// ToURLParam encodes ChunkMetaCmd to URL encoding string. | ||
func (p *ChunkMetaCmd) ToURLParam() string { | ||
parameters := url.Values{} | ||
parameters.Add("method", p.Method) | ||
parameters.Add("path", p.FilePath) | ||
|
||
str := fmt.Sprint(p.ChunkSize) | ||
parameters.Add("chunksize", str) | ||
|
||
return parameters.Encode() | ||
} | ||
|
||
// ToJSON encodes ChunkMetaCmd to JSON string. | ||
func (p *ChunkMetaCmd) ToJSON() ([]byte, error) { | ||
return json.Marshal(p) | ||
} | ||
|
||
// Run is a functions which run ChunkMetaCmd. | ||
func (p *ChunkMetaCmd) Run() (interface{}, error) { | ||
return GetChunkMeta(p.FilePath, p.ChunkSize) | ||
} | ||
|
||
func (p *ChunkMetaCmd) checkChunkSize() error { | ||
if p.ChunkSize < defaultMinChunkSize || | ||
p.ChunkSize > defaultMaxChunkSize { | ||
return errors.New(StatusBadChunkSize) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// CloudCheck checks the conditions when running on cloud. | ||
func (p *ChunkMetaCmd) CloudCheck() error { | ||
if !IsCloudPath(p.FilePath) { | ||
return errors.New(StatusShouldBePfsPath + ": p.FilePath") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 惭愧,搞错了。 |
||
} | ||
if !CheckUser(p.FilePath) { | ||
return errors.New(StatusUnAuthorized + ":" + p.FilePath) | ||
} | ||
return p.checkChunkSize() | ||
} | ||
|
||
// LocalCheck checks the conditions when running locally. | ||
func (p *ChunkMetaCmd) LocalCheck() error { | ||
return p.checkChunkSize() | ||
} | ||
|
||
// NewChunkMetaCmdFromURLParams get a new ChunkMetaCmd. | ||
func NewChunkMetaCmdFromURLParam(r *http.Request) (*ChunkMetaCmd, error) { | ||
method := r.URL.Query().Get("method") | ||
path := r.URL.Query().Get("path") | ||
chunkStr := r.URL.Query().Get("chunksize") | ||
|
||
if len(method) == 0 || | ||
method != ChunkMetaCmdName || | ||
len(path) == 0 || | ||
len(chunkStr) == 0 { | ||
return nil, errors.New(http.StatusText(http.StatusBadRequest)) | ||
} | ||
|
||
chunkSize, err := strconv.ParseInt(chunkStr, 10, 64) | ||
if err != nil { | ||
return nil, errors.New(StatusBadChunkSize) | ||
} | ||
|
||
return &ChunkMetaCmd{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is using struct literal ( To reduce complexity, I recommend unless there is some other initialization that a struct literal can not do, don't use a new function for it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个当时主要考虑可以少写一部分,如 |
||
Method: method, | ||
FilePath: path, | ||
ChunkSize: chunkSize, | ||
}, nil | ||
} | ||
|
||
type metaSlice []ChunkMeta | ||
|
||
func (a metaSlice) Len() int { return len(a) } | ||
func (a metaSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } | ||
func (a metaSlice) Less(i, j int) bool { return a[i].Offset < a[j].Offset } | ||
|
||
// GetDiffChunkMeta gets difference between srcMeta and dstMeta. | ||
func GetDiffChunkMeta(srcMeta []ChunkMeta, dstMeta []ChunkMeta) ([]ChunkMeta, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function needs a test case. Btw, there is no test case in this PR. Conservatively, We need to aim for 70% test coverage. Anyway, this PR is getting big, other test cases can be submitted by another PR. (but I think this function need a test case for this PR.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 好的 |
||
if len(dstMeta) == 0 || len(srcMeta) == 0 { | ||
return srcMeta, nil | ||
} | ||
|
||
if !sort.IsSorted(metaSlice(srcMeta)) { | ||
sort.Sort(metaSlice(srcMeta)) | ||
} | ||
|
||
if !sort.IsSorted(metaSlice(dstMeta)) { | ||
sort.Sort(metaSlice(dstMeta)) | ||
} | ||
|
||
dstIdx := 0 | ||
srcIdx := 0 | ||
diff := make([]ChunkMeta, 0, len(srcMeta)) | ||
|
||
for { | ||
if srcMeta[srcIdx].Offset < dstMeta[dstIdx].Offset { | ||
diff = append(diff, srcMeta[srcIdx]) | ||
srcIdx++ | ||
} else if srcMeta[srcIdx].Offset > dstMeta[dstIdx].Offset { | ||
dstIdx++ | ||
} else { | ||
if srcMeta[srcIdx].Checksum != dstMeta[dstIdx].Checksum { | ||
diff = append(diff, srcMeta[srcIdx]) | ||
} | ||
|
||
dstIdx++ | ||
srcIdx++ | ||
} | ||
|
||
if dstIdx >= len(dstMeta) { | ||
break | ||
} | ||
|
||
if srcIdx >= len(srcMeta) { | ||
break | ||
} | ||
} | ||
|
||
if srcIdx < len(srcMeta) { | ||
diff = append(diff, srcMeta[srcIdx:]...) | ||
} | ||
|
||
return diff, nil | ||
} | ||
|
||
// GetChunkMeta gets chunk metas from path of file. | ||
func GetChunkMeta(path string, len int64) ([]ChunkMeta, error) { | ||
f, err := os.Open(path) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer Close(f) | ||
|
||
if len > defaultMaxChunkSize || len < defaultMinChunkSize { | ||
return nil, errors.New(StatusBadChunkSize) | ||
} | ||
|
||
fi, err := f.Stat() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
metas := make([]ChunkMeta, 0, fi.Size()/len+1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I don't understand why there is a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
data := make([]byte, len) | ||
offset := int64(0) | ||
|
||
for { | ||
n, err := f.Read(data) | ||
if err != nil && err != io.EOF { | ||
return nil, err | ||
} | ||
|
||
if err == io.EOF { | ||
break | ||
} | ||
|
||
m := ChunkMeta{} | ||
m.Offset = offset | ||
sum := md5.Sum(data[:n]) | ||
m.Checksum = hex.EncodeToString(sum[:]) | ||
m.Len = int64(n) | ||
|
||
metas = append(metas, m) | ||
|
||
offset += int64(n) | ||
} | ||
|
||
return metas, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package pfsmodules | ||
|
||
import ( | ||
"io" | ||
"strings" | ||
|
||
log "github.com/golang/glog" | ||
) | ||
|
||
const ( | ||
// DefaultMultiPartBoundary is the default multipart form boudary. | ||
DefaultMultiPartBoundary = "8d7b0e5709d756e21e971ff4d9ac3b20" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think in general const block can be put together: const (
DefaultMultiPartBoundary = "8d7b0e5709d756e21e971ff4d9ac3b20"
MaxJSONRequestSize = 2048 If you want to write them separately, you can do: const DefaultMultiPartBoundary = "8d7b0e5709d756e21e971ff4d9ac3b20"
const MaxJSONRequestSize = 2048 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
// MaxJSONRequestSize is the max body size when server receives a request. | ||
MaxJSONRequestSize = 2048 | ||
) | ||
|
||
// Command is a interface of all commands. | ||
type Command interface { | ||
ToURLParam() string | ||
ToJSON() ([]byte, error) | ||
Run() (interface{}, error) | ||
LocalCheck() error | ||
CloudCheck() error | ||
} | ||
|
||
// IsCloudPath returns whether a path is a pfspath. | ||
func IsCloudPath(path string) bool { | ||
return strings.HasPrefix(path, "/pfs/") | ||
} | ||
|
||
// CheckUser checks if a user has authority to access a path. | ||
func CheckUser(path string) bool { | ||
// TODO | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this function still a TODO? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
return true | ||
} | ||
|
||
// Close closes c and log it. | ||
func Close(c io.Closer) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Close should not be visible from outside of the package. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 我的思路是这样的:有两个package中用到这个函数,为了防止两个包相互调用,所以定义了一个pfsmodules作为中间的公共package,然后把公共函数放到了它里边。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这种简单的函数,就直接放在package内吧。另外,防止相互调用往往会用interface(好像挺少因为这个再去加一个新的package,创建新package的目的可能是为了模块化,而不是解决循环依赖),比如: package a
import "b"
var c = b.Foo()
type T int
func (t T) Bar() {
} package b
type interface Bar {
Bar()
}
// can't import "a" and use a.T as an argument, using a interface.
func Baz(b Bar) {
}
func Foo() {
} package main
func main() {
b.Baz(a.T(0))
} 往往必须有循环依赖的时候还是比较少的。 |
||
err := c.Close() | ||
if err != nil { | ||
log.Error(err) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use "github.com/namsral/flag" (a drop in replacement of "flag") instead of "flag", since the former can parse env variable as input.
Please see this discussion: PaddlePaddle/Paddle#2245 (comment)
Usage of "github.com/namsral/flag" see: https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/go/cmd/pserver/pserver.go#L9
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@helinwang @typhoonzero
github.com/golang/glog
里边判断了而它用的是go原生的
flag
,所以用了github.com/namsral/flag
,每一行打印中都会出现类似ERROR: logging before flag.Parse: I0531 11:34:47.868818 3294 main.go:21]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
鉴于需要使用
glog
的level日志的功能,同时flag
的功能已经足够用了。先用flag
。There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. It's fine as long as we don't need to parse environment variable for file server.