Skip to content

Commit

Permalink
node: support reloading node attributes with SIGHUP (#3005)
Browse files Browse the repository at this point in the history
Closes #1870.

Also fix bug from #2998, that incorrectly checked when to reconnect.
  • Loading branch information
roman-khimov authored Nov 21, 2024
2 parents f1b6982 + d430daa commit 1092989
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ attribute, which is used for container domain name in NNS contracts (#2954)
- `logger.encoding` config option (#2999)
- Reloading morph endpoints with SIGHUP (#2998)
- New `peapod-to-fstree` tool providing peapod-to-fstree data migration (#3013)
- Reloading node attributes with SIGHUP (#3005)

### Fixed
- Do not search for tombstones when handling their expiration, use local indexes instead (#2929)
Expand Down
29 changes: 29 additions & 0 deletions cmd/neofs-node/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/nspcc-dev/locode-db/pkg/locodedb"
netmapV2 "github.com/nspcc-dev/neofs-api-go/v2/netmap"
nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node"
"github.com/nspcc-dev/neofs-node/pkg/util/attributes"
"go.uber.org/zap"
Expand Down Expand Up @@ -97,3 +98,31 @@ func setIfNotEmpty(setter func(string), value string) {
setter(value)
}
}

func nodeAttrsEqual(arr1, arr2 [][2]string) bool {
if len(arr1) != len(arr2) {
return false
}

elements := make(map[string]string, len(arr1))

for _, item := range arr1 {
elements[item[0]] = item[1]
}

for _, item := range arr2 {
if value, exists := elements[item[0]]; !exists || value != item[1] {
return false
}
}

return true
}

func nodeAttrsToSlice(attrs []netmapV2.Attribute) [][2]string {
res := make([][2]string, len(attrs))
for i := range attrs {
res[i] = [2]string{attrs[i].GetKey(), attrs[i].GetValue()}
}
return res
}
60 changes: 60 additions & 0 deletions cmd/neofs-node/attributes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"testing"
)

func TestAttrsEqual(t *testing.T) {
tests := []struct {
name string
arr1 [][2]string
arr2 [][2]string
expected bool
}{
{
name: "Equal attributes",
arr1: [][2]string{{"key1", "value1"}, {"key2", "value2"}},
arr2: [][2]string{{"key1", "value1"}, {"key2", "value2"}},
expected: true,
},
{
name: "Different lengths",
arr1: [][2]string{{"key1", "value1"}},
arr2: [][2]string{{"key1", "value1"}, {"key2", "value2"}},
expected: false,
},
{
name: "Different values",
arr1: [][2]string{{"key1", "value1"}, {"key2", "value2"}},
arr2: [][2]string{{"key1", "value1"}, {"key2", "value3"}},
expected: false,
},
{
name: "Different keys",
arr1: [][2]string{{"key1", "value1"}, {"key2", "value2"}},
arr2: [][2]string{{"key3", "value1"}, {"key2", "value2"}},
expected: false,
},
{
name: "Equal attributes, different order",
arr1: [][2]string{{"key2", "value2"}, {"key1", "value1"}},
arr2: [][2]string{{"key1", "value1"}, {"key2", "value2"}},
expected: true,
},
{
name: "Empty arrays",
arr1: [][2]string{},
arr2: [][2]string{},
expected: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := nodeAttrsEqual(tt.arr1, tt.arr2)
if result != tt.expected {
t.Errorf("nodeAttrsEqual() = %v, expected %v", result, tt.expected)
}
})
}
}
20 changes: 18 additions & 2 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,9 @@ type cfgNetmap struct {
}

type cfgNodeInfo struct {
// values from config; NOT MODIFY AFTER APP INITIALIZATION
localInfo netmap.NodeInfo
// values from config; NOT MODIFY AFTER APP INITIALIZATION OR CONFIG RELOAD
localInfoLock sync.RWMutex
localInfo netmap.NodeInfo
}

type cfgObject struct {
Expand Down Expand Up @@ -524,10 +525,12 @@ func initCfg(appCfg *config.Config) *cfg {
panic(fmt.Errorf("config reading: %w", err))
}

c.cfgNodeInfo.localInfoLock.Lock()
// filling system attributes; do not move it anywhere
// below applying the other attributes since a user
// should be able to overwrite it.
err = writeSystemAttributes(c)
c.cfgNodeInfo.localInfoLock.Unlock()
fatalOnErr(err)

key := nodeconfig.Wallet(appCfg)
Expand Down Expand Up @@ -771,6 +774,9 @@ func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
}

func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) {
c.cfgNodeInfo.localInfoLock.RLock()
defer c.cfgNodeInfo.localInfoLock.RUnlock()

var res netmapV2.NodeInfo
c.cfgNodeInfo.localInfo.WriteToV2(&res)

Expand All @@ -789,7 +795,9 @@ func (c *cfg) handleLocalNodeInfoFromNetwork(ni *netmap.NodeInfo) {
// with the binary-encoded information from the current node's configuration.
// The state is set using the provided setter which MUST NOT be nil.
func (c *cfg) bootstrapWithState(stateSetter func(*netmap.NodeInfo)) error {
c.cfgNodeInfo.localInfoLock.RLock()
ni := c.cfgNodeInfo.localInfo
c.cfgNodeInfo.localInfoLock.RUnlock()
stateSetter(&ni)

prm := nmClient.AddPeerPrm{}
Expand Down Expand Up @@ -877,6 +885,14 @@ func (c *cfg) configWatcher(ctx context.Context) {

c.cli.Reload(client.WithEndpoints(c.morph.endpoints))

// Node

err = c.reloadNodeAttributes()
if err != nil {
c.log.Error("invalid node attributes configuration", zap.Error(err))
continue
}

c.log.Info("configuration has been reloaded successfully")
case <-ctx.Done():
return
Expand Down
3 changes: 3 additions & 0 deletions cmd/neofs-node/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ func (c *cfg) NumberOfAddresses() int {
}

func (c *cfg) ExternalAddresses() []string {
c.cfgNodeInfo.localInfoLock.RLock()
defer c.cfgNodeInfo.localInfoLock.RUnlock()

return c.cfgNodeInfo.localInfo.ExternalAddresses()
}

Expand Down
44 changes: 44 additions & 0 deletions cmd/neofs-node/netmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sync/atomic"

netmapV2 "github.com/nspcc-dev/neofs-api-go/v2/netmap"
netmapGRPC "github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/metrics"
Expand Down Expand Up @@ -122,15 +123,22 @@ func (c *cfg) iterateNetworkAddresses(f func(string) bool) {
}

func (c *cfg) addressNum() int {
c.cfgNodeInfo.localInfoLock.RLock()
defer c.cfgNodeInfo.localInfoLock.RUnlock()

return c.cfgNodeInfo.localInfo.NumberOfNetworkEndpoints()
}

func initNetmapService(c *cfg) {
c.cfgNodeInfo.localInfoLock.Lock()

network.WriteToNodeInfo(c.localAddr, &c.cfgNodeInfo.localInfo)
c.cfgNodeInfo.localInfo.SetPublicKey(c.key.PublicKey().Bytes())
parseAttributes(c)
c.cfgNodeInfo.localInfo.SetOffline()

c.cfgNodeInfo.localInfoLock.Unlock()

if c.cfgMorph.client == nil {
initMorphComponents(c)
}
Expand Down Expand Up @@ -445,3 +453,39 @@ func (n *netInfo) Dump(ver version.Version) (*netmapSDK.NetworkInfo, error) {

return &ni, nil
}

func (c *cfg) reloadNodeAttributes() error {
c.cfgNodeInfo.localInfoLock.Lock()

// TODO(@End-rey): after updating SDK, rewrite with w/o api netmap. See #3005, neofs-sdk-go#635.
var ni2 netmapV2.NodeInfo
c.cfgNodeInfo.localInfo.WriteToV2(&ni2)

oldAttrs := ni2.GetAttributes()

ni2.SetAttributes(nil)

err := c.cfgNodeInfo.localInfo.ReadFromV2(ni2)
if err != nil {
c.cfgNodeInfo.localInfoLock.Unlock()
return err
}

err = writeSystemAttributes(c)
if err != nil {
c.cfgNodeInfo.localInfoLock.Unlock()
return err
}
parseAttributes(c)

c.cfgNodeInfo.localInfo.WriteToV2(&ni2)

newAttrs := ni2.GetAttributes()
c.cfgNodeInfo.localInfoLock.Unlock()

if nodeAttrsEqual(nodeAttrsToSlice(oldAttrs), nodeAttrsToSlice(newAttrs)) {
return nil
}

return c.bootstrap()
}
6 changes: 6 additions & 0 deletions docs/sighup.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@ comparing paths from `shard.blobstor` section. After this we have 3 sets:
| Changed section | Actions |
|-----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `endpoints` | Updates N3 endpoints.<br/>If new `endpoints` do not contain the endpoint client is connected to, it will reconnect to another endpoint from the new list. Node service can be interrupted in this case. |

### Node

| Changed section | Actions |
|-----------------|--------------------------|
| `attribute_*` | Updates node attributes. |
3 changes: 3 additions & 0 deletions pkg/core/object/fmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ func (v *FormatValidator) validateSignatureKey(obj *object.Object) error {
return nil
}

if sig.PublicKey() == nil {
return errors.New("missing public key")
}
if !token.AssertAuthKey(sig.PublicKey()) {
return errors.New("session token is not for object's signer")
}
Expand Down
17 changes: 15 additions & 2 deletions pkg/local_object_storage/blobstor/put_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
"github.com/nspcc-dev/neofs-sdk-go/object"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -45,11 +47,22 @@ func TestBlobStor_Put_Overflow(t *testing.T) {
},
))

_, err := bs.Put(common.PutPrm{})
addr := oidtest.Address()

obj := objecttest.Object()
obj.SetContainerID(addr.Container())
obj.SetID(addr.Object())

prm := common.PutPrm{
Address: addr,
Object: &obj,
}

_, err := bs.Put(prm)
require.NoError(t, err)

sub2.full = true

_, err = bs.Put(common.PutPrm{})
_, err = bs.Put(prm)
require.ErrorIs(t, err, common.ErrNoSpace)
}
2 changes: 1 addition & 1 deletion pkg/morph/client/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (c *Client) Reload(opts ...Option) {
// Close current connection and attempt to reconnect, if there is no endpoint
// in the config to which the client is connected.
// Node service can be interrupted in this case.
if slices.Contains(cfg.endpoints, conn.client.Endpoint()) {
if !slices.Contains(cfg.endpoints, conn.client.Endpoint()) {
conn.client.Close()
}
}
4 changes: 2 additions & 2 deletions pkg/services/object/tombstone/verify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestVerifier_VerifyTomb(t *testing.T) {
},
}

require.NoError(t, v.VerifyTomb(ctx, cidtest.ID(), tomb))
require.NoError(t, v.VerifyTomb(ctx, addr.Container(), tomb))
})
}

Expand All @@ -246,7 +246,7 @@ func childrenResMap(cnr cid.ID, heads []object.Object) map[oid.Address]headRes {
}

func objectsToOIDs(oo []object.Object) []oid.ID {
res := make([]oid.ID, len(oo))
res := make([]oid.ID, 0, len(oo))
for _, obj := range oo {
oID := obj.GetID()
res = append(res, oID)
Expand Down

0 comments on commit 1092989

Please sign in to comment.