Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileManager #112

Merged
merged 43 commits into from
Jun 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
a53328f
init add
gongweibao May 26, 2017
e589e33
add some
gongweibao May 26, 2017
4b8572b
add some
gongweibao May 26, 2017
24c303f
add touch
gongweibao May 26, 2017
2081766
fix some bugs
gongweibao May 27, 2017
c09e0cf
fix some bugs
gongweibao May 27, 2017
23c1177
compile ok
gongweibao May 27, 2017
380b942
rm not need
gongweibao May 27, 2017
bac9cc8
fix ls bugs
gongweibao May 27, 2017
b96c270
ls ok
gongweibao May 28, 2017
c3dbe34
prepare add stat
gongweibao May 29, 2017
f756307
fix bugs
gongweibao May 29, 2017
4ab2a7f
upload ok
gongweibao May 29, 2017
463df28
download ok
gongweibao May 29, 2017
ff95f05
rm ok
gongweibao May 29, 2017
b2d919f
mkdir ok
gongweibao May 29, 2017
419a678
rename files
gongweibao May 29, 2017
cb75cac
fix by gometalinter
gongweibao May 30, 2017
737f7a3
del not need code
gongweibao May 30, 2017
3636d04
rm not need codes
gongweibao May 30, 2017
9c41efc
fix by comments
gongweibao May 31, 2017
ef53907
rename ChunkCmd to Chunk
gongweibao May 31, 2017
34ce72c
rm resp.go
gongweibao May 31, 2017
97938cb
rename pfsmod => pfsmodules
gongweibao May 31, 2017
fb19caf
fix pfsmod bugs
gongweibao May 31, 2017
42b0890
fix by helin's comments
gongweibao Jun 1, 2017
026669a
Merge remote-tracking branch 'upstream/develop' into filemanager2
gongweibao Jun 2, 2017
7e5f9df
fix by comments
gongweibao Jun 2, 2017
db395cd
prepar use restclient
gongweibao Jun 2, 2017
6f219e9
prepare use restclient
gongweibao Jun 2, 2017
d6b2bd0
Merge remote-tracking branch 'upstream/develop' into filemanager2
gongweibao Jun 5, 2017
bbee71e
fix restclient and compile ok
gongweibao Jun 5, 2017
20d13a4
test commands ok
gongweibao Jun 5, 2017
cfd5767
fix by comments and add check user
gongweibao Jun 5, 2017
2750df9
fix by gometa
gongweibao Jun 5, 2017
1b3aa33
rm no need code
gongweibao Jun 5, 2017
7a32638
fix visit to return error
gongweibao Jun 6, 2017
a17e0b1
Errorf to Errorln
gongweibao Jun 6, 2017
8966691
rename some funtions that not need export out package
gongweibao Jun 7, 2017
2485891
rm not need code
gongweibao Jun 7, 2017
4c7cf4e
fix by wuyi comments
gongweibao Jun 7, 2017
930b8cd
add comments of Command interface
gongweibao Jun 7, 2017
00880ae
Merge branch 'develop' into filemanager2
gongweibao Jun 7, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go/cmd/paddlecloud/paddlecloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func main() {
subcommands.Register(&paddlecloud.LogsCommand{}, "")
subcommands.Register(&paddlecloud.GetCommand{}, "")
subcommands.Register(&paddlecloud.KillCommand{}, "")
subcommands.Register(&paddlecloud.LsCommand{}, "")
subcommands.Register(&paddlecloud.CpCommand{}, "")
subcommands.Register(&paddlecloud.RmCommand{}, "")
subcommands.Register(&paddlecloud.MkdirCommand{}, "")

flag.Parse()
ctx := context.Background()
Expand Down
24 changes: 24 additions & 0 deletions go/cmd/pfsserver/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"flag"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use "github.com/namsral/flag" (a drop in replacement of "flag") instead of "flag", since the former can parse env variable as input.
Please see this discussion: PaddlePaddle/Paddle#2245 (comment)

Usage of "github.com/namsral/flag" see: https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/go/cmd/pserver/pserver.go#L9

Copy link
Collaborator Author

@gongweibao gongweibao May 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@helinwang @typhoonzero github.com/golang/glog里边判断了

679     if !flag.Parsed() {
680         os.Stderr.Write([]byte("ERROR: logging before flag.Parse: "))
681         os.Stderr.Write(data)
682     }

而它用的是go原生的flag,所以用了github.com/namsral/flag,每一行打印中都会出现类似
ERROR: logging before flag.Parse: I0531 11:34:47.868818 3294 main.go:21]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

鉴于需要使用glog的level日志的功能,同时flag的功能已经足够用了。先用flag

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator

