diff --git a/dagstore.go b/dagstore.go index cd1f83a..775df18 100644 --- a/dagstore.go +++ b/dagstore.go @@ -10,6 +10,7 @@ import ( "github.com/filecoin-project/dagstore/index" "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/dagstore/shard" + "github.com/filecoin-project/dagstore/throttle" ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/query" @@ -97,8 +98,8 @@ type DAGStore struct { // Throttling. // - throttleFetch Throttler - throttleIndex Throttler + throttleFetch throttle.Throttler + throttleIndex throttle.Throttler // Lifecycle. // @@ -164,6 +165,8 @@ type Config struct { // MaxConcurrentFetch is the maximum fetching jobs that can // run concurrently. 0 (default) disables throttling. + // + // Mounts can also have throttling mechanisms. MaxConcurrentFetch int // RecoverOnStart specifies whether failed shards should be recovered @@ -214,18 +217,18 @@ func NewDAGStore(cfg Config) (*DAGStore, error) { gcCh: make(chan chan *GCResult, 8), traceCh: cfg.TraceCh, failureCh: cfg.FailureCh, - throttleFetch: noopThrottler{}, - throttleIndex: noopThrottler{}, + throttleFetch: throttle.Noop(), + throttleIndex: throttle.Noop(), ctx: ctx, cancelFn: cancel, } if max := cfg.MaxConcurrentFetch; max > 0 { - dagst.throttleFetch = NewThrottler(max) + dagst.throttleFetch = throttle.Fixed(max) } if max := cfg.MaxConcurrentIndex; max > 0 { - dagst.throttleIndex = NewThrottler(max) + dagst.throttleIndex = throttle.Fixed(max) } if err := dagst.restoreState(); err != nil { diff --git a/go.mod b/go.mod index 9c3e4c7..0d2a5f5 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/filecoin-project/dagstore go 1.16 require ( - github.com/hashicorp/go-multierror v1.0.0 github.com/ipfs/go-block-format v0.0.3 github.com/ipfs/go-cid v0.0.8-0.20210716091050-de6c03deae1c github.com/ipfs/go-datastore v0.4.5 diff --git a/go.sum b/go.sum index 5579b33..a4cf490 100644 --- a/go.sum +++ b/go.sum @@ -131,12 +131,10 @@ github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfm github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= -github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= -github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= diff --git a/throttle/doc.go b/throttle/doc.go new file mode 100644 index 0000000..d73b59a --- /dev/null +++ b/throttle/doc.go @@ -0,0 +1,3 @@ +// Package throttle includes throttlers for composing in various contexts, such +// as inside Mounts for costly operations, and within the DAG store itself. +package throttle diff --git a/throttler.go b/throttle/throttler.go similarity index 82% rename from throttler.go rename to throttle/throttler.go index 08f20fe..00d7647 100644 --- a/throttler.go +++ b/throttle/throttler.go @@ -1,4 +1,4 @@ -package dagstore +package throttle import "context" @@ -18,9 +18,9 @@ type throttler struct { ch chan struct{} } -// NewThrottler creates a new throttler that allows the specified concurrency +// Fixed creates a new throttler that allows the specified fixed concurrency // at most. -func NewThrottler(maxConcurrency int) Throttler { +func Fixed(maxConcurrency int) Throttler { ch := make(chan struct{}, maxConcurrency) for i := 0; i < maxConcurrency; i++ { ch <- struct{}{} @@ -38,6 +38,11 @@ func (t *throttler) Do(ctx context.Context, fn func(ctx context.Context) error) return fn(ctx) } +// Noop returns a noop throttler. +func Noop() Throttler { + return noopThrottler{} +} + type noopThrottler struct{} func (noopThrottler) Do(ctx context.Context, fn func(ctx context.Context) error) error { diff --git a/throttler_test.go b/throttle/throttler_test.go similarity index 97% rename from throttler_test.go rename to throttle/throttler_test.go index 8d1ad9a..f143798 100644 --- a/throttler_test.go +++ b/throttle/throttler_test.go @@ -1,4 +1,4 @@ -package dagstore +package throttle import ( "context" @@ -11,7 +11,7 @@ import ( ) func TestThrottler(t *testing.T) { - tt := NewThrottler(5) + tt := Fixed(5) var cnt int32 ch := make(chan struct{}, 16)