Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
Rate limit concurrent cache misses (#87)
Browse files Browse the repository at this point in the history
* rate limit concurrent cache misses and update dagstore

* update go.sum
  • Loading branch information
aarshkshah1992 authored Sep 30, 2022
1 parent fe72d8e commit cc15534
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 23 deletions.
100 changes: 80 additions & 20 deletions carstore/carstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ var (
maxConcurrentReadyFetches = 3
secondMissDuration = 24 * time.Hour
maxRecoverAttempts = uint64(1)
defaultDownloadTimeout = 20 * time.Minute
defaultDownloadTimeout = 45 * time.Minute
nConcurrentDownloads = 3
nMaxCacheMissBuffer = 20
)

var (
Expand All @@ -59,6 +61,11 @@ type Config struct {
DownloadTimeout time.Duration
}

type cacheMissReq struct {
reqId uuid.UUID
root cid.Cid
}

type CarStore struct {
ctx context.Context
cancel context.CancelFunc
Expand All @@ -78,6 +85,9 @@ type CarStore struct {

transientsDir string
downloadTimeout time.Duration

cacheMissBuffer chan cacheMissReq
cacheMissSemaphore chan struct{}
}

func New(rootDir string, gwAPI GatewayAPI, cfg Config, logger *logs.SaturnLogger) (*CarStore, error) {
Expand Down Expand Up @@ -153,6 +163,9 @@ func New(rootDir string, gwAPI GatewayAPI, cfg Config, logger *logs.SaturnLogger
logger: logger.Subsystem("car-store"),
transientsDir: transientsDir,
downloadTimeout: downloadTimeout,

cacheMissBuffer: make(chan cacheMissReq, nMaxCacheMissBuffer),
cacheMissSemaphore: make(chan struct{}, nConcurrentDownloads),
}, nil
}

Expand All @@ -169,6 +182,9 @@ func (cs *CarStore) Start(ctx context.Context) error {
cs.wg.Add(1)
go cs.traceLoop()

cs.wg.Add(1)
go cs.cacheMissLoop()

err := cs.dagst.Start(ctx)
if err == nil {
log.Info("successfully started car store")
Expand Down Expand Up @@ -206,8 +222,9 @@ func (cs *CarStore) gcTraceLoop() {
for {
select {
case res := <-cs.gcCh:
log.Debugw("shard reclaimed by automated gc", "shard", res.ReclaimedShard,
"disk-size-after-reclaim", res.TransientsDirSizeAfterReclaim, "disk-size-before-reclaim", res.TransientsDirSizeBeforeReclaim)
log.Infow("shard reclaimed by automated gc", "shard", res.ReclaimedShard,
"disk-size-after-reclaim", res.TransientsDirSizeAfterReclaim, "disk-size-before-reclaim", res.TransientsDirSizeBeforeReclaim,
"accounted-before", res.TransientsAccountingBeforeReclaim, "accounted-after", res.TransientsAccountingAfterReclaim)
case <-cs.ctx.Done():
return
}
Expand Down Expand Up @@ -245,6 +262,7 @@ func (cs *CarStore) FetchAndWriteCAR(reqID uuid.UUID, root cid.Cid, writer func(
}

if err == nil && len(sks) != 0 {
cs.logger.Infow(reqID, "found shards containing the requested cid")
var sa *dagstore.ShardAccessor

// among all the shards that have the requested root, select the first shard that we already have the CAR for locally.
Expand All @@ -263,7 +281,15 @@ func (cs *CarStore) FetchAndWriteCAR(reqID uuid.UUID, root cid.Cid, writer func(
// and return not found here.
if sa == nil {
cs.logger.Infow(reqID, "failed to acquire shard with nodownload=true, will execute the cache miss code", "err", err)
cs.executeCacheMiss(reqID, root)
// block and backpressure the L1 if we have too many concurrent downloads
select {
case cs.cacheMissBuffer <- cacheMissReq{reqId: reqID, root: root}:
cs.logger.Infow(reqID, "queued to cache miss buffer")
case <-cs.ctx.Done():
return cs.ctx.Err()
default:
cs.logger.Infow(reqID, "dropping cache miss request as no space in buffer")
}
return ErrNotFound
}
defer sa.Close()
Expand All @@ -279,12 +305,33 @@ func (cs *CarStore) FetchAndWriteCAR(reqID uuid.UUID, root cid.Cid, writer func(
return writer(&blockstore{bs})
}

// we don't have the requested CAR -> apply "cache on second miss" rule
cs.executeCacheMiss(reqID, root)
// do not execute the cache miss if we don't have the capacity to do so
select {
case cs.cacheMissBuffer <- cacheMissReq{reqId: reqID, root: root}:
cs.logger.Infow(reqID, "queued to cache miss buffer")
case <-cs.ctx.Done():
return cs.ctx.Err()
default:
cs.logger.Infow(reqID, "dropping cache miss request as no space in buffer")
}

return ErrNotFound
}

func (cs *CarStore) cacheMissLoop() {
defer cs.wg.Done()

for {
select {
case req := <-cs.cacheMissBuffer:
cs.logger.Infow(req.reqId, "dequeued request from cache miss buffer")
cs.executeCacheMiss(req.reqId, req.root)
case <-cs.ctx.Done():
return
}
}
}

func (cs *CarStore) executeCacheMiss(reqID uuid.UUID, root cid.Cid) {
cs.mu.Lock()
defer cs.mu.Unlock()
Expand Down Expand Up @@ -313,21 +360,34 @@ func (cs *CarStore) executeCacheMiss(reqID uuid.UUID, root cid.Cid) {
mnt := &GatewayMount{RootCID: root, API: cs.gwAPI}
cs.downloading[mhkey] = struct{}{}

go func(mhkey string) {
ctx, cancel := context.WithDeadline(cs.ctx, time.Now().Add(cs.downloadTimeout))
defer cancel()
sa, err := helpers.RegisterAndAcquireSync(ctx, cs.dagst, keyFromCIDMultihash(root), mnt, dagstore.RegisterOpts{}, dagstore.AcquireOpts{})
if err == nil {
cs.logger.Infow(reqID, "successfully downloaded and cached given root")
sa.Close()
} else {
cs.logger.LogError(reqID, "download failed as failed to register/acquire shard", err)
}
select {
case cs.cacheMissSemaphore <- struct{}{}:
cs.logger.Infow(reqID, "acquired cache miss semaphore")
cs.wg.Add(1)
go func(mhkey string) {
defer func() {
<-cs.cacheMissSemaphore
cs.logger.Infow(reqID, "released cache miss semaphore")
cs.mu.Lock()
delete(cs.downloading, mhkey)
cs.mu.Unlock()
cs.wg.Done()
}()

ctx, cancel := context.WithDeadline(cs.ctx, time.Now().Add(cs.downloadTimeout))
defer cancel()
sa, err := helpers.RegisterAndAcquireSync(ctx, cs.dagst, keyFromCIDMultihash(root), mnt, dagstore.RegisterOpts{}, dagstore.AcquireOpts{})
if err == nil {
cs.logger.Infow(reqID, "successfully downloaded and cached given root")
sa.Close()
} else {
cs.logger.LogError(reqID, "download failed as failed to register/acquire shard", err)
}

cs.mu.Lock()
delete(cs.downloading, mhkey)
cs.mu.Unlock()
}(mhkey)
}(mhkey)
case <-cs.ctx.Done():
return
}
}

func (cs *CarStore) Stat() (station.StorageStats, error) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/filecoin-project/saturn-l2
go 1.18

require (
github.com/filecoin-project/dagstore v0.5.3-0.20220920122009-a359291f19ff
github.com/filecoin-project/dagstore v0.5.3-0.20220930091127-95a7d39bc17e
github.com/filecoin-project/go-address v1.0.0
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.7.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/filecoin-project/dagstore v0.5.3-0.20220920122009-a359291f19ff h1:m1+NsnqCJg4KKaJ0Zkq81B8oFsk8m9CPIeu9AVzb0+A=
github.com/filecoin-project/dagstore v0.5.3-0.20220920122009-a359291f19ff/go.mod h1:AIh49K94YHHrJZsHD9Puza4VciBmvG+Y9jqZtE4ie8Y=
github.com/filecoin-project/dagstore v0.5.3-0.20220930091127-95a7d39bc17e h1:mdetZehqskz1eUcoUTk5Z/LgchijQRZ6pz45HgSWW5s=
github.com/filecoin-project/dagstore v0.5.3-0.20220930091127-95a7d39bc17e/go.mod h1:AIh49K94YHHrJZsHD9Puza4VciBmvG+Y9jqZtE4ie8Y=
github.com/filecoin-project/go-address v1.0.0 h1:IrexI0kpADLaPP+CdmU3CVAUqnW/FQC0KTmz4lVKiFU=
github.com/filecoin-project/go-address v1.0.0/go.mod h1:5t3z6qPmIADZBtuE9EIzi0EwzcRy2nVhpo0I/c1r0OA=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
Expand Down

0 comments on commit cc15534

Please sign in to comment.