Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dkg/sync: enforce version #1901

Merged
merged 5 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions app/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package version
import (
"context"
"runtime/debug"
"strings"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
)
Expand Down Expand Up @@ -53,3 +55,13 @@ func LogInfo(ctx context.Context, msg string) {
z.Str("git_commit_time", gitTimestamp),
)
}

// Minor returns the minor version of the provided version string.
func Minor(version string) (string, error) {
split := strings.Split(version, ".")
if len(split) < 2 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be

Suggested change
if len(split) < 2 {
if len(split) < 3 {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it supports v0.1 as input and just returns it.

return "", errors.New("invalid version string")
}

return strings.Join(split[:2], "."), nil
}
39 changes: 39 additions & 0 deletions app/version/version_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1

package version_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/obolnetwork/charon/app/version"
)

func TestMinor(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add a case to assert an error from Minor function with error: invalid version string

minor, err := version.Minor("v0.1.2")
require.NoError(t, err)
require.Equal(t, "v0.1", minor)

minor, err = version.Minor("1.2.3")
require.NoError(t, err)
require.Equal(t, "1.2", minor)

minor, err = version.Minor("version 1000.2000.3000")
require.NoError(t, err)
require.Equal(t, "version 1000.2000", minor)

minor, err = version.Minor("v0.1")
require.NoError(t, err)
require.Equal(t, "v0.1", minor)

minor, err = version.Minor("v0.1.2.3")
require.NoError(t, err)
require.Equal(t, "v0.1", minor)

_, err = version.Minor("0")
require.ErrorContains(t, err, "invalid version string")

_, err = version.Minor("foo")
require.ErrorContains(t, err, "invalid version string")
}
15 changes: 11 additions & 4 deletions dkg/dkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,24 @@ func setupP2P(ctx context.Context, key *k1.PrivateKey, p2pConf p2p.Config, peers

// startSyncProtocol sets up a sync protocol server and clients for each peer and returns a shutdown function
// when all peers are connected.
func startSyncProtocol(ctx context.Context, tcpNode host.Host, key *k1.PrivateKey, defHash []byte, peerIDs []peer.ID,
onFailure func(), testCallback func(connected int, id peer.ID),
func startSyncProtocol(ctx context.Context, tcpNode host.Host, key *k1.PrivateKey, defHash []byte,
peerIDs []peer.ID, onFailure func(), testCallback func(connected int, id peer.ID),
) (func(context.Context) error, error) {
// Sign definition hash with charon-enr-private-key
// Note: libp2p signing does another hash of the defHash.

hashSig, err := ((*libp2pcrypto.Secp256k1PrivateKey)(key)).Sign(defHash)
if err != nil {
return nil, errors.Wrap(err, "sign definition hash")
}

server := sync.NewServer(tcpNode, len(peerIDs)-1, defHash)
// DKG compatibility is minor version dependent.
minorVersion, err := version.Minor(version.Version)
if err != nil {
return nil, errors.Wrap(err, "get version")
}

server := sync.NewServer(tcpNode, len(peerIDs)-1, defHash, minorVersion)
server.Start(ctx)

var clients []*sync.Client
Expand All @@ -290,7 +297,7 @@ func startSyncProtocol(ctx context.Context, tcpNode host.Host, key *k1.PrivateKe
}

ctx := log.WithCtx(ctx, z.Str("peer", p2p.PeerName(pID)))
client := sync.NewClient(tcpNode, pID, hashSig)
client := sync.NewClient(tcpNode, pID, hashSig, minorVersion)
clients = append(clients, client)

go func() {
Expand Down
33 changes: 21 additions & 12 deletions dkg/dkgpb/v1/sync.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dkg/dkgpb/v1/sync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ message MsgSync {
google.protobuf.Timestamp timestamp = 1;
bytes hash_signature = 2;
bool shutdown = 3;
string version = 4;
}

message MsgSyncResponse {
Expand Down
5 changes: 4 additions & 1 deletion dkg/sync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
)

// NewClient returns a new Client instance.
func NewClient(tcpNode host.Host, peer peer.ID, hashSig []byte) *Client {
func NewClient(tcpNode host.Host, peer peer.ID, hashSig []byte, version string) *Client {
return &Client{
tcpNode: tcpNode,
peer: peer,
hashSig: hashSig,
shutdown: make(chan struct{}),
done: make(chan struct{}),
reconnect: true,
version: version,
}
}

Expand All @@ -45,6 +46,7 @@ type Client struct {

// Immutable state
hashSig []byte
version string
tcpNode host.Host
peer peer.ID
}
Expand Down Expand Up @@ -167,6 +169,7 @@ func (c *Client) sendMsg(stream network.Stream, shutdown bool) (*pb.MsgSyncRespo
Timestamp: timestamppb.Now(),
HashSignature: c.hashSig,
Shutdown: shutdown,
Version: c.version,
}

if err := writeSizedProto(stream, msg); err != nil {
Expand Down
58 changes: 43 additions & 15 deletions dkg/sync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"time"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -25,18 +26,20 @@ import (
)

const (
protocolID = "/charon/dkg/sync/1.0.0/"
errInvalidSig = "invalid signature"
protocolID = "/charon/dkg/sync/1.0.0/"
errInvalidSig = "invalid signature"
errInvalidVersion = "invalid version"
)

// NewServer returns a new Server instance.
func NewServer(tcpNode host.Host, allCount int, defHash []byte) *Server {
func NewServer(tcpNode host.Host, allCount int, defHash []byte, version string) *Server {
return &Server{
defHash: defHash,
tcpNode: tcpNode,
allCount: allCount,
shutdown: make(map[peer.ID]struct{}),
connected: make(map[peer.ID]struct{}),
version: version,
}
}

Expand All @@ -47,6 +50,7 @@ type Server struct {
shutdown map[peer.ID]struct{}
connected map[peer.ID]struct{}
defHash []byte
version string
allCount int // Excluding self
tcpNode host.Host
errResponse bool // To return error and exit anywhere in the server flow
Expand All @@ -73,6 +77,14 @@ func (s *Server) AwaitAllConnected(ctx context.Context) error {
}
}

// setError sets the shared error state for the server.
func (s *Server) setError() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

godoc incorrect

s.mu.Lock()
defer s.mu.Unlock()

s.errResponse = true
}

// isError checks if there was any error in between the server flow.
func (s *Server) isError() bool {
s.mu.Lock()
Expand Down Expand Up @@ -179,20 +191,13 @@ func (s *Server) handleStream(ctx context.Context, stream network.Stream) error
SyncTimestamp: msg.Timestamp,
}

// Verify definition hash
// Note: libp2p verify does another hash of defHash.
ok, err := pubkey.Verify(s.defHash, msg.HashSignature)
var ok bool
resp.Error, ok, err = s.validReq(ctx, pubkey, msg)
if err != nil {
return errors.Wrap(err, "verify sig hash")
return err
} else if !ok {
resp.Error = errInvalidSig

s.mu.Lock()
s.errResponse = true
s.mu.Unlock()

log.Error(ctx, "Received mismatching cluster definition hash from peer", nil)
} else if ok && !s.isConnected(pID) {
s.setError()
} else if !s.isConnected(pID) {
count := s.setConnected(pID)
log.Info(ctx, fmt.Sprintf("Connected to peer %d of %d", count, s.allCount))
}
Expand All @@ -209,6 +214,29 @@ func (s *Server) handleStream(ctx context.Context, stream network.Stream) error
}
}

// validReq returns an error message and false if the request version or definition hash are invalid.
// Else it returns true or an error.
func (s *Server) validReq(ctx context.Context, pubkey crypto.PubKey, msg *pb.MsgSync) (string, bool, error) {
Copy link
Contributor

@dB2510 dB2510 Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are reading s.version and s.defHash in this method without locking mutex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they are immutable fields, no need to lock

Copy link
Contributor Author

@corverroos corverroos Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grouped server fields into immutable and mutable groups

if msg.Version != s.version {
log.Error(ctx, "Received mismatching charon version from peer", nil,
z.Str("expect", s.version),
z.Str("got", msg.Version),
)

return errInvalidVersion, false, nil
}

ok, err := pubkey.Verify(s.defHash, msg.HashSignature)
if err != nil { // Note: libp2p verify does another hash of defHash.
return "", false, errors.Wrap(err, "verify sig hash")
} else if !ok {
log.Error(ctx, "Received mismatching cluster definition hash from peer", nil)
return errInvalidSig, false, nil
}

return "", true, nil
}

// Start registers sync protocol with the libp2p host.
func (s *Server) Start(ctx context.Context) {
s.tcpNode.SetStreamHandler(protocolID, func(stream network.Stream) {
Expand Down
Loading