Skip to content

Commit

Permalink
Merge pull request #89 from filecoin-project/feat/throttler-expose
Browse files Browse the repository at this point in the history
expose throttler so it can be embedded in mounts.
  • Loading branch information
raulk authored Jul 27, 2021
2 parents 187c97e + 5db1798 commit a63a3aa
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 14 deletions.
15 changes: 9 additions & 6 deletions dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/filecoin-project/dagstore/index"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
"github.com/filecoin-project/dagstore/throttle"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
Expand Down Expand Up @@ -97,8 +98,8 @@ type DAGStore struct {

// Throttling.
//
throttleFetch Throttler
throttleIndex Throttler
throttleFetch throttle.Throttler
throttleIndex throttle.Throttler

// Lifecycle.
//
Expand Down Expand Up @@ -164,6 +165,8 @@ type Config struct {

// MaxConcurrentFetch is the maximum fetching jobs that can
// run concurrently. 0 (default) disables throttling.
//
// Mounts can also have throttling mechanisms.
MaxConcurrentFetch int

// RecoverOnStart specifies whether failed shards should be recovered
Expand Down Expand Up @@ -214,18 +217,18 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
gcCh: make(chan chan *GCResult, 8),
traceCh: cfg.TraceCh,
failureCh: cfg.FailureCh,
throttleFetch: noopThrottler{},
throttleIndex: noopThrottler{},
throttleFetch: throttle.Noop(),
throttleIndex: throttle.Noop(),
ctx: ctx,
cancelFn: cancel,
}

if max := cfg.MaxConcurrentFetch; max > 0 {
dagst.throttleFetch = NewThrottler(max)
dagst.throttleFetch = throttle.Fixed(max)
}

if max := cfg.MaxConcurrentIndex; max > 0 {
dagst.throttleIndex = NewThrottler(max)
dagst.throttleIndex = throttle.Fixed(max)
}

if err := dagst.restoreState(); err != nil {
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/filecoin-project/dagstore
go 1.16

require (
github.com/hashicorp/go-multierror v1.0.0
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-cid v0.0.8-0.20210716091050-de6c03deae1c
github.com/ipfs/go-datastore v0.4.5
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,10 @@ github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfm
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
Expand Down
3 changes: 3 additions & 0 deletions throttle/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package throttle includes throttlers for composing in various contexts, such
// as inside Mounts for costly operations, and within the DAG store itself.
package throttle
11 changes: 8 additions & 3 deletions throttler.go → throttle/throttler.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dagstore
package throttle

import "context"

Expand All @@ -18,9 +18,9 @@ type throttler struct {
ch chan struct{}
}

// NewThrottler creates a new throttler that allows the specified concurrency
// Fixed creates a new throttler that allows the specified fixed concurrency
// at most.
func NewThrottler(maxConcurrency int) Throttler {
func Fixed(maxConcurrency int) Throttler {
ch := make(chan struct{}, maxConcurrency)
for i := 0; i < maxConcurrency; i++ {
ch <- struct{}{}
Expand All @@ -38,6 +38,11 @@ func (t *throttler) Do(ctx context.Context, fn func(ctx context.Context) error)
return fn(ctx)
}

// Noop returns a noop throttler.
func Noop() Throttler {
return noopThrottler{}
}

type noopThrottler struct{}

func (noopThrottler) Do(ctx context.Context, fn func(ctx context.Context) error) error {
Expand Down
4 changes: 2 additions & 2 deletions throttler_test.go → throttle/throttler_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dagstore
package throttle

import (
"context"
Expand All @@ -11,7 +11,7 @@ import (
)

func TestThrottler(t *testing.T) {
tt := NewThrottler(5)
tt := Fixed(5)

var cnt int32
ch := make(chan struct{}, 16)
Expand Down

0 comments on commit a63a3aa

Please sign in to comment.