From c6df13eba8cf716327f3f0adaec03452c6f4a3d5 Mon Sep 17 00:00:00 2001 From: corverroos Date: Fri, 21 Oct 2022 17:43:13 +0200 Subject: [PATCH 1/7] cleanup --- app/log/loki/lokipb/v1/loki.pb.go | 312 ++++++++++++++++++++++++++++++ 1 file changed, 312 insertions(+) create mode 100644 app/log/loki/lokipb/v1/loki.pb.go diff --git a/app/log/loki/lokipb/v1/loki.pb.go b/app/log/loki/lokipb/v1/loki.pb.go new file mode 100644 index 000000000..6296b5875 --- /dev/null +++ b/app/log/loki/lokipb/v1/loki.pb.go @@ -0,0 +1,312 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc (unknown) +// source: app/log/loki/lokipb/v1/loki.proto + +package v1 + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type PushRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Streams []*Stream `protobuf:"bytes,1,rep,name=streams,proto3" json:"streams,omitempty"` +} + +func (x *PushRequest) Reset() { + *x = PushRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_app_log_loki_lokipb_v1_loki_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PushRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PushRequest) ProtoMessage() {} + +func (x *PushRequest) ProtoReflect() protoreflect.Message { + mi := &file_app_log_loki_lokipb_v1_loki_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PushRequest.ProtoReflect.Descriptor instead. +func (*PushRequest) Descriptor() ([]byte, []int) { + return file_app_log_loki_lokipb_v1_loki_proto_rawDescGZIP(), []int{0} +} + +func (x *PushRequest) GetStreams() []*Stream { + if x != nil { + return x.Streams + } + return nil +} + +type Stream struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels,omitempty"` + Entries []*Entry `protobuf:"bytes,2,rep,name=entries,proto3" json:"entries,omitempty"` + Hash uint64 `protobuf:"varint,3,opt,name=hash,proto3" json:"hash,omitempty"` +} + +func (x *Stream) Reset() { + *x = Stream{} + if protoimpl.UnsafeEnabled { + mi := &file_app_log_loki_lokipb_v1_loki_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Stream) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Stream) ProtoMessage() {} + +func (x *Stream) ProtoReflect() protoreflect.Message { + mi := &file_app_log_loki_lokipb_v1_loki_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Stream.ProtoReflect.Descriptor instead. +func (*Stream) Descriptor() ([]byte, []int) { + return file_app_log_loki_lokipb_v1_loki_proto_rawDescGZIP(), []int{1} +} + +func (x *Stream) GetLabels() string { + if x != nil { + return x.Labels + } + return "" +} + +func (x *Stream) GetEntries() []*Entry { + if x != nil { + return x.Entries + } + return nil +} + +func (x *Stream) GetHash() uint64 { + if x != nil { + return x.Hash + } + return 0 +} + +type Entry struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line,omitempty"` +} + +func (x *Entry) Reset() { + *x = Entry{} + if protoimpl.UnsafeEnabled { + mi := &file_app_log_loki_lokipb_v1_loki_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Entry) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Entry) ProtoMessage() {} + +func (x *Entry) ProtoReflect() protoreflect.Message { + mi := &file_app_log_loki_lokipb_v1_loki_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Entry.ProtoReflect.Descriptor instead. +func (*Entry) Descriptor() ([]byte, []int) { + return file_app_log_loki_lokipb_v1_loki_proto_rawDescGZIP(), []int{2} +} + +func (x *Entry) GetTimestamp() *timestamppb.Timestamp { + if x != nil { + return x.Timestamp + } + return nil +} + +func (x *Entry) GetLine() string { + if x != nil { + return x.Line + } + return "" +} + +var File_app_log_loki_lokipb_v1_loki_proto protoreflect.FileDescriptor + +var file_app_log_loki_lokipb_v1_loki_proto_rawDesc = []byte{ + 0x0a, 0x21, 0x61, 0x70, 0x70, 0x2f, 0x6c, 0x6f, 0x67, 0x2f, 0x6c, 0x6f, 0x6b, 0x69, 0x2f, 0x6c, + 0x6f, 0x6b, 0x69, 0x70, 0x62, 0x2f, 0x76, 0x31, 0x2f, 0x6c, 0x6f, 0x6b, 0x69, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x16, 0x61, 0x70, 0x70, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x6c, 0x6f, 0x6b, + 0x69, 0x2e, 0x6c, 0x6f, 0x6b, 0x69, 0x70, 0x62, 0x2e, 0x76, 0x31, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x47, 0x0a, 0x0b, + 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x07, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x61, + 0x70, 0x70, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x6c, 0x6f, 0x6b, 0x69, 0x2e, 0x6c, 0x6f, 0x6b, 0x69, + 0x70, 0x62, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x07, 0x73, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x73, 0x22, 0x6d, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, + 0x16, 0x0a, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x12, 0x37, 0x0a, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, + 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6c, + 0x6f, 0x67, 0x2e, 0x6c, 0x6f, 0x6b, 0x69, 0x2e, 0x6c, 0x6f, 0x6b, 0x69, 0x70, 0x62, 0x2e, 0x76, + 0x31, 0x2e, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, + 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, 0x73, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, + 0x68, 0x61, 0x73, 0x68, 0x22, 0x55, 0x0a, 0x05, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x38, 0x0a, + 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x6c, 0x69, 0x6e, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6c, 0x69, 0x6e, 0x65, 0x42, 0x36, 0x5a, 0x34, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x62, 0x6f, 0x6c, 0x6e, 0x65, + 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x2f, 0x63, 0x68, 0x61, 0x72, 0x6f, 0x6e, 0x2f, 0x61, 0x70, 0x70, + 0x2f, 0x6c, 0x6f, 0x67, 0x2f, 0x6c, 0x6f, 0x6b, 0x69, 0x2f, 0x6c, 0x6f, 0x6b, 0x69, 0x70, 0x62, + 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_app_log_loki_lokipb_v1_loki_proto_rawDescOnce sync.Once + file_app_log_loki_lokipb_v1_loki_proto_rawDescData = file_app_log_loki_lokipb_v1_loki_proto_rawDesc +) + +func file_app_log_loki_lokipb_v1_loki_proto_rawDescGZIP() []byte { + file_app_log_loki_lokipb_v1_loki_proto_rawDescOnce.Do(func() { + file_app_log_loki_lokipb_v1_loki_proto_rawDescData = protoimpl.X.CompressGZIP(file_app_log_loki_lokipb_v1_loki_proto_rawDescData) + }) + return file_app_log_loki_lokipb_v1_loki_proto_rawDescData +} + +var file_app_log_loki_lokipb_v1_loki_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_app_log_loki_lokipb_v1_loki_proto_goTypes = []interface{}{ + (*PushRequest)(nil), // 0: app.log.loki.lokipb.v1.PushRequest + (*Stream)(nil), // 1: app.log.loki.lokipb.v1.Stream + (*Entry)(nil), // 2: app.log.loki.lokipb.v1.Entry + (*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp +} +var file_app_log_loki_lokipb_v1_loki_proto_depIdxs = []int32{ + 1, // 0: app.log.loki.lokipb.v1.PushRequest.streams:type_name -> app.log.loki.lokipb.v1.Stream + 2, // 1: app.log.loki.lokipb.v1.Stream.entries:type_name -> app.log.loki.lokipb.v1.Entry + 3, // 2: app.log.loki.lokipb.v1.Entry.timestamp:type_name -> google.protobuf.Timestamp + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_app_log_loki_lokipb_v1_loki_proto_init() } +func file_app_log_loki_lokipb_v1_loki_proto_init() { + if File_app_log_loki_lokipb_v1_loki_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_app_log_loki_lokipb_v1_loki_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PushRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_app_log_loki_lokipb_v1_loki_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Stream); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_app_log_loki_lokipb_v1_loki_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Entry); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_app_log_loki_lokipb_v1_loki_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_app_log_loki_lokipb_v1_loki_proto_goTypes, + DependencyIndexes: file_app_log_loki_lokipb_v1_loki_proto_depIdxs, + MessageInfos: file_app_log_loki_lokipb_v1_loki_proto_msgTypes, + }.Build() + File_app_log_loki_lokipb_v1_loki_proto = out.File + file_app_log_loki_lokipb_v1_loki_proto_rawDesc = nil + file_app_log_loki_lokipb_v1_loki_proto_goTypes = nil + file_app_log_loki_lokipb_v1_loki_proto_depIdxs = nil +} From 70014badec530ec672e5e9c2dd6dd20932700e95 Mon Sep 17 00:00:00 2001 From: corverroos Date: Fri, 11 Nov 2022 09:27:38 +0200 Subject: [PATCH 2/7] core/priority: fix deadlock --- core/priority/prioritiser.go | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/core/priority/prioritiser.go b/core/priority/prioritiser.go index 64859cdb9..0106569ae 100644 --- a/core/priority/prioritiser.go +++ b/core/priority/prioritiser.go @@ -231,8 +231,8 @@ func (p *Prioritiser) Run(ctx context.Context) error { Timeout: time.Now().Add(p.consensusTimeout), } } - sendResponse(data.Msgs, recv) data.Msgs[recv.Msg.PeerId] = recv + sendResponse(data.Msgs) instances[key] = data if len(data.Msgs) == len(p.peers) { @@ -392,28 +392,31 @@ func hashProto(msg proto.Message) ([32]byte, error) { } // sendResponse sends own response to any awaiting peers. -func sendResponse(msgs map[string]received, recv received) { - if recv.Own { // Send our message to all waiting peers. - for _, other := range msgs { - if other.Response == nil { - continue - } - other.Response <- recv.Msg +func sendResponse(msgs map[string]received) { + // Get own message + var own *pbv1.PriorityMsg + for _, recv := range msgs { + if !recv.Own { + continue } - return + own = recv.Msg + + break } - if recv.Response == nil { - // This peer doesn't need a response + if own == nil { + // Own message not received yet return } - // Send own response to this peer. - for _, other := range msgs { - if !other.Own { + // Send own to any awaiting peers + for _, recv := range msgs { + if recv.Response == nil { continue } - recv.Response <- other.Msg + + recv.Response <- own // Response channel has capacity of 1. + recv.Response = nil // Mark channel as responded.. } } From 37033aff720a1bd46fc154f13b5684ebd284a076 Mon Sep 17 00:00:00 2001 From: corverroos Date: Fri, 11 Nov 2022 09:29:34 +0200 Subject: [PATCH 3/7] cleanup --- app/log/loki/lokipb/v1/loki.pb.go | 312 ------------------------------ 1 file changed, 312 deletions(-) delete mode 100644 app/log/loki/lokipb/v1/loki.pb.go diff --git a/app/log/loki/lokipb/v1/loki.pb.go b/app/log/loki/lokipb/v1/loki.pb.go deleted file mode 100644 index 6296b5875..000000000 --- a/app/log/loki/lokipb/v1/loki.pb.go +++ /dev/null @@ -1,312 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.28.1 -// protoc (unknown) -// source: app/log/loki/lokipb/v1/loki.proto - -package v1 - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type PushRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Streams []*Stream `protobuf:"bytes,1,rep,name=streams,proto3" json:"streams,omitempty"` -} - -func (x *PushRequest) Reset() { - *x = PushRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_app_log_loki_lokipb_v1_loki_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *PushRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*PushRequest) ProtoMessage() {} - -func (x *PushRequest) ProtoReflect() protoreflect.Message { - mi := &file_app_log_loki_lokipb_v1_loki_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use PushRequest.ProtoReflect.Descriptor instead. -func (*PushRequest) Descriptor() ([]byte, []int) { - return file_app_log_loki_lokipb_v1_loki_proto_rawDescGZIP(), []int{0} -} - -func (x *PushRequest) GetStreams() []*Stream { - if x != nil { - return x.Streams - } - return nil -} - -type Stream struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels,omitempty"` - Entries []*Entry `protobuf:"bytes,2,rep,name=entries,proto3" json:"entries,omitempty"` - Hash uint64 `protobuf:"varint,3,opt,name=hash,proto3" json:"hash,omitempty"` -} - -func (x *Stream) Reset() { - *x = Stream{} - if protoimpl.UnsafeEnabled { - mi := &file_app_log_loki_lokipb_v1_loki_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Stream) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Stream) ProtoMessage() {} - -func (x *Stream) ProtoReflect() protoreflect.Message { - mi := &file_app_log_loki_lokipb_v1_loki_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Stream.ProtoReflect.Descriptor instead. -func (*Stream) Descriptor() ([]byte, []int) { - return file_app_log_loki_lokipb_v1_loki_proto_rawDescGZIP(), []int{1} -} - -func (x *Stream) GetLabels() string { - if x != nil { - return x.Labels - } - return "" -} - -func (x *Stream) GetEntries() []*Entry { - if x != nil { - return x.Entries - } - return nil -} - -func (x *Stream) GetHash() uint64 { - if x != nil { - return x.Hash - } - return 0 -} - -type Entry struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line,omitempty"` -} - -func (x *Entry) Reset() { - *x = Entry{} - if protoimpl.UnsafeEnabled { - mi := &file_app_log_loki_lokipb_v1_loki_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Entry) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Entry) ProtoMessage() {} - -func (x *Entry) ProtoReflect() protoreflect.Message { - mi := &file_app_log_loki_lokipb_v1_loki_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Entry.ProtoReflect.Descriptor instead. -func (*Entry) Descriptor() ([]byte, []int) { - return file_app_log_loki_lokipb_v1_loki_proto_rawDescGZIP(), []int{2} -} - -func (x *Entry) GetTimestamp() *timestamppb.Timestamp { - if x != nil { - return x.Timestamp - } - return nil -} - -func (x *Entry) GetLine() string { - if x != nil { - return x.Line - } - return "" -} - -var File_app_log_loki_lokipb_v1_loki_proto protoreflect.FileDescriptor - -var file_app_log_loki_lokipb_v1_loki_proto_rawDesc = []byte{ - 0x0a, 0x21, 0x61, 0x70, 0x70, 0x2f, 0x6c, 0x6f, 0x67, 0x2f, 0x6c, 0x6f, 0x6b, 0x69, 0x2f, 0x6c, - 0x6f, 0x6b, 0x69, 0x70, 0x62, 0x2f, 0x76, 0x31, 0x2f, 0x6c, 0x6f, 0x6b, 0x69, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x12, 0x16, 0x61, 0x70, 0x70, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x6c, 0x6f, 0x6b, - 0x69, 0x2e, 0x6c, 0x6f, 0x6b, 0x69, 0x70, 0x62, 0x2e, 0x76, 0x31, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, - 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x47, 0x0a, 0x0b, - 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x07, 0x73, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x61, - 0x70, 0x70, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x6c, 0x6f, 0x6b, 0x69, 0x2e, 0x6c, 0x6f, 0x6b, 0x69, - 0x70, 0x62, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x07, 0x73, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x73, 0x22, 0x6d, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, - 0x16, 0x0a, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x12, 0x37, 0x0a, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, - 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6c, - 0x6f, 0x67, 0x2e, 0x6c, 0x6f, 0x6b, 0x69, 0x2e, 0x6c, 0x6f, 0x6b, 0x69, 0x70, 0x62, 0x2e, 0x76, - 0x31, 0x2e, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, - 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, 0x73, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, - 0x68, 0x61, 0x73, 0x68, 0x22, 0x55, 0x0a, 0x05, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x38, 0x0a, - 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, - 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x6c, 0x69, 0x6e, 0x65, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6c, 0x69, 0x6e, 0x65, 0x42, 0x36, 0x5a, 0x34, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x62, 0x6f, 0x6c, 0x6e, 0x65, - 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x2f, 0x63, 0x68, 0x61, 0x72, 0x6f, 0x6e, 0x2f, 0x61, 0x70, 0x70, - 0x2f, 0x6c, 0x6f, 0x67, 0x2f, 0x6c, 0x6f, 0x6b, 0x69, 0x2f, 0x6c, 0x6f, 0x6b, 0x69, 0x70, 0x62, - 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_app_log_loki_lokipb_v1_loki_proto_rawDescOnce sync.Once - file_app_log_loki_lokipb_v1_loki_proto_rawDescData = file_app_log_loki_lokipb_v1_loki_proto_rawDesc -) - -func file_app_log_loki_lokipb_v1_loki_proto_rawDescGZIP() []byte { - file_app_log_loki_lokipb_v1_loki_proto_rawDescOnce.Do(func() { - file_app_log_loki_lokipb_v1_loki_proto_rawDescData = protoimpl.X.CompressGZIP(file_app_log_loki_lokipb_v1_loki_proto_rawDescData) - }) - return file_app_log_loki_lokipb_v1_loki_proto_rawDescData -} - -var file_app_log_loki_lokipb_v1_loki_proto_msgTypes = make([]protoimpl.MessageInfo, 3) -var file_app_log_loki_lokipb_v1_loki_proto_goTypes = []interface{}{ - (*PushRequest)(nil), // 0: app.log.loki.lokipb.v1.PushRequest - (*Stream)(nil), // 1: app.log.loki.lokipb.v1.Stream - (*Entry)(nil), // 2: app.log.loki.lokipb.v1.Entry - (*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp -} -var file_app_log_loki_lokipb_v1_loki_proto_depIdxs = []int32{ - 1, // 0: app.log.loki.lokipb.v1.PushRequest.streams:type_name -> app.log.loki.lokipb.v1.Stream - 2, // 1: app.log.loki.lokipb.v1.Stream.entries:type_name -> app.log.loki.lokipb.v1.Entry - 3, // 2: app.log.loki.lokipb.v1.Entry.timestamp:type_name -> google.protobuf.Timestamp - 3, // [3:3] is the sub-list for method output_type - 3, // [3:3] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name -} - -func init() { file_app_log_loki_lokipb_v1_loki_proto_init() } -func file_app_log_loki_lokipb_v1_loki_proto_init() { - if File_app_log_loki_lokipb_v1_loki_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_app_log_loki_lokipb_v1_loki_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PushRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_app_log_loki_lokipb_v1_loki_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Stream); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_app_log_loki_lokipb_v1_loki_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Entry); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_app_log_loki_lokipb_v1_loki_proto_rawDesc, - NumEnums: 0, - NumMessages: 3, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_app_log_loki_lokipb_v1_loki_proto_goTypes, - DependencyIndexes: file_app_log_loki_lokipb_v1_loki_proto_depIdxs, - MessageInfos: file_app_log_loki_lokipb_v1_loki_proto_msgTypes, - }.Build() - File_app_log_loki_lokipb_v1_loki_proto = out.File - file_app_log_loki_lokipb_v1_loki_proto_rawDesc = nil - file_app_log_loki_lokipb_v1_loki_proto_goTypes = nil - file_app_log_loki_lokipb_v1_loki_proto_depIdxs = nil -} From 5ab4c87eb8600fd38c68626200e50a7d74e90cd0 Mon Sep 17 00:00:00 2001 From: corverroos Date: Fri, 11 Nov 2022 12:47:30 +0200 Subject: [PATCH 4/7] cleanup --- core/priority/prioritiser.go | 186 ++++++++++++++++-------------- core/priority/prioritiser_test.go | 76 ++++++++---- 2 files changed, 154 insertions(+), 108 deletions(-) diff --git a/core/priority/prioritiser.go b/core/priority/prioritiser.go index 0106569ae..8b8c507b6 100644 --- a/core/priority/prioritiser.go +++ b/core/priority/prioritiser.go @@ -25,6 +25,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/log" @@ -46,9 +47,12 @@ type Priority proto.Message // instanceData contains an Instance and its data. type instanceData struct { + OwnID string Instance Instance - Msgs map[string]received // map[peerID]msg - Timeout time.Time + Key [32]byte // Hash of instance + Pending []chan<- *pbv1.PriorityMsg // Pending exchange requests from peers + Msgs map[string]*pbv1.PriorityMsg // Received messages by peers (including own) + Timeout time.Time // Timeout starts consensus even if all messages not received } type Consensus interface { @@ -65,9 +69,8 @@ type tickerProvider func() (<-chan time.Time, func()) // subscriber abstracts the output subscriber callbacks of Prioritiser. type subscriber func(context.Context, Instance, *pbv1.PriorityResult) error -// received contains a received peer message and a channel to provide response. -type received struct { - Own bool +// request contains a received peer request and a channel to provide response. +type request struct { Msg *pbv1.PriorityMsg Response chan<- *pbv1.PriorityMsg } @@ -100,8 +103,9 @@ func newInternal(tcpNode host.Host, peers []peer.ID, minRequired int, sendFunc p msgValidator: msgValidator, consensusTimeout: consensusTimeout, tickerProvider: tickerProvider, - proposals: make(chan *pbv1.PriorityMsg), - receives: make(chan received), + own: make(chan *pbv1.PriorityMsg), + responses: make(chan *pbv1.PriorityMsg), + requests: make(chan request), quit: make(chan struct{}), noSupportFilters: noSupportFilters, skipAllFilter: log.Filter(), @@ -137,9 +141,12 @@ func newInternal(tcpNode host.Host, peers []peer.ID, minRequired int, sendFunc p // Prioritiser resolves cluster wide priorities. type Prioritiser struct { + // All state immutable wrt Run. + quit chan struct{} - proposals chan *pbv1.PriorityMsg - receives chan received + own chan *pbv1.PriorityMsg // Own proposed messages to exchange + requests chan request // Other peers requesting to exchange messages. + responses chan *pbv1.PriorityMsg // Responses from exchanging with peers. minRequired int consensusTimeout time.Duration tcpNode host.Host @@ -162,7 +169,7 @@ func (p *Prioritiser) Subscribe(fn subscriber) { // Prioritise starts a new prioritisation instance for the provided message or returns an error. func (p *Prioritiser) Prioritise(ctx context.Context, msg *pbv1.PriorityMsg) error { select { - case p.proposals <- msg: + case p.own <- msg: return nil case <-ctx.Done(): return ctx.Err() @@ -180,14 +187,14 @@ func (p *Prioritiser) Run(ctx context.Context) error { ticker, stopTicker := p.tickerProvider() defer stopTicker() + // Mutable state instances := make(map[[32]byte]instanceData) - startConsensus := func(key [32]byte) { - data := instances[key] - + // startConsensus starts consensus and deletes it. + startConsensus := func(data instanceData) { var msgs []*pbv1.PriorityMsg for _, msg := range data.Msgs { - msgs = append(msgs, msg.Msg) + msgs = append(msgs, msg) } result, err := calculateResult(msgs, p.minRequired) if err != nil { @@ -201,51 +208,78 @@ func (p *Prioritiser) Run(ctx context.Context) error { return } - delete(instances, key) + delete(instances, data.Key) } - for { - select { - case <-ctx.Done(): - return ctx.Err() - case msg := <-p.proposals: - p.prioritiseOnce(ctx, msg) + // processInstance calls the callback with new or existing instance data and + // stores the result after processing any pending requests. It also starts consensus + // if all messages were received. + processInstance := func(instance *anypb.Any, callback func(instanceData) (instanceData, error)) { + // TODO(corver): Instance needs a duty/slot so we can filter out unexpected instances. + instancePB, err := instance.UnmarshalNew() + if err != nil { + log.Error(ctx, "Priority unmarshal any", err) + return + } + key, err := hashProto(instancePB) + if err != nil { + log.Error(ctx, "Priority hash proto", err) + return + } - case recv := <-p.receives: - instance, err := recv.Msg.Instance.UnmarshalNew() - if err != nil { - log.Error(ctx, "Priority instance from any proto", err) - continue - } - key, err := hashProto(instance) - if err != nil { - log.Error(ctx, "Priority instance key", err) - continue + data, ok := instances[key] + if !ok { + data = instanceData{ + OwnID: p.tcpNode.ID().String(), + Instance: instancePB, + Key: key, + Msgs: make(map[string]*pbv1.PriorityMsg), + Timeout: time.Now().Add(p.consensusTimeout), } + } - data, ok := instances[key] - if !ok { - data = instanceData{ - Instance: instance, - Msgs: make(map[string]received), - Timeout: time.Now().Add(p.consensusTimeout), - } - } - data.Msgs[recv.Msg.PeerId] = recv - sendResponse(data.Msgs) - instances[key] = data + data, err = callback(data) + if err != nil { + log.Error(ctx, "Priority instance error", err) + return + } - if len(data.Msgs) == len(p.peers) { - // All messages received before timeout - startConsensus(key) - } + instances[key] = processPending(data) + + if len(data.Msgs) == len(p.peers) { + // All messages received before timeout + startConsensus(data) + } + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case msg := <-p.own: + log.Debug(ctx, "Priority protocol triggered") + processInstance(msg.Instance, func(data instanceData) (instanceData, error) { + data.Msgs[msg.PeerId] = msg + return data, nil + }) + p.exchangeOnce(ctx, msg) + case req := <-p.requests: + processInstance(req.Msg.Instance, func(data instanceData) (instanceData, error) { + data.Pending = append(data.Pending, req.Response) + return data, nil + }) + case msg := <-p.responses: + processInstance(msg.Instance, func(data instanceData) (instanceData, error) { + data.Msgs[msg.PeerId] = msg + return data, nil + }) case now := <-ticker: - for key, data := range instances { + for _, data := range instances { if now.Before(data.Timeout) { continue } - startConsensus(key) // Note that iterating and deleting from a map from a single goroutine is fine. + startConsensus(data) } } } @@ -260,13 +294,13 @@ func (p *Prioritiser) handleRequest(ctx context.Context, pID peer.ID, msg *pbv1. } response := make(chan *pbv1.PriorityMsg, 1) // Ensure responding goroutine never blocks. - recv := received{ + req := request{ Msg: msg, Response: response, } select { - case p.receives <- recv: + case p.requests <- req: case <-ctx.Done(): return nil, ctx.Err() case <-p.quit: @@ -283,24 +317,12 @@ func (p *Prioritiser) handleRequest(ctx context.Context, pID peer.ID, msg *pbv1. } } -// prioritiseOnce initiates a priority message exchange with all peers. -func (p *Prioritiser) prioritiseOnce(ctx context.Context, msg *pbv1.PriorityMsg) { +// exchangeOnce initiates a priority message exchange with all peers. +func (p *Prioritiser) exchangeOnce(ctx context.Context, msg *pbv1.PriorityMsg) { if !p.quorumSupported(ctx) { log.Warn(ctx, "Skipping non-critical priority protocol not supported by quorum peers", nil, p.skipAllFilter) return } - log.Debug(ctx, "Priority protocol triggered") - - // Send our own message first to start consensus timeout. - go func() { // Async since unbuffered - select { - case <-ctx.Done(): - case p.receives <- received{ - Own: true, - Msg: msg, - }: - } - }() for _, pID := range p.peers { if pID == p.tcpNode.ID() { @@ -327,7 +349,7 @@ func (p *Prioritiser) prioritiseOnce(ctx context.Context, msg *pbv1.PriorityMsg) select { case <-ctx.Done(): - case p.receives <- received{Msg: response}: + case p.responses <- response: } }(pID) } @@ -391,32 +413,22 @@ func hashProto(msg proto.Message) ([32]byte, error) { return hash, nil } -// sendResponse sends own response to any awaiting peers. -func sendResponse(msgs map[string]received) { +// processPending sends own proposed msg to any awaiting/pending peers removing them from the returned instance. +func processPending(data instanceData) instanceData { // Get own message - var own *pbv1.PriorityMsg - for _, recv := range msgs { - if !recv.Own { - continue - } - - own = recv.Msg - - break - } - - if own == nil { + own, ok := data.Msgs[data.OwnID] + if !ok { // Own message not received yet - return + return data } // Send own to any awaiting peers - for _, recv := range msgs { - if recv.Response == nil { - continue - } - - recv.Response <- own // Response channel has capacity of 1. - recv.Response = nil // Mark channel as responded.. + for _, ch := range data.Pending { + ch <- own } + + // Clear pending + data.Pending = nil + + return data } diff --git a/core/priority/prioritiser_test.go b/core/priority/prioritiser_test.go index a56f5f99e..4f1312480 100644 --- a/core/priority/prioritiser_test.go +++ b/core/priority/prioritiser_test.go @@ -17,7 +17,7 @@ package priority_test import ( "context" - "reflect" + "strings" "sync" "testing" "time" @@ -26,10 +26,10 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" - "github.com/obolnetwork/charon/app/errors" pbv1 "github.com/obolnetwork/charon/core/corepb/v1" "github.com/obolnetwork/charon/core/priority" "github.com/obolnetwork/charon/p2p" @@ -40,10 +40,10 @@ func TestPrioritiser(t *testing.T) { var ( ctx, cancel = context.WithCancel(context.Background()) n = 3 - instance = &pbv1.Duty{Slot: 99} + instances = []*pbv1.Duty{{Slot: 97}, {Slot: 98}, {Slot: 99}} tcpNodes []host.Host peers []peer.ID - consensus = new(testConsensus) + consensus = &testConsensus{t: t} msgValidator = func(*pbv1.PriorityMsg) error { return nil } noTicks = func() (<-chan time.Time, func()) { return nil, func() {} } results = make(chan []*pbv1.PriorityScoredResult, n) @@ -81,29 +81,30 @@ func TestPrioritiser(t *testing.T) { resTopic, err := result.Topics[0].Topic.UnmarshalNew() require.NoError(t, err) - requireProtoEqual(t, instance, resInstance) + requireAnyDuty(t, instances, resInstance) requireProtoEqual(t, topic, resTopic) results <- result.Topics[0].Priorities return nil }) - msg := &pbv1.PriorityMsg{ - Topics: []*pbv1.PriorityTopicProposal{{Topic: mustAny(topic), Priorities: priorities}}, - Instance: mustAny(instance), - PeerId: tcpNode.ID().String(), - } - go func() { require.ErrorIs(t, prio.Run(ctx), context.Canceled) }() - go func() { - require.NoError(t, prio.Prioritise(ctx, msg)) - }() + for _, instance := range instances { + msg := &pbv1.PriorityMsg{ + Topics: []*pbv1.PriorityTopicProposal{{Topic: mustAny(topic), Priorities: priorities}}, + Instance: mustAny(instance), + PeerId: tcpNode.ID().String(), + } + go func() { + require.NoError(t, prio.Prioritise(ctx, msg)) + }() + } } - for i := 0; i < n; i++ { + for i := 0; i < n*len(instances); i++ { res := <-results require.Len(t, res, 1) require.EqualValues(t, n*1000, res[0].Score) @@ -116,8 +117,9 @@ func TestPrioritiser(t *testing.T) { // testConsensus is a mock consensus implementation that "decides" on the first proposal. // It also expects all proposals to be identical. type testConsensus struct { + t *testing.T mu sync.Mutex - proposed *pbv1.PriorityResult + proposed map[int64]*pbv1.PriorityResult subs []func(ctx context.Context, instance priority.Instance, result *pbv1.PriorityResult) error } @@ -125,10 +127,12 @@ func (t *testConsensus) ProposePriority(ctx context.Context, instance priority.I t.mu.Lock() defer t.mu.Unlock() - if t.proposed != nil { - if !reflect.DeepEqual(t.proposed, result) { - return errors.New("mismatching proposals") - } + slot := instance.(*pbv1.Duty).Slot + + if t.proposed[slot] != nil { + prev := mustResultsToText(t.proposed[slot].Topics) + this := mustResultsToText(result.Topics) + require.Equal(t.t, prev, this) return nil } @@ -139,7 +143,11 @@ func (t *testConsensus) ProposePriority(ctx context.Context, instance priority.I return err } } - t.proposed = result + + if t.proposed == nil { + t.proposed = make(map[int64]*pbv1.PriorityResult) + } + t.proposed[slot] = result return nil } @@ -161,6 +169,32 @@ func prioToAny(prio int) *anypb.Any { return mustAny(&pbv1.Duty{Slot: int64(prio)}) } +func requireAnyDuty(t *testing.T, anyOf []*pbv1.Duty, actual proto.Message) { + t.Helper() + for _, msg := range anyOf { + if proto.Equal(msg, actual) { + return + } + } + require.Fail(t, "not anyOf: %#v\nactual: %#v\n", anyOf, actual) +} + +func mustResultsToText(msgs []*pbv1.PriorityTopicResult) string { + var resp []string + for _, msg := range msgs { + b, err := prototext.MarshalOptions{ + Multiline: true, + }.Marshal(msg) + if err != nil { + panic(err) + } + + resp = append(resp, string(b)) + } + + return strings.Join(resp, ",") +} + func requireProtoEqual(t *testing.T, expect, actual proto.Message) { t.Helper() require.True(t, proto.Equal(expect, actual), "expected: %#v\nactual: %#v\n", expect, actual) From 3bb855d39232bf394e636e449ff85d30bebc6aec Mon Sep 17 00:00:00 2001 From: corverroos Date: Fri, 11 Nov 2022 13:52:15 +0200 Subject: [PATCH 5/7] cleanup --- app/simnet_test.go | 2 +- app/vmock.go | 27 +++++++++++------- core/priority/prioritiser.go | 55 ++++++++++++++++++++++++++++-------- p2p/receive.go | 2 +- testutil/trackpr/track.go | 4 +-- 5 files changed, 64 insertions(+), 26 deletions(-) diff --git a/app/simnet_test.go b/app/simnet_test.go index 359f3186e..0ad0faf95 100644 --- a/app/simnet_test.go +++ b/app/simnet_test.go @@ -260,7 +260,7 @@ func (e simnetExpect) Assert(t *testing.T, typ core.DutyType) { e.counts[typ]-- } -// Done returns true if all duties have been asserted sufficient number of times. +// ConsStarted returns true if all duties have been asserted sufficient number of times. func (e simnetExpect) Done() bool { for _, v := range e.counts { if v > 0 { diff --git a/app/vmock.go b/app/vmock.go index 795bd68df..e00e6f8e2 100644 --- a/app/vmock.go +++ b/app/vmock.go @@ -29,6 +29,7 @@ import ( "github.com/obolnetwork/charon/app/log" "github.com/obolnetwork/charon/app/z" "github.com/obolnetwork/charon/core" + "github.com/obolnetwork/charon/eth2util" "github.com/obolnetwork/charon/eth2util/keystore" "github.com/obolnetwork/charon/testutil/validatormock" ) @@ -48,13 +49,13 @@ func wireValidatorMock(conf Config, pubshares []eth2p0.BLSPubKey, sched core.Sch onStartup := true sched.SubscribeSlots(func(ctx context.Context, slot core.Slot) error { // Prepare attestations when slots tick. - vMockWrap(ctx, slot.Slot, slot.Epoch(), func(ctx context.Context, state vMockState) error { + vMockWrap(ctx, slot.Slot, func(ctx context.Context, state vMockState) error { return state.Attester.Prepare(ctx) }) // Prepare sync committee message when epoch tick. if onStartup || slot.FirstInEpoch() { - vMockWrap(ctx, slot.Slot, slot.Epoch(), func(ctx context.Context, state vMockState) error { + vMockWrap(ctx, slot.Slot, func(ctx context.Context, state vMockState) error { // Either call if it is first slot in epoch or on charon startup. return state.SyncCommMember.PrepareEpoch(ctx) }) @@ -63,13 +64,13 @@ func wireValidatorMock(conf Config, pubshares []eth2p0.BLSPubKey, sched core.Sch onStartup = false // Prepare sync committee selections when slots tick. - vMockWrap(ctx, slot.Slot, slot.Epoch(), func(ctx context.Context, state vMockState) error { + vMockWrap(ctx, slot.Slot, func(ctx context.Context, state vMockState) error { // Either call if it is first slot in epoch or on charon startup. return state.SyncCommMember.PrepareSlot(ctx, eth2p0.Slot(slot.Slot)) }) // Submit sync committee message 1/3 into the slot. - vMockWrap(ctx, slot.Slot, slot.Epoch(), func(ctx context.Context, state vMockState) error { + vMockWrap(ctx, slot.Slot, func(ctx context.Context, state vMockState) error { thirdDuration := slot.SlotDuration / 3 thirdTime := slot.Time.Add(thirdDuration) @@ -86,7 +87,7 @@ func wireValidatorMock(conf Config, pubshares []eth2p0.BLSPubKey, sched core.Sch // Handle duties when triggered. sched.SubscribeDuties(func(ctx context.Context, duty core.Duty, _ core.DutyDefinitionSet) error { - vMockWrap(ctx, duty.Slot, 0, func(ctx context.Context, state vMockState) error { + vMockWrap(ctx, duty.Slot, func(ctx context.Context, state vMockState) error { return handleVMockDuty(ctx, duty, state.Eth2Cl, state.SignFunc, pubshares, state.Attester, state.SyncCommMember) }) @@ -96,7 +97,7 @@ func wireValidatorMock(conf Config, pubshares []eth2p0.BLSPubKey, sched core.Sch go func() { // TODO(corver): Improve registrations to use lock file and trigger on epoch transitions. for registration := range conf.TestConfig.BuilderRegistration { - vMockWrap(context.Background(), 0, 0, func(ctx context.Context, state vMockState) error { + vMockWrap(context.Background(), 0, func(ctx context.Context, state vMockState) error { return validatormock.Register(ctx, state.Eth2Cl, state.SignFunc, registration, pubshares[0]) }) } @@ -117,7 +118,7 @@ type vMockState struct { type vMockCallback func(context.Context, vMockState) error // newVMockWrapper returns a stateful validator mock wrapper function. -func newVMockWrapper(conf Config, pubshares []eth2p0.BLSPubKey) (func(ctx context.Context, slot int64, epoch int64, callback vMockCallback), error) { +func newVMockWrapper(conf Config, pubshares []eth2p0.BLSPubKey) (func(ctx context.Context, slot int64, callback vMockCallback), error) { // Immutable state and providers. signFunc, err := newVMockSigner(conf, pubshares) if err != nil { @@ -133,7 +134,7 @@ func newVMockWrapper(conf Config, pubshares []eth2p0.BLSPubKey) (func(ctx contex syncCommMem = new(validatormock.SyncCommMember) ) - return func(ctx context.Context, slot, epoch int64, fn vMockCallback) { + return func(ctx context.Context, slot int64, fn vMockCallback) { mu.Lock() defer mu.Unlock() @@ -145,12 +146,18 @@ func newVMockWrapper(conf Config, pubshares []eth2p0.BLSPubKey) (func(ctx contex return } + epoch, err := eth2util.EpochFromSlot(ctx, eth2Cl, eth2p0.Slot(slot)) + if err != nil { + log.Error(ctx, "Epoch from slot", err) + return + } + // Create new slot attester on new slots if slot != 0 && attester.Slot() != eth2p0.Slot(slot) { attester = validatormock.NewSlotAttester(eth2Cl, eth2p0.Slot(slot), signFunc, pubshares) } - if epoch != 0 && syncCommMem.Epoch() != eth2p0.Epoch(epoch) { - syncCommMem = validatormock.NewSyncCommMember(eth2Cl, eth2p0.Epoch(epoch), signFunc, pubshares) + if epoch != 0 && syncCommMem.Epoch() != epoch { + syncCommMem = validatormock.NewSyncCommMember(eth2Cl, epoch, signFunc, pubshares) } state := vMockState{ diff --git a/core/priority/prioritiser.go b/core/priority/prioritiser.go index 8b8c507b6..e80ca3837 100644 --- a/core/priority/prioritiser.go +++ b/core/priority/prioritiser.go @@ -14,6 +14,19 @@ // this program. If not, see . // Package priority implements the priority protocol that resolves arbitrary cluster wide priorities. +// +// Protocol overview: +// - Priorities are arbitrary protobufs (data). +// - Priorities are grouped by a topic (also arbitrary protobuf data). +// - Peers in the cluster participate in a priority protocol instances. +// - The protocol consists of two steps: priority exchange followed by priority consensus. +// - All peers propose their own set of priorities for an instance. +// - These are exchanged with all other peers. +// - All peers also respond with their priorities. +// - The exchange step is complete when the priorities of all peers have been received or on timeout. +// - Each peer calculates what they consider as the cluster wide priorities based on the priorities available to them at the point. +// - Each peer then starts a consensus instance proposing this deterministic calculated result. +// - Consensus is reached if quorum peers propose the same value. package priority import ( @@ -34,7 +47,10 @@ import ( "github.com/obolnetwork/charon/p2p" ) -const ProtocolID = "charon/priority/1.0.0" +const ( + ProtocolID = "charon/priority/1.0.0" + deleteAfter = time.Minute +) // Instance identifies an instance of the priority protocol. type Instance proto.Message @@ -47,12 +63,13 @@ type Priority proto.Message // instanceData contains an Instance and its data. type instanceData struct { - OwnID string - Instance Instance - Key [32]byte // Hash of instance - Pending []chan<- *pbv1.PriorityMsg // Pending exchange requests from peers - Msgs map[string]*pbv1.PriorityMsg // Received messages by peers (including own) - Timeout time.Time // Timeout starts consensus even if all messages not received + OwnID string + Instance Instance + Key [32]byte // Hash of instance + Pending []chan<- *pbv1.PriorityMsg // Pending exchange requests from peers + Msgs map[string]*pbv1.PriorityMsg // Received messages by peers (including own) + Timeout time.Time // Timeout starts consensus even if all messages not received + ConsStarted bool // Whether consensus was started } type Consensus interface { @@ -190,7 +207,7 @@ func (p *Prioritiser) Run(ctx context.Context) error { // Mutable state instances := make(map[[32]byte]instanceData) - // startConsensus starts consensus and deletes it. + // startConsensus starts consensus and marks the instance as such. startConsensus := func(data instanceData) { var msgs []*pbv1.PriorityMsg for _, msg := range data.Msgs { @@ -208,7 +225,8 @@ func (p *Prioritiser) Run(ctx context.Context) error { return } - delete(instances, data.Key) + data.ConsStarted = true + instances[data.Key] = data } // processInstance calls the callback with new or existing instance data and @@ -244,10 +262,12 @@ func (p *Prioritiser) Run(ctx context.Context) error { return } - instances[key] = processPending(data) + data = processPending(data) + instances[key] = data - if len(data.Msgs) == len(p.peers) { + if !data.ConsStarted && len(data.Msgs) == len(p.peers) { // All messages received before timeout + log.Debug(ctx, "Priority instance received all messages, starting consensus") startConsensus(data) } } @@ -265,7 +285,9 @@ func (p *Prioritiser) Run(ctx context.Context) error { p.exchangeOnce(ctx, msg) case req := <-p.requests: processInstance(req.Msg.Instance, func(data instanceData) (instanceData, error) { + data.Msgs[req.Msg.PeerId] = req.Msg data.Pending = append(data.Pending, req.Response) + return data, nil }) case msg := <-p.responses: @@ -276,10 +298,19 @@ func (p *Prioritiser) Run(ctx context.Context) error { case now := <-ticker: for _, data := range instances { if now.Before(data.Timeout) { + continue // Not timed out yet. + } + if !data.ConsStarted { // Timed out and consensus not started yet. + log.Debug(ctx, "Priority instance timeout, starting consensus") + startConsensus(data) + continue } + if now.Before(data.Timeout.Add(deleteAfter)) { + continue // Not deletable yet + } - startConsensus(data) + delete(instances, data.Key) } } } diff --git a/p2p/receive.go b/p2p/receive.go index d5f7e56d0..dabf17afa 100644 --- a/p2p/receive.go +++ b/p2p/receive.go @@ -69,7 +69,7 @@ func RegisterHandler(logTopic string, tcpNode host.Host, protocol protocol.ID, resp, ok, err := handlerFunc(ctx, s.Conn().RemotePeer(), req) if err != nil { - log.Error(ctx, "LibP2P handler error", err) + log.Error(ctx, "LibP2P handle stream error", err, z.Str("protocol", string(protocol))) return } diff --git a/testutil/trackpr/track.go b/testutil/trackpr/track.go index 5a36098dc..1285dcd07 100644 --- a/testutil/trackpr/track.go +++ b/testutil/trackpr/track.go @@ -143,7 +143,7 @@ func getProjectData(ctx context.Context, client *gh.Client, organization string, data.statusFieldID = statusField.ID for _, opt := range statusField.Options { - if opt.Name == "Done" { + if opt.Name == "ConsStarted" { data.doneOptionID = opt.ID } } @@ -212,7 +212,7 @@ func setSize(ctx context.Context, client *gh.Client, projectID, itemID, sizeFiel return err } -// setStatus sets the status field (ex: "Done", "In Progress" etc.) of the project item. +// setStatus sets the status field (ex: "ConsStarted", "In Progress" etc.) of the project item. func setStatus(ctx context.Context, client *gh.Client, projectID, itemID, statusFieldID gh.ID, doneOptionID gh.String) error { m := new(setFieldMutation) input := UpdateProjectV2ItemFieldValueInput{ From b2b575a7f2877c25ba1046d3e33dfaa1864493e6 Mon Sep 17 00:00:00 2001 From: corverroos Date: Fri, 11 Nov 2022 13:54:27 +0200 Subject: [PATCH 6/7] cleanup --- app/simnet_test.go | 2 +- testutil/trackpr/track.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/app/simnet_test.go b/app/simnet_test.go index 0ad0faf95..359f3186e 100644 --- a/app/simnet_test.go +++ b/app/simnet_test.go @@ -260,7 +260,7 @@ func (e simnetExpect) Assert(t *testing.T, typ core.DutyType) { e.counts[typ]-- } -// ConsStarted returns true if all duties have been asserted sufficient number of times. +// Done returns true if all duties have been asserted sufficient number of times. func (e simnetExpect) Done() bool { for _, v := range e.counts { if v > 0 { diff --git a/testutil/trackpr/track.go b/testutil/trackpr/track.go index 1285dcd07..5a36098dc 100644 --- a/testutil/trackpr/track.go +++ b/testutil/trackpr/track.go @@ -143,7 +143,7 @@ func getProjectData(ctx context.Context, client *gh.Client, organization string, data.statusFieldID = statusField.ID for _, opt := range statusField.Options { - if opt.Name == "ConsStarted" { + if opt.Name == "Done" { data.doneOptionID = opt.ID } } @@ -212,7 +212,7 @@ func setSize(ctx context.Context, client *gh.Client, projectID, itemID, sizeFiel return err } -// setStatus sets the status field (ex: "ConsStarted", "In Progress" etc.) of the project item. +// setStatus sets the status field (ex: "Done", "In Progress" etc.) of the project item. func setStatus(ctx context.Context, client *gh.Client, projectID, itemID, statusFieldID gh.ID, doneOptionID gh.String) error { m := new(setFieldMutation) input := UpdateProjectV2ItemFieldValueInput{ From 03b038114729c9e99296c04914a50d69afebd8b6 Mon Sep 17 00:00:00 2001 From: corverroos Date: Fri, 11 Nov 2022 14:25:56 +0200 Subject: [PATCH 7/7] cleanup --- core/priority/prioritiser.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/priority/prioritiser.go b/core/priority/prioritiser.go index e80ca3837..0994820a7 100644 --- a/core/priority/prioritiser.go +++ b/core/priority/prioritiser.go @@ -219,11 +219,13 @@ func (p *Prioritiser) Run(ctx context.Context) error { return } - err = p.consensus.ProposePriority(ctx, data.Instance, result) - if err != nil { - log.Warn(ctx, "Propose priority consensus", err) // Unexpected - return - } + go func() { + err = p.consensus.ProposePriority(ctx, data.Instance, result) + if err != nil { + log.Warn(ctx, "Propose priority consensus", err) // Unexpected + return + } + }() data.ConsStarted = true instances[data.Key] = data