Skip to content

Commit

Permalink
Add background go routine to retrieve missing child shards in large b…
Browse files Browse the repository at this point in the history
…atches

License: MIT
Signed-off-by: Kevin Atkinson <[email protected]>
  • Loading branch information
kevina committed Apr 29, 2018
1 parent afdec28 commit f78d58b
Show file tree
Hide file tree
Showing 2 changed files with 302 additions and 15 deletions.
254 changes: 254 additions & 0 deletions unixfs/hamt/fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
package hamt

import (
"context"
//"fmt"
//"os"

logging "gx/ipfs/QmRb5jh8z2E8hMGN2tkvs1yHynUanqnZ3UeKwgN1i9P1F8/go-log"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

var log = logging.Logger("hamt")

// fetcher implements a background fether to retrieve missing child
// shards in large batches. It attempts to retrieves the missing
// shards in an order that allow streaming of the complete hamt
// directory, assuming a depth first traversal.
type fetcher struct {
// note: the fields in this structure should only be accesses by
// the 'mainLoop' go routine, all communication should be done via
// channels

ctx context.Context
dserv ipld.DAGService

newJob chan *job
reqRes chan *Shard
result chan result

idle bool

done chan struct{}

todoFirst *job // do this job first since we are waiting for its results
todo jobStack // stack of jobs that still need to be done
jobs map[*Shard]*job // map of all jobs in which the results have not been collected yet

// stats relevent for streaming the complete hamt directory
hits int // job's result already ready, no delay
nearMisses int // job currently being worked on, small delay
misses int // job on todo stack but will be done in the next batch, larger delay
}

// batchSize must be at least as large as the largest number of cids
// requested in a single job. For best perforamce it should likely be
// sligtly larger as jobs are poped from the todo stack in order and a
// job close to the batchSize could forse a very small batch to run.
const batchSize = 320

//
// fetcher public interface
//

// startFetcher starts a new fetcher in the background
func startFetcher(ctx context.Context, dserv ipld.DAGService) *fetcher {
log.Infof("fetcher: starting...")
f := &fetcher{
ctx: ctx,
dserv: dserv,
newJob: make(chan *job),
reqRes: make(chan *Shard),
result: make(chan result),
idle: true,
done: make(chan struct{}),
jobs: make(map[*Shard]*job),
}
go f.mainLoop()
return f
}

// addJob adds a job to retrive the missing child shards for the
// provided shard
func (f *fetcher) addJob(hamt *Shard) bool {
children := hamt.missingChildShards()
if children == nil {
return false
}
j := &job{id: hamt, cids: children}
f.newJob <- j
return true
}

// result contains the result of a job, see getResult
type result struct {
vals map[string]*Shard
errs []error
}

// getResult gets the result of the job, the result is the result of
// the batch request and not just the single job. In particular, if
// the 'errs' field is empty the 'vals' of the result is guaranteed to
// contain the all the missing child shards, but the map may also
// contain child shards of other jobs in the batch
func (f *fetcher) getResult(hamt *Shard) result {
f.reqRes <- hamt
res := <-f.result
return res
}

//
// fetcher internals
//

type job struct {
id *Shard
idx int
cids []*cid.Cid
res result
}

type jobStack struct {
c []*job
}

func (f *fetcher) mainLoop() {
var want *Shard
for {
select {
case j := <-f.newJob:
if len(j.cids) > batchSize {
panic("job size larger than batchSize")
}
f.todo.push(j)
f.jobs[j.id] = j
if f.idle {
f.launch()
}
case id := <-f.reqRes:
j := f.jobs[id]
if j.res.vals != nil {
f.hits++
delete(f.jobs, id)
f.result <- j.res
} else {
if j.idx != -1 {
f.misses++
// move job to todoFirst so that it will be done on the
// next batch job
f.todo.remove(j)
f.todoFirst = j
} else {
f.nearMisses++
}
want = id
}
case <-f.done:
f.launch()
log.Infof("fetcher: batch job done")
log.Infof("fetcher stats (hits, nearMisses, misses): %d %d %d", f.hits, f.nearMisses, f.misses)
if want != nil {
j := f.jobs[want]
if j.res.vals != nil {
delete(f.jobs, want)
f.result <- j.res
want = nil
}
}
case <-f.ctx.Done():
log.Infof("fetcher: exiting")
log.Infof("fetcher stats (hits, nearMisses, misses): %d %d %d", f.hits, f.nearMisses, f.misses)
return
}
}
}

type batchJob struct {
cids []*cid.Cid
jobs []*job
}

func (b *batchJob) add(j *job) {
b.cids = append(b.cids, j.cids...)
b.jobs = append(b.jobs, j)
j.idx = -1
}

func (f *fetcher) launch() {
bj := batchJob{}

// always do todoFirst
if f.todoFirst != nil {
bj.add(f.todoFirst)
f.todoFirst = nil
}

// pop requets from todo list until we hit the batchSize
for !f.todo.empty() && len(bj.cids)+len(f.todo.top().cids) <= batchSize {
j := f.todo.pop()
bj.add(j)
}

if len(bj.cids) == 0 {
log.Infof("fetcher: entering idle state: no more jobs")
f.idle = true
return
}

// launch job
log.Infof("fetcher: starting batch job, size = %d", len(bj.cids))
f.idle = false
go func() {
ch := f.dserv.GetMany(f.ctx, bj.cids)
fetched := result{vals: make(map[string]*Shard)}
for no := range ch {
if no.Err != nil {
fetched.errs = append(fetched.errs, no.Err)
continue
}
hamt, err := NewHamtFromDag(f.dserv, no.Node)
if err != nil {
fetched.errs = append(fetched.errs, err)
continue
}
f.addJob(hamt)
fetched.vals[string(no.Node.Cid().Bytes())] = hamt
}
for _, job := range bj.jobs {
job.res = fetched
}
f.done <- struct{}{}
}()
}

func (js *jobStack) empty() bool {
return len(js.c) == 0
}

func (js *jobStack) top() *job {
return js.c[len(js.c)-1]
}

func (js *jobStack) push(j *job) {
j.idx = len(js.c)
js.c = append(js.c, j)
}

func (js *jobStack) pop() *job {
j := js.top()
js.remove(j)
return j
}

func (js *jobStack) remove(j *job) {
js.c[j.idx] = nil
j.idx = -1
js.popEmpty()
}

func (js *jobStack) popEmpty() {
for len(js.c) > 0 && js.c[len(js.c)-1] == nil {
js.c = js.c[:len(js.c)-1]
}
}
63 changes: 48 additions & 15 deletions unixfs/hamt/hamt.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,23 @@ func (ds *Shard) loadChild(ctx context.Context, i int) (child, error) {
return c, nil
}

func (ds *Shard) missingChildShards() []*cid.Cid {
if len(ds.children) != len(ds.nd.Links()) {
panic("inconsistent lengths between children array and Links array")
}
res := make([]*cid.Cid, 0, len(ds.children))
for i, c := range ds.children {
if c != nil {
continue
}
lnk := ds.nd.Links()[i]
if len(lnk.Name) == ds.maxpadlen {
res = append(res, lnk.Cid)
}
}
return res
}

// loadChild returns i'th child node if it is a 'shardValue' otherwise
// returns nil. If neither a child or an error is returned it is safe
// to assume the child is also a hamt shard.
Expand All @@ -332,7 +349,7 @@ func (ds *Shard) loadChildValue(i int) (child, error) {

// preloadChildren populates the 'children' array if some child shards
// are not already loaded they are fetched in parallel using GetMany
func (ds *Shard) preloadChildren(ctx context.Context) error {
func (ds *Shard) preloadChildren(ctx context.Context, f *fetcher) error {
if len(ds.children) != len(ds.nd.Links()) {
return fmt.Errorf("inconsistent lengths between children array and Links array")
}
Expand All @@ -353,18 +370,29 @@ func (ds *Shard) preloadChildren(ctx context.Context) error {
toFetch = append(toFetch, lnk.Cid)
}

ch := ds.dserv.GetMany(ctx, toFetch)
if len(toFetch) == 0 {
return nil
}

fetched := make(map[string]*Shard)
for no := range ch {
if no.Err != nil {
return no.Err
if f == nil {
ch := ds.dserv.GetMany(ctx, toFetch)
for no := range ch {
if no.Err != nil {
return no.Err
}
c, err := NewHamtFromDag(ds.dserv, no.Node)
if err != nil {
return err
}
fetched[string(no.Node.Cid().Bytes())] = c
}
c, err := NewHamtFromDag(ds.dserv, no.Node)
if err != nil {
return err
} else {
res := f.getResult(ds)
if len(res.errs) > 0 {
return res.errs[0]
}
fetched[string(no.Node.Cid().Bytes())] = c
fetched = res.vals
}

for i, c0 := range ds.children {
Expand Down Expand Up @@ -463,17 +491,22 @@ func (ds *Shard) EnumLinks(ctx context.Context) ([]*ipld.Link, error) {
}

// ForEachLink walks the Shard and calls the given function.
func (ds *Shard) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error {
return ds.walkTrie(ctx, func(sv *shardValue) error {
func (ds *Shard) ForEachLink(ctx0 context.Context, f func(*ipld.Link) error) error {
ctx, cancel := context.WithCancel(ctx0)
fetcher := startFetcher(ctx, ds.dserv)
fetcher.addJob(ds)
err := ds.walkTrie(ctx, func(sv *shardValue) error {
lnk := sv.val
lnk.Name = sv.key

return f(lnk)
})
}, fetcher)
cancel()
return err
}

func (ds *Shard) walkTrie(ctx context.Context, cb func(*shardValue) error) error {
err := ds.preloadChildren(ctx)
func (ds *Shard) walkTrie(ctx context.Context, cb func(*shardValue) error, f *fetcher) error {
err := ds.preloadChildren(ctx, f)
if err != nil {
return err
}
Expand All @@ -485,7 +518,7 @@ func (ds *Shard) walkTrie(ctx context.Context, cb func(*shardValue) error) error
}

case *Shard:
if err := c.walkTrie(ctx, cb); err != nil {
if err := c.walkTrie(ctx, cb, f); err != nil {
return err
}
default:
Expand Down

0 comments on commit f78d58b

Please sign in to comment.