diff --git a/ClientSideServiceDiscovery/README.md b/ClientSideServiceDiscovery/README.md
index 91299a3..0e1c045 100644
--- a/ClientSideServiceDiscovery/README.md
+++ b/ClientSideServiceDiscovery/README.md
@@ -1,13 +1,5 @@
## Discover Servers and Load Balance from the Client
-### Prerequisites
-
-#### Raft
-
-```shell
-go install github.com/hashicorp/raft@latest
-```
-
### Tests
```shell
diff --git a/ClientSideServiceDiscovery/api/v1/log.pb.go b/ClientSideServiceDiscovery/api/v1/log.pb.go
index 5a89dd8..6fa7372 100644
--- a/ClientSideServiceDiscovery/api/v1/log.pb.go
+++ b/ClientSideServiceDiscovery/api/v1/log.pb.go
@@ -279,6 +279,154 @@ func (x *Record) GetType() uint32 {
return 0
}
+type GetServersRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+}
+
+func (x *GetServersRequest) Reset() {
+ *x = GetServersRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_api_v1_log_proto_msgTypes[5]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *GetServersRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*GetServersRequest) ProtoMessage() {}
+
+func (x *GetServersRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_api_v1_log_proto_msgTypes[5]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use GetServersRequest.ProtoReflect.Descriptor instead.
+func (*GetServersRequest) Descriptor() ([]byte, []int) {
+ return file_api_v1_log_proto_rawDescGZIP(), []int{5}
+}
+
+type GetServersResponse struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Servers []*Server `protobuf:"bytes,1,rep,name=servers,proto3" json:"servers,omitempty"`
+}
+
+func (x *GetServersResponse) Reset() {
+ *x = GetServersResponse{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_api_v1_log_proto_msgTypes[6]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *GetServersResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*GetServersResponse) ProtoMessage() {}
+
+func (x *GetServersResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_api_v1_log_proto_msgTypes[6]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use GetServersResponse.ProtoReflect.Descriptor instead.
+func (*GetServersResponse) Descriptor() ([]byte, []int) {
+ return file_api_v1_log_proto_rawDescGZIP(), []int{6}
+}
+
+func (x *GetServersResponse) GetServers() []*Server {
+ if x != nil {
+ return x.Servers
+ }
+ return nil
+}
+
+type Server struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
+ RpcAddr string `protobuf:"bytes,2,opt,name=rpc_addr,json=rpcAddr,proto3" json:"rpc_addr,omitempty"`
+ IsLeader bool `protobuf:"varint,3,opt,name=is_leader,json=isLeader,proto3" json:"is_leader,omitempty"`
+}
+
+func (x *Server) Reset() {
+ *x = Server{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_api_v1_log_proto_msgTypes[7]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Server) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Server) ProtoMessage() {}
+
+func (x *Server) ProtoReflect() protoreflect.Message {
+ mi := &file_api_v1_log_proto_msgTypes[7]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Server.ProtoReflect.Descriptor instead.
+func (*Server) Descriptor() ([]byte, []int) {
+ return file_api_v1_log_proto_rawDescGZIP(), []int{7}
+}
+
+func (x *Server) GetId() string {
+ if x != nil {
+ return x.Id
+ }
+ return ""
+}
+
+func (x *Server) GetRpcAddr() string {
+ if x != nil {
+ return x.RpcAddr
+ }
+ return ""
+}
+
+func (x *Server) GetIsLeader() bool {
+ if x != nil {
+ return x.IsLeader
+ }
+ return false
+}
+
var File_api_v1_log_proto protoreflect.FileDescriptor
var file_api_v1_log_proto_rawDesc = []byte{
@@ -302,27 +450,42 @@ var file_api_v1_log_proto_rawDesc = []byte{
0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x12, 0x0a, 0x04,
0x74, 0x65, 0x72, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d,
0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04,
- 0x74, 0x79, 0x70, 0x65, 0x32, 0x8f, 0x02, 0x0a, 0x03, 0x4c, 0x6f, 0x67, 0x12, 0x3c, 0x0a, 0x07,
- 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x12, 0x16, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31,
- 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
- 0x17, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65,
- 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3c, 0x0a, 0x07, 0x43, 0x6f,
- 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x12, 0x16, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x43,
- 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e,
- 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65,
- 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x44, 0x0a, 0x0d, 0x43, 0x6f, 0x6e, 0x73,
- 0x75, 0x6d, 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x16, 0x2e, 0x6c, 0x6f, 0x67, 0x2e,
- 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
- 0x74, 0x1a, 0x17, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75,
- 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x46,
- 0x0a, 0x0d, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12,
- 0x16, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65,
- 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31,
- 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
- 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x28, 0x5a, 0x26, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
- 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x69, 0x67, 0x6f, 0x72, 0x2d, 0x62, 0x61, 0x69, 0x62, 0x6f, 0x72,
- 0x6f, 0x64, 0x69, 0x6e, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6c, 0x6f, 0x67, 0x5f, 0x76, 0x31,
- 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x74, 0x79, 0x70, 0x65, 0x22, 0x13, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, 0x65,
+ 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x3e, 0x0a, 0x12, 0x47, 0x65, 0x74,
+ 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
+ 0x28, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b,
+ 0x32, 0x0e, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72,
+ 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x22, 0x50, 0x0a, 0x06, 0x53, 0x65, 0x72,
+ 0x76, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
+ 0x02, 0x69, 0x64, 0x12, 0x19, 0x0a, 0x08, 0x72, 0x70, 0x63, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18,
+ 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x72, 0x70, 0x63, 0x41, 0x64, 0x64, 0x72, 0x12, 0x1b,
+ 0x0a, 0x09, 0x69, 0x73, 0x5f, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28,
+ 0x08, 0x52, 0x08, 0x69, 0x73, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x32, 0xd6, 0x02, 0x0a, 0x03,
+ 0x4c, 0x6f, 0x67, 0x12, 0x3c, 0x0a, 0x07, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x12, 0x16,
+ 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52,
+ 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e,
+ 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
+ 0x00, 0x12, 0x3c, 0x0a, 0x07, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x12, 0x16, 0x2e, 0x6c,
+ 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71,
+ 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f,
+ 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12,
+ 0x44, 0x0a, 0x0d, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d,
+ 0x12, 0x16, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d,
+ 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x76,
+ 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
+ 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x46, 0x0a, 0x0d, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65,
+ 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x16, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e,
+ 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17,
+ 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52,
+ 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x45, 0x0a,
+ 0x0a, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x12, 0x19, 0x2e, 0x6c, 0x6f,
+ 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x52,
+ 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e,
+ 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
+ 0x73, 0x65, 0x22, 0x00, 0x42, 0x28, 0x5a, 0x26, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63,
+ 0x6f, 0x6d, 0x2f, 0x69, 0x67, 0x6f, 0x72, 0x2d, 0x62, 0x61, 0x69, 0x62, 0x6f, 0x72, 0x6f, 0x64,
+ 0x69, 0x6e, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6c, 0x6f, 0x67, 0x5f, 0x76, 0x31, 0x62, 0x06,
+ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -337,30 +500,36 @@ func file_api_v1_log_proto_rawDescGZIP() []byte {
return file_api_v1_log_proto_rawDescData
}
-var file_api_v1_log_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
+var file_api_v1_log_proto_msgTypes = make([]protoimpl.MessageInfo, 8)
var file_api_v1_log_proto_goTypes = []interface{}{
- (*ProduceRequest)(nil), // 0: log.v1.ProduceRequest
- (*ProduceResponse)(nil), // 1: log.v1.ProduceResponse
- (*ConsumeRequest)(nil), // 2: log.v1.ConsumeRequest
- (*ConsumeResponse)(nil), // 3: log.v1.ConsumeResponse
- (*Record)(nil), // 4: log.v1.Record
+ (*ProduceRequest)(nil), // 0: log.v1.ProduceRequest
+ (*ProduceResponse)(nil), // 1: log.v1.ProduceResponse
+ (*ConsumeRequest)(nil), // 2: log.v1.ConsumeRequest
+ (*ConsumeResponse)(nil), // 3: log.v1.ConsumeResponse
+ (*Record)(nil), // 4: log.v1.Record
+ (*GetServersRequest)(nil), // 5: log.v1.GetServersRequest
+ (*GetServersResponse)(nil), // 6: log.v1.GetServersResponse
+ (*Server)(nil), // 7: log.v1.Server
}
var file_api_v1_log_proto_depIdxs = []int32{
4, // 0: log.v1.ProduceRequest.record:type_name -> log.v1.Record
4, // 1: log.v1.ConsumeResponse.record:type_name -> log.v1.Record
- 0, // 2: log.v1.Log.Produce:input_type -> log.v1.ProduceRequest
- 2, // 3: log.v1.Log.Consume:input_type -> log.v1.ConsumeRequest
- 2, // 4: log.v1.Log.ConsumeStream:input_type -> log.v1.ConsumeRequest
- 0, // 5: log.v1.Log.ProduceStream:input_type -> log.v1.ProduceRequest
- 1, // 6: log.v1.Log.Produce:output_type -> log.v1.ProduceResponse
- 3, // 7: log.v1.Log.Consume:output_type -> log.v1.ConsumeResponse
- 3, // 8: log.v1.Log.ConsumeStream:output_type -> log.v1.ConsumeResponse
- 1, // 9: log.v1.Log.ProduceStream:output_type -> log.v1.ProduceResponse
- 6, // [6:10] is the sub-list for method output_type
- 2, // [2:6] is the sub-list for method input_type
- 2, // [2:2] is the sub-list for extension type_name
- 2, // [2:2] is the sub-list for extension extendee
- 0, // [0:2] is the sub-list for field type_name
+ 7, // 2: log.v1.GetServersResponse.servers:type_name -> log.v1.Server
+ 0, // 3: log.v1.Log.Produce:input_type -> log.v1.ProduceRequest
+ 2, // 4: log.v1.Log.Consume:input_type -> log.v1.ConsumeRequest
+ 2, // 5: log.v1.Log.ConsumeStream:input_type -> log.v1.ConsumeRequest
+ 0, // 6: log.v1.Log.ProduceStream:input_type -> log.v1.ProduceRequest
+ 5, // 7: log.v1.Log.GetServers:input_type -> log.v1.GetServersRequest
+ 1, // 8: log.v1.Log.Produce:output_type -> log.v1.ProduceResponse
+ 3, // 9: log.v1.Log.Consume:output_type -> log.v1.ConsumeResponse
+ 3, // 10: log.v1.Log.ConsumeStream:output_type -> log.v1.ConsumeResponse
+ 1, // 11: log.v1.Log.ProduceStream:output_type -> log.v1.ProduceResponse
+ 6, // 12: log.v1.Log.GetServers:output_type -> log.v1.GetServersResponse
+ 8, // [8:13] is the sub-list for method output_type
+ 3, // [3:8] is the sub-list for method input_type
+ 3, // [3:3] is the sub-list for extension type_name
+ 3, // [3:3] is the sub-list for extension extendee
+ 0, // [0:3] is the sub-list for field type_name
}
func init() { file_api_v1_log_proto_init() }
@@ -429,6 +598,42 @@ func file_api_v1_log_proto_init() {
return nil
}
}
+ file_api_v1_log_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*GetServersRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_api_v1_log_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*GetServersResponse); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_api_v1_log_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Server); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
}
type x struct{}
out := protoimpl.TypeBuilder{
@@ -436,7 +641,7 @@ func file_api_v1_log_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_api_v1_log_proto_rawDesc,
NumEnums: 0,
- NumMessages: 5,
+ NumMessages: 8,
NumExtensions: 0,
NumServices: 1,
},
diff --git a/ClientSideServiceDiscovery/api/v1/log.proto b/ClientSideServiceDiscovery/api/v1/log.proto
index 16980b2..6f479f9 100644
--- a/ClientSideServiceDiscovery/api/v1/log.proto
+++ b/ClientSideServiceDiscovery/api/v1/log.proto
@@ -8,7 +8,9 @@ service Log {
rpc Produce(ProduceRequest) returns (ProduceResponse) {}
rpc Consume(ConsumeRequest) returns (ConsumeResponse) {}
rpc ConsumeStream(ConsumeRequest) returns (stream ConsumeResponse) {}
- rpc ProduceStream(stream ProduceRequest) returns (stream ProduceResponse) {}
+ rpc ProduceStream(stream ProduceRequest) returns (stream ProduceResponse)
+ {}
+ rpc GetServers(GetServersRequest) returns (GetServersResponse) {}
}
message ProduceRequest {
@@ -28,8 +30,20 @@ message ConsumeResponse {
}
message Record {
- bytes value = 1;
+ bytes value = 1;
uint64 offset = 2;
uint64 term = 3;
uint32 type = 4;
}
+
+message GetServersRequest {}
+
+message GetServersResponse {
+ repeated Server servers = 1;
+}
+
+message Server {
+ string id = 1;
+ string rpc_addr = 2;
+ bool is_leader = 3;
+}
diff --git a/ClientSideServiceDiscovery/api/v1/log_grpc.pb.go b/ClientSideServiceDiscovery/api/v1/log_grpc.pb.go
index 3486665..05d98cc 100644
--- a/ClientSideServiceDiscovery/api/v1/log_grpc.pb.go
+++ b/ClientSideServiceDiscovery/api/v1/log_grpc.pb.go
@@ -26,6 +26,7 @@ type LogClient interface {
Consume(ctx context.Context, in *ConsumeRequest, opts ...grpc.CallOption) (*ConsumeResponse, error)
ConsumeStream(ctx context.Context, in *ConsumeRequest, opts ...grpc.CallOption) (Log_ConsumeStreamClient, error)
ProduceStream(ctx context.Context, opts ...grpc.CallOption) (Log_ProduceStreamClient, error)
+ GetServers(ctx context.Context, in *GetServersRequest, opts ...grpc.CallOption) (*GetServersResponse, error)
}
type logClient struct {
@@ -117,6 +118,15 @@ func (x *logProduceStreamClient) Recv() (*ProduceResponse, error) {
return m, nil
}
+func (c *logClient) GetServers(ctx context.Context, in *GetServersRequest, opts ...grpc.CallOption) (*GetServersResponse, error) {
+ out := new(GetServersResponse)
+ err := c.cc.Invoke(ctx, "/log.v1.Log/GetServers", in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
// LogServer is the server API for Log service.
// All implementations must embed UnimplementedLogServer
// for forward compatibility
@@ -125,6 +135,7 @@ type LogServer interface {
Consume(context.Context, *ConsumeRequest) (*ConsumeResponse, error)
ConsumeStream(*ConsumeRequest, Log_ConsumeStreamServer) error
ProduceStream(Log_ProduceStreamServer) error
+ GetServers(context.Context, *GetServersRequest) (*GetServersResponse, error)
mustEmbedUnimplementedLogServer()
}
@@ -144,6 +155,9 @@ func (UnimplementedLogServer) ConsumeStream(*ConsumeRequest, Log_ConsumeStreamSe
func (UnimplementedLogServer) ProduceStream(Log_ProduceStreamServer) error {
return status.Errorf(codes.Unimplemented, "method ProduceStream not implemented")
}
+func (UnimplementedLogServer) GetServers(context.Context, *GetServersRequest) (*GetServersResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method GetServers not implemented")
+}
func (UnimplementedLogServer) mustEmbedUnimplementedLogServer() {}
// UnsafeLogServer may be embedded to opt out of forward compatibility for this service.
@@ -240,6 +254,24 @@ func (x *logProduceStreamServer) Recv() (*ProduceRequest, error) {
return m, nil
}
+func _Log_GetServers_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(GetServersRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(LogServer).GetServers(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: "/log.v1.Log/GetServers",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(LogServer).GetServers(ctx, req.(*GetServersRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
// Log_ServiceDesc is the grpc.ServiceDesc for Log service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -255,6 +287,10 @@ var Log_ServiceDesc = grpc.ServiceDesc{
MethodName: "Consume",
Handler: _Log_Consume_Handler,
},
+ {
+ MethodName: "GetServers",
+ Handler: _Log_GetServers_Handler,
+ },
},
Streams: []grpc.StreamDesc{
{
diff --git a/ClientSideServiceDiscovery/internal/agent/agent.go b/ClientSideServiceDiscovery/internal/agent/agent.go
index d2640f3..c148e29 100644
--- a/ClientSideServiceDiscovery/internal/agent/agent.go
+++ b/ClientSideServiceDiscovery/internal/agent/agent.go
@@ -130,8 +130,9 @@ func (a *Agent) setupServer() error {
a.Config.ACLPolicyFile,
)
serverConfig := &server.Config{
- CommitLog: a.log,
- Authorizer: authorizer,
+ CommitLog: a.log,
+ Authorizer: authorizer,
+ GetServerer: a.log,
}
var opts []grpc.ServerOption
if a.Config.ServerTLSConfig != nil {
diff --git a/ClientSideServiceDiscovery/internal/agent/agent_test.go b/ClientSideServiceDiscovery/internal/agent/agent_test.go
index 06c42ea..1400137 100644
--- a/ClientSideServiceDiscovery/internal/agent/agent_test.go
+++ b/ClientSideServiceDiscovery/internal/agent/agent_test.go
@@ -12,11 +12,11 @@ import (
"github.com/travisjeffery/go-dynaport"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
- "google.golang.org/grpc/status"
api "github.com/igor-baiborodine/proglog/api/v1"
"github.com/igor-baiborodine/proglog/internal/agent"
"github.com/igor-baiborodine/proglog/internal/config"
+ "github.com/igor-baiborodine/proglog/internal/loadbalance"
)
func TestAgent(t *testing.T) {
@@ -50,10 +50,7 @@ func TestAgent(t *testing.T) {
var startJoinAddrs []string
if i != 0 {
- startJoinAddrs = append(
- startJoinAddrs,
- agents[0].Config.BindAddr,
- )
+ startJoinAddrs = append(startJoinAddrs, agents[0].Config.BindAddr)
}
a, err := agent.New(agent.Config{
@@ -75,9 +72,7 @@ func TestAgent(t *testing.T) {
defer func() {
for _, a := range agents {
_ = a.Shutdown()
- require.NoError(t,
- os.RemoveAll(a.Config.DataDir),
- )
+ require.NoError(t, os.RemoveAll(a.Config.DataDir))
}
}()
@@ -94,7 +89,10 @@ func TestAgent(t *testing.T) {
},
)
require.NoError(t, err)
- consumeResponse, err := leaderClient.Consume(
+
+ time.Sleep(3 * time.Second)
+
+ consumeResponse, err := leaderClient.Consume( //
context.Background(),
&api.ConsumeRequest{
Offset: produceResponse.Offset,
@@ -103,11 +101,8 @@ func TestAgent(t *testing.T) {
require.NoError(t, err)
require.Equal(t, consumeResponse.Record.Value, []byte("foo"))
- // wait until replication has finished
- time.Sleep(3 * time.Second)
-
followerClient := client(t, agents[1], peerTLSConfig)
- consumeResponse, err = followerClient.Consume(
+ consumeResponse, err = followerClient.Consume( //
context.Background(),
&api.ConsumeRequest{
Offset: produceResponse.Offset,
@@ -115,18 +110,6 @@ func TestAgent(t *testing.T) {
)
require.NoError(t, err)
require.Equal(t, consumeResponse.Record.Value, []byte("foo"))
-
- consumeResponse, err = leaderClient.Consume(
- context.Background(),
- &api.ConsumeRequest{
- Offset: produceResponse.Offset + 1,
- },
- )
- require.Nil(t, consumeResponse)
- require.Error(t, err)
- got := status.Code(err)
- want := status.Code(api.ErrOffsetOutOfRange{}.GRPCStatus().Err())
- require.Equal(t, got, want)
}
func client(t *testing.T, agent *agent.Agent, tlsConfig *tls.Config) api.LogClient {
@@ -134,7 +117,11 @@ func client(t *testing.T, agent *agent.Agent, tlsConfig *tls.Config) api.LogClie
opts := []grpc.DialOption{grpc.WithTransportCredentials(tlsCreds)}
rpcAddr, err := agent.Config.RPCAddr()
require.NoError(t, err)
- conn, err := grpc.Dial(rpcAddr, opts...)
+ conn, err := grpc.Dial(fmt.Sprintf(
+ "%s:///%s",
+ loadbalance.Name,
+ rpcAddr,
+ ), opts...)
require.NoError(t, err)
client := api.NewLogClient(conn)
return client
diff --git a/ClientSideServiceDiscovery/internal/loadbalance/picker.go b/ClientSideServiceDiscovery/internal/loadbalance/picker.go
new file mode 100644
index 0000000..693c264
--- /dev/null
+++ b/ClientSideServiceDiscovery/internal/loadbalance/picker.go
@@ -0,0 +1,70 @@
+package loadbalance
+
+import (
+ "strings"
+ "sync"
+ "sync/atomic"
+
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/balancer/base"
+)
+
+var _ base.PickerBuilder = (*Picker)(nil)
+
+type Picker struct {
+ mu sync.RWMutex
+ leader balancer.SubConn
+ followers []balancer.SubConn
+ current uint64
+}
+
+func (p *Picker) Build(buildInfo base.PickerBuildInfo) balancer.Picker {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ var followers []balancer.SubConn
+ for sc, scInfo := range buildInfo.ReadySCs {
+ isLeader := scInfo.
+ Address.
+ Attributes.
+ Value("is_leader").(bool)
+ if isLeader {
+ p.leader = sc
+ continue
+ }
+ followers = append(followers, sc)
+ }
+ p.followers = followers
+ return p
+}
+
+var _ balancer.Picker = (*Picker)(nil)
+
+func (p *Picker) Pick(info balancer.PickInfo) (
+ balancer.PickResult, error) {
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+ var result balancer.PickResult
+ if strings.Contains(info.FullMethodName, "Produce") ||
+ len(p.followers) == 0 {
+ result.SubConn = p.leader
+ } else if strings.Contains(info.FullMethodName, "Consume") {
+ result.SubConn = p.nextFollower()
+ }
+ if result.SubConn == nil {
+ return result, balancer.ErrNoSubConnAvailable
+ }
+ return result, nil
+}
+
+func (p *Picker) nextFollower() balancer.SubConn {
+ cur := atomic.AddUint64(&p.current, uint64(1))
+ len := uint64(len(p.followers))
+ idx := int(cur % len)
+ return p.followers[idx]
+}
+
+func init() {
+ balancer.Register(
+ base.NewBalancerBuilder(Name, &Picker{}, base.Config{}),
+ )
+}
diff --git a/ClientSideServiceDiscovery/internal/loadbalance/picker_test.go b/ClientSideServiceDiscovery/internal/loadbalance/picker_test.go
new file mode 100644
index 0000000..1c438a1
--- /dev/null
+++ b/ClientSideServiceDiscovery/internal/loadbalance/picker_test.go
@@ -0,0 +1,91 @@
+package loadbalance_test
+
+import (
+ "testing"
+
+ "google.golang.org/grpc/attributes"
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/balancer/base"
+ "google.golang.org/grpc/resolver"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/igor-baiborodine/proglog/internal/loadbalance"
+)
+
+func TestPickerNoSubConnAvailable(t *testing.T) {
+ picker := &loadbalance.Picker{}
+ for _, method := range []string{
+ "/log.vX.Log/Produce",
+ "/log.vX.Log/Consume",
+ } {
+ info := balancer.PickInfo{
+ FullMethodName: method,
+ }
+ result, err := picker.Pick(info)
+ require.Equal(t, balancer.ErrNoSubConnAvailable, err)
+ require.Nil(t, result.SubConn)
+ }
+}
+
+func TestPickerProducesToLeader(t *testing.T) {
+ picker, subConns := setupTest()
+ info := balancer.PickInfo{
+ FullMethodName: "/log.vX.Log/Produce",
+ }
+ for i := 0; i < 5; i++ {
+ gotPick, err := picker.Pick(info)
+ require.NoError(t, err)
+ require.Equal(t, subConns[0], gotPick.SubConn)
+ }
+}
+
+func TestPickerConsumesFromFollowers(t *testing.T) {
+ picker, subConns := setupTest()
+ info := balancer.PickInfo{
+ FullMethodName: "/log.vX.Log/Consume",
+ }
+ for i := 0; i < 5; i++ {
+ pick, err := picker.Pick(info)
+ require.NoError(t, err)
+ require.Equal(t, subConns[i%2+1], pick.SubConn)
+ }
+}
+
+func setupTest() (*loadbalance.Picker, []*subConn) {
+ var subConns []*subConn
+ buildInfo := base.PickerBuildInfo{
+ ReadySCs: make(map[balancer.SubConn]base.SubConnInfo),
+ }
+ for i := 0; i < 3; i++ {
+ sc := &subConn{}
+ addr := resolver.Address{
+ Attributes: attributes.New("is_leader", i == 0),
+ }
+ // 0th sub conn is the leader
+ sc.UpdateAddresses([]resolver.Address{addr})
+ buildInfo.ReadySCs[sc] = base.SubConnInfo{Address: addr}
+ subConns = append(subConns, sc)
+ }
+ picker := &loadbalance.Picker{}
+ picker.Build(buildInfo)
+ return picker, subConns
+}
+
+// subConn implements balancer.SubConn.
+type subConn struct {
+ addrs []resolver.Address
+}
+
+func (s *subConn) GetOrBuildProducer(builder balancer.ProducerBuilder) (
+ p balancer.Producer, close func(),
+) {
+ // TODO implement me
+ panic("implement me")
+}
+
+func (s *subConn) UpdateAddresses(addrs []resolver.Address) {
+ s.addrs = addrs
+}
+
+func (s *subConn) Connect() {}
diff --git a/ClientSideServiceDiscovery/internal/loadbalance/resolver.go b/ClientSideServiceDiscovery/internal/loadbalance/resolver.go
new file mode 100644
index 0000000..08b820e
--- /dev/null
+++ b/ClientSideServiceDiscovery/internal/loadbalance/resolver.go
@@ -0,0 +1,102 @@
+package loadbalance
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "go.uber.org/zap"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/attributes"
+ "google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/serviceconfig"
+
+ api "github.com/igor-baiborodine/proglog/api/v1"
+)
+
+type Resolver struct {
+ mu sync.Mutex
+ clientConn resolver.ClientConn
+ resolverConn *grpc.ClientConn
+ serviceConfig *serviceconfig.ParseResult
+ logger *zap.Logger
+}
+
+var _ resolver.Builder = (*Resolver)(nil)
+
+func (r *Resolver) Build(
+ target resolver.Target,
+ cc resolver.ClientConn,
+ opts resolver.BuildOptions,
+) (resolver.Resolver, error) {
+ r.logger = zap.L().Named("resolver")
+ r.clientConn = cc
+ var dialOpts []grpc.DialOption
+ if opts.DialCreds != nil {
+ dialOpts = append(
+ dialOpts,
+ grpc.WithTransportCredentials(opts.DialCreds),
+ )
+ }
+ r.serviceConfig = r.clientConn.ParseServiceConfig(
+ fmt.Sprintf(`{"loadBalancingConfig":[{"%s":{}}]}`, Name),
+ )
+ var err error
+ r.resolverConn, err = grpc.Dial(target.Endpoint(), dialOpts...)
+ if err != nil {
+ return nil, err
+ }
+ r.ResolveNow(resolver.ResolveNowOptions{})
+ return r, nil
+}
+
+const Name = "proglog"
+
+func (r *Resolver) Scheme() string {
+ return Name
+}
+
+func init() {
+ resolver.Register(&Resolver{})
+}
+
+var _ resolver.Resolver = (*Resolver)(nil)
+
+func (r *Resolver) ResolveNow(resolver.ResolveNowOptions) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ client := api.NewLogClient(r.resolverConn)
+ // get cluster and then set on cc attributes
+ ctx := context.Background()
+ res, err := client.GetServers(ctx, &api.GetServersRequest{})
+ if err != nil {
+ r.logger.Error(
+ "failed to resolve server",
+ zap.Error(err),
+ )
+ return
+ }
+ var addrs []resolver.Address
+ for _, server := range res.Servers {
+ addrs = append(addrs, resolver.Address{
+ Addr: server.RpcAddr,
+ Attributes: attributes.New(
+ "is_leader",
+ server.IsLeader,
+ ),
+ })
+ }
+ r.clientConn.UpdateState(resolver.State{
+ Addresses: addrs,
+ ServiceConfig: r.serviceConfig,
+ })
+}
+
+func (r *Resolver) Close() {
+ if err := r.resolverConn.Close(); err != nil {
+ r.logger.Error(
+ "failed to close conn",
+ zap.Error(err),
+ )
+ }
+}
diff --git a/ClientSideServiceDiscovery/internal/loadbalance/resolver_test.go b/ClientSideServiceDiscovery/internal/loadbalance/resolver_test.go
new file mode 100644
index 0000000..e6e05e1
--- /dev/null
+++ b/ClientSideServiceDiscovery/internal/loadbalance/resolver_test.go
@@ -0,0 +1,114 @@
+package loadbalance_test
+
+import (
+ "net"
+ "net/url"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/attributes"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/serviceconfig"
+
+ api "github.com/igor-baiborodine/proglog/api/v1"
+ "github.com/igor-baiborodine/proglog/internal/config"
+ "github.com/igor-baiborodine/proglog/internal/loadbalance"
+ "github.com/igor-baiborodine/proglog/internal/server"
+)
+
+func TestResolver(t *testing.T) {
+ l, err := net.Listen("tcp", "127.0.0.1:0")
+ require.NoError(t, err)
+
+ tlsConfig, err := config.SetupTLSConfig(config.TLSConfig{
+ CertFile: config.ServerCertFile,
+ KeyFile: config.ServerKeyFile,
+ CAFile: config.CAFile,
+ Server: true,
+ ServerAddress: "127.0.0.1",
+ })
+ require.NoError(t, err)
+ serverCreds := credentials.NewTLS(tlsConfig)
+
+ srv, err := server.NewGRPCServer(&server.Config{
+ GetServerer: &getServers{}, //
+ }, grpc.Creds(serverCreds))
+ require.NoError(t, err)
+
+ go srv.Serve(l)
+
+ conn := &clientConn{}
+ tlsConfig, err = config.SetupTLSConfig(config.TLSConfig{
+ CertFile: config.RootClientCertFile,
+ KeyFile: config.RootClientKeyFile,
+ CAFile: config.CAFile,
+ Server: false,
+ ServerAddress: "127.0.0.1",
+ })
+ require.NoError(t, err)
+ clientCreds := credentials.NewTLS(tlsConfig)
+ opts := resolver.BuildOptions{
+ DialCreds: clientCreds,
+ }
+ r := &loadbalance.Resolver{}
+ _, err = r.Build(
+ resolver.Target{
+ URL: url.URL{Path: l.Addr().String()},
+ },
+ conn,
+ opts,
+ )
+ require.NoError(t, err)
+
+ wantState := resolver.State{
+ Addresses: []resolver.Address{{
+ Addr: "localhost:9001",
+ Attributes: attributes.New("is_leader", true),
+ }, {
+ Addr: "localhost:9002",
+ Attributes: attributes.New("is_leader", false),
+ }},
+ }
+ require.Equal(t, wantState, conn.state)
+
+ conn.state.Addresses = nil
+ r.ResolveNow(resolver.ResolveNowOptions{})
+ require.Equal(t, wantState, conn.state)
+}
+
+type getServers struct{}
+
+func (s *getServers) GetServers() ([]*api.Server, error) {
+ return []*api.Server{{
+ Id: "leader",
+ RpcAddr: "localhost:9001",
+ IsLeader: true,
+ }, {
+ Id: "follower",
+ RpcAddr: "localhost:9002",
+ }}, nil
+}
+
+type clientConn struct {
+ resolver.ClientConn
+ state resolver.State
+}
+
+func (c *clientConn) UpdateState(state resolver.State) error {
+ c.state = state
+ return nil
+}
+
+func (c *clientConn) ReportError(err error) {}
+
+func (c *clientConn) NewAddress(addrs []resolver.Address) {}
+
+func (c *clientConn) NewServiceConfig(config string) {}
+
+func (c *clientConn) ParseServiceConfig(
+ config string,
+) *serviceconfig.ParseResult {
+ return nil
+}
diff --git a/ClientSideServiceDiscovery/internal/log/distributed.go b/ClientSideServiceDiscovery/internal/log/distributed.go
index 9d7ad81..ec1f9ab 100644
--- a/ClientSideServiceDiscovery/internal/log/distributed.go
+++ b/ClientSideServiceDiscovery/internal/log/distributed.go
@@ -228,6 +228,22 @@ func (l *DistributedLog) Close() error {
return l.log.Close()
}
+func (l *DistributedLog) GetServers() ([]*api.Server, error) {
+ future := l.raft.GetConfiguration()
+ if err := future.Error(); err != nil {
+ return nil, err
+ }
+ var servers []*api.Server
+ for _, server := range future.Configuration().Servers {
+ servers = append(servers, &api.Server{
+ Id: string(server.ID),
+ RpcAddr: string(server.Address),
+ IsLeader: l.raft.Leader() == server.Address,
+ })
+ }
+ return servers, nil
+}
+
var _ raft.FSM = (*fsm)(nil)
type fsm struct {
@@ -240,31 +256,31 @@ const (
AppendRequestType RequestType = 0
)
-func (l *fsm) Apply(record *raft.Log) interface{} {
+func (f *fsm) Apply(record *raft.Log) interface{} {
buf := record.Data
reqType := RequestType(buf[0])
switch reqType {
case AppendRequestType:
- return l.applyAppend(buf[1:])
+ return f.applyAppend(buf[1:])
}
return nil
}
-func (l *fsm) applyAppend(b []byte) interface{} {
+func (f *fsm) applyAppend(b []byte) interface{} {
var req api.ProduceRequest
err := proto.Unmarshal(b, &req)
if err != nil {
return err
}
- offset, err := l.log.Append(req.Record)
+ offset, err := f.log.Append(req.Record)
if err != nil {
return err
}
return &api.ProduceResponse{Offset: offset}
}
-func (l *fsm) Snapshot() (raft.FSMSnapshot, error) {
- r := l.log.Reader()
+func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
+ r := f.log.Reader()
return &snapshot{reader: r}, nil
}
@@ -284,7 +300,7 @@ func (s *snapshot) Persist(sink raft.SnapshotSink) error {
func (s *snapshot) Release() {}
-func (l *fsm) Restore(r io.ReadCloser) error {
+func (f *fsm) Restore(r io.ReadCloser) error {
b := make([]byte, lenWidth)
var buf bytes.Buffer
for i := 0; ; i++ {
@@ -303,12 +319,12 @@ func (l *fsm) Restore(r io.ReadCloser) error {
return err
}
if i == 0 {
- l.log.Config.Segment.InitialOffset = record.Offset
- if err := l.log.Reset(); err != nil {
+ f.log.Config.Segment.InitialOffset = record.Offset
+ if err := f.log.Reset(); err != nil {
return err
}
}
- if _, err = l.log.Append(record); err != nil {
+ if _, err = f.log.Append(record); err != nil {
return err
}
buf.Reset()
@@ -399,7 +415,6 @@ func (s *StreamLayer) Dial(
) (net.Conn, error) {
dialer := &net.Dialer{Timeout: timeout}
var conn, err = dialer.Dial("tcp", string(addr))
-
if err != nil {
return nil, err
}
diff --git a/ClientSideServiceDiscovery/internal/log/distributed_test.go b/ClientSideServiceDiscovery/internal/log/distributed_test.go
index ee8bd33..d32a8d0 100644
--- a/ClientSideServiceDiscovery/internal/log/distributed_test.go
+++ b/ClientSideServiceDiscovery/internal/log/distributed_test.go
@@ -28,10 +28,7 @@ func TestMultipleNodes(t *testing.T) {
_ = os.RemoveAll(dir)
}(dataDir)
- ln, err := net.Listen(
- "tcp",
- fmt.Sprintf("127.0.0.1:%d", ports[i]),
- )
+ ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", ports[i]))
require.NoError(t, err)
config := log.Config{}
@@ -85,10 +82,24 @@ func TestMultipleNodes(t *testing.T) {
}, 500*time.Millisecond, 50*time.Millisecond)
}
- err := logs[0].Leave("1")
+ servers, err := logs[0].GetServers()
require.NoError(t, err)
+ require.Equal(t, 3, len(servers))
+ require.True(t, servers[0].IsLeader)
+ require.False(t, servers[1].IsLeader)
+ require.False(t, servers[2].IsLeader)
- time.Sleep(50 * time.Millisecond)
+ // START: distributed_log_test_leave
+ err = logs[0].Leave("1") //
+ require.NoError(t, err) //
+
+ time.Sleep(50 * time.Millisecond) //
+
+ servers, err = logs[0].GetServers()
+ require.NoError(t, err)
+ require.Equal(t, 2, len(servers))
+ require.True(t, servers[0].IsLeader)
+ require.False(t, servers[1].IsLeader)
off, err := logs[0].Append(&api.Record{
Value: []byte("third"),
@@ -104,5 +115,5 @@ func TestMultipleNodes(t *testing.T) {
record, err = logs[2].Read(off)
require.NoError(t, err)
require.Equal(t, []byte("third"), record.Value)
- require.Equal(t, off, record.Offset)
+ require.Equal(t, off, record.Offset) //
}
diff --git a/ClientSideServiceDiscovery/internal/server/server.go b/ClientSideServiceDiscovery/internal/server/server.go
index f73cc4b..aca5270 100644
--- a/ClientSideServiceDiscovery/internal/server/server.go
+++ b/ClientSideServiceDiscovery/internal/server/server.go
@@ -24,8 +24,9 @@ import (
)
type Config struct {
- CommitLog CommitLog
- Authorizer Authorizer
+ CommitLog CommitLog
+ Authorizer Authorizer
+ GetServerer GetServerer
}
const (
@@ -162,6 +163,21 @@ func (s *grpcServer) ConsumeStream(
}
}
+func (s *grpcServer) GetServers(
+ ctx context.Context, req *api.GetServersRequest,
+) (
+ *api.GetServersResponse, error) {
+ servers, err := s.GetServerer.GetServers()
+ if err != nil {
+ return nil, err
+ }
+ return &api.GetServersResponse{Servers: servers}, nil
+}
+
+type GetServerer interface {
+ GetServers() ([]*api.Server, error)
+}
+
type CommitLog interface {
Append(*api.Record) (uint64, error)
Read(uint64) (*api.Record, error)