Skip to content

Commit

Permalink
Merge pull request #4424 from filecoin-project/feat/retrieval-cli-dea…
Browse files Browse the repository at this point in the history
…l-filter

Custom filters for retrieval deals
  • Loading branch information
magik6k authored Oct 16, 2020
2 parents 40e34b3 + d0e4150 commit 111942b
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 51 deletions.
60 changes: 41 additions & 19 deletions markets/dealfilter/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,57 @@ import (
"encoding/json"
"os/exec"

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"

"github.com/filecoin-project/lotus/node/modules/dtypes"
)

func CliDealFilter(cmd string) dtypes.DealFilter {
// TODO: run some checks on the cmd string

func CliStorageDealFilter(cmd string) dtypes.StorageDealFilter {
return func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) {
j, err := json.MarshalIndent(deal, "", " ")
if err != nil {
return false, "", err
d := struct {
storagemarket.MinerDeal
DealType string
}{
MinerDeal: deal,
DealType: "storage",
}
return runDealFilter(ctx, cmd, d)
}
}

var out bytes.Buffer
func CliRetrievalDealFilter(cmd string) dtypes.RetrievalDealFilter {
return func(ctx context.Context, deal retrievalmarket.ProviderDealState) (bool, string, error) {
d := struct {
retrievalmarket.ProviderDealState
DealType string
}{
ProviderDealState: deal,
DealType: "retrieval",
}
return runDealFilter(ctx, cmd, d)
}
}

func runDealFilter(ctx context.Context, cmd string, deal interface{}) (bool, string, error) {
j, err := json.MarshalIndent(deal, "", " ")
if err != nil {
return false, "", err
}

c := exec.Command("sh", "-c", cmd)
c.Stdin = bytes.NewReader(j)
c.Stdout = &out
c.Stderr = &out
var out bytes.Buffer

switch err := c.Run().(type) {
case nil:
return true, "", nil
case *exec.ExitError:
return false, out.String(), nil
default:
return false, "filter cmd run error", err
}
c := exec.Command("sh", "-c", cmd)
c.Stdin = bytes.NewReader(j)
c.Stdout = &out
c.Stderr = &out

switch err := c.Run().(type) {
case nil:
return true, "", nil
case *exec.ExitError:
return false, out.String(), nil
default:
return false, "filter cmd run error", err
}
}
9 changes: 7 additions & 2 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ func Online() Option {
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
Override(new(dtypes.DealFilter), modules.BasicDealFilter(nil)),
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)),
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)),
Override(new(modules.ProviderDealFunds), modules.NewProviderDealFunds),
Override(new(storagemarket.StorageProvider), modules.StorageProvider),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)),
Expand Down Expand Up @@ -484,7 +485,11 @@ func ConfigStorageMiner(c interface{}) Option {
ConfigCommon(&cfg.Common),

If(cfg.Dealmaking.Filter != "",
Override(new(dtypes.DealFilter), modules.BasicDealFilter(dealfilter.CliDealFilter(cfg.Dealmaking.Filter))),
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(dealfilter.CliStorageDealFilter(cfg.Dealmaking.Filter))),
),

If(cfg.Dealmaking.RetrievalFilter != "",
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))),
),

Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees)),
Expand Down
3 changes: 2 additions & 1 deletion node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ type DealmakingConfig struct {
PieceCidBlocklist []cid.Cid
ExpectedSealDuration Duration

Filter string
Filter string
RetrievalFilter string
}

type SealingConfig struct {
Expand Down
4 changes: 3 additions & 1 deletion node/modules/dtypes/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ipfs/go-cid"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"

Expand Down Expand Up @@ -71,4 +72,5 @@ type SetExpectedSealDurationFunc func(time.Duration) error
// too determine how long sealing is expected to take
type GetExpectedSealDurationFunc func() (time.Duration, error)

type DealFilter func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error)
type StorageDealFilter func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error)
type RetrievalDealFilter func(ctx context.Context, deal retrievalmarket.ProviderDealState) (bool, string, error)
78 changes: 50 additions & 28 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,16 +412,16 @@ func NewProviderDealFunds(ds dtypes.MetadataDS) (ProviderDealFunds, error) {
return funds.NewDealFunds(ds, datastore.NewKey("/marketfunds/provider"))
}

func BasicDealFilter(user dtypes.DealFilter) func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc,
blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc,
expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc,
spn storagemarket.StorageProviderNode) dtypes.DealFilter {
spn storagemarket.StorageProviderNode) dtypes.StorageDealFilter {
return func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc,
blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc,
expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc,
spn storagemarket.StorageProviderNode) dtypes.DealFilter {
spn storagemarket.StorageProviderNode) dtypes.StorageDealFilter {

return func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) {
b, err := onlineOk()
Expand Down Expand Up @@ -497,7 +497,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
pieceStore dtypes.ProviderPieceStore,
dataTransfer dtypes.ProviderDataTransfer,
spn storagemarket.StorageProviderNode,
df dtypes.DealFilter,
df dtypes.StorageDealFilter,
funds ProviderDealFunds,
) (storagemarket.StorageProvider, error) {
net := smnet.NewFromLibp2pHost(h)
Expand All @@ -511,8 +511,52 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, mds, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, funds, opt)
}

func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalDealFilter {
return func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalDealFilter {
return func(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error) {
b, err := onlineOk()
if err != nil {
return false, "miner error", err
}

if !b {
log.Warn("online retrieval deal consideration disabled; rejecting retrieval deal proposal from client")
return false, "miner is not accepting online retrieval deals", nil
}

b, err = offlineOk()
if err != nil {
return false, "miner error", err
}

if !b {
log.Info("offline retrieval has not been implemented yet")
}

if userFilter != nil {
return userFilter(ctx, state)
}

return true, "", nil
}
}
}

// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.SectorManager, full lapi.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, mds dtypes.StagingMultiDstore, dt dtypes.ProviderDataTransfer, onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) (retrievalmarket.RetrievalProvider, error) {
func RetrievalProvider(h host.Host,
miner *storage.Miner,
sealer sectorstorage.SectorManager,
full lapi.FullNode,
ds dtypes.MetadataDS,
pieceStore dtypes.ProviderPieceStore,
mds dtypes.StagingMultiDstore,
dt dtypes.ProviderDataTransfer,
onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc,
userFilter dtypes.RetrievalDealFilter,
) (retrievalmarket.RetrievalProvider, error) {
adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full)

maddr, err := minerAddrFromDS(ds)
Expand All @@ -521,29 +565,7 @@ func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.S
}

netwk := rmnet.NewFromLibp2pHost(h)

opt := retrievalimpl.DealDeciderOpt(func(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error) {
b, err := onlineOk()
if err != nil {
return false, "miner error", err
}

if !b {
log.Warn("online retrieval deal consideration disabled; rejecting retrieval deal proposal from client")
return false, "miner is not accepting online retrieval deals", nil
}

b, err = offlineOk()
if err != nil {
return false, "miner error", err
}

if !b {
log.Info("offline retrieval has not been implemented yet")
}

return true, "", nil
})
opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter))

return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt)
}
Expand Down

0 comments on commit 111942b

Please sign in to comment.