@helinwang helinwang May 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. It's fine as long as we don't need to parse environment variable for file server.

"fmt"
"net/http"

"github.com/PaddlePaddle/cloud/go/filemanager/pfsserver"
log "github.com/golang/glog"
)

func main() {
port := flag.Int("port", 8080, "port of server")
ip := flag.String("ip", "0.0.0.0", "ip of server")
tokenUri := flag.String("tokenuri", "http://cloud.paddlepaddle.org", "uri of token server")
flag.Parse()

router := pfsserver.NewRouter()
addr := fmt.Sprintf("%s:%d", *ip, *port)
pfsserver.TokenUri = *tokenUri

log.Infof("server on:%s and tokenuri:%s\n", addr, *tokenUri)
log.Fatal(http.ListenAndServe(addr, router))
}
98 changes: 98 additions & 0 deletions go/filemanager/pfsmodules/chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package pfsmodules

import (
"errors"
"fmt"
"io"
"net/url"
"os"
"strconv"

log "github.com/golang/glog"
)

// Chunk respresents a chunk info.
type Chunk struct {
Path string
Offset int64
Size int64
}

// ToURLParam encodes variables to url encoding parameters.
func (p *Chunk) ToURLParam() url.Values {
parameters := url.Values{}
parameters.Add("path", p.Path)

str := fmt.Sprint(p.Offset)
parameters.Add("offset", str)

str = fmt.Sprint(p.Size)
parameters.Add("chunksize", str)

return parameters
}

// ParseChunk get a Chunk struct from path.
// path example:
// path=/pfs/datacenter1/1.txt&offset=4096&chunksize=4096
func ParseChunk(path string) (*Chunk, error) {
cmd := Chunk{}

m, err := url.ParseQuery(path)
if err != nil ||
len(m["path"]) == 0 ||
len(m["offset"]) == 0 ||
len(m["chunksize"]) == 0 {
return nil, errors.New(StatusJSONErr)
}

cmd.Path = m["path"][0]
cmd.Offset, err = strconv.ParseInt(m["offset"][0], 10, 64)
if err != nil {
return nil, errors.New(StatusJSONErr)
}

chunkSize, err := strconv.ParseInt(m["chunksize"][0], 10, 64)
if err != nil {
return nil, errors.New(StatusBadChunkSize)
}
cmd.Size = chunkSize

return &cmd, nil
}

// LoadChunkData loads a specified chunk to io.Writer.
func (p *Chunk) LoadChunkData(w io.Writer) error {
f, err := os.Open(p.Path)
if err != nil {
return err
}
defer Close(f)

_, err = f.Seek(p.Offset, 0)
if err != nil {
return err
}

loaded, err := io.CopyN(w, f, p.Size)
log.V(2).Infof("loaded:%d\n", loaded)
return err
}

// SaveChunkData save data from io.Reader.
func (p *Chunk) SaveChunkData(r io.Reader) error {
f, err := os.OpenFile(p.Path, os.O_WRONLY, 0600)
if err != nil {
return err
}
defer Close(f)

_, err = f.Seek(p.Offset, 0)
if err != nil {
return err
}

writen, err := io.CopyN(f, r, p.Size)
log.V(2).Infof("chunksize:%d writen:%d\n", p.Size, writen)
return err
}
203 changes: 203 additions & 0 deletions go/filemanager/pfsmodules/chunkmeta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package pfsmodules

import (
"crypto/md5"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"sort"
"strconv"
)

const (
defaultMaxChunkSize = 4 * 1024 * 1024
defaultMinChunkSize = 4 * 1024
)
const (
ChunkMetaCmdName = "GetChunkMeta"
)

// ChunkMeta holds the chunk meta's info.
type ChunkMeta struct {
Offset int64 `json:"offset"`
Checksum string `json:"checksum"`
Len int64 `json:"len"`
}

// ChunkMetaCmd is a command.
type ChunkMetaCmd struct {
Method string `json:"method"`
FilePath string `json:"path"`
ChunkSize int64 `json:"chunksize"`
}

// ToURLParam encodes ChunkMetaCmd to URL encoding string.
func (p *ChunkMetaCmd) ToURLParam() url.Values {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These code are repetitive, a good candidate for automatically generating them. See: https://blog.golang.org/generate

Not required, just want to let you know :)

Copy link
Collaborator Author

@gongweibao gongweibao Jun 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

棒。我正要模仿JSON的Marshal函数自己搞一个那。多谢helin。
后续改进它。

parameters := url.Values{}
parameters.Add("method", p.Method)
parameters.Add("path", p.FilePath)

str := fmt.Sprint(p.ChunkSize)
parameters.Add("chunksize", str)

return parameters
}

// ToJSON encodes ChunkMetaCmd to JSON string.
func (p *ChunkMetaCmd) ToJSON() ([]byte, error) {
return json.Marshal(p)
}

// Run is a functions which run ChunkMetaCmd.
func (p *ChunkMetaCmd) Run() (interface{}, error) {
return GetChunkMeta(p.FilePath, p.ChunkSize)
}

func (p *ChunkMetaCmd) checkChunkSize() error {
if p.ChunkSize < defaultMinChunkSize ||
p.ChunkSize > defaultMaxChunkSize {
return errors.New(StatusBadChunkSize)
}

return nil
}

// CloudCheck checks the conditions when running on cloud.
func (p *ChunkMetaCmd) ValidateCloudArgs(userName string) error {
if err := ValidatePfsPath([]string{p.FilePath}, userName); err != nil {
return err
}

return p.checkChunkSize()
}

// LocalCheck checks the conditions when running locally.
func (p *ChunkMetaCmd) ValidateLocalArgs() error {
return p.checkChunkSize()
}

// NewChunkMetaCmdFromURLParams get a new ChunkMetaCmd.
func NewChunkMetaCmdFromURLParam(r *http.Request) (*ChunkMetaCmd, error) {
method := r.URL.Query().Get("method")
path := r.URL.Query().Get("path")
chunkStr := r.URL.Query().Get("chunksize")

if len(method) == 0 ||
method != ChunkMetaCmdName ||
len(path) == 0 ||
len(chunkStr) == 0 {
return nil, errors.New(http.StatusText(http.StatusBadRequest))
}

chunkSize, err := strconv.ParseInt(chunkStr, 10, 64)
if err != nil {
return nil, errors.New(StatusBadChunkSize)
}

return &ChunkMetaCmd{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is using struct literal (&ChunkMetaCmd{...}), but there is a function for it below NewChunkMetaCmd which does exactly the same thing. I suggest removing function NewChunkMetaCmd and NewChunk, and just use the struct literal way.

To reduce complexity, I recommend unless there is some other initialization that a struct literal can not do, don't use a new function for it.

Copy link
Collaborator Author

@gongweibao gongweibao Jun 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个当时主要考虑可以少写一部分,如Method
Done

Method: method,
FilePath: path,
ChunkSize: chunkSize,
}, nil
}

type metaSlice []ChunkMeta

func (a metaSlice) Len() int { return len(a) }
func (a metaSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a metaSlice) Less(i, j int) bool { return a[i].Offset < a[j].Offset }

// GetDiffChunkMeta gets difference between srcMeta and dstMeta.
func GetDiffChunkMeta(srcMeta []ChunkMeta, dstMeta []ChunkMeta) ([]ChunkMeta, error) {
if len(dstMeta) == 0 || len(srcMeta) == 0 {
return srcMeta, nil
}

if !sort.IsSorted(metaSlice(srcMeta)) {
sort.Sort(metaSlice(srcMeta))
}

if !sort.IsSorted(metaSlice(dstMeta)) {
sort.Sort(metaSlice(dstMeta))
}

dstIdx := 0
srcIdx := 0
diff := make([]ChunkMeta, 0, len(srcMeta))

for {
if srcMeta[srcIdx].Offset < dstMeta[dstIdx].Offset {
diff = append(diff, srcMeta[srcIdx])
srcIdx++
} else if srcMeta[srcIdx].Offset > dstMeta[dstIdx].Offset {
dstIdx++
} else {
if srcMeta[srcIdx].Checksum != dstMeta[dstIdx].Checksum {
diff = append(diff, srcMeta[srcIdx])
}

dstIdx++
srcIdx++
}

if dstIdx >= len(dstMeta) {
break
}

if srcIdx >= len(srcMeta) {
break
}
}

if srcIdx < len(srcMeta) {
diff = append(diff, srcMeta[srcIdx:]...)
}

return diff, nil
}

// GetChunkMeta gets chunk metas from path of file.
func GetChunkMeta(path string, len int64) ([]ChunkMeta, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer Close(f)

if len > defaultMaxChunkSize || len < defaultMinChunkSize {
return nil, errors.New(StatusBadChunkSize)
}

var metas []ChunkMeta

data := make([]byte, len)
offset := int64(0)

for {
n, err := f.Read(data)
if err != nil && err != io.EOF {
return nil, err
}

if err == io.EOF {
break
}

m := ChunkMeta{}
m.Offset = offset
sum := md5.Sum(data[:n])
m.Checksum = hex.EncodeToString(sum[:])
m.Len = int64(n)

metas = append(metas, m)

offset += int64(n)
}

return metas, nil
}
Loading