Skip to content

Commit

Permalink
Less dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
linxGnu committed Mar 4, 2022
1 parent 5a67604 commit 43d8276
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 64 deletions.
11 changes: 9 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
module github.com/linxGnu/goseaweedfs

require github.com/stretchr/testify v1.7.0

require (
github.com/linxGnu/gumble v1.0.0
github.com/stretchr/testify v1.4.0
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.1.0 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/linxGnu/gumble v1.0.0 h1:OAJud8Hy4rmV9I5p/KTRiVpwwklMTd9Ankza3Mz7a4M=
github.com/linxGnu/gumble v1.0.0/go.mod h1:iyhNJpBHvJ0q2Hr41iiZRJyj6LLF47i2a9C9zLiucVY=
github.com/linxGnu/gumble v1.0.6 h1:7b3EJ37cIlLzG+c4z08jEtmploKjPVARDNJY0mnx1q8=
github.com/linxGnu/gumble v1.0.6/go.mod h1:ZXh0GRzKc0oCCMbvnzfRx9aI5VEY2CoxFLrXSKVnmyM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/scryner/lfreequeue v0.0.0-20121212074822-473f33702129/go.mod h1:0OrdloYlIayHGsgKYlwEnmdrPWmuYtbdS6Dm71PprFM=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/valyala/fastrand v1.0.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
24 changes: 7 additions & 17 deletions http_client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package goseaweedfs

import (
"context"
"encoding/json"
"fmt"
"io"
Expand All @@ -11,26 +10,18 @@ import (
"net/textproto"
"path/filepath"
"strings"

workerpool "github.com/linxGnu/gumble/worker-pool"
)

type httpClient struct {
client *http.Client
workers *workerpool.Pool
client *http.Client
}

func newHTTPClient(client *http.Client) *httpClient {
c := &httpClient{
client: client,
workers: createWorkerPool(),
}
c.workers.Start()
c := &httpClient{client: client}
return c
}

func (c *httpClient) Close() (err error) {
c.workers.Stop()
return
}

Expand Down Expand Up @@ -117,7 +108,8 @@ func (c *httpClient) upload(url string, filename string, fileReader io.Reader, m
// create multipart writer
mw := multipart.NewWriter(w)

task := workerpool.NewTask(context.Background(), func(ctx context.Context) (interface{}, error) {
result := make(chan error, 1)
go func() {
h := make(textproto.MIMEHeader)
h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, normalizeName(filename)))
if mtype == "" {
Expand All @@ -143,9 +135,8 @@ func (c *httpClient) upload(url string, filename string, fileReader io.Reader, m
_ = w.Close()
}

return nil, err
})
c.workers.Do(task)
result <- err
}()

var resp *http.Response
resp, err = c.client.Post(url, mw.FormDataContentType(), r)
Expand All @@ -156,8 +147,7 @@ func (c *httpClient) upload(url string, filename string, fileReader io.Reader, m

if err == nil {
if respBody, statusCode, err = readAll(resp); err == nil {
result := <-task.Result()
err = result.Err
err = <-result
}
}

Expand Down
60 changes: 25 additions & 35 deletions seaweed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package goseaweedfs

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -13,8 +12,6 @@ import (
"path"
"strconv"
"strings"

workerpool "github.com/linxGnu/gumble/worker-pool"
)

var (
Expand Down Expand Up @@ -91,7 +88,6 @@ type Seaweed struct {
filers []*Filer
chunkSize int64
client *httpClient
workers *workerpool.Pool
}

// NewSeaweed create new seaweed client. Master url must be a valid uri (which includes scheme).
Expand Down Expand Up @@ -120,18 +116,11 @@ func NewSeaweed(masterURL string, filers []string, chunkSize int64, client *http
}
}

// start underlying workers
c.workers = createWorkerPool()
c.workers.Start()

return
}

// Close underlying daemons.
func (c *Seaweed) Close() (err error) {
if c.workers != nil {
c.workers.Stop()
}
if c.client != nil {
err = c.client.Close()
}
Expand Down Expand Up @@ -401,7 +390,9 @@ func (c *Seaweed) BatchUploadFileParts(files []*FilePart, collection string, ttl
return results, err
}

tasks := make([]*workerpool.Task, 0, len(files))
n := len(files)
result := make(chan taskResult, 1)

for i, file := range files {
file.FileID = assigned.FileID
if i > 0 {
Expand All @@ -415,26 +406,27 @@ func (c *Seaweed) BatchUploadFileParts(files []*FilePart, collection string, ttl
results[i].FileID = file.FileID
results[i].FileURL = assigned.PublicURL + "/" + file.FileID

task := c.uploadTask(file)
c.workers.Do(task)
tasks = append(tasks, task)
go c.uploadTask(file, i, result)
}

for i := range tasks {
r := <-tasks[i].Result()
if r.Err != nil {
results[i].Error = r.Err.Error()
for i := 0; i < n; i++ {
r := <-result
if r.err != nil {
results[r.meta.(int)].Error = r.err.Error()
}
}

return results, nil
}

func (c *Seaweed) uploadTask(file *FilePart) *workerpool.Task {
return workerpool.NewTask(context.Background(), func(ctx context.Context) (res interface{}, err error) {
_, err = c.UploadFilePart(file)
return
})
func (c *Seaweed) uploadTask(file *FilePart, meta interface{}, result chan taskResult) {
_, err := c.UploadFilePart(file)
result <- taskResult{err: err, meta: meta}
}

type taskResult struct {
err error
meta interface{}
}

// Replace file content with new one.
Expand Down Expand Up @@ -528,27 +520,25 @@ func (c *Seaweed) DeleteChunks(cm *ChunkManifest, args url.Values) (err error) {
return nil
}

tasks := make([]*workerpool.Task, 0, len(cm.Chunks))
n := len(cm.Chunks)
result := make(chan error, n)

for _, ci := range cm.Chunks {
task := c.deleteFileTask(ci.Fid, args)
c.workers.Do(task)
tasks = append(tasks, task)
go c.deleteFileTask(ci.Fid, args, result)
}

for i := range tasks {
if r := <-tasks[i].Result(); r.Err != nil {
err = r.Err
for i := 0; i < n; i++ {
if e := <-result; e != nil {
err = e
return
}
}

return
}

func (c *Seaweed) deleteFileTask(fileID string, args url.Values) *workerpool.Task {
return workerpool.NewTask(context.Background(), func(ctx context.Context) (interface{}, error) {
return nil, c.DeleteFile(fileID, args)
})
func (c *Seaweed) deleteFileTask(fileID string, args url.Values, result chan error) {
result <- c.DeleteFile(fileID, args)
}

// DeleteFile by id.
Expand Down
10 changes: 0 additions & 10 deletions utils.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,13 @@
package goseaweedfs

import (
"context"
"io"
"io/ioutil"
"net/http"
"net/url"
"runtime"
"strings"

workerpool "github.com/linxGnu/gumble/worker-pool"
)

func createWorkerPool() *workerpool.Pool {
return workerpool.NewPool(context.Background(), workerpool.Option{
NumberWorker: runtime.NumCPU() << 1,
})
}

func parseURI(uri string) (u *url.URL, err error) {
u, err = url.Parse(uri)
if err == nil && u.Scheme == "" {
Expand Down

0 comments on commit 43d8276

Please sign in to comment.