Skip to content

Commit

Permalink
Merge pull request #567 from textileio/sander/pprof
Browse files Browse the repository at this point in the history
PushPaths pre-verification
  • Loading branch information
sanderpick authored Sep 2, 2021
2 parents 4ed7549 + 8001733 commit 63eefd3
Show file tree
Hide file tree
Showing 10 changed files with 471 additions and 402 deletions.
2 changes: 2 additions & 0 deletions api/billingd/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -82,6 +83,7 @@ func (g *Gateway) Start() {
}

router := gin.Default()
pprof.Register(router)
router.GET("/health", func(c *gin.Context) {
c.Writer.WriteHeader(http.StatusOK)
})
Expand Down
89 changes: 55 additions & 34 deletions api/bucketsd/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ type PushPathsQueue struct {

q []pushPath
len int
startCh chan []string
inCh chan pushPath
inWaitCh chan struct{}
outCh chan PushPathsResult
Expand Down Expand Up @@ -375,6 +376,15 @@ func (c *PushPathsQueue) Next() (ok bool) {
func (c *PushPathsQueue) start() {
go func() {
defer close(c.inWaitCh)

var paths []string
c.lk.Lock()
for _, p := range c.q {
paths = append(paths, p.path)
}
c.lk.Unlock()
c.startCh <- paths

for {
c.lk.Lock()
if c.closed {
Expand All @@ -391,7 +401,6 @@ func (c *PushPathsQueue) start() {
c.lk.Unlock()
c.inCh <- p
}

}()
}

Expand Down Expand Up @@ -452,18 +461,8 @@ func (c *Client) PushPaths(ctx context.Context, key string, opts ...Option) (*Pu
xr = args.root.String()
}

if err := stream.Send(&pb.PushPathsRequest{
Payload: &pb.PushPathsRequest_Header_{
Header: &pb.PushPathsRequest_Header{
Key: key,
Root: xr,
},
},
}); err != nil {
return nil, err
}

q := &PushPathsQueue{
startCh: make(chan []string),
inCh: make(chan pushPath),
inWaitCh: make(chan struct{}),
outCh: make(chan PushPathsResult),
Expand Down Expand Up @@ -543,33 +542,55 @@ func (c *Client) PushPaths(ctx context.Context, key string, opts ...Option) (*Pu
}

go func() {
for p := range q.inCh {
r, err := p.r()
if err != nil {
q.outCh <- PushPathsResult{err: err}
break
}
buf := make([]byte, chunkSize)
for {
n, err := r.Read(buf)
c := &pb.PushPathsRequest_Chunk{
Path: p.path,
loop:
for {
select {
case paths := <-q.startCh:
if len(paths) > 0 {
if err := stream.Send(&pb.PushPathsRequest{
Payload: &pb.PushPathsRequest_Header_{
Header: &pb.PushPathsRequest_Header{
Key: key,
Root: xr,
Paths: paths,
},
},
}); err != nil {
q.outCh <- PushPathsResult{err: err}
break loop
}
}
case p, ok := <-q.inCh:
if !ok {
break loop
}
if n > 0 {
c.Data = make([]byte, n)
copy(c.Data, buf[:n])
if ok := sendChunk(c); !ok {
r, err := p.r()
if err != nil {
q.outCh <- PushPathsResult{err: err}
break loop
}
buf := make([]byte, chunkSize)
for {
n, err := r.Read(buf)
c := &pb.PushPathsRequest_Chunk{
Path: p.path,
}
if n > 0 {
c.Data = make([]byte, n)
copy(c.Data, buf[:n])
if ok := sendChunk(c); !ok {
break
}
} else if err == io.EOF {
sendChunk(c)
break
} else if err != nil {
q.outCh <- PushPathsResult{err: err}
break
}
} else if err == io.EOF {
sendChunk(c)
break
} else if err != nil {
q.outCh <- PushPathsResult{err: err}
break
}
r.Close()
}
r.Close()
}
}()

Expand Down
704 changes: 357 additions & 347 deletions api/bucketsd/pb/bucketsd.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions api/bucketsd/pb/bucketsd.proto
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ message PushPathsRequest {
message Header {
string key = 1;
string root = 2;
repeated string paths = 3;
}

message Chunk {
Expand Down
50 changes: 38 additions & 12 deletions api/bucketsd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,9 @@ func (s *Service) PushPath(server pb.APIService_PushPathServer) (err error) {
dbToken, _ := thread.TokenFromContext(server.Context())

req, err := server.Recv()
if err != nil {
if err == io.EOF {
return nil
} else if err != nil {
return err
}
var buckKey, headerPath, root string
Expand Down Expand Up @@ -1946,14 +1948,18 @@ func (s *Service) PushPaths(server pb.APIService_PushPathsServer) error {
dbToken, _ := thread.TokenFromContext(ctx)

req, err := server.Recv()
if err != nil {
if err == io.EOF {
return nil
} else if err != nil {
return fmt.Errorf("on receive: %v", err)
}
var buckKey, buckRoot string
var buckPaths []string
switch payload := req.Payload.(type) {
case *pb.PushPathsRequest_Header_:
buckKey = payload.Header.Key
buckRoot = payload.Header.Root
buckPaths = payload.Header.Paths
default:
return fmt.Errorf("push bucket path header is required")
}
Expand All @@ -1963,17 +1969,31 @@ func (s *Service) PushPaths(server pb.APIService_PushPathsServer) error {
defer lck.Release()

buck := &tdb.Bucket{}
err = s.Buckets.GetSafe(ctx, dbID, buckKey, buck, tdb.WithToken(dbToken))
if err != nil {
if err := s.Buckets.GetSafe(ctx, dbID, buckKey, buck, tdb.WithToken(dbToken)); err != nil {
return fmt.Errorf("getting bucket: %v", err)
}
if buckRoot != "" && buckRoot != buck.Path {
return status.Error(codes.FailedPrecondition, buckets.ErrNonFastForward.Error())
}

readOnlyBuck := buck.Copy()
preVerified := make(map[string]struct{})
if len(buckPaths) > 0 {
readOnlyBuck.UpdatedAt = time.Now().UnixNano()
for _, p := range buckPaths {
readOnlyBuck.SetMetadataAtPath(p, tdb.Metadata{
UpdatedAt: readOnlyBuck.UpdatedAt,
})
readOnlyBuck.UnsetMetadataWithPrefix(p + "/")
preVerified[p] = struct{}{}
}
if err := s.Buckets.Verify(ctx, dbID, readOnlyBuck, tdb.WithToken(dbToken)); err != nil {
return status.Error(codes.PermissionDenied, fmt.Sprintf("verifying bucket update: %v", err))
}
}

var wg sync.WaitGroup
var ctxLock sync.RWMutex
var ctxLock, verifiedLock sync.RWMutex
addedCh := make(chan addedFile)
doneCh := make(chan struct{})
errCh := make(chan error)
Expand Down Expand Up @@ -2001,13 +2021,19 @@ func (s *Service) PushPaths(server pb.APIService_PushPathsServer) error {
ctxLock.RUnlock()
fa, err := queue.add(ctx, s.IPFSClient.Unixfs(), pth, func() ([]byte, error) {
wg.Add(1)
readOnlyBuck.UpdatedAt = time.Now().UnixNano()
readOnlyBuck.SetMetadataAtPath(pth, tdb.Metadata{
UpdatedAt: readOnlyBuck.UpdatedAt,
})
readOnlyBuck.UnsetMetadataWithPrefix(pth + "/")
if err = s.Buckets.Verify(ctx, dbID, readOnlyBuck, tdb.WithToken(dbToken)); err != nil {
return nil, fmt.Errorf("verifying bucket update: %v", err)
// Support requests that don't send all paths for pre-verification (pre v2.6.11)
verifiedLock.Lock()
_, ok := preVerified[pth]
verifiedLock.Unlock()
if !ok {
readOnlyBuck.UpdatedAt = time.Now().UnixNano()
readOnlyBuck.SetMetadataAtPath(pth, tdb.Metadata{
UpdatedAt: readOnlyBuck.UpdatedAt,
})
readOnlyBuck.UnsetMetadataWithPrefix(pth + "/")
if err = s.Buckets.Verify(ctx, dbID, readOnlyBuck, tdb.WithToken(dbToken)); err != nil {
return nil, fmt.Errorf("verifying bucket update: %v", err)
}
}
key, err := readOnlyBuck.GetFileEncryptionKeyForPath(pth)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
uc "github.com/textileio/textile/v2/api/usersd/client"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)

var (
Expand Down Expand Up @@ -102,6 +103,11 @@ func NewClients(hubTarget string, isHub bool, minerIndexTarget string) *Clients
hubOpts = append(hubOpts, grpc.WithInsecure())
}
hubOpts = append(hubOpts, grpc.WithPerRPCCredentials(auth))
hubOpts = append(hubOpts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Second * 20,
Timeout: time.Second * 10,
PermitWithoutStream: true,
}))

c := &Clients{}
var err error
Expand Down
9 changes: 3 additions & 6 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,14 +475,11 @@ func NewTextile(ctx context.Context, conf Config, opts ...Option) (*Textile, err
grpcm.WithUnaryServerChain(auth.UnaryServerInterceptor(t.noAuthFunc)),
grpcm.WithStreamServerChain(auth.StreamServerInterceptor(t.noAuthFunc)),
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: time.Hour * 24,
MaxConnectionAge: time.Hour * 24,
MaxConnectionAgeGrace: time.Hour * 24,
Time: time.Hour * 2,
Timeout: time.Hour * 2,
Time: time.Second * 20,
Timeout: time.Second * 10,
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: time.Minute * 5,
MinTime: time.Second * 20,
PermitWithoutStream: true,
}),
}
Expand Down
2 changes: 2 additions & 0 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/gin-contrib/location"
"github.com/gin-contrib/pprof"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -148,6 +149,7 @@ func (g *Gateway) Start() {
host: g.bucketsDomain,
}))
router.Use(gincors.New(cors.Options{}))
pprof.Register(router)

router.GET("/health", func(c *gin.Context) {
c.Writer.WriteHeader(http.StatusOK)
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/filecoin-project/go-fil-markets v1.1.9
github.com/gin-contrib/location v0.0.2
github.com/gin-contrib/pprof v1.3.0
github.com/gin-contrib/static v0.0.1
github.com/gin-gonic/gin v1.7.4
github.com/gogo/status v1.1.0
Expand Down Expand Up @@ -74,7 +75,7 @@ require (
github.com/textileio/dcrypto v0.0.1
github.com/textileio/go-assets v0.0.0-20200430191519-b341e634e2b7
github.com/textileio/go-ds-mongo v0.1.5
github.com/textileio/go-threads v1.1.1
github.com/textileio/go-threads v1.1.2-0.20210828004955-e1f6dc25311a
github.com/textileio/powergate/v2 v2.3.0
github.com/textileio/swagger-ui v0.3.29-0.20210224180244-7d73a7a32fe7
github.com/xakep666/mongo-migrate v0.2.1
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,15 @@ github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9/go.mod h1:106OIgoo
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gin-contrib/location v0.0.2 h1:QZKh1+K/LLR4KG/61eIO3b7MLuKi8tytQhV6texLgP4=
github.com/gin-contrib/location v0.0.2/go.mod h1:NGoidiRlf0BlA/VKSVp+g3cuSMeTmip/63PhEjRhUAc=
github.com/gin-contrib/pprof v1.3.0 h1:G9eK6HnbkSqDZBYbzG4wrjCsA4e+cvYAHUZw6W+W9K0=
github.com/gin-contrib/pprof v1.3.0/go.mod h1:waMjT1H9b179t3CxuG1cV3DHpga6ybizwfBaM5OXaB0=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-contrib/static v0.0.0-20191128031702-f81c604d8ac2/go.mod h1:VhW/Ch/3FhimwZb8Oj+qJmdMmoB8r7lmJ5auRjm50oQ=
github.com/gin-contrib/static v0.0.1 h1:JVxuvHPuUfkoul12N7dtQw7KRn/pSMq7Ue1Va9Swm1U=
github.com/gin-contrib/static v0.0.1/go.mod h1:CSxeF+wep05e0kCOsqWdAWbSszmc31zTIbD8TvWl7Hs=
github.com/gin-gonic/gin v1.5.0/go.mod h1:Nd6IXA8m5kNZdNEHMBd93KT+mdY3+bewLgRvmCsR2Do=
github.com/gin-gonic/gin v1.6.2/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/gin-gonic/gin v1.7.4 h1:QmUZXrvJ9qZ3GfWvQ+2wnW/1ePrTEJqPKMYEU3lD/DM=
github.com/gin-gonic/gin v1.7.4/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY=
Expand Down Expand Up @@ -1881,8 +1884,8 @@ github.com/textileio/go-libp2p-pubsub-rpc v0.0.5 h1:De54sqNpQocJebf7P+4RrwtuUw8s
github.com/textileio/go-libp2p-pubsub-rpc v0.0.5/go.mod h1:MlOMOz3KZxexobvUuFXT/QY9Vjh9eKJpZPr48hDUdVo=
github.com/textileio/go-log/v2 v2.1.3-gke-1 h1:7e3xSUXQB8hn4uUe5fp41kLThW1o9T65gSM7qjS323g=
github.com/textileio/go-log/v2 v2.1.3-gke-1/go.mod h1:DwACkjFS3kjZZR/4Spx3aPfSsciyslwUe5bxV8CEU2w=
github.com/textileio/go-threads v1.1.1 h1:cAYayq8nrYpMzF/+oWZ0QKhbmcHNEHpRHjWKSC4vPzE=
github.com/textileio/go-threads v1.1.1/go.mod h1:xIuk2P+WXIwzvN2NlHZerPMZb1DkwxeWGlu5HkX9j3c=
github.com/textileio/go-threads v1.1.2-0.20210828004955-e1f6dc25311a h1:2WHvzu8r6GPnA5QC0ShjzflyxGMO7BltoW8Mv9g/1sM=
github.com/textileio/go-threads v1.1.2-0.20210828004955-e1f6dc25311a/go.mod h1:xIuk2P+WXIwzvN2NlHZerPMZb1DkwxeWGlu5HkX9j3c=
github.com/textileio/powergate/v2 v2.3.0 h1:kelYh+ZWDQao1rL5YiMznQscd6CsDjgt6P/D1S5UYwQ=
github.com/textileio/powergate/v2 v2.3.0/go.mod h1:2j2NL1oevaVdrI6MpKfHnfgUOy1D4L7eP3I+1czxDjw=
github.com/textileio/swagger-ui v0.3.29-0.20210224180244-7d73a7a32fe7 h1:qUEurT6kJF+nFkiNjUPMJJ7hgg9OIDnb8iLn6VtBukE=
Expand Down

0 comments on commit 63eefd3

Please sign in to comment.