Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
carstore tests complete
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed Jul 13, 2022
1 parent 6979b36 commit 570af5a
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 83 deletions.
5 changes: 4 additions & 1 deletion carstore/carstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
secondMissDuration = 24 * time.Hour
maxRecoverAttempts = uint64(1)
defaultMaxSize = int64(200 * 1073741824) // 200 Gib
defaultDownloadTimeout = 20 * time.Minute
)

var (
Expand Down Expand Up @@ -297,7 +298,9 @@ func (cs *CarStore) executeCacheMiss(reqID uuid.UUID, root cid.Cid) {
cs.downloading[mhkey] = struct{}{}

go func(mhkey string) {
sa, err := helpers.RegisterAndAcquireSync(cs.ctx, cs.dagst, keyFromCIDMultihash(root), mnt, dagstore.RegisterOpts{}, dagstore.AcquireOpts{})
ctx, cancel := context.WithDeadline(cs.ctx, time.Now().Add(defaultDownloadTimeout))
defer cancel()
sa, err := helpers.RegisterAndAcquireSync(ctx, cs.dagst, keyFromCIDMultihash(root), mnt, dagstore.RegisterOpts{}, dagstore.AcquireOpts{})
if err == nil {
cs.logger.Infow(reqID, "successfully downloaded and cached given root")
sa.Close()
Expand Down
204 changes: 125 additions & 79 deletions carstore/carstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"testing"
"time"

cid "github.com/ipfs/go-cid"

"golang.org/x/sync/errgroup"

"github.com/filecoin-project/dagstore"
Expand All @@ -23,149 +25,148 @@ import (
)

func TestPersistentCache(t *testing.T) {
lg := logs.NewSaturnLogger()
cfg := Config{}

ctx := context.Background()
temp := t.TempDir()

carv1File := "../testdata/files/sample-v1.car"
rootcid, bz := testutils.ParseCar(t, carv1File)

svc := testutils.GetTestServer(t, rootcid.String(), bz)
defer svc.Close()
cs, err := New(temp, &gatewayAPI{
baseURL: svc.URL,
}, cfg, lg)
require.NoError(t, err)
require.NoError(t, cs.Start(ctx))

csh := newCarStoreHarness(t, svc.URL)
reqID := uuid.New()

// first hit -> not found
err = cs.FetchAndWriteCAR(reqID, rootcid, func(_ bstore.Blockstore) error {
return nil
})
require.EqualValues(t, err, ErrNotFound)
csh.fetchAndAssertNotFound(reqID, rootcid)

// second hit -> not found
err = cs.FetchAndWriteCAR(reqID, rootcid, func(_ bstore.Blockstore) error {
return nil
})
require.EqualValues(t, err, ErrNotFound)
csh.fetchAndAssertNotFound(reqID, rootcid)

require.Eventually(t, func() bool {
ks, err := cs.dagst.ShardsContainingMultihash(ctx, rootcid.Hash())
ks, err := csh.cs.dagst.ShardsContainingMultihash(ctx, rootcid.Hash())
return err == nil && len(ks) == 1
}, 50*time.Second, 200*time.Millisecond)

// third hit -> found
err = cs.FetchAndWriteCAR(reqID, rootcid, func(bs bstore.Blockstore) error {
blk, err := bs.Get(ctx, rootcid)
if err != nil {
return err
}
if blk == nil {
return errors.New("empty root")
}
return nil
})
require.NoError(t, err)
csh.fetchAndAssertFound(ctx, reqID, rootcid)

// fourth hit -> found
err = cs.FetchAndWriteCAR(reqID, rootcid, func(bs bstore.Blockstore) error {
blk, err := bs.Get(ctx, rootcid)
if err != nil {
return err
}
if blk == nil {
return errors.New("empty root")
}
return nil
})
require.NoError(t, err)
csh.fetchAndAssertFound(ctx, reqID, rootcid)

// wait for shard to become reclaimable again
require.Eventually(t, func() bool {
si, err := cs.dagst.GetShardInfo(keyFromCIDMultihash(rootcid))
si, err := csh.cs.dagst.GetShardInfo(keyFromCIDMultihash(rootcid))
return err == nil && si.ShardState == dagstore.ShardStateAvailable
}, 50*time.Second, 200*time.Millisecond)

// run dagstore GC -> CAR file is removed
res, err := cs.dagst.GC(ctx)
res, err := csh.cs.dagst.GC(ctx)
require.NoError(t, err)
require.Len(t, res.Shards, 1)

// fetch car -> fails as we do not have it but will trigger a fetch again
err = cs.FetchAndWriteCAR(reqID, rootcid, func(_ bstore.Blockstore) error {
return nil
})
require.EqualValues(t, err, ErrNotFound)
csh.fetchAndAssertNotFound(reqID, rootcid)

// fetch car -> works now as car file was downloaded in the previous fetch
require.Eventually(t, func() bool {
err = cs.FetchAndWriteCAR(reqID, rootcid, func(_ bstore.Blockstore) error {
err = csh.cs.FetchAndWriteCAR(reqID, rootcid, func(_ bstore.Blockstore) error {
return nil
})
return err == nil
}, 50*time.Second, 200*time.Millisecond)

require.NoError(t, cs.Close())
require.NoError(t, csh.cs.Close())
}

func TestPersistentCacheConcurrent(t *testing.T) {
lg := logs.NewSaturnLogger()
cfg := Config{}

ctx := context.Background()
temp := t.TempDir()

carv1File := "../testdata/files/sample-v1.car"
rootcid, bz := testutils.ParseCar(t, carv1File)

svc := testutils.GetTestServer(t, rootcid.String(), bz)
defer svc.Close()
cs, err := New(temp, &gatewayAPI{
baseURL: svc.URL,
}, cfg, lg)
require.NoError(t, err)
require.NoError(t, cs.Start(ctx))
csh := newCarStoreHarness(t, svc.URL)

// send 100 concurrent requests
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
cs.FetchAndWriteCAR(uuid.New(), rootcid, func(_ bstore.Blockstore) error {
return nil
})
}()
}
wg.Wait()
csh.fetchNAsyNC(rootcid, 100)

require.Eventually(t, func() bool {
ks, err := cs.dagst.ShardsContainingMultihash(ctx, rootcid.Hash())
ks, err := csh.cs.dagst.ShardsContainingMultihash(ctx, rootcid.Hash())
return err == nil && len(ks) == 1
}, 50*time.Second, 200*time.Millisecond)

// fetch shard 100 times -> should work
var errg errgroup.Group
for i := 0; i < 100; i++ {
errg.Go(func() error {

return cs.FetchAndWriteCAR(uuid.New(), rootcid, func(_ bstore.Blockstore) error {
return csh.cs.FetchAndWriteCAR(uuid.New(), rootcid, func(_ bstore.Blockstore) error {
return nil
})
})

}
require.NoError(t, errg.Wait())
}

// dagstore should only have one copy
sz, err := transientDirSize(filepath.Join(temp, "transients"))
require.NoError(t, err)
require.EqualValues(t, len(bz), sz)
func TestMountFetchErrorConcurrent(t *testing.T) {
carv1File := "../testdata/files/sample-v1.car"
rootcid, _ := testutils.ParseCar(t, carv1File)
svc := testutils.GetTestErrorServer(t)
defer svc.Close()
csh := newCarStoreHarness(t, svc.URL)

// send 100 concurrent requests
csh.fetchNAsyNC(rootcid, 100)

// fetch 100 times -> all fail and no panic
errCh := make(chan error, 100)

for i := 0; i < 100; i++ {
go func() {
errCh <- csh.cs.FetchAndWriteCAR(uuid.New(), rootcid, func(_ bstore.Blockstore) error {
return nil
})
}()
}

for i := 0; i < 100; i++ {
err := <-errCh
require.EqualError(t, err, ErrNotFound.Error())
}
}

func TestDownloadTimeout(t *testing.T) {
carv1File := "../testdata/files/sample-v1.car"
rootcid, _ := testutils.ParseCar(t, carv1File)
x := defaultDownloadTimeout
defer func() {
defaultDownloadTimeout = x
}()
defaultDownloadTimeout = 100 * time.Millisecond
svc := testutils.GetTestHangingServer(t)
csh := newCarStoreHarness(t, svc.URL)

reqID := uuid.New()
// first try -> not found
csh.fetchAndAssertNotFound(reqID, rootcid)

// second try -> not found
csh.fetchAndAssertNotFound(reqID, rootcid)
time.Sleep(1 * time.Second)

// still errors out
csh.fetchAndAssertNotFound(reqID, rootcid)
}

func (csh *carstoreHarness) fetchNAsyNC(rootCid cid.Cid, n int) {
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
csh.cs.FetchAndWriteCAR(uuid.New(), rootCid, func(_ bstore.Blockstore) error {
return nil
})
}()
}
wg.Wait()
}

func transientDirSize(dir string) (int64, error) {
Expand All @@ -181,3 +182,48 @@ func transientDirSize(dir string) (int64, error) {
})
return size, err
}

type carstoreHarness struct {
t *testing.T
cs *CarStore
}

func newCarStoreHarness(t *testing.T, apiurl string) *carstoreHarness {
lg := logs.NewSaturnLogger()
cfg := Config{}

ctx := context.Background()
temp := t.TempDir()

cs, err := New(temp, &gatewayAPI{
baseURL: apiurl,
}, cfg, lg)
require.NoError(t, err)
require.NoError(t, cs.Start(ctx))

return &carstoreHarness{
cs: cs,
t: t,
}
}

func (csh *carstoreHarness) fetchAndAssertNotFound(reqID uuid.UUID, rootCid cid.Cid) {
err := csh.cs.FetchAndWriteCAR(reqID, rootCid, func(_ bstore.Blockstore) error {
return nil
})
require.EqualValues(csh.t, err, ErrNotFound)
}

func (csh *carstoreHarness) fetchAndAssertFound(ctx context.Context, reqID uuid.UUID, rootCid cid.Cid) {
err := csh.cs.FetchAndWriteCAR(reqID, rootCid, func(bs bstore.Blockstore) error {
blk, err := bs.Get(ctx, rootCid)
if err != nil {
return err
}
if blk == nil {
return errors.New("empty root")
}
return nil
})
require.NoError(csh.t, err)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/filecoin-project/dagstore v0.5.3-0.20220713123034-28520b6998ac
github.com/filecoin-project/dagstore v0.5.3-0.20220713170614-1cd6e2f9a6bd
github.com/golang/snappy v0.0.3 // indirect
github.com/google/uuid v1.3.0
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/filecoin-project/dagstore v0.5.3-0.20220713123034-28520b6998ac h1:sD6egfpgKWH85lmpOfCxHCmRiKmP010sYvjFEaGg020=
github.com/filecoin-project/dagstore v0.5.3-0.20220713123034-28520b6998ac/go.mod h1:AIh49K94YHHrJZsHD9Puza4VciBmvG+Y9jqZtE4ie8Y=
github.com/filecoin-project/dagstore v0.5.3-0.20220713170614-1cd6e2f9a6bd h1:Ku0nAmbYIwTt56KQXdDvSEovPDtLlYp8YdBpRMI755Y=
github.com/filecoin-project/dagstore v0.5.3-0.20220713170614-1cd6e2f9a6bd/go.mod h1:AIh49K94YHHrJZsHD9Puza4VciBmvG+Y9jqZtE4ie8Y=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ=
github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ=
Expand Down
14 changes: 14 additions & 0 deletions testutils/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ func GetTestServerFor(t *testing.T, path string) (cid.Cid, []byte, *httptest.Ser
return root, contents, GetTestServer(t, root.String(), contents)
}

func GetTestHangingServer(t *testing.T) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
for {

}
}))
}

func GetTestServer(t *testing.T, root string, out []byte) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
Expand All @@ -41,3 +49,9 @@ func GetTestServer(t *testing.T, root string, out []byte) *httptest.Server {
w.Write(out)
}))
}

func GetTestErrorServer(t *testing.T) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "bad req", http.StatusInternalServerError)
}))
}

0 comments on commit 570af5a

Please sign in to comment.