diff --git a/app/peerinfo/metrics.go b/app/peerinfo/metrics.go new file mode 100644 index 000000000..e008f0f8e --- /dev/null +++ b/app/peerinfo/metrics.go @@ -0,0 +1,40 @@ +// Copyright © 2022 Obol Labs Inc. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU General Public License as published by the Free +// Software Foundation, either version 3 of the License, or (at your option) +// any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . + +package peerinfo + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/obolnetwork/charon/app/promauto" +) + +var ( + peerClockOffset = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "app", + Subsystem: "peerinfo", + Name: "clock_offset_seconds", + Help: "Peer clock offset in seconds", + ConstLabels: nil, + }, []string{"peer"}) + + peerVersion = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "app", + Subsystem: "peerinfo", + Name: "version", + Help: "Constant gauge with version label set to peer's charon version.", + ConstLabels: nil, + }, []string{"peer", "version"}) +) diff --git a/app/peerinfo/peerinfo.go b/app/peerinfo/peerinfo.go new file mode 100644 index 000000000..60ff6b1f9 --- /dev/null +++ b/app/peerinfo/peerinfo.go @@ -0,0 +1,165 @@ +// Copyright © 2022 Obol Labs Inc. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU General Public License as published by the Free +// Software Foundation, either version 3 of the License, or (at your option) +// any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . + +package peerinfo + +import ( + "bytes" + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/obolnetwork/charon/app/log" + pbv1 "github.com/obolnetwork/charon/app/peerinfo/peerinfopb/v1" + "github.com/obolnetwork/charon/app/z" + "github.com/obolnetwork/charon/p2p" +) + +const period = time.Minute + +var protocolID protocol.ID = "/charon/peerinfo/1.0.0" + +type ( + tickerProvider func() (<-chan time.Time, func()) + nowFunc func() time.Time +) + +func New(tcpNode host.Host, peers []peer.ID, version string, lockHash []byte, + sendFunc p2p.SendReceiveFunc, +) *PeerInfo { + tickerProvider := func() (<-chan time.Time, func()) { + ticker := time.NewTicker(period) + return ticker.C, ticker.Stop + } + + return newInternal(tcpNode, peers, version, lockHash, sendFunc, p2p.RegisterHandler, + tickerProvider, time.Now) +} + +func NewForT(_ *testing.T, tcpNode host.Host, peers []peer.ID, version string, lockHash []byte, + sendFunc p2p.SendReceiveFunc, registerHandler p2p.RegisterHandlerFunc, + tickerProvider tickerProvider, nowFunc nowFunc, +) *PeerInfo { + return newInternal(tcpNode, peers, version, lockHash, sendFunc, registerHandler, + tickerProvider, nowFunc) +} + +func newInternal(tcpNode host.Host, peers []peer.ID, version string, lockHash []byte, + sendFunc p2p.SendReceiveFunc, registerHandler p2p.RegisterHandlerFunc, + tickerProvider tickerProvider, nowFunc nowFunc, +) *PeerInfo { + // Register a simple handler that returns our info and ignores the request. + registerHandler("peerinfo", tcpNode, protocolID, + func() proto.Message { return new(pbv1.PeerInfo) }, + func(ctx context.Context, peerID peer.ID, req proto.Message) (proto.Message, bool, error) { + return &pbv1.PeerInfo{ + CharonVersion: version, + LockHash: lockHash, + SentAt: timestamppb.New(nowFunc()), + }, true, nil + }, + ) + + return &PeerInfo{ + sendFunc: sendFunc, + tcpNode: tcpNode, + peers: peers, + version: version, + lockHash: lockHash, + tickerProvider: tickerProvider, + loggedLocks: new(sync.Map), + } +} + +type PeerInfo struct { + sendFunc p2p.SendReceiveFunc + tcpNode host.Host + peers []peer.ID + version string + lockHash []byte + tickerProvider tickerProvider + loggedLocks *sync.Map // map[peer.ID]lockHash +} + +func (p *PeerInfo) Run(ctx context.Context) { + ctx = log.WithTopic(ctx, "peerinfo") + + ticks, cancel := p.tickerProvider() + defer cancel() + + for { + select { + case <-ctx.Done(): + return + case now := <-ticks: + p.sendOnce(ctx, now) + } + } +} + +func (p *PeerInfo) sendOnce(ctx context.Context, now time.Time) { + for _, peerID := range p.peers { + if peerID == p.tcpNode.ID() { + continue // Do not send to self. + } + + req := &pbv1.PeerInfo{ + CharonVersion: p.version, + LockHash: p.lockHash, + SentAt: timestamppb.New(now), + } + + go func(peerID peer.ID) { + resp := new(pbv1.PeerInfo) + err := p.sendFunc(ctx, p.tcpNode, peerID, req, resp, protocolID) + if err != nil { + return // Logging handled by send func. + } + + rtt := time.Since(now) + expectSentAt := now.Add(rtt / 2) + clockOffset := resp.SentAt.AsTime().Sub(expectSentAt) + + peerName := p2p.PeerName(peerID) + peerClockOffset.WithLabelValues(peerName).Set(clockOffset.Seconds()) + peerVersion.WithLabelValues(peerName, resp.CharonVersion).Set(1) + + // Log unexpected lock hash + if !bytes.Equal(resp.LockHash, p.lockHash) { + // TODO(corver): Think about escalating this error when we are clear + // on how to handle lock file migrations. + + prevHash, ok := p.loggedLocks.Load(peerID) + if !ok || !bytes.Equal(prevHash.([]byte), resp.LockHash) { + // Only log once when we see a new lock hash + log.Warn(ctx, "Mismatching peer lock hash", nil, + z.Str("peer", peerName), + z.Str("lock_hash", fmt.Sprintf("%#x", resp.LockHash)), + ) + + p.loggedLocks.Store(peerID, resp.LockHash) + } + } + }(peerID) + } +} diff --git a/app/peerinfo/peerinfopb/v1/appinfo.pb.go b/app/peerinfo/peerinfopb/v1/appinfo.pb.go new file mode 100644 index 000000000..2417a9269 --- /dev/null +++ b/app/peerinfo/peerinfopb/v1/appinfo.pb.go @@ -0,0 +1,175 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc (unknown) +// source: app/peerinfo/peerinfopb/v1/appinfo.proto + +package v1 + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type PeerInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + CharonVersion string `protobuf:"bytes,1,opt,name=charon_version,json=charonVersion,proto3" json:"charon_version,omitempty"` + LockHash []byte `protobuf:"bytes,2,opt,name=lock_hash,json=lockHash,proto3" json:"lock_hash,omitempty"` + SentAt *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=sent_at,json=sentAt,proto3" json:"sent_at,omitempty"` +} + +func (x *PeerInfo) Reset() { + *x = PeerInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_app_peerinfo_peerinfopb_v1_appinfo_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PeerInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PeerInfo) ProtoMessage() {} + +func (x *PeerInfo) ProtoReflect() protoreflect.Message { + mi := &file_app_peerinfo_peerinfopb_v1_appinfo_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PeerInfo.ProtoReflect.Descriptor instead. +func (*PeerInfo) Descriptor() ([]byte, []int) { + return file_app_peerinfo_peerinfopb_v1_appinfo_proto_rawDescGZIP(), []int{0} +} + +func (x *PeerInfo) GetCharonVersion() string { + if x != nil { + return x.CharonVersion + } + return "" +} + +func (x *PeerInfo) GetLockHash() []byte { + if x != nil { + return x.LockHash + } + return nil +} + +func (x *PeerInfo) GetSentAt() *timestamppb.Timestamp { + if x != nil { + return x.SentAt + } + return nil +} + +var File_app_peerinfo_peerinfopb_v1_appinfo_proto protoreflect.FileDescriptor + +var file_app_peerinfo_peerinfopb_v1_appinfo_proto_rawDesc = []byte{ + 0x0a, 0x28, 0x61, 0x70, 0x70, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x66, 0x6f, 0x2f, 0x70, + 0x65, 0x65, 0x72, 0x69, 0x6e, 0x66, 0x6f, 0x70, 0x62, 0x2f, 0x76, 0x31, 0x2f, 0x61, 0x70, 0x70, + 0x69, 0x6e, 0x66, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1a, 0x61, 0x70, 0x70, 0x2e, + 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x66, 0x6f, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x66, + 0x6f, 0x70, 0x62, 0x2e, 0x76, 0x31, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x83, 0x01, 0x0a, 0x08, 0x50, 0x65, 0x65, 0x72, + 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x68, 0x61, 0x72, 0x6f, 0x6e, 0x5f, 0x76, + 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x68, + 0x61, 0x72, 0x6f, 0x6e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1b, 0x0a, 0x09, 0x6c, + 0x6f, 0x63, 0x6b, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, + 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x61, 0x73, 0x68, 0x12, 0x33, 0x0a, 0x07, 0x73, 0x65, 0x6e, 0x74, + 0x5f, 0x61, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x06, 0x73, 0x65, 0x6e, 0x74, 0x41, 0x74, 0x42, 0x3a, 0x5a, + 0x38, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x62, 0x6f, 0x6c, + 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x2f, 0x63, 0x68, 0x61, 0x72, 0x6f, 0x6e, 0x2f, 0x61, + 0x70, 0x70, 0x2f, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x66, 0x6f, 0x2f, 0x70, 0x65, 0x65, 0x72, + 0x69, 0x6e, 0x66, 0x6f, 0x70, 0x62, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, +} + +var ( + file_app_peerinfo_peerinfopb_v1_appinfo_proto_rawDescOnce sync.Once + file_app_peerinfo_peerinfopb_v1_appinfo_proto_rawDescData = file_app_peerinfo_peerinfopb_v1_appinfo_proto_rawDesc +) + +func file_app_peerinfo_peerinfopb_v1_appinfo_proto_rawDescGZIP() []byte { + file_app_peerinfo_peerinfopb_v1_appinfo_proto_rawDescOnce.Do(func() { + file_app_peerinfo_peerinfopb_v1_appinfo_proto_rawDescData = protoimpl.X.CompressGZIP(file_app_peerinfo_peerinfopb_v1_appinfo_proto_rawDescData) + }) + return file_app_peerinfo_peerinfopb_v1_appinfo_proto_rawDescData +} + +var file_app_peerinfo_peerinfopb_v1_appinfo_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_app_peerinfo_peerinfopb_v1_appinfo_proto_goTypes = []interface{}{ + (*PeerInfo)(nil), // 0: app.peerinfo.peerinfopb.v1.PeerInfo + (*timestamppb.Timestamp)(nil), // 1: google.protobuf.Timestamp +} +var file_app_peerinfo_peerinfopb_v1_appinfo_proto_depIdxs = []int32{ + 1, // 0: app.peerinfo.peerinfopb.v1.PeerInfo.sent_at:type_name -> google.protobuf.Timestamp + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_app_peerinfo_peerinfopb_v1_appinfo_proto_init() } +func file_app_peerinfo_peerinfopb_v1_appinfo_proto_init() { + if File_app_peerinfo_peerinfopb_v1_appinfo_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_app_peerinfo_peerinfopb_v1_appinfo_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PeerInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_app_peerinfo_peerinfopb_v1_appinfo_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_app_peerinfo_peerinfopb_v1_appinfo_proto_goTypes, + DependencyIndexes: file_app_peerinfo_peerinfopb_v1_appinfo_proto_depIdxs, + MessageInfos: file_app_peerinfo_peerinfopb_v1_appinfo_proto_msgTypes, + }.Build() + File_app_peerinfo_peerinfopb_v1_appinfo_proto = out.File + file_app_peerinfo_peerinfopb_v1_appinfo_proto_rawDesc = nil + file_app_peerinfo_peerinfopb_v1_appinfo_proto_goTypes = nil + file_app_peerinfo_peerinfopb_v1_appinfo_proto_depIdxs = nil +} diff --git a/app/peerinfo/peerinfopb/v1/appinfo.proto b/app/peerinfo/peerinfopb/v1/appinfo.proto new file mode 100644 index 000000000..14dbd7afa --- /dev/null +++ b/app/peerinfo/peerinfopb/v1/appinfo.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +package app.peerinfo.peerinfopb.v1; + +option go_package = "github.com/obolnetwork/charon/app/peerinfo/peerinfopb/v1"; + +import "google/protobuf/timestamp.proto"; + +message PeerInfo { + string charon_version = 1; + bytes lock_hash = 2; + google.protobuf.Timestamp sent_at = 3; +}