diff --git a/cmd/neofs-node/accounting.go b/cmd/neofs-node/accounting.go index 21cac4e410e..4f5c35a110c 100644 --- a/cmd/neofs-node/accounting.go +++ b/cmd/neofs-node/accounting.go @@ -13,7 +13,7 @@ func initAccountingService(c *cfg) { initMorphComponents(c) } - balanceMorphWrapper, err := balance.NewFromMorph(c.cfgMorph.client, c.cfgAccounting.scriptHash, 0) + balanceMorphWrapper, err := balance.NewFromMorph(c.cfgMorph.client, c.shared.basics.balanceSH, 0) fatalOnErr(err) server := accountingTransportGRPC.New( diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 66d2ba5d29a..9ef463aba72 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -17,6 +17,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/crypto/keys" neogoutil "github.com/nspcc-dev/neo-go/pkg/util" + utilneogo "github.com/nspcc-dev/neo-go/pkg/util" netmapV2 "github.com/nspcc-dev/neofs-api-go/v2/netmap" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" apiclientconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/apiclient" @@ -46,6 +47,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache" "github.com/nspcc-dev/neofs-node/pkg/metrics" "github.com/nspcc-dev/neofs-node/pkg/morph/client" + cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" containerClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" nmClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/event" @@ -120,6 +122,14 @@ type applicationConfiguration struct { reconnectionRetriesNumber int reconnectionRetriesDelay time.Duration } + + ContractsCfg struct { + netmap utilneogo.Uint160 + balance utilneogo.Uint160 + container utilneogo.Uint160 + reputation utilneogo.Uint160 + proxy utilneogo.Uint160 + } } // readConfig fills applicationConfiguration with raw configuration values @@ -166,6 +176,14 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { a.MorphCfg.reconnectionRetriesNumber = morphconfig.ReconnectionRetriesNumber(c) a.MorphCfg.reconnectionRetriesDelay = morphconfig.ReconnectionRetriesDelay(c) + // Contracts + + a.ContractsCfg.balance = contractsconfig.Balance(c) + a.ContractsCfg.container = contractsconfig.Container(c) + a.ContractsCfg.netmap = contractsconfig.Netmap(c) + a.ContractsCfg.proxy = contractsconfig.Proxy(c) + a.ContractsCfg.reputation = contractsconfig.Reputation(c) + return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { var sh storage.ShardCfg @@ -301,9 +319,41 @@ func (c *internals) IsMaintenance() bool { return c.isMaintenance.Load() } +type basics struct { + networkState *networkState + netMapSource netmapCore.Source + + stateStorage *state.PersistentStorage + + key *keys.PrivateKey + binPublicKey []byte + + cli *client.Client + nCli *nmClient.Client + cCli *containerClient.Client + + ttl time.Duration + + // caches are non-nil only if ttl > 0 + containerCache *ttlContainerStorage + eaclCache *ttlEACLStorage + containerListCache *ttlContainerLister + netmapCache *lruNetmapSource + + balanceSH utilneogo.Uint160 + containerSH utilneogo.Uint160 + netmapSH utilneogo.Uint160 + reputationSH utilneogo.Uint160 + proxySH utilneogo.Uint160 +} + // shared contains component-specific structs/helpers that should // be shared during initialization of the application. type shared struct { + // shared b/w logical components but does not + // depend on them, should be inited first + basics + privateTokenStore sessionStorage persistate *state.PersistentStorage @@ -312,24 +362,14 @@ type shared struct { putClientCache *cache.ClientCache localAddr network.AddressGroup - containerCache *ttlContainerStorage - eaclCache *ttlEACLStorage - containerListCache *ttlContainerLister - netmapCache *lruNetmapSource - - key *keys.PrivateKey - binPublicKey []byte ownerIDFromKey user.ID // user ID calculated from key // current network map - netMap atomicstd.Value // type netmap.NetMap - netMapSource netmapCore.Source + netMap atomicstd.Value // type netmap.NetMap // whether the local node is in the netMap localNodeInNetmap atomic.Bool - cnrClient *containerClient.Client - respSvc *response.Service policer *policer.Policer @@ -367,7 +407,6 @@ type cfg struct { // services cfgGRPC cfgGRPC cfgMorph cfgMorph - cfgAccounting cfgAccounting cfgContainer cfgContainer cfgNodeInfo cfgNodeInfo cfgNetmap cfgNetmap @@ -413,21 +452,14 @@ type cfgMorph struct { proxyScriptHash neogoutil.Uint160 } -type cfgAccounting struct { - scriptHash neogoutil.Uint160 -} - type cfgContainer struct { - scriptHash neogoutil.Uint160 - parsers map[event.Type]event.NotificationParser subscribers map[event.Type][]event.Handler workerPool util.WorkerPool // pool for asynchronous handlers } type cfgNetmap struct { - scriptHash neogoutil.Uint160 - wrapper *nmClient.Client + wrapper *nmClient.Client parsers map[event.Type]event.NotificationParser @@ -491,8 +523,6 @@ type cfgReputation struct { localTrustStorage *truststorage.Storage localTrustCtrl *trustcontroller.Controller - - scriptHash neogoutil.Uint160 } var persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block") @@ -523,8 +553,6 @@ func initCfg(appCfg *config.Config) *cfg { maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes - netState := newNetworkState() - persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path()) fatalOnErr(err) @@ -573,26 +601,21 @@ func initCfg(appCfg *config.Config) *cfg { Buffers: &buffers, Logger: c.internals.log, } + basics := initBasics(c, key, persistate) c.shared = shared{ - key: key, - binPublicKey: key.PublicKey().Bytes(), + basics: basics, localAddr: netAddr, - respSvc: response.NewService(response.WithNetworkState(netState)), + respSvc: response.NewService(response.WithNetworkState(basics.networkState)), clientCache: cache.NewSDKClientCache(cacheOpts), bgClientCache: cache.NewSDKClientCache(cacheOpts), putClientCache: cache.NewSDKClientCache(cacheOpts), persistate: persistate, } - c.cfgAccounting = cfgAccounting{ - scriptHash: contractsconfig.Balance(appCfg), - } c.cfgContainer = cfgContainer{ - scriptHash: contractsconfig.Container(appCfg), workerPool: containerWorkerPool, } c.cfgNetmap = cfgNetmap{ - scriptHash: contractsconfig.Netmap(appCfg), - state: netState, + state: c.basics.networkState, workerPool: netmapWorkerPool, needBootstrap: !relayOnly, } @@ -608,7 +631,6 @@ func initCfg(appCfg *config.Config) *cfg { tombstoneLifetime: objectconfig.TombstoneLifetime(appCfg), } c.cfgReputation = cfgReputation{ - scriptHash: contractsconfig.Reputation(appCfg), workerPool: reputationWorkerPool, } @@ -618,7 +640,7 @@ func initCfg(appCfg *config.Config) *cfg { if metricsconfig.Enabled(c.cfgReader) { c.metricsCollector = metrics.NewNodeMetrics(misc.Version) - netState.metrics = c.metricsCollector + c.basics.networkState.metrics = c.metricsCollector } c.onShutdown(c.clientCache.CloseAll) // clean up connections @@ -629,6 +651,96 @@ func initCfg(appCfg *config.Config) *cfg { return c } +func initBasics(c *cfg, key *keys.PrivateKey, stateStorage *state.PersistentStorage) basics { + b := basics{} + + addresses := c.applicationConfiguration.MorphCfg.endpoints + + fromSideChainBlock, err := stateStorage.UInt32(persistateSideChainLastBlockKey) + if err != nil { + fromSideChainBlock = 0 + c.log.Warn("can't get last processed side chain block number", zap.String("error", err.Error())) + } + + cli, err := client.New(key, + client.WithDialTimeout(c.applicationConfiguration.MorphCfg.dialTimout), + client.WithLogger(c.log), + client.WithAutoSidechainScope(), + client.WithEndpoints(addresses), + client.WithReconnectionRetries(c.applicationConfiguration.MorphCfg.reconnectionRetriesNumber), + client.WithReconnectionsDelay(c.applicationConfiguration.MorphCfg.reconnectionRetriesDelay), + client.WithConnSwitchCallback(func() { + err = c.restartMorph() + if err != nil { + c.internalErr <- fmt.Errorf("restarting after morph connection was lost: %w", err) + } + }), + client.WithConnLostCallback(func() { + c.internalErr <- errors.New("morph connection has been lost") + }), + client.WithMinRequiredBlockHeight(fromSideChainBlock), + ) + if err != nil { + c.log.Info("failed to create neo RPC client", + zap.Any("endpoints", addresses), + zap.String("error", err.Error()), + ) + + fatalOnErr(err) + } + + lookupScriptHashesInNNS(cli, &b) + + nState := newNetworkState() + + // container wrapper that invokes notary + // requests with the (empty) Alphabet signature + cnrWrap, err := cntClient.NewFromMorph(cli, b.containerSH, 0) + fatalOnErr(err) + + cnrSrc := cntClient.AsContainerSource(cnrWrap) + + eACLFetcher := &morphEACLFetcher{ + w: cnrWrap, + } + + nmWrap, err := nmClient.NewFromMorph(cli, b.netmapSH, 0) + fatalOnErr(err) + + ttl := c.applicationConfiguration.MorphCfg.cacheTTL + + var netmapSource netmapCore.Source + if ttl < 0 { + netmapSource = nmWrap + } else { + b.netmapCache = newCachedNetmapStorage(nState, nmWrap) + b.containerCache = newCachedContainerStorage(cnrSrc, ttl) + b.eaclCache = newCachedEACLStorage(eACLFetcher, ttl) + b.containerListCache = newCachedContainerLister(cnrWrap, ttl) + + // use RPC node as source of netmap (with caching) + netmapSource = b.netmapCache + } + + if ttl == 0 { + msPerBlock, err := cli.MsPerBlock() + fatalOnErr(err) + ttl = time.Duration(msPerBlock) * time.Millisecond + c.log.Debug("morph.cache_ttl fetched from network", zap.Duration("value", ttl)) + } + + b.netMapSource = netmapSource + b.networkState = nState + b.key = key + b.binPublicKey = key.PublicKey().Bytes() + b.cli = cli + b.nCli = nmWrap + b.cCli = cnrWrap + b.ttl = ttl + + return b +} + func (c *cfg) engineOpts() []engine.Option { opts := make([]engine.Option, 0, 4) @@ -636,10 +748,15 @@ func (c *cfg) engineOpts() []engine.Option { engine.WithShardPoolSize(c.EngineCfg.shardPoolSize), engine.WithErrorThreshold(c.EngineCfg.errorThreshold), - engine.WithContainersSource(c.shared.containerCache), engine.WithLogger(c.log), ) + if c.shared.basics.ttl > 0 { + opts = append(opts, engine.WithContainersSource(c.shared.basics.containerCache)) + } else { + opts = append(opts, engine.WithContainersSource(cntClient.AsContainerSource(c.shared.basics.cCli))) + } + if c.metricsCollector != nil { opts = append(opts, engine.WithMetrics(c.metricsCollector)) } @@ -769,9 +886,6 @@ func (c *cfg) LocalAddress() network.AddressGroup { } func initLocalStorage(c *cfg) { - // storage needs container, container needs storage, dirty sharing - c.shared.cnrClient = new(containerClient.Client) - ls := engine.New(c.engineOpts()...) addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index bfd6c8ce3b2..3c8ab7d43d1 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -46,45 +46,26 @@ func initContainerService(c *cfg) { initMorphComponents(c) } - // container wrapper that invokes notary - // requests with the (empty) Alphabet signature - wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0) - fatalOnErr(err) - - *c.shared.cnrClient = *wrap - - // container wrapper that always sends non-notary - // requests - wrapperNoNotary, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.DisableNotarySigning()) - fatalOnErr(err) - - cnrSrc := cntClient.AsContainerSource(wrap) - - eACLFetcher := &morphEACLFetcher{ - w: wrap, - } - cnrRdr := new(morphContainerReader) cnrWrt := &morphContainerWriter{ - neoClient: wrap, + neoClient: c.shared.basics.cCli, } - if c.cfgMorph.cacheTTL <= 0 { - c.cfgObject.eaclSource = eACLFetcher - cnrRdr.eacl = eACLFetcher + cnrCli := c.shared.basics.cCli + cnrSrc := cntClient.AsContainerSource(cnrCli) + eaclFetcher := &morphEACLFetcher{cnrCli} + + if c.shared.basics.ttl <= 0 { + c.cfgObject.eaclSource = eaclFetcher + cnrRdr.eacl = eaclFetcher c.cfgObject.cnrSource = cnrSrc cnrRdr.get = cnrSrc - cnrRdr.lister = wrap + cnrRdr.lister = cnrCli } else { - // use RPC node as source of Container contract items (with caching) - cachedContainerStorage := newCachedContainerStorage(cnrSrc, c.cfgMorph.cacheTTL) - cachedEACLStorage := newCachedEACLStorage(eACLFetcher, c.cfgMorph.cacheTTL) - cachedContainerLister := newCachedContainerLister(wrap, c.cfgMorph.cacheTTL) - - c.shared.containerCache = cachedContainerStorage - c.shared.eaclCache = cachedEACLStorage - c.shared.containerListCache = cachedContainerLister + cnrCache := c.shared.basics.containerCache + cnrListCache := c.shared.basics.containerListCache + eaclCache := c.shared.basics.eaclCache subscribeToContainerCreation(c, func(e event.Event) { ev := e.(containerEvent.PutSuccess) @@ -93,9 +74,9 @@ func initContainerService(c *cfg) { // TODO: use owner directly from the event after neofs-contract#256 will become resolved // but don't forget about the profit of reading the new container and caching it: // creation success are most commonly tracked by polling GET op. - cnr, err := cachedContainerStorage.Get(ev.ID) + cnr, err := cnrCache.Get(ev.ID) if err == nil { - cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true) + cnrListCache.update(cnr.Value.Owner(), ev.ID, true) } else { // unlike removal, we expect successful receive of the container // after successful creation, so logging can be useful @@ -117,27 +98,27 @@ func initContainerService(c *cfg) { // It's strange to read already removed container, but we can successfully hit // the cache. // TODO: use owner directly from the event after neofs-contract#256 will become resolved - cnr, err := cachedContainerStorage.Get(ev.ID) + cnr, err := cnrCache.Get(ev.ID) if err == nil { - cachedContainerLister.update(cnr.Value.Owner(), ev.ID, false) + cnrListCache.update(cnr.Value.Owner(), ev.ID, false) } - cachedContainerStorage.handleRemoval(ev.ID) + cnrCache.handleRemoval(ev.ID) c.log.Debug("container removal event's receipt", zap.Stringer("id", ev.ID), ) }) - c.cfgObject.eaclSource = cachedEACLStorage - c.cfgObject.cnrSource = cachedContainerStorage + c.cfgObject.eaclSource = eaclCache + c.cfgObject.cnrSource = cnrCache - cnrRdr.lister = cachedContainerLister + cnrRdr.lister = cnrListCache cnrRdr.eacl = c.cfgObject.eaclSource cnrRdr.get = c.cfgObject.cnrSource cnrWrt.cacheEnabled = true - cnrWrt.eacls = cachedEACLStorage + cnrWrt.eacls = eaclCache } estimationsLogger := c.log.With(zap.String("component", "container_estimations")) @@ -151,7 +132,7 @@ func initContainerService(c *cfg) { resultWriter := &morphLoadWriter{ log: estimationsLogger, - cnrMorphClient: wrapperNoNotary, + cnrMorphClient: cnrCli, key: pubKey, } diff --git a/cmd/neofs-node/morph.go b/cmd/neofs-node/morph.go index 1548f383095..72687813a0c 100644 --- a/cmd/neofs-node/morph.go +++ b/cmd/neofs-node/morph.go @@ -4,14 +4,11 @@ import ( "context" "errors" "fmt" - "time" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/util" - "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/client" - nmClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/event" netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" "go.uber.org/zap" @@ -27,79 +24,19 @@ const ( func initMorphComponents(c *cfg) { var err error - addresses := c.applicationConfiguration.MorphCfg.endpoints + morphCli := c.shared.basics.cli + c.cfgMorph.client = morphCli - fromSideChainBlock, err := c.persistate.UInt32(persistateSideChainLastBlockKey) - if err != nil { - fromSideChainBlock = 0 - c.log.Warn("can't get last processed side chain block number", zap.String("error", err.Error())) - } - - cli, err := client.New(c.key, - client.WithDialTimeout(c.applicationConfiguration.MorphCfg.dialTimout), - client.WithLogger(c.log), - client.WithAutoSidechainScope(), - client.WithEndpoints(addresses), - client.WithReconnectionRetries(c.applicationConfiguration.MorphCfg.reconnectionRetriesNumber), - client.WithReconnectionsDelay(c.applicationConfiguration.MorphCfg.reconnectionRetriesDelay), - client.WithConnSwitchCallback(func() { - err = c.restartMorph() - if err != nil { - c.internalErr <- fmt.Errorf("restarting after morph connection was lost: %w", err) - } - }), - client.WithConnLostCallback(func() { - c.internalErr <- errors.New("morph connection has been lost") - }), - client.WithMinRequiredBlockHeight(fromSideChainBlock), - ) - if err != nil { - c.log.Info("failed to create neo RPC client", - zap.Any("endpoints", addresses), - zap.String("error", err.Error()), - ) - - fatalOnErr(err) - } - - c.onShutdown(cli.Close) - - c.cfgMorph.client = cli + c.onShutdown(morphCli.Close) - lookupScriptHashesInNNS(c) // smart contract auto negotiation - - err = c.cfgMorph.client.EnableNotarySupport( + err = morphCli.EnableNotarySupport( client.WithProxyContract( c.cfgMorph.proxyScriptHash, ), ) fatalOnErr(err) - wrap, err := nmClient.NewFromMorph(c.cfgMorph.client, c.cfgNetmap.scriptHash, 0) - fatalOnErr(err) - - var netmapSource netmap.Source - - c.cfgMorph.cacheTTL = c.applicationConfiguration.MorphCfg.cacheTTL - - if c.cfgMorph.cacheTTL == 0 { - msPerBlock, err := c.cfgMorph.client.MsPerBlock() - fatalOnErr(err) - c.cfgMorph.cacheTTL = time.Duration(msPerBlock) * time.Millisecond - c.log.Debug("morph.cache_ttl fetched from network", zap.Duration("value", c.cfgMorph.cacheTTL)) - } - - if c.cfgMorph.cacheTTL < 0 { - netmapSource = wrap - } else { - c.shared.netmapCache = newCachedNetmapStorage(c.cfgNetmap.state, wrap) - - // use RPC node as source of netmap (with caching) - netmapSource = c.shared.netmapCache - } - - c.netMapSource = netmapSource - c.cfgNetmap.wrapper = wrap + c.cfgNetmap.wrapper = c.shared.basics.nCli } func makeAndWaitNotaryDeposit(c *cfg) { @@ -193,8 +130,8 @@ func listenMorphNotifications(c *cfg) { return res, err }) - registerNotificationHandlers(c.cfgNetmap.scriptHash, lis, c.cfgNetmap.parsers, c.cfgNetmap.subscribers) - registerNotificationHandlers(c.cfgContainer.scriptHash, lis, c.cfgContainer.parsers, c.cfgContainer.subscribers) + registerNotificationHandlers(c.shared.basics.netmapSH, lis, c.cfgNetmap.parsers, c.cfgNetmap.subscribers) + registerNotificationHandlers(c.shared.basics.containerSH, lis, c.cfgContainer.parsers, c.cfgContainer.subscribers) registerBlockHandler(lis, func(block *block.Block) { c.log.Debug("new block", zap.Uint32("index", block.Index)) @@ -243,7 +180,7 @@ func registerBlockHandler(lis event.Listener, handler event.BlockHandler) { // lookupScriptHashesInNNS looks up for contract script hashes in NNS contract of side // chain if they were not specified in config file. -func lookupScriptHashesInNNS(c *cfg) { +func lookupScriptHashesInNNS(morphCli *client.Client, b *basics) { var ( err error @@ -252,17 +189,17 @@ func lookupScriptHashesInNNS(c *cfg) { h *util.Uint160 nnsName string }{ - {&c.cfgNetmap.scriptHash, client.NNSNetmapContractName}, - {&c.cfgAccounting.scriptHash, client.NNSBalanceContractName}, - {&c.cfgContainer.scriptHash, client.NNSContainerContractName}, - {&c.cfgReputation.scriptHash, client.NNSReputationContractName}, - {&c.cfgMorph.proxyScriptHash, client.NNSProxyContractName}, + {&b.balanceSH, client.NNSBalanceContractName}, + {&b.containerSH, client.NNSContainerContractName}, + {&b.netmapSH, client.NNSNetmapContractName}, + {&b.reputationSH, client.NNSReputationContractName}, + {&b.proxySH, client.NNSProxyContractName}, } ) for _, t := range targets { if emptyHash.Equals(*t.h) { - *t.h, err = c.cfgMorph.client.NNSContractAddress(t.nnsName) + *t.h, err = morphCli.NNSContractAddress(t.nnsName) fatalOnErrDetails(fmt.Sprintf("can't resolve %s in NNS", t.nnsName), err) } } diff --git a/cmd/neofs-node/reputation.go b/cmd/neofs-node/reputation.go index 2d5205aed63..37e2e9fd845 100644 --- a/cmd/neofs-node/reputation.go +++ b/cmd/neofs-node/reputation.go @@ -33,7 +33,7 @@ import ( ) func initReputationService(c *cfg) { - wrap, err := repClient.NewFromMorph(c.cfgMorph.client, c.cfgReputation.scriptHash, 0) + wrap, err := repClient.NewFromMorph(c.cfgMorph.client, c.shared.basics.reputationSH, 0) fatalOnErr(err) localKey := c.key.PublicKey().Bytes() diff --git a/cmd/neofs-node/tree.go b/cmd/neofs-node/tree.go index e0734c456cb..60b9b343ad9 100644 --- a/cmd/neofs-node/tree.go +++ b/cmd/neofs-node/tree.go @@ -44,7 +44,7 @@ func initTreeService(c *cfg) { c.treeService = tree.New( tree.WithContainerSource(cnrSource{ src: c.cfgObject.cnrSource, - cli: c.shared.cnrClient, + cli: c.shared.basics.cCli, }), tree.WithEACLSource(c.cfgObject.eaclSource), tree.WithNetmapSource(c.netMapSource),