diff --git a/documentation/en/default-lotus-config.toml b/documentation/en/default-lotus-config.toml index 1074bd11702..3cb8977b290 100644 --- a/documentation/en/default-lotus-config.toml +++ b/documentation/en/default-lotus-config.toml @@ -108,11 +108,18 @@ #IpfsUseForRetrieval = false # The maximum number of simultaneous data transfers between the client - # and storage providers + # and storage providers for storage deals # # type: uint64 - # env var: LOTUS_CLIENT_SIMULTANEOUSTRANSFERS - #SimultaneousTransfers = 20 + # env var: LOTUS_CLIENT_SIMULTANEOUSTRANSFERSFORSTORAGE + #SimultaneousTransfersForStorage = 20 + + # The maximum number of simultaneous data transfers between the client + # and storage providers for retrieval deals + # + # type: uint64 + # env var: LOTUS_CLIENT_SIMULTANEOUSTRANSFERSFORRETRIEVAL + #SimultaneousTransfersForRetrieval = 20 [Wallet] diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index 7748d94c46c..9394d585852 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -201,11 +201,17 @@ # env var: LOTUS_DEALMAKING_MAXSTAGINGDEALSBYTES #MaxStagingDealsBytes = 0 - # The maximum number of parallel online data transfers (storage+retrieval) + # The maximum number of parallel online data transfers for storage deals # # type: uint64 - # env var: LOTUS_DEALMAKING_SIMULTANEOUSTRANSFERS - #SimultaneousTransfers = 20 + # env var: LOTUS_DEALMAKING_SIMULTANEOUSTRANSFERSFORSTORAGE + #SimultaneousTransfersForStorage = 20 + + # The maximum number of parallel online data transfers for retrieval deals + # + # type: uint64 + # env var: LOTUS_DEALMAKING_SIMULTANEOUSTRANSFERSFORRETRIEVAL + #SimultaneousTransfersForRetrieval = 20 # Minimum start epoch buffer to give time for sealing of sector with deal. # diff --git a/go.mod b/go.mod index 43d53e0e759..3c7ca85f334 100644 --- a/go.mod +++ b/go.mod @@ -33,10 +33,10 @@ require ( github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 github.com/filecoin-project/go-commp-utils v0.1.1-0.20210427191551-70bf140d31c7 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 - github.com/filecoin-project/go-data-transfer v1.10.1 + github.com/filecoin-project/go-data-transfer v1.11.1 github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 - github.com/filecoin-project/go-fil-markets v1.12.0 + github.com/filecoin-project/go-fil-markets v1.13.1 github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec github.com/filecoin-project/go-padreader v0.0.0-20210723183308-812a16dc01b1 github.com/filecoin-project/go-paramfetch v0.0.2 @@ -78,7 +78,7 @@ require ( github.com/ipfs/go-ds-pebble v0.0.2-0.20200921225637-ce220f8ac459 github.com/ipfs/go-filestore v1.0.0 github.com/ipfs/go-fs-lock v0.0.6 - github.com/ipfs/go-graphsync v0.9.3 + github.com/ipfs/go-graphsync v0.10.0 github.com/ipfs/go-ipfs-blockstore v1.0.4 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 @@ -98,9 +98,9 @@ require ( github.com/ipfs/go-path v0.0.7 github.com/ipfs/go-unixfs v0.2.6 github.com/ipfs/interface-go-ipfs-core v0.4.0 - github.com/ipld/go-car v0.3.1-0.20210601190600-f512dac51e8e + github.com/ipld/go-car v0.3.2-0.20211001225732-32d0d9933823 github.com/ipld/go-car/v2 v2.0.3-0.20210811121346-c514a30114d7 - github.com/ipld/go-ipld-prime v0.12.0 + github.com/ipld/go-ipld-prime v0.12.3 github.com/kelseyhightower/envconfig v1.4.0 github.com/libp2p/go-buffer-pool v0.0.2 github.com/libp2p/go-eventbus v0.2.1 diff --git a/go.sum b/go.sum index 147eeff573e..67a1de7c9fb 100644 --- a/go.sum +++ b/go.sum @@ -302,9 +302,8 @@ github.com/filecoin-project/go-commp-utils v0.1.1-0.20210427191551-70bf140d31c7/ github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-data-transfer v1.0.1/go.mod h1:UxvfUAY9v3ub0a21BSK9u3pB2aq30Y0KMsG+w9/ysyo= -github.com/filecoin-project/go-data-transfer v1.10.0/go.mod h1:uQtqy6vUAY5v70ZHdkF5mJ8CjVtjj/JA3aOoaqzWTVw= -github.com/filecoin-project/go-data-transfer v1.10.1 h1:YQNLwhizxkdfFxegAyrnn3l7WjgMjqDlqFzr18iWiYI= -github.com/filecoin-project/go-data-transfer v1.10.1/go.mod h1:CSDMCrPK2lVGodNB1wPEogjFvM9nVGyiL1GNbBRTSdw= +github.com/filecoin-project/go-data-transfer v1.11.1 h1:fiw2FHDVSDrt427cGp7+Ax3TTZk0e6HvF9Odcl2etBM= +github.com/filecoin-project/go-data-transfer v1.11.1/go.mod h1:2MitLI0ebCkLlPKM7NRggP/t9d+gCcREUKkCKqWRCwU= github.com/filecoin-project/go-ds-versioning v0.1.0 h1:y/X6UksYTsK8TLCI7rttCKEvl8btmWxyFMEeeWGUxIQ= github.com/filecoin-project/go-ds-versioning v0.1.0/go.mod h1:mp16rb4i2QPmxBnmanUx8i/XANp+PFCCJWiAb+VW4/s= github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= @@ -314,8 +313,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+ github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo= github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8= github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= -github.com/filecoin-project/go-fil-markets v1.12.0 h1:RpU5bLaMADVrU4CgLxKMGHC2ZUocNV35uINxogQCf00= -github.com/filecoin-project/go-fil-markets v1.12.0/go.mod h1:XuuZFaFujI47nrgfQJiq7jWB+6rRya6nm7Sj6uXQ80U= +github.com/filecoin-project/go-fil-markets v1.13.1 h1:KjarxgKp/RN4iYXT2pMcMq6veIa1guGJMoVtnwru4BQ= +github.com/filecoin-project/go-fil-markets v1.13.1/go.mod h1:58OjtsWtDt3xlN1QLmgDQxtfCDtDS4RIyHepIUbqXhM= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= @@ -687,10 +686,8 @@ github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28 github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE= github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0= github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY= -github.com/ipfs/go-graphsync v0.9.0/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw= -github.com/ipfs/go-graphsync v0.9.1/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw= -github.com/ipfs/go-graphsync v0.9.3 h1:oWqUuN3OYqLwu669fxYbgymBrIodB0fD7vFZfF//X7Y= -github.com/ipfs/go-graphsync v0.9.3/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw= +github.com/ipfs/go-graphsync v0.10.0 h1:VXljS1ETYp1GmAJ6N45hlcKO+tlvPFUzz3xzEQ0jMbM= +github.com/ipfs/go-graphsync v0.10.0/go.mod h1:cKIshzTaa5rCZjryH5xmSKZVGX9uk1wvwGvz2WEha5Y= github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= @@ -801,8 +798,8 @@ github.com/ipfs/iptb-plugins v0.3.0/go.mod h1:5QtOvckeIw4bY86gSH4fgh3p3gCSMn3FmI github.com/ipld/go-car v0.1.0/go.mod h1:RCWzaUh2i4mOEkB3W45Vc+9jnS/M6Qay5ooytiBHl3g= github.com/ipld/go-car v0.1.1-0.20200923150018-8cdef32e2da4/go.mod h1:xrMEcuSq+D1vEwl+YAXsg/JfA98XGpXDwnkIL4Aimqw= github.com/ipld/go-car v0.1.1-0.20201119040415-11b6074b6d4d/go.mod h1:2Gys8L8MJ6zkh1gktTSXreY63t4UbyvNp5JaudTyxHQ= -github.com/ipld/go-car v0.3.1-0.20210601190600-f512dac51e8e h1:iTAJWTWEMe0Lx8JwRaIYrYgDuI9bVzkgogGz43Yv9Eo= -github.com/ipld/go-car v0.3.1-0.20210601190600-f512dac51e8e/go.mod h1:wUxBdwOLA9/0HZBi3fnTBzla0MuwlqgJLyrhOg1XaKI= +github.com/ipld/go-car v0.3.2-0.20211001225732-32d0d9933823 h1:8JMSJ0k71fU9lIUrpVwEdoX4KoxiTEX8cZG97v/hTDw= +github.com/ipld/go-car v0.3.2-0.20211001225732-32d0d9933823/go.mod h1:jSlTph+i/q1jLFoiKKeN69KGG0fXpwrcD0izu5C1Tpo= github.com/ipld/go-car/v2 v2.0.0-beta1.0.20210721090610-5a9d1b217d25/go.mod h1:I2ACeeg6XNBe5pdh5TaR7Ambhfa7If9KXxmXgZsYENU= github.com/ipld/go-car/v2 v2.0.2/go.mod h1:I2ACeeg6XNBe5pdh5TaR7Ambhfa7If9KXxmXgZsYENU= github.com/ipld/go-car/v2 v2.0.3-0.20210811121346-c514a30114d7 h1:6Z0beJSZNsRY+7udoqUl4gQ/tqtrPuRvDySrlsvbqZA= @@ -816,8 +813,9 @@ github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f/go.mod h1:0xE github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM= github.com/ipld/go-ipld-prime v0.9.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8= github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8= -github.com/ipld/go-ipld-prime v0.12.0 h1:JapyKWTsJgmhrPI7hfx4V798c/RClr85sXfBZnH1VIw= -github.com/ipld/go-ipld-prime v0.12.0/go.mod h1:hy8b93WleDMRKumOJnTIrr0MbbFbx9GD6Kzxa53Xppc= +github.com/ipld/go-ipld-prime v0.12.3-0.20210930132912-0b3aef3ca569/go.mod h1:PaeLYq8k6dJLmDUSLrzkEpoGV4PEfe/1OtFN/eALOc8= +github.com/ipld/go-ipld-prime v0.12.3 h1:furVobw7UBLQZwlEwfE26tYORy3PAK8VYSgZOSr3JMQ= +github.com/ipld/go-ipld-prime v0.12.3/go.mod h1:PaeLYq8k6dJLmDUSLrzkEpoGV4PEfe/1OtFN/eALOc8= github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5/go.mod h1:gcvzoEDBjwycpXt3LBE061wT9f46szXGHAmj9uoP6fU= github.com/ipld/go-ipld-prime-proto v0.0.0-20200428191222-c1ffdadc01e1/go.mod h1:OAV6xBmuTLsPZ+epzKkPB1e25FHk/vCtyatkdHcArLs= github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1:3pHYooM9Ea65jewRwrb2u5uHZCNkNTe9ABsVB+SrkH0= diff --git a/itests/deals_concurrent_test.go b/itests/deals_concurrent_test.go index ff8bab25720..c0458e8d1dd 100644 --- a/itests/deals_concurrent_test.go +++ b/itests/deals_concurrent_test.go @@ -139,8 +139,8 @@ func TestSimultanenousTransferLimit(t *testing.T) { ) runTest := func(t *testing.T) { client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ConstructorOpts( - node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(graphsyncThrottle))), - node.Override(new(dtypes.Graphsync), modules.Graphsync(graphsyncThrottle)), + node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(graphsyncThrottle, graphsyncThrottle))), + node.Override(new(dtypes.Graphsync), modules.Graphsync(graphsyncThrottle, graphsyncThrottle)), )) ens.InterconnectAll().BeginMining(250 * time.Millisecond) dh := kit.NewDealHarness(t, client, miner, miner) diff --git a/node/builder_chain.go b/node/builder_chain.go index 4137e324823..11283ec3a09 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -100,7 +100,7 @@ var ChainNode = Options( Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)), // Shared graphsync (markets, serving chain) - Override(new(dtypes.Graphsync), modules.Graphsync(config.DefaultFullNode().Client.SimultaneousTransfers)), + Override(new(dtypes.Graphsync), modules.Graphsync(config.DefaultFullNode().Client.SimultaneousTransfersForStorage, config.DefaultFullNode().Client.SimultaneousTransfersForRetrieval)), // Service: Wallet Override(new(*messagesigner.MessageSigner), messagesigner.NewMessageSigner), @@ -219,7 +219,7 @@ func ConfigFullNode(c interface{}) Option { Override(new(retrievalmarket.BlockstoreAccessor), modules.IpfsRetrievalBlockstoreAccessor), ), ), - Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfers)), + Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfersForStorage, cfg.Client.SimultaneousTransfersForRetrieval)), If(cfg.Wallet.RemoteBackend != "", Override(new(*remotewallet.RemoteWallet), remotewallet.SetupRemoteWallet(cfg.Wallet.RemoteBackend)), diff --git a/node/builder_miner.go b/node/builder_miner.go index ae0a61875c7..3447eb3e6dc 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -136,7 +136,7 @@ func ConfigStorageMiner(c interface{}) Option { If(cfg.Subsystems.EnableMarkets, // Markets Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore), - Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfers)), + Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfersForStorage, cfg.Dealmaking.SimultaneousTransfersForRetrieval)), Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), diff --git a/node/config/def.go b/node/config/def.go index 20444346aa4..8dafbbfb2a6 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -2,6 +2,8 @@ package config import ( "encoding" + "os" + "strconv" "time" "github.com/ipfs/go-cid" @@ -24,6 +26,16 @@ const ( RetrievalPricingExternalMode = "external" ) +// MaxTraversalLinks configures the maximum number of links to traverse in a DAG while calculating +// CommP and traversing a DAG with graphsync; invokes a budget on DAG depth and density. +var MaxTraversalLinks uint64 = 32 * (1 << 20) + +func init() { + if envMaxTraversal, err := strconv.ParseUint(os.Getenv("LOTUS_MAX_TRAVERSAL_LINKS"), 10, 64); err == nil { + MaxTraversalLinks = envMaxTraversal + } +} + func (b *BatchFeeConfig) FeeForSectors(nSectors int) abi.TokenAmount { return big.Add(big.Int(b.Base), big.Mul(big.NewInt(int64(nSectors)), big.Int(b.PerSector))) } @@ -65,7 +77,8 @@ func DefaultFullNode() *FullNode { DefaultMaxFee: DefaultDefaultMaxFee, }, Client: Client{ - SimultaneousTransfers: DefaultSimultaneousTransfers, + SimultaneousTransfersForStorage: DefaultSimultaneousTransfers, + SimultaneousTransfersForRetrieval: DefaultSimultaneousTransfers, }, Chainstore: Chainstore{ EnableSplitstore: false, @@ -146,7 +159,8 @@ func DefaultStorageMiner() *StorageMiner { MaxDealsPerPublishMsg: 8, MaxProviderCollateralMultiplier: 2, - SimultaneousTransfers: DefaultSimultaneousTransfers, + SimultaneousTransfersForStorage: DefaultSimultaneousTransfers, + SimultaneousTransfersForRetrieval: DefaultSimultaneousTransfers, StartEpochSealingBuffer: 480, // 480 epochs buffer == 4 hours from adding deal to sector to sector being sealed diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index c8ac3f28f73..dd44966c7a7 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -92,11 +92,18 @@ your node if metadata log is disabled`, Comment: ``, }, { - Name: "SimultaneousTransfers", + Name: "SimultaneousTransfersForStorage", Type: "uint64", Comment: `The maximum number of simultaneous data transfers between the client -and storage providers`, +and storage providers for storage deals`, + }, + { + Name: "SimultaneousTransfersForRetrieval", + Type: "uint64", + + Comment: `The maximum number of simultaneous data transfers between the client +and storage providers for retrieval deals`, }, }, "Common": []DocField{ @@ -260,10 +267,16 @@ as a multiplier of the minimum collateral bound`, passed to the sealing node by the markets service. 0 is unlimited.`, }, { - Name: "SimultaneousTransfers", + Name: "SimultaneousTransfersForStorage", + Type: "uint64", + + Comment: `The maximum number of parallel online data transfers for storage deals`, + }, + { + Name: "SimultaneousTransfersForRetrieval", Type: "uint64", - Comment: `The maximum number of parallel online data transfers (storage+retrieval)`, + Comment: `The maximum number of parallel online data transfers for retrieval deals`, }, { Name: "StartEpochSealingBuffer", diff --git a/node/config/types.go b/node/config/types.go index aeaefd8cee5..5f1c66b4247 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -129,8 +129,10 @@ type DealmakingConfig struct { // The maximum allowed disk usage size in bytes of staging deals not yet // passed to the sealing node by the markets service. 0 is unlimited. MaxStagingDealsBytes int64 - // The maximum number of parallel online data transfers (storage+retrieval) - SimultaneousTransfers uint64 + // The maximum number of parallel online data transfers for storage deals + SimultaneousTransfersForStorage uint64 + // The maximum number of parallel online data transfers for retrieval deals + SimultaneousTransfersForRetrieval uint64 // Minimum start epoch buffer to give time for sealing of sector with deal. StartEpochSealingBuffer uint64 @@ -360,8 +362,11 @@ type Client struct { IpfsMAddr string IpfsUseForRetrieval bool // The maximum number of simultaneous data transfers between the client - // and storage providers - SimultaneousTransfers uint64 + // and storage providers for storage deals + SimultaneousTransfersForStorage uint64 + // The maximum number of simultaneous data transfers between the client + // and storage providers for retrieval deals + SimultaneousTransfersForRetrieval uint64 } type Wallet struct { diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 5f08f93cbc6..7ed0a683c93 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -54,6 +54,7 @@ import ( "github.com/filecoin-project/specs-actors/v3/actors/builtin/market" marketevents "github.com/filecoin-project/lotus/markets/loggers" + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/repo/imports" "github.com/filecoin-project/lotus/api" @@ -981,6 +982,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref Root: order.Root, Selector: sel, }}, + car.MaxTraversalLinks(config.MaxTraversalLinks), ).Write(f) if err != nil { finish(err) @@ -1232,7 +1234,11 @@ func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath stri allSelector := ssb.ExploreRecursive( selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() - sc := car.NewSelectiveCar(ctx, fs, []car.Dag{{Root: root, Selector: allSelector}}) + sc := car.NewSelectiveCar(ctx, + fs, + []car.Dag{{Root: root, Selector: allSelector}}, + car.MaxTraversalLinks(config.MaxTraversalLinks), + ) f, err := os.Create(outputPath) if err != nil { return err diff --git a/node/modules/client.go b/node/modules/client.go index 7ca97f0e0da..4d988d98afc 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -36,6 +36,7 @@ import ( "github.com/filecoin-project/lotus/markets" marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/markets/retrievaladapter" + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/impl/full" payapi "github.com/filecoin-project/lotus/node/impl/paych" "github.com/filecoin-project/lotus/node/modules/dtypes" @@ -112,7 +113,7 @@ func NewClientGraphsyncDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.Grap net := dtnet.NewFromLibp2pHost(h, dtRetryParams) dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/client/transfers")) - transport := dtgstransport.NewTransport(h.ID(), gs) + transport := dtgstransport.NewTransport(h.ID(), gs, net) err := os.MkdirAll(filepath.Join(r.Path(), "data-transfer"), 0755) //nolint: gosec if err != nil && !os.IsExist(err) { return nil, err @@ -182,7 +183,7 @@ func StorageClient(lc fx.Lifecycle, h host.Host, dataTransfer dtypes.ClientDataT marketsRetryParams := smnet.RetryParameters(time.Second, 5*time.Minute, 15, 5) net := smnet.NewFromLibp2pHost(h, marketsRetryParams) - c, err := storageimpl.NewClient(net, dataTransfer, discovery, deals, scn, accessor, storageimpl.DealPollingInterval(time.Second)) + c, err := storageimpl.NewClient(net, dataTransfer, discovery, deals, scn, accessor, storageimpl.DealPollingInterval(time.Second), storageimpl.MaxTraversalLinks(config.MaxTraversalLinks)) if err != nil { return nil, err } diff --git a/node/modules/graphsync.go b/node/modules/graphsync.go index f0f4400b8fe..839508900cf 100644 --- a/node/modules/graphsync.go +++ b/node/modules/graphsync.go @@ -9,18 +9,26 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "go.uber.org/fx" + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" ) // Graphsync creates a graphsync instance from the given loader and storer -func Graphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) { +func Graphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) { graphsyncNetwork := gsnet.NewFromLibp2pHost(h) lsys := storeutil.LinkSystemForBlockstore(clientBs) - gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, lsys, graphsyncimpl.RejectAllRequestsByDefault(), graphsyncimpl.MaxInProgressRequests(parallelTransfers)) + gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), + graphsyncNetwork, + lsys, + graphsyncimpl.RejectAllRequestsByDefault(), + graphsyncimpl.MaxInProgressIncomingRequests(parallelTransfersForStorage), + graphsyncimpl.MaxInProgressOutgoingRequests(parallelTransfersForRetrieval), + graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks), + graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks)) chainLinkSystem := storeutil.LinkSystemForBlockstore(chainBs) err := gs.RegisterPersistenceOption("chainstore", chainLinkSystem) if err != nil { diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 9e3bd95ab46..0f74a8d583d 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -38,6 +38,7 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" graphsync "github.com/ipfs/go-graphsync/impl" + graphsyncimpl "github.com/ipfs/go-graphsync/impl" gsnet "github.com/ipfs/go-graphsync/network" "github.com/ipfs/go-graphsync/storeutil" "github.com/libp2p/go-libp2p-core/host" @@ -340,7 +341,7 @@ func NewProviderDAGServiceDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.S net := dtnet.NewFromLibp2pHost(h) dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/provider/transfers")) - transport := dtgstransport.NewTransport(h.ID(), gs) + transport := dtgstransport.NewTransport(h.ID(), gs, net) err := os.MkdirAll(filepath.Join(r.Path(), "data-transfer"), 0755) //nolint: gosec if err != nil && !os.IsExist(err) { return nil, err @@ -394,11 +395,18 @@ func StagingBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRe // StagingGraphsync creates a graphsync instance which reads and writes blocks // to the StagingBlockstore -func StagingGraphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync { +func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync { graphsyncNetwork := gsnet.NewFromLibp2pHost(h) lsys := storeutil.LinkSystemForBlockstore(ibs) - gs := graphsync.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, lsys, graphsync.RejectAllRequestsByDefault(), graphsync.MaxInProgressRequests(parallelTransfers)) + gs := graphsync.New(helpers.LifecycleCtx(mctx, lc), + graphsyncNetwork, + lsys, + graphsync.RejectAllRequestsByDefault(), + graphsync.MaxInProgressIncomingRequests(parallelTransfersForRetrieval), + graphsync.MaxInProgressOutgoingRequests(parallelTransfersForStorage), + graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks), + graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks)) return gs } diff --git a/tools/packer/repo/config.toml b/tools/packer/repo/config.toml index 4a4fb510ef4..900dad218fa 100644 --- a/tools/packer/repo/config.toml +++ b/tools/packer/repo/config.toml @@ -20,7 +20,8 @@ ListenAddresses = ["/ip4/0.0.0.0/tcp/5678", "/ip6/::/tcp/5678"] # IpfsOnlineMode = false # IpfsMAddr = "" # IpfsUseForRetrieval = false -# SimultaneousTransfers = 20 +# SimultaneousTransfersForStorage = 20 +# SimultaneousTransfersForRetrieval = 20 # [Metrics] # Nickname = ""