Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic Retrieval pricing #6175

Merged
merged 38 commits into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
2a40c80
bypass task scheduler for reading unsealed pieces
aarshkshah1992 May 18, 2021
73613ee
docs, logs and green ci
aarshkshah1992 May 18, 2021
d33d426
Apply suggestions from code review
aarshkshah1992 May 19, 2021
c58048d
address review comments
aarshkshah1992 May 19, 2021
dd9c9fd
test http handler
aarshkshah1992 May 19, 2021
8c4c26c
fix linting problems
aarshkshah1992 May 19, 2021
ec85a97
feat: TestPieceProviderReadPiece
dirkmc May 19, 2021
31a5f68
use an actual worker in the integration tests
aarshkshah1992 May 20, 2021
9b34494
use mockgen
aarshkshah1992 May 20, 2021
bd99590
unit tests for the remote store Reader
aarshkshah1992 May 20, 2021
50d7acf
fix go mod
aarshkshah1992 May 20, 2021
c17300d
remove read task type and run gen and docsgen
aarshkshah1992 May 20, 2021
78a0458
finish integration tests
aarshkshah1992 May 21, 2021
85f2ac8
more logging
aarshkshah1992 May 21, 2021
40642b2
better logging
aarshkshah1992 May 21, 2021
536d7c4
clean up logging
aarshkshah1992 May 21, 2021
fb29f78
integration test should remove unsealed files
aarshkshah1992 May 21, 2021
8ff5bce
logs to debug read and unseal
aarshkshah1992 May 21, 2021
2c9f592
logs to debug read & unseal
aarshkshah1992 May 21, 2021
50e023e
changes as per review
aarshkshah1992 May 21, 2021
dc6dbc9
dpr changes and test based on new unsealing PR
aarshkshah1992 May 22, 2021
78255ea
test unsealing prices for default pricing strategy
aarshkshah1992 May 24, 2021
ac443cd
go mod tidy
aarshkshah1992 May 24, 2021
0f3ff9e
fix typo
aarshkshah1992 May 24, 2021
1b5c1f0
test unsealed ask
aarshkshah1992 May 24, 2021
43453bb
fix broken test
aarshkshah1992 Jun 3, 2021
05ba3de
changes as per review
aarshkshah1992 Jun 4, 2021
303ef15
fix ci
aarshkshah1992 Jun 4, 2021
1616650
merged master
aarshkshah1992 Jun 8, 2021
e4e60d7
fix mock
aarshkshah1992 Jun 8, 2021
212d0fc
fix remote store diff
aarshkshah1992 Jun 8, 2021
4e9bb16
changes as per review
aarshkshah1992 Jun 8, 2021
fed5afa
merge master
aarshkshah1992 Jun 11, 2021
ea9bed2
undo ffi change
aarshkshah1992 Jun 11, 2021
3d086df
changes as per review
aarshkshah1992 Jun 14, 2021
bed2bc7
Update node/config/def.go
aarshkshah1992 Jun 14, 2021
4419205
fix compilation
aarshkshah1992 Jun 14, 2021
9002fcb
local unsealed file wont have the unsealed piece
aarshkshah1992 Jun 17, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cmd/lotus-storage-miner/allinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,14 @@ func TestMinerAllInfo(t *testing.T) {
t.Run("pre-info-all", run)

dh := kit.NewDealHarness(t, client, miner)
dh.MakeFullDeal(context.Background(), 6, false, false, 0)
_, _, _ = dh.MakeFullDeal(kit.MakeFullDealParams{
Ctx: context.Background(),
Rseed: 6,
CarExport: false,
FastRet: false,
StartEpoch: 0,
DoRetrieval: true,
})

t.Run("post-info-all", run)
}
2 changes: 1 addition & 1 deletion cmd/lotus-storage-miner/retrieval-deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ var retrievalSetAskCmd = &cli.Command{

var retrievalGetAskCmd = &cli.Command{
Name: "get-ask",
Usage: "Get the provider's current retrieval ask",
Usage: "Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command",
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
ctx := lcli.DaemonContext(cctx)
Expand Down
4 changes: 2 additions & 2 deletions documentation/en/cli-lotus-miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ COMMANDS:
selection Configure acceptance criteria for retrieval deal proposals
list List all active retrieval deals for this miner
set-ask Configure the provider's retrieval ask
get-ask Get the provider's current retrieval ask
get-ask Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command
help, h Shows a list of commands or help for one command

OPTIONS:
Expand Down Expand Up @@ -848,7 +848,7 @@ OPTIONS:
### lotus-miner retrieval-deals get-ask
```
NAME:
lotus-miner retrieval-deals get-ask - Get the provider's current retrieval ask
lotus-miner retrieval-deals get-ask - Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command

USAGE:
lotus-miner retrieval-deals get-ask [command options] [arguments...]
Expand Down
4 changes: 4 additions & 0 deletions extern/sector-storage/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func (mgr *SectorMgr) AcquireSectorNumber() (abi.SectorNumber, error) {
return id, nil
}

func (mgr *SectorMgr) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
return false, nil
}

func (mgr *SectorMgr) ForceState(sid storage.SectorRef, st int) error {
mgr.lk.Lock()
ss, ok := mgr.sectors[sid.ID]
Expand Down
23 changes: 23 additions & 0 deletions extern/sector-storage/piece_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ type Unsealer interface {
type PieceProvider interface {
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
}

var _ PieceProvider = &pieceProvider{}

type pieceProvider struct {
storage *stores.Remote
index stores.SectorIndex
Expand All @@ -40,6 +43,26 @@ func NewPieceProvider(storage *stores.Remote, index stores.SectorIndex, uns Unse
}
}

// IsUnsealed checks if we have the unsealed piece at the given offset in an already
// existing unsealed file either locally or on any of the workers.
func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
if err := offset.Valid(); err != nil {
return false, xerrors.Errorf("offset is not valid: %w", err)
}
if err := size.Validate(); err != nil {
return false, xerrors.Errorf("size is not a valid piece size: %w", err)
}

ctxLock, cancel := context.WithCancel(ctx)
defer cancel()

if err := p.index.StorageLock(ctxLock, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
return false, xerrors.Errorf("acquiring read sector lock: %w", err)
}

return p.storage.CheckIsUnsealed(ctxLock, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded())
}

// tryReadUnsealedPiece will try to read the unsealed piece from an existing unsealed sector file for the given sector from any worker that has it.
// It will NOT try to schedule an Unseal of a sealed sector file for the read.
//
Expand Down
29 changes: 29 additions & 0 deletions extern/sector-storage/piece_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,32 @@ func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) {
// pre-commit 1
preCommit1 := ppt.preCommit1(t)

// check if IsUnsealed -> true
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size))
// read piece
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
false, pieceData)

// pre-commit 2
ppt.preCommit2(t, preCommit1)

// check if IsUnsealed -> true
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size))
// read piece
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
false, pieceData)

// finalize -> nil here will remove unsealed file
ppt.finalizeSector(t, nil)

// check if IsUnsealed -> false
require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size))
// Read the piece -> will have to unseal
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
true, pieceData)

// check if IsUnsealed -> true
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size))
// read the piece -> will not have to unseal
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
false, pieceData)
Expand Down Expand Up @@ -118,12 +126,18 @@ func TestReadPieceRemoteWorkers(t *testing.T) {

// pre-commit 1
pC1 := ppt.preCommit1(t)

// check if IsUnsealed -> true
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size))
// Read the piece -> no need to unseal
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
false, pd1)

// pre-commit 2
ppt.preCommit2(t, pC1)

// check if IsUnsealed -> true
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size))
// Read the piece -> no need to unseal
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
false, pd1)
Expand All @@ -133,6 +147,8 @@ func TestReadPieceRemoteWorkers(t *testing.T) {
// sending nil here will remove all unsealed files after sector is finalized.
ppt.finalizeSector(t, nil)

// check if IsUnsealed -> false
require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size))
// Read the piece -> have to unseal since we removed the file.
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
true, pd1)
Expand All @@ -142,14 +158,21 @@ func TestReadPieceRemoteWorkers(t *testing.T) {

// remove the unsealed file and read again -> will have to unseal.
ppt.removeAllUnsealedSectorFiles(t)
// check if IsUnsealed -> false
require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size))
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
true, pd1)

// check if IsUnsealed -> true
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(pd1size), pd2size))
// Read Piece 2 -> no unsealing as it got unsealed above.
ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, false, pd2)

// remove all unseal files -> Read Piece 2 -> will have to Unseal.
ppt.removeAllUnsealedSectorFiles(t)

// check if IsUnsealed -> false
require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(pd1size), pd2size))
ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, true, pd2)
}

Expand Down Expand Up @@ -306,6 +329,12 @@ func (p *pieceProviderTestHarness) preCommit2(t *testing.T, pc1 specstorage.PreC
p.commD = commD
}

func (p *pieceProviderTestHarness) isUnsealed(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) bool {
b, err := p.pp.IsUnsealed(p.ctx, p.sector, offset, size)
require.NoError(t, err)
return b
}

func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize,
expectedHadToUnseal bool, expectedBytes []byte) {
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD)
Expand Down
100 changes: 90 additions & 10 deletions extern/sector-storage/stores/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,87 @@ func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.Pa
return resp.Body, nil
}

// CheckIsUnsealed checks if we have an unsealed piece at the given offset in an already unsealed sector file for the given piece
// either locally or on any of the workers.
// Returns true if we have the unsealed piece, false otherwise.
func (r *Remote) CheckIsUnsealed(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (bool, error) {
ft := storiface.FTUnsealed

paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
return false, xerrors.Errorf("acquire local: %w", err)
}

path := storiface.PathByType(paths, ft)
if path != "" {
// if we have the unsealed file locally, check if it has the unsealed piece.
log.Infof("Read local %s (+%d,%d)", path, offset, size)
nonsense marked this conversation as resolved.
Show resolved Hide resolved
raulk marked this conversation as resolved.
Show resolved Hide resolved
ssize, err := s.ProofType.SectorSize()
if err != nil {
return false, err
}

// open the unsealed sector file for the given sector size located at the given path.
pf, err := r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
return false, xerrors.Errorf("opening partial file: %w", err)
}
log.Debugf("local partial file opened %s (+%d,%d)", path, offset, size)

// even though we have an unsealed file for the given sector, we still need to determine if we have the unsealed piece
// in the unsealed sector file. That is what `HasAllocated` checks for.
has, err := r.pfHandler.HasAllocated(pf, storiface.UnpaddedByteIndex(offset.Unpadded()), size.Unpadded())
if err != nil {
return false, xerrors.Errorf("has allocated: %w", err)
}

// close the local unsealed file.
if err := r.pfHandler.Close(pf); err != nil {
return false, xerrors.Errorf("failed to close partial file: %s", err)
}
log.Debugf("checked if local partial file has the piece %s (+%d,%d), returning answer=%t", path, offset, size, has)

// Sector files can technically not have a piece unsealed locally, but have it unsealed in remote storage, so we probably
// want to return only if has is true
if has {
return has, nil
}
}

// --- We don't have the unsealed piece in an unsealed sector file locally
// Check if we have it in a remote cluster.

si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false)
if err != nil {
return false, xerrors.Errorf("StorageFindSector: %s", err)
}

if len(si) == 0 {
return false, nil
}

sort.Slice(si, func(i, j int) bool {
return si[i].Weight < si[j].Weight
})

for _, info := range si {
for _, url := range info.URLs {
ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size)
if err != nil {
log.Warnw("check if remote has piece", "url", url, "error", err)
continue
}
if !ok {
continue
}

return true, nil
}
}

return false, nil
}

// Reader returns a reader for an unsealed piece at the given offset in the given sector.
// If the Miner has the unsealed piece locally, it will return a reader that reads from the local copy.
// If the Miner does NOT have the unsealed piece locally, it will query all workers that have the unsealed sector file
Expand All @@ -507,7 +588,7 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
if path != "" {
// if we have the unsealed file locally, return a reader that can be used to read the contents of the
// unsealed piece.
log.Infof("Read local %s (+%d,%d)", path, offset, size)
log.Debugf("Check local %s (+%d,%d)", path, offset, size)
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
ssize, err := s.ProofType.SectorSize()
if err != nil {
return nil, err
Expand All @@ -529,19 +610,18 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
}
log.Debugf("check if partial file is allocated %s (+%d,%d)", path, offset, size)

if !has {
log.Debugf("miner has unsealed file but not unseal piece, %s (+%d,%d)", path, offset, size)
if err := r.pfHandler.Close(pf); err != nil {
return nil, xerrors.Errorf("close partial file: %w", err)
}
return nil, nil
if has {
log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size)
return r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset), size)
}

log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size)
return r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset), size)
log.Debugf("miner has unsealed file but not unseal piece, %s (+%d,%d)", path, offset, size)
if err := r.pfHandler.Close(pf); err != nil {
return nil, xerrors.Errorf("close partial file: %w", err)
}
}

// --- We don't have the unsealed sector file locally
// --- We don't have the unsealed piece in an unsealed sector file locally

// if we don't have the unsealed sector file locally, we'll first lookup the Miner Sector Store Index
// to determine which workers have the unsealed file and then query those workers to know
Expand Down
Loading