Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Dec 26, 2023
1 parent d5ce304 commit ff1c242
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 160 deletions.
2 changes: 1 addition & 1 deletion cmd/neofs-node/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func initAccountingService(c *cfg) {
initMorphComponents(c)
}

balanceMorphWrapper, err := balance.NewFromMorph(c.cfgMorph.client, c.cfgAccounting.scriptHash, 0)
balanceMorphWrapper, err := balance.NewFromMorph(c.cfgMorph.client, c.shared.basics.balanceSH, 0)

Check warning on line 16 in cmd/neofs-node/accounting.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/accounting.go#L16

Added line #L16 was not covered by tests
fatalOnErr(err)

server := accountingTransportGRPC.New(
Expand Down
192 changes: 153 additions & 39 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
neogoutil "github.com/nspcc-dev/neo-go/pkg/util"
utilneogo "github.com/nspcc-dev/neo-go/pkg/util"
netmapV2 "github.com/nspcc-dev/neofs-api-go/v2/netmap"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
apiclientconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/apiclient"
Expand Down Expand Up @@ -46,6 +47,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
"github.com/nspcc-dev/neofs-node/pkg/metrics"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
containerClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
nmClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
Expand Down Expand Up @@ -120,6 +122,14 @@ type applicationConfiguration struct {
reconnectionRetriesNumber int
reconnectionRetriesDelay time.Duration
}

ContractsCfg struct {
netmap utilneogo.Uint160
balance utilneogo.Uint160
container utilneogo.Uint160
reputation utilneogo.Uint160
proxy utilneogo.Uint160
}
}

// readConfig fills applicationConfiguration with raw configuration values
Expand Down Expand Up @@ -166,6 +176,14 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.MorphCfg.reconnectionRetriesNumber = morphconfig.ReconnectionRetriesNumber(c)
a.MorphCfg.reconnectionRetriesDelay = morphconfig.ReconnectionRetriesDelay(c)

Check warning on line 177 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L173-L177

Added lines #L173 - L177 were not covered by tests

// Contracts

a.ContractsCfg.balance = contractsconfig.Balance(c)
a.ContractsCfg.container = contractsconfig.Container(c)
a.ContractsCfg.netmap = contractsconfig.Netmap(c)
a.ContractsCfg.proxy = contractsconfig.Proxy(c)
a.ContractsCfg.reputation = contractsconfig.Reputation(c)

Check warning on line 185 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L181-L185

Added lines #L181 - L185 were not covered by tests

return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error {
var sh storage.ShardCfg

Expand Down Expand Up @@ -301,9 +319,41 @@ func (c *internals) IsMaintenance() bool {
return c.isMaintenance.Load()
}

type basics struct {
networkState *networkState
netMapSource netmapCore.Source

stateStorage *state.PersistentStorage

Check failure on line 326 in cmd/neofs-node/config.go

View workflow job for this annotation

GitHub Actions / lint

field `stateStorage` is unused (unused)

key *keys.PrivateKey
binPublicKey []byte

cli *client.Client
nCli *nmClient.Client
cCli *containerClient.Client

ttl time.Duration

// caches are non-nil only if ttl > 0
containerCache *ttlContainerStorage
eaclCache *ttlEACLStorage
containerListCache *ttlContainerLister
netmapCache *lruNetmapSource

balanceSH utilneogo.Uint160
containerSH utilneogo.Uint160
netmapSH utilneogo.Uint160
reputationSH utilneogo.Uint160
proxySH utilneogo.Uint160
}

// shared contains component-specific structs/helpers that should
// be shared during initialization of the application.
type shared struct {
// shared b/w logical components but does not
// depend on them, should be inited first
basics

privateTokenStore sessionStorage
persistate *state.PersistentStorage

Expand All @@ -312,24 +362,14 @@ type shared struct {
putClientCache *cache.ClientCache
localAddr network.AddressGroup

containerCache *ttlContainerStorage
eaclCache *ttlEACLStorage
containerListCache *ttlContainerLister
netmapCache *lruNetmapSource

key *keys.PrivateKey
binPublicKey []byte
ownerIDFromKey user.ID // user ID calculated from key

// current network map
netMap atomicstd.Value // type netmap.NetMap
netMapSource netmapCore.Source
netMap atomicstd.Value // type netmap.NetMap

// whether the local node is in the netMap
localNodeInNetmap atomic.Bool

cnrClient *containerClient.Client

respSvc *response.Service

policer *policer.Policer
Expand Down Expand Up @@ -367,7 +407,6 @@ type cfg struct {
// services
cfgGRPC cfgGRPC
cfgMorph cfgMorph
cfgAccounting cfgAccounting
cfgContainer cfgContainer
cfgNodeInfo cfgNodeInfo
cfgNetmap cfgNetmap
Expand Down Expand Up @@ -413,21 +452,14 @@ type cfgMorph struct {
proxyScriptHash neogoutil.Uint160
}

type cfgAccounting struct {
scriptHash neogoutil.Uint160
}

type cfgContainer struct {
scriptHash neogoutil.Uint160

parsers map[event.Type]event.NotificationParser
subscribers map[event.Type][]event.Handler
workerPool util.WorkerPool // pool for asynchronous handlers
}

type cfgNetmap struct {
scriptHash neogoutil.Uint160
wrapper *nmClient.Client
wrapper *nmClient.Client

parsers map[event.Type]event.NotificationParser

Expand Down Expand Up @@ -491,8 +523,6 @@ type cfgReputation struct {
localTrustStorage *truststorage.Storage

localTrustCtrl *trustcontroller.Controller

scriptHash neogoutil.Uint160
}

var persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block")
Expand Down Expand Up @@ -523,8 +553,6 @@ func initCfg(appCfg *config.Config) *cfg {
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes

netState := newNetworkState()

persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path())
fatalOnErr(err)

Expand Down Expand Up @@ -573,26 +601,21 @@ func initCfg(appCfg *config.Config) *cfg {
Buffers: &buffers,
Logger: c.internals.log,
}
basics := initBasics(c, key, persistate)

Check warning on line 604 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L604

Added line #L604 was not covered by tests
c.shared = shared{
key: key,
binPublicKey: key.PublicKey().Bytes(),
basics: basics,

Check warning on line 606 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L606

Added line #L606 was not covered by tests
localAddr: netAddr,
respSvc: response.NewService(response.WithNetworkState(netState)),
respSvc: response.NewService(response.WithNetworkState(basics.networkState)),

Check warning on line 608 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L608

Added line #L608 was not covered by tests
clientCache: cache.NewSDKClientCache(cacheOpts),
bgClientCache: cache.NewSDKClientCache(cacheOpts),
putClientCache: cache.NewSDKClientCache(cacheOpts),
persistate: persistate,
}
c.cfgAccounting = cfgAccounting{
scriptHash: contractsconfig.Balance(appCfg),
}
c.cfgContainer = cfgContainer{
scriptHash: contractsconfig.Container(appCfg),
workerPool: containerWorkerPool,
}
c.cfgNetmap = cfgNetmap{
scriptHash: contractsconfig.Netmap(appCfg),
state: netState,
state: c.basics.networkState,

Check warning on line 618 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L618

Added line #L618 was not covered by tests
workerPool: netmapWorkerPool,
needBootstrap: !relayOnly,
}
Expand All @@ -608,7 +631,6 @@ func initCfg(appCfg *config.Config) *cfg {
tombstoneLifetime: objectconfig.TombstoneLifetime(appCfg),
}
c.cfgReputation = cfgReputation{
scriptHash: contractsconfig.Reputation(appCfg),
workerPool: reputationWorkerPool,
}

Expand All @@ -618,7 +640,7 @@ func initCfg(appCfg *config.Config) *cfg {

if metricsconfig.Enabled(c.cfgReader) {

Check warning on line 641 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L641

Added line #L641 was not covered by tests
c.metricsCollector = metrics.NewNodeMetrics(misc.Version)
netState.metrics = c.metricsCollector
c.basics.networkState.metrics = c.metricsCollector

Check warning on line 643 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L643

Added line #L643 was not covered by tests
}

c.onShutdown(c.clientCache.CloseAll) // clean up connections
Expand All @@ -629,17 +651,112 @@ func initCfg(appCfg *config.Config) *cfg {
return c
}

func initBasics(c *cfg, key *keys.PrivateKey, stateStorage *state.PersistentStorage) basics {
b := basics{}

Check warning on line 655 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L654-L655

Added lines #L654 - L655 were not covered by tests

addresses := c.applicationConfiguration.MorphCfg.endpoints

Check warning on line 657 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L657

Added line #L657 was not covered by tests

fromSideChainBlock, err := stateStorage.UInt32(persistateSideChainLastBlockKey)
if err != nil {
fromSideChainBlock = 0
c.log.Warn("can't get last processed side chain block number", zap.String("error", err.Error()))

Check warning on line 662 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L659-L662

Added lines #L659 - L662 were not covered by tests
}

cli, err := client.New(key,
client.WithDialTimeout(c.applicationConfiguration.MorphCfg.dialTimout),
client.WithLogger(c.log),
client.WithAutoSidechainScope(),
client.WithEndpoints(addresses),
client.WithReconnectionRetries(c.applicationConfiguration.MorphCfg.reconnectionRetriesNumber),
client.WithReconnectionsDelay(c.applicationConfiguration.MorphCfg.reconnectionRetriesDelay),
client.WithConnSwitchCallback(func() {
err = c.restartMorph()
if err != nil {
c.internalErr <- fmt.Errorf("restarting after morph connection was lost: %w", err)

Check warning on line 675 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L665-L675

Added lines #L665 - L675 were not covered by tests
}
}),
client.WithConnLostCallback(func() {
c.internalErr <- errors.New("morph connection has been lost")
}),

Check warning on line 680 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L678-L680

Added lines #L678 - L680 were not covered by tests
client.WithMinRequiredBlockHeight(fromSideChainBlock),
)
if err != nil {
c.log.Info("failed to create neo RPC client",
zap.Any("endpoints", addresses),
zap.String("error", err.Error()),
)

Check warning on line 687 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L683-L687

Added lines #L683 - L687 were not covered by tests

fatalOnErr(err)

Check warning on line 689 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L689

Added line #L689 was not covered by tests
}

lookupScriptHashesInNNS(cli, &b)

Check warning on line 692 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L692

Added line #L692 was not covered by tests

nState := newNetworkState()

Check warning on line 694 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L694

Added line #L694 was not covered by tests

// container wrapper that invokes notary
// requests with the (empty) Alphabet signature
cnrWrap, err := cntClient.NewFromMorph(cli, b.containerSH, 0)
fatalOnErr(err)

Check warning on line 699 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L698-L699

Added lines #L698 - L699 were not covered by tests

cnrSrc := cntClient.AsContainerSource(cnrWrap)

Check warning on line 701 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L701

Added line #L701 was not covered by tests

eACLFetcher := &morphEACLFetcher{
w: cnrWrap,

Check warning on line 704 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L703-L704

Added lines #L703 - L704 were not covered by tests
}

nmWrap, err := nmClient.NewFromMorph(cli, b.netmapSH, 0)
fatalOnErr(err)

Check warning on line 708 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L707-L708

Added lines #L707 - L708 were not covered by tests

ttl := c.applicationConfiguration.MorphCfg.cacheTTL

Check warning on line 710 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L710

Added line #L710 was not covered by tests

var netmapSource netmapCore.Source
if ttl < 0 {
netmapSource = nmWrap
} else {
b.netmapCache = newCachedNetmapStorage(nState, nmWrap)
b.containerCache = newCachedContainerStorage(cnrSrc, ttl)
b.eaclCache = newCachedEACLStorage(eACLFetcher, ttl)
b.containerListCache = newCachedContainerLister(cnrWrap, ttl)

Check warning on line 719 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L712-L719

Added lines #L712 - L719 were not covered by tests

// use RPC node as source of netmap (with caching)
netmapSource = b.netmapCache

Check warning on line 722 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L722

Added line #L722 was not covered by tests
}

if ttl == 0 {
msPerBlock, err := cli.MsPerBlock()
fatalOnErr(err)
ttl = time.Duration(msPerBlock) * time.Millisecond
c.log.Debug("morph.cache_ttl fetched from network", zap.Duration("value", ttl))

Check warning on line 729 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L725-L729

Added lines #L725 - L729 were not covered by tests
}

b.netMapSource = netmapSource
b.networkState = nState
b.key = key
b.binPublicKey = key.PublicKey().Bytes()
b.cli = cli
b.nCli = nmWrap
b.cCli = cnrWrap
b.ttl = ttl

Check warning on line 739 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L732-L739

Added lines #L732 - L739 were not covered by tests

return b

Check warning on line 741 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L741

Added line #L741 was not covered by tests
}

func (c *cfg) engineOpts() []engine.Option {
opts := make([]engine.Option, 0, 4)

opts = append(opts,
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),

engine.WithContainersSource(c.shared.containerCache),
engine.WithLogger(c.log),
)

if c.shared.basics.ttl > 0 {
opts = append(opts, engine.WithContainersSource(c.shared.basics.containerCache))
} else {
opts = append(opts, engine.WithContainersSource(cntClient.AsContainerSource(c.shared.basics.cCli)))

Check warning on line 757 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L754-L757

Added lines #L754 - L757 were not covered by tests
}

if c.metricsCollector != nil {
opts = append(opts, engine.WithMetrics(c.metricsCollector))
}
Expand Down Expand Up @@ -769,9 +886,6 @@ func (c *cfg) LocalAddress() network.AddressGroup {
}

func initLocalStorage(c *cfg) {
// storage needs container, container needs storage, dirty sharing
c.shared.cnrClient = new(containerClient.Client)

ls := engine.New(c.engineOpts()...)

addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
Expand Down
Loading

0 comments on commit ff1c242

Please sign in to comment.