Skip to content

Commit

Permalink
Merge pull request #91 from filecoin-project/feat/dagstore-start
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk authored Jul 29, 2021
2 parents 044f5cf + 416a8c8 commit 2e0ceed
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 17 deletions.
41 changes: 24 additions & 17 deletions dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ type Config struct {
}

// NewDAGStore constructs a new DAG store with the supplied configuration.
//
// You must call Start for processing to begin.
func NewDAGStore(cfg Config) (*DAGStore, error) {
// validate and manage scratch root directory.
if cfg.TransientsDir == "" {
Expand Down Expand Up @@ -241,12 +243,17 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
dagst.throttleCopy = throttle.Fixed(max)
}

if err := dagst.restoreState(); err != nil {
return dagst, nil
}

// Start starts a DAG store.
func (d *DAGStore) Start(ctx context.Context) error {
if err := d.restoreState(); err != nil {
// TODO add a lenient mode.
return nil, fmt.Errorf("failed to restore dagstore state: %w", err)
return fmt.Errorf("failed to restore dagstore state: %w", err)
}

if err := dagst.clearOrphaned(); err != nil {
if err := d.clearOrphaned(); err != nil {
log.Warnf("failed to clear orphaned files on startup: %s", err)
}

Expand All @@ -257,10 +264,10 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
// in this state than the externalCh buffer size would exceed the channel
// buffer, and we'd block forever.
var toRegister, toRecover []*Shard
for _, s := range dagst.shards {
for _, s := range d.shards {
switch s.state {
case ShardStateErrored:
switch cfg.RecoverOnStart {
switch d.config.RecoverOnStart {
case DoNotRecover:
log.Infow("start: skipping recovery of shard in errored state", "shard", s.key, "error", s.err)
case RecoverOnAcquire:
Expand All @@ -280,7 +287,7 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
case ShardStateInitializing:
// handle shards that were initializing when we shut down.
// if we already have the index for the shard, there's nothing else to do.
if istat, err := dagst.indices.StatFullIndex(s.key); err == nil && istat.Exists {
if istat, err := d.indices.StatFullIndex(s.key); err == nil && istat.Exists {
s.state = ShardStateAvailable
} else {
// reset back to new, and queue the OpShardRegister.
Expand All @@ -291,32 +298,32 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {
}

// spawn the control goroutine.
dagst.wg.Add(1)
go dagst.control()
d.wg.Add(1)
go d.control()

// spawn the dispatcher goroutine for responses, responsible for pumping
// async results back to the caller.
dagst.wg.Add(1)
go dagst.dispatcher(dagst.dispatchResultsCh)
d.wg.Add(1)
go d.dispatcher(d.dispatchResultsCh)

// application has provided a failure channel; spawn the dispatcher.
if dagst.failureCh != nil {
dagst.dispatchFailuresCh = make(chan *dispatch, 128) // len=128, same as externalCh.
dagst.wg.Add(1)
go dagst.dispatcher(dagst.dispatchFailuresCh)
if d.failureCh != nil {
d.dispatchFailuresCh = make(chan *dispatch, 128) // len=128, same as externalCh.
d.wg.Add(1)
go d.dispatcher(d.dispatchFailuresCh)
}

// release the queued registrations before we return.
for _, s := range toRegister {
_ = dagst.queueTask(&task{op: OpShardRegister, shard: s, waiter: &waiter{ctx: ctx}}, dagst.externalCh)
_ = d.queueTask(&task{op: OpShardRegister, shard: s, waiter: &waiter{ctx: ctx}}, d.externalCh)
}

// queue shard recovery for shards in the errored state before we return.
for _, s := range toRecover {
_ = dagst.queueTask(&task{op: OpShardRecover, shard: s, waiter: &waiter{ctx: ctx}}, dagst.externalCh)
_ = d.queueTask(&task{op: OpShardRecover, shard: s, waiter: &waiter{ctx: ctx}}, d.externalCh)
}

return dagst, nil
return nil
}

type RegisterOpts struct {
Expand Down
79 changes: 79 additions & 0 deletions dagstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func TestRegisterUsingExistingTransient(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

ch := make(chan ShardResult, 1)
k := shard.KeyFromString("foo")
// even though the fs mount has an empty path, the existing transient will get us through registration.
Expand All @@ -64,6 +67,9 @@ func TestRegisterWithaNilResponseChannel(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

k := shard.KeyFromString("foo")
// we pass a nil response channel to Register Shard here
err = dagst.RegisterShard(context.Background(), k, &mount.FSMount{FS: testdata.FS, Path: testdata.FSPathCarV1}, nil, RegisterOpts{})
Expand Down Expand Up @@ -96,6 +102,9 @@ func TestRegisterCarV1(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

ch := make(chan ShardResult, 1)
k := shard.KeyFromString("foo")
err = dagst.RegisterShard(context.Background(), k, &mount.FSMount{FS: testdata.FS, Path: testdata.FSPathCarV1}, ch, RegisterOpts{})
Expand Down Expand Up @@ -127,6 +136,9 @@ func TestRegisterCarV2(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

ch := make(chan ShardResult, 1)
k := shard.KeyFromString("foo")
err = dagst.RegisterShard(context.Background(), k, carv2mnt, ch, RegisterOpts{})
Expand Down Expand Up @@ -158,6 +170,9 @@ func TestRegisterConcurrentShards(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

registerShards(t, dagst, n, carv2mnt, RegisterOpts{})
}

Expand All @@ -180,6 +195,9 @@ func TestAcquireInexistentShard(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

ch := make(chan ShardResult, 1)
k := shard.KeyFromString("foo")
err = dagst.AcquireShard(context.Background(), k, ch, AcquireOpts{})
Expand All @@ -194,6 +212,9 @@ func TestAcquireAfterRegisterWait(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

ch := make(chan ShardResult, 1)
k := shard.KeyFromString("foo")
err = dagst.RegisterShard(context.Background(), k, carv2mnt, ch, RegisterOpts{})
Expand All @@ -220,6 +241,9 @@ func TestConcurrentAcquires(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

ch := make(chan ShardResult, 1)
k := shard.KeyFromString("foo")
err = dagst.RegisterShard(context.Background(), k, carv2mnt, ch, RegisterOpts{})
Expand Down Expand Up @@ -265,6 +289,9 @@ func TestRestartRestoresState(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

keys := registerShards(t, dagst, 100, carv2mnt, RegisterOpts{})
for _, k := range keys[0:20] { // acquire the first 20 keys.
_ = acquireShard(t, dagst, k, 4)
Expand All @@ -288,6 +315,10 @@ func TestRestartRestoresState(t *testing.T) {
IndexRepo: idx,
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

info := dagst.AllShardsInfo()
require.Len(t, info, 100)

Expand Down Expand Up @@ -328,6 +359,9 @@ func TestRestartResumesRegistration(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

// start registering a shard -> registration will not complete as mount.Fetch will hang.
k := shard.KeyFromString("test")
ch := make(chan ShardResult, 1)
Expand Down Expand Up @@ -381,6 +415,9 @@ func TestRestartResumesRegistration(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

// this time we will receive three traces; OpShardInitialize, and OpShardMakeAvailable.
n, timedOut = sink.Read(traces, 1*time.Second)
require.Equal(t, 3, n)
Expand Down Expand Up @@ -429,6 +466,9 @@ func TestGC(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

// register 100 shards
// acquire 25 with 5 acquirers, release 2 acquirers (refcount 3); non reclaimable
// acquire another 25, release them all, they're reclaimable
Expand Down Expand Up @@ -485,6 +525,9 @@ func TestOrphansRemovedOnStartup(t *testing.T) {
require.NoError(t, err)
defer dagst.Close()

err = dagst.Start(context.Background())
require.NoError(t, err)

// orphaned files are gone
for _, p := range orphaned {
_, err := os.Stat(p)
Expand All @@ -506,6 +549,9 @@ func TestLazyInitialization(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

ch := make(chan ShardResult, 1)
k := shard.KeyFromString("foo")
counting := &mount.Counting{Mount: carv2mnt}
Expand Down Expand Up @@ -556,6 +602,9 @@ func TestThrottleFetch(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

// register 16 shards with lazy init, against the blocking mount.
// we don't register with eager, because we would block due to the throttle.
mnt := newBlockingMount(carv2mnt)
Expand Down Expand Up @@ -616,6 +665,9 @@ func TestIndexingFailure(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

// register 16 shards with junk in them, so they will fail indexing.
resCh := make(chan ShardResult, 16)
junkmnt := *junkmnt // take a copy
Expand Down Expand Up @@ -822,6 +874,9 @@ func TestFailureRecovery(t *testing.T) {
go RecoverImmediately(context.Background(), dagst, failures, 10, nil) // 10 max attempts.
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

// register 16 shards with junk in them, so they will fail indexing.
resCh := make(chan ShardResult, 16)
junkmnt := *junkmnt // take a copy
Expand Down Expand Up @@ -873,6 +928,9 @@ func TestRecoveryOnStart(t *testing.T) {
dagst, err := NewDAGStore(config)
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

// register 16 shards with junk in them, so they will fail indexing.
resCh := make(chan ShardResult, 16)
junkmnt := *junkmnt // take a copy
Expand Down Expand Up @@ -905,6 +963,9 @@ func TestRecoveryOnStart(t *testing.T) {
dagst, err = NewDAGStore(config)
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

// no events.
evts := make([]Trace, 16)
n, timedOut := sink.Read(evts, 1*time.Second)
Expand All @@ -925,6 +986,9 @@ func TestRecoveryOnStart(t *testing.T) {
dagst, err = NewDAGStore(config)
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

// 32 events: recovery and failure.
evts := make([]Trace, 32)
n, timedOut := sink.Read(evts, 1*time.Second)
Expand Down Expand Up @@ -952,6 +1016,9 @@ func TestRecoveryOnStart(t *testing.T) {
dagst, err = NewDAGStore(config)
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

// 0 events.
evts := make([]Trace, 32)
n, timedOut := sink.Read(evts, 500*time.Millisecond)
Expand Down Expand Up @@ -1035,6 +1102,9 @@ func TestFailingAcquireErrorPropagates(t *testing.T) {
dagst, err := NewDAGStore(config)
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

// create a junk mount and front it with a blocking mount.
junkmnt := *junkmnt
mnt := newBlockingMount(&junkmnt)
Expand Down Expand Up @@ -1086,6 +1156,9 @@ func TestTransientReusedOnRestart(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

ch := make(chan ShardResult, 1)
k := shard.KeyFromString("foo")
err = dagst.RegisterShard(context.Background(), k, carv2mnt, ch, RegisterOpts{})
Expand All @@ -1107,6 +1180,9 @@ func TestTransientReusedOnRestart(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

// make sure the transient is populated, and it exists.
path := dagst.shards[k].mount.TransientPath()
require.NotEmpty(t, path)
Expand Down Expand Up @@ -1137,6 +1213,9 @@ func TestAcquireFailsWhenIndexGone(t *testing.T) {
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

ch := make(chan ShardResult, 1)
k := shard.KeyFromString("foo")
err = dagst.RegisterShard(context.Background(), k, carv2mnt, ch, RegisterOpts{})
Expand Down

0 comments on commit 2e0ceed

Please sign in to comment.