From 18150a5619b6ef61fc048169160d9d8e1d0c37b3 Mon Sep 17 00:00:00 2001 From: Jakub Dyszkiewicz Date: Tue, 13 Dec 2022 08:59:51 +0100 Subject: [PATCH] feat: intercp communication with catalog (#5445) Signed-off-by: Jakub Dyszkiewicz --- api/system/v1alpha1/inter_cp_ping.pb.go | 247 ++++++++++++++++++ api/system/v1alpha1/inter_cp_ping.proto | 16 ++ api/system/v1alpha1/inter_cp_ping_grpc.pb.go | 101 +++++++ app/kuma-cp/cmd/run.go | 5 + ...tall-control-plane.cni-enabled.golden.yaml | 4 + ...plane.cni-experimental-enabled.golden.yaml | 4 + ...install-control-plane.defaults.golden.yaml | 4 + ...plane.gateway-api-present-not-enabled.yaml | 4 + ...all-control-plane.gateway-api-present.yaml | 4 + .../install-control-plane.global.golden.yaml | 4 + ...ontrol-plane.override-env-vars.golden.yaml | 4 + ...nstall-control-plane.overrides.golden.yaml | 4 + ...install-control-plane.registry.golden.yaml | 4 + ...roxy-ebpf-experimental-enabled.golden.yaml | 4 + ...tall-control-plane.with-egress.golden.yaml | 4 + .../install-control-plane.with-helm-set.yaml | 4 + ...nstall-control-plane.with-helm-values.yaml | 4 + ...all-control-plane.with-ingress.golden.yaml | 4 + .../install-control-plane.zone.golden.yaml | 4 + .../install-cp-helm/empty.golden.yaml | 4 + .../install-cp-helm/fix4485.golden.yaml | 4 + .../install-cp-helm/fix4496.golden.yaml | 4 + .../install-cp-helm/fix4935.golden.yaml | 4 + .../charts/kuma/templates/cp-deployment.yaml | 4 + pkg/config/app/kuma-cp/config.go | 9 +- pkg/config/app/kuma-cp/kuma-cp.defaults.yaml | 22 ++ pkg/config/intercp/config.go | 87 ++++++ pkg/config/loader_test.go | 25 ++ pkg/core/bootstrap/autoconfig.go | 24 ++ pkg/core/bootstrap/autoconfig_test.go | 12 + pkg/envoy/admin/tls/pki.go | 4 +- pkg/intercp/catalog/catalog.go | 48 ++++ pkg/intercp/catalog/catalog_test.go | 197 ++++++++++++++ pkg/intercp/catalog/client.go | 26 ++ pkg/intercp/catalog/config_catalog.go | 102 ++++++++ pkg/intercp/catalog/discovery_suite_test.go | 11 + pkg/intercp/catalog/heartbeat_component.go | 127 +++++++++ .../catalog/heartbeat_component_test.go | 189 ++++++++++++++ pkg/intercp/catalog/heartbeats.go | 40 +++ pkg/intercp/catalog/heartbeats_test.go | 39 +++ pkg/intercp/catalog/server.go | 45 ++++ pkg/intercp/catalog/server_test.go | 91 +++++++ pkg/intercp/catalog/writer.go | 65 +++++ pkg/intercp/catalog/writer_test.go | 95 +++++++ pkg/intercp/client/client.go | 43 +++ pkg/intercp/components.go | 119 +++++++++ pkg/intercp/server/server.go | 118 +++++++++ pkg/intercp/tls/defaults.go | 62 +++++ pkg/intercp/tls/defaults_test.go | 57 ++++ pkg/intercp/tls/pki.go | 75 ++++++ pkg/intercp/tls/pki_test.go | 53 ++++ pkg/intercp/tls/tls_suite_test.go | 11 + pkg/util/net/ips.go | 19 +- test/e2e/helm/kuma_helm_deploy_global_zone.go | 53 +++- 54 files changed, 2311 insertions(+), 6 deletions(-) create mode 100644 api/system/v1alpha1/inter_cp_ping.pb.go create mode 100644 api/system/v1alpha1/inter_cp_ping.proto create mode 100644 api/system/v1alpha1/inter_cp_ping_grpc.pb.go create mode 100644 pkg/config/intercp/config.go create mode 100644 pkg/intercp/catalog/catalog.go create mode 100644 pkg/intercp/catalog/catalog_test.go create mode 100644 pkg/intercp/catalog/client.go create mode 100644 pkg/intercp/catalog/config_catalog.go create mode 100644 pkg/intercp/catalog/discovery_suite_test.go create mode 100644 pkg/intercp/catalog/heartbeat_component.go create mode 100644 pkg/intercp/catalog/heartbeat_component_test.go create mode 100644 pkg/intercp/catalog/heartbeats.go create mode 100644 pkg/intercp/catalog/heartbeats_test.go create mode 100644 pkg/intercp/catalog/server.go create mode 100644 pkg/intercp/catalog/server_test.go create mode 100644 pkg/intercp/catalog/writer.go create mode 100644 pkg/intercp/catalog/writer_test.go create mode 100644 pkg/intercp/client/client.go create mode 100644 pkg/intercp/components.go create mode 100644 pkg/intercp/server/server.go create mode 100644 pkg/intercp/tls/defaults.go create mode 100644 pkg/intercp/tls/defaults_test.go create mode 100644 pkg/intercp/tls/pki.go create mode 100644 pkg/intercp/tls/pki_test.go create mode 100644 pkg/intercp/tls/tls_suite_test.go diff --git a/api/system/v1alpha1/inter_cp_ping.pb.go b/api/system/v1alpha1/inter_cp_ping.pb.go new file mode 100644 index 000000000000..90a753d0f363 --- /dev/null +++ b/api/system/v1alpha1/inter_cp_ping.pb.go @@ -0,0 +1,247 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.20.0 +// source: system/v1alpha1/inter_cp_ping.proto + +package v1alpha1 + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type PingRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + InstanceId string `protobuf:"bytes,1,opt,name=instance_id,json=instanceId,proto3" json:"instance_id,omitempty"` + Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` + InterCpPort uint32 `protobuf:"varint,3,opt,name=inter_cp_port,json=interCpPort,proto3" json:"inter_cp_port,omitempty"` + Ready bool `protobuf:"varint,4,opt,name=ready,proto3" json:"ready,omitempty"` +} + +func (x *PingRequest) Reset() { + *x = PingRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_system_v1alpha1_inter_cp_ping_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PingRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PingRequest) ProtoMessage() {} + +func (x *PingRequest) ProtoReflect() protoreflect.Message { + mi := &file_system_v1alpha1_inter_cp_ping_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PingRequest.ProtoReflect.Descriptor instead. +func (*PingRequest) Descriptor() ([]byte, []int) { + return file_system_v1alpha1_inter_cp_ping_proto_rawDescGZIP(), []int{0} +} + +func (x *PingRequest) GetInstanceId() string { + if x != nil { + return x.InstanceId + } + return "" +} + +func (x *PingRequest) GetAddress() string { + if x != nil { + return x.Address + } + return "" +} + +func (x *PingRequest) GetInterCpPort() uint32 { + if x != nil { + return x.InterCpPort + } + return 0 +} + +func (x *PingRequest) GetReady() bool { + if x != nil { + return x.Ready + } + return false +} + +type PingResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Leader bool `protobuf:"varint,1,opt,name=leader,proto3" json:"leader,omitempty"` +} + +func (x *PingResponse) Reset() { + *x = PingResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_system_v1alpha1_inter_cp_ping_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PingResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PingResponse) ProtoMessage() {} + +func (x *PingResponse) ProtoReflect() protoreflect.Message { + mi := &file_system_v1alpha1_inter_cp_ping_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PingResponse.ProtoReflect.Descriptor instead. +func (*PingResponse) Descriptor() ([]byte, []int) { + return file_system_v1alpha1_inter_cp_ping_proto_rawDescGZIP(), []int{1} +} + +func (x *PingResponse) GetLeader() bool { + if x != nil { + return x.Leader + } + return false +} + +var File_system_v1alpha1_inter_cp_ping_proto protoreflect.FileDescriptor + +var file_system_v1alpha1_inter_cp_ping_proto_rawDesc = []byte{ + 0x0a, 0x23, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, + 0x31, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x5f, 0x63, 0x70, 0x5f, 0x70, 0x69, 0x6e, 0x67, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x14, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x73, 0x79, 0x73, 0x74, + 0x65, 0x6d, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x22, 0x82, 0x01, 0x0a, 0x0b, + 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x69, + 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, + 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, + 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x22, 0x0a, 0x0d, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x5f, + 0x63, 0x70, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x69, + 0x6e, 0x74, 0x65, 0x72, 0x43, 0x70, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, + 0x61, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, + 0x22, 0x26, 0x0a, 0x0c, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x16, 0x0a, 0x06, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, + 0x52, 0x06, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x32, 0x63, 0x0a, 0x12, 0x49, 0x6e, 0x74, 0x65, + 0x72, 0x43, 0x70, 0x50, 0x69, 0x6e, 0x67, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4d, + 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x21, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x73, 0x79, + 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x50, 0x69, + 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x6b, 0x75, 0x6d, 0x61, + 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, + 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x2c, 0x5a, + 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, 0x75, 0x6d, 0x61, + 0x68, 0x71, 0x2f, 0x6b, 0x75, 0x6d, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x79, 0x73, 0x74, + 0x65, 0x6d, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, +} + +var ( + file_system_v1alpha1_inter_cp_ping_proto_rawDescOnce sync.Once + file_system_v1alpha1_inter_cp_ping_proto_rawDescData = file_system_v1alpha1_inter_cp_ping_proto_rawDesc +) + +func file_system_v1alpha1_inter_cp_ping_proto_rawDescGZIP() []byte { + file_system_v1alpha1_inter_cp_ping_proto_rawDescOnce.Do(func() { + file_system_v1alpha1_inter_cp_ping_proto_rawDescData = protoimpl.X.CompressGZIP(file_system_v1alpha1_inter_cp_ping_proto_rawDescData) + }) + return file_system_v1alpha1_inter_cp_ping_proto_rawDescData +} + +var file_system_v1alpha1_inter_cp_ping_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_system_v1alpha1_inter_cp_ping_proto_goTypes = []interface{}{ + (*PingRequest)(nil), // 0: kuma.system.v1alpha1.PingRequest + (*PingResponse)(nil), // 1: kuma.system.v1alpha1.PingResponse +} +var file_system_v1alpha1_inter_cp_ping_proto_depIdxs = []int32{ + 0, // 0: kuma.system.v1alpha1.InterCpPingService.Ping:input_type -> kuma.system.v1alpha1.PingRequest + 1, // 1: kuma.system.v1alpha1.InterCpPingService.Ping:output_type -> kuma.system.v1alpha1.PingResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_system_v1alpha1_inter_cp_ping_proto_init() } +func file_system_v1alpha1_inter_cp_ping_proto_init() { + if File_system_v1alpha1_inter_cp_ping_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_system_v1alpha1_inter_cp_ping_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PingRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_system_v1alpha1_inter_cp_ping_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PingResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_system_v1alpha1_inter_cp_ping_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_system_v1alpha1_inter_cp_ping_proto_goTypes, + DependencyIndexes: file_system_v1alpha1_inter_cp_ping_proto_depIdxs, + MessageInfos: file_system_v1alpha1_inter_cp_ping_proto_msgTypes, + }.Build() + File_system_v1alpha1_inter_cp_ping_proto = out.File + file_system_v1alpha1_inter_cp_ping_proto_rawDesc = nil + file_system_v1alpha1_inter_cp_ping_proto_goTypes = nil + file_system_v1alpha1_inter_cp_ping_proto_depIdxs = nil +} diff --git a/api/system/v1alpha1/inter_cp_ping.proto b/api/system/v1alpha1/inter_cp_ping.proto new file mode 100644 index 000000000000..7d1f9fbe13de --- /dev/null +++ b/api/system/v1alpha1/inter_cp_ping.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package kuma.system.v1alpha1; + +option go_package = "github.com/kumahq/kuma/api/system/v1alpha1"; + +message PingRequest { + string instance_id = 1; + string address = 2; + uint32 inter_cp_port = 3; + bool ready = 4; +} + +message PingResponse { bool leader = 1; } + +service InterCpPingService { rpc Ping(PingRequest) returns (PingResponse); } diff --git a/api/system/v1alpha1/inter_cp_ping_grpc.pb.go b/api/system/v1alpha1/inter_cp_ping_grpc.pb.go new file mode 100644 index 000000000000..b5bbafd04647 --- /dev/null +++ b/api/system/v1alpha1/inter_cp_ping_grpc.pb.go @@ -0,0 +1,101 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package v1alpha1 + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// InterCpPingServiceClient is the client API for InterCpPingService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type InterCpPingServiceClient interface { + Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) +} + +type interCpPingServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewInterCpPingServiceClient(cc grpc.ClientConnInterface) InterCpPingServiceClient { + return &interCpPingServiceClient{cc} +} + +func (c *interCpPingServiceClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) { + out := new(PingResponse) + err := c.cc.Invoke(ctx, "/kuma.system.v1alpha1.InterCpPingService/Ping", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// InterCpPingServiceServer is the server API for InterCpPingService service. +// All implementations must embed UnimplementedInterCpPingServiceServer +// for forward compatibility +type InterCpPingServiceServer interface { + Ping(context.Context, *PingRequest) (*PingResponse, error) + mustEmbedUnimplementedInterCpPingServiceServer() +} + +// UnimplementedInterCpPingServiceServer must be embedded to have forward compatible implementations. +type UnimplementedInterCpPingServiceServer struct { +} + +func (UnimplementedInterCpPingServiceServer) Ping(context.Context, *PingRequest) (*PingResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") +} +func (UnimplementedInterCpPingServiceServer) mustEmbedUnimplementedInterCpPingServiceServer() {} + +// UnsafeInterCpPingServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to InterCpPingServiceServer will +// result in compilation errors. +type UnsafeInterCpPingServiceServer interface { + mustEmbedUnimplementedInterCpPingServiceServer() +} + +func RegisterInterCpPingServiceServer(s grpc.ServiceRegistrar, srv InterCpPingServiceServer) { + s.RegisterService(&InterCpPingService_ServiceDesc, srv) +} + +func _InterCpPingService_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PingRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(InterCpPingServiceServer).Ping(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/kuma.system.v1alpha1.InterCpPingService/Ping", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(InterCpPingServiceServer).Ping(ctx, req.(*PingRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// InterCpPingService_ServiceDesc is the grpc.ServiceDesc for InterCpPingService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var InterCpPingService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "kuma.system.v1alpha1.InterCpPingService", + HandlerType: (*InterCpPingServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Ping", + Handler: _InterCpPingService_Ping_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "system/v1alpha1/inter_cp_ping.proto", +} diff --git a/app/kuma-cp/cmd/run.go b/app/kuma-cp/cmd/run.go index c9e56a320de3..899722dbb406 100644 --- a/app/kuma-cp/cmd/run.go +++ b/app/kuma-cp/cmd/run.go @@ -18,6 +18,7 @@ import ( "github.com/kumahq/kuma/pkg/gc" "github.com/kumahq/kuma/pkg/hds" "github.com/kumahq/kuma/pkg/insights" + "github.com/kumahq/kuma/pkg/intercp" kds_global "github.com/kumahq/kuma/pkg/kds/global" kds_zone "github.com/kumahq/kuma/pkg/kds/zone" mads_server "github.com/kumahq/kuma/pkg/mads/server" @@ -135,6 +136,10 @@ func newRunCmdWithOpts(opts kuma_cmd.RunCmdOpts) *cobra.Command { runLog.Error(err, "unable to set up GC") return err } + if err := intercp.Setup(rt); err != nil { + runLog.Error(err, "unable to set up Control Plane Intercommunication") + return err + } runLog.Info("starting Control Plane", "version", kuma_version.Build.Version) if err := rt.Start(gracefulCtx.Done()); err != nil { diff --git a/app/kumactl/cmd/install/testdata/install-control-plane.cni-enabled.golden.yaml b/app/kumactl/cmd/install/testdata/install-control-plane.cni-enabled.golden.yaml index 2d60251d3dee..98068dc923d9 100644 --- a/app/kumactl/cmd/install/testdata/install-control-plane.cni-enabled.golden.yaml +++ b/app/kumactl/cmd/install/testdata/install-control-plane.cni-enabled.golden.yaml @@ -581,6 +581,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-control-plane.cni-experimental-enabled.golden.yaml b/app/kumactl/cmd/install/testdata/install-control-plane.cni-experimental-enabled.golden.yaml index e6fe96487c02..f137839ad17f 100644 --- a/app/kumactl/cmd/install/testdata/install-control-plane.cni-experimental-enabled.golden.yaml +++ b/app/kumactl/cmd/install/testdata/install-control-plane.cni-experimental-enabled.golden.yaml @@ -603,6 +603,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-control-plane.defaults.golden.yaml b/app/kumactl/cmd/install/testdata/install-control-plane.defaults.golden.yaml index 2b1ebabf44fd..feafb18a0a76 100644 --- a/app/kumactl/cmd/install/testdata/install-control-plane.defaults.golden.yaml +++ b/app/kumactl/cmd/install/testdata/install-control-plane.defaults.golden.yaml @@ -2933,6 +2933,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-control-plane.gateway-api-present-not-enabled.yaml b/app/kumactl/cmd/install/testdata/install-control-plane.gateway-api-present-not-enabled.yaml index 2b1ebabf44fd..feafb18a0a76 100644 --- a/app/kumactl/cmd/install/testdata/install-control-plane.gateway-api-present-not-enabled.yaml +++ b/app/kumactl/cmd/install/testdata/install-control-plane.gateway-api-present-not-enabled.yaml @@ -2933,6 +2933,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-control-plane.gateway-api-present.yaml b/app/kumactl/cmd/install/testdata/install-control-plane.gateway-api-present.yaml index 53fb13ae49fd..261aba5d6357 100644 --- a/app/kumactl/cmd/install/testdata/install-control-plane.gateway-api-present.yaml +++ b/app/kumactl/cmd/install/testdata/install-control-plane.gateway-api-present.yaml @@ -3069,6 +3069,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-control-plane.global.golden.yaml b/app/kumactl/cmd/install/testdata/install-control-plane.global.golden.yaml index cea179090aae..39274b632f4f 100644 --- a/app/kumactl/cmd/install/testdata/install-control-plane.global.golden.yaml +++ b/app/kumactl/cmd/install/testdata/install-control-plane.global.golden.yaml @@ -420,6 +420,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-control-plane.override-env-vars.golden.yaml b/app/kumactl/cmd/install/testdata/install-control-plane.override-env-vars.golden.yaml index 08fc0a26d78f..ea26e4870988 100644 --- a/app/kumactl/cmd/install/testdata/install-control-plane.override-env-vars.golden.yaml +++ b/app/kumactl/cmd/install/testdata/install-control-plane.override-env-vars.golden.yaml @@ -403,6 +403,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-control-plane.overrides.golden.yaml b/app/kumactl/cmd/install/testdata/install-control-plane.overrides.golden.yaml index 634f170263d6..ed0e55bd02b5 100644 --- a/app/kumactl/cmd/install/testdata/install-control-plane.overrides.golden.yaml +++ b/app/kumactl/cmd/install/testdata/install-control-plane.overrides.golden.yaml @@ -419,6 +419,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-control-plane.registry.golden.yaml b/app/kumactl/cmd/install/testdata/install-control-plane.registry.golden.yaml index 66ae4eac62be..0d5e88f5867a 100644 --- a/app/kumactl/cmd/install/testdata/install-control-plane.registry.golden.yaml +++ b/app/kumactl/cmd/install/testdata/install-control-plane.registry.golden.yaml @@ -403,6 +403,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-control-plane.tproxy-ebpf-experimental-enabled.golden.yaml b/app/kumactl/cmd/install/testdata/install-control-plane.tproxy-ebpf-experimental-enabled.golden.yaml index 6fcc862078b9..d770259f35fc 100644 --- a/app/kumactl/cmd/install/testdata/install-control-plane.tproxy-ebpf-experimental-enabled.golden.yaml +++ b/app/kumactl/cmd/install/testdata/install-control-plane.tproxy-ebpf-experimental-enabled.golden.yaml @@ -413,6 +413,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-control-plane.with-egress.golden.yaml b/app/kumactl/cmd/install/testdata/install-control-plane.with-egress.golden.yaml index 2cbd6c1afdaa..2605cff7cf53 100644 --- a/app/kumactl/cmd/install/testdata/install-control-plane.with-egress.golden.yaml +++ b/app/kumactl/cmd/install/testdata/install-control-plane.with-egress.golden.yaml @@ -434,6 +434,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-control-plane.with-helm-set.yaml b/app/kumactl/cmd/install/testdata/install-control-plane.with-helm-set.yaml index cf90708a24a8..d070fd45143e 100644 --- a/app/kumactl/cmd/install/testdata/install-control-plane.with-helm-set.yaml +++ b/app/kumactl/cmd/install/testdata/install-control-plane.with-helm-set.yaml @@ -2999,6 +2999,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-control-plane.with-helm-values.yaml b/app/kumactl/cmd/install/testdata/install-control-plane.with-helm-values.yaml index bea4ed0ec0f7..d77de18f831c 100644 --- a/app/kumactl/cmd/install/testdata/install-control-plane.with-helm-values.yaml +++ b/app/kumactl/cmd/install/testdata/install-control-plane.with-helm-values.yaml @@ -403,6 +403,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-control-plane.with-ingress.golden.yaml b/app/kumactl/cmd/install/testdata/install-control-plane.with-ingress.golden.yaml index fe1897599e7a..7b6c2a6c0b59 100644 --- a/app/kumactl/cmd/install/testdata/install-control-plane.with-ingress.golden.yaml +++ b/app/kumactl/cmd/install/testdata/install-control-plane.with-ingress.golden.yaml @@ -438,6 +438,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-control-plane.zone.golden.yaml b/app/kumactl/cmd/install/testdata/install-control-plane.zone.golden.yaml index e350bfe95a12..fd894de04fe1 100644 --- a/app/kumactl/cmd/install/testdata/install-control-plane.zone.golden.yaml +++ b/app/kumactl/cmd/install/testdata/install-control-plane.zone.golden.yaml @@ -407,6 +407,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-cp-helm/empty.golden.yaml b/app/kumactl/cmd/install/testdata/install-cp-helm/empty.golden.yaml index bb90e01a723f..a7f9e6d3177e 100644 --- a/app/kumactl/cmd/install/testdata/install-cp-helm/empty.golden.yaml +++ b/app/kumactl/cmd/install/testdata/install-cp-helm/empty.golden.yaml @@ -403,6 +403,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-cp-helm/fix4485.golden.yaml b/app/kumactl/cmd/install/testdata/install-cp-helm/fix4485.golden.yaml index 8f2ec991dfe7..d3046862dd58 100644 --- a/app/kumactl/cmd/install/testdata/install-cp-helm/fix4485.golden.yaml +++ b/app/kumactl/cmd/install/testdata/install-cp-helm/fix4485.golden.yaml @@ -426,6 +426,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-cp-helm/fix4496.golden.yaml b/app/kumactl/cmd/install/testdata/install-cp-helm/fix4496.golden.yaml index 39dfa3f6ab40..fad6ce8d7ac7 100644 --- a/app/kumactl/cmd/install/testdata/install-cp-helm/fix4496.golden.yaml +++ b/app/kumactl/cmd/install/testdata/install-cp-helm/fix4496.golden.yaml @@ -449,6 +449,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/app/kumactl/cmd/install/testdata/install-cp-helm/fix4935.golden.yaml b/app/kumactl/cmd/install/testdata/install-cp-helm/fix4935.golden.yaml index de3a5d42cc44..33529ef89123 100644 --- a/app/kumactl/cmd/install/testdata/install-cp-helm/fix4935.golden.yaml +++ b/app/kumactl/cmd/install/testdata/install-cp-helm/fix4935.golden.yaml @@ -657,6 +657,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level=info diff --git a/deployments/charts/kuma/templates/cp-deployment.yaml b/deployments/charts/kuma/templates/cp-deployment.yaml index ef39087bdb18..475a6878be4e 100644 --- a/deployments/charts/kuma/templates/cp-deployment.yaml +++ b/deployments/charts/kuma/templates/cp-deployment.yaml @@ -92,6 +92,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP args: - run - --log-level={{ .Values.controlPlane.logLevel }} diff --git a/pkg/config/app/kuma-cp/config.go b/pkg/config/app/kuma-cp/config.go index 04df567fbe0f..3e0bdc4e901c 100644 --- a/pkg/config/app/kuma-cp/config.go +++ b/pkg/config/app/kuma-cp/config.go @@ -14,6 +14,7 @@ import ( "github.com/kumahq/kuma/pkg/config/diagnostics" dns_server "github.com/kumahq/kuma/pkg/config/dns-server" dp_server "github.com/kumahq/kuma/pkg/config/dp-server" + "github.com/kumahq/kuma/pkg/config/intercp" "github.com/kumahq/kuma/pkg/config/mads" "github.com/kumahq/kuma/pkg/config/multizone" "github.com/kumahq/kuma/pkg/config/plugins/runtime" @@ -150,6 +151,8 @@ type Config struct { Experimental ExperimentalConfig `json:"experimental"` // Proxy holds configuration for proxies Proxy xds.Proxy `json:"proxy"` + // Intercommunication CP configuration + InterCp intercp.InterCpConfig `json:"interCp"` } func (c *Config) Sanitize() { @@ -208,7 +211,8 @@ var DefaultConfig = func() Config { GatewayAPI: false, KubeOutboundsAsVIPs: false, }, - Proxy: xds.DefaultProxyConfig(), + Proxy: xds.DefaultProxyConfig(), + InterCp: intercp.DefaultInterCpConfig(), } } @@ -281,6 +285,9 @@ func (c *Config) Validate() error { if err := c.Experimental.Validate(); err != nil { return errors.Wrap(err, "Experimental validation failed") } + if err := c.InterCp.Validate(); err != nil { + return errors.Wrap(err, "InterCp validation failed") + } return nil } diff --git a/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml b/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml index fd1bf72cc0f3..b2bc58987b58 100644 --- a/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml +++ b/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml @@ -490,6 +490,28 @@ dpServer: # UnhealthyThreshold is a number of unhealthy health checks required before a host is marked unhealthy unhealthyThreshold: 1 # ENV: KUMA_DP_SERVER_HDS_CHECK_UNHEALTHY_THRESHOLD +# Intercommunication CP configuration +interCp: + # Catalog configuration. Catalog keeps a record of all live CP instances in the zone. + catalog: + # Indicates an address on which other control planes can communicate with this CP. + # If empty then it's autoconfigured by taking the first IP of the nonloopback network interface. + instanceAddress: "" # ENV: KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS + # Interval on which CP will send heartbeat to a leader. + heartbeatInterval: 5s # ENV: KUMA_INTER_CP_CATALOG_HEARTBEAT_INTERVAL + # Interval on which CP will write all instances to a catalog. + writerInterval: 15s # ENV: KUMA_INTER_CP_CATALOG_WRITER_INTERVAL + # Intercommunication CP server configuration + server: + # Port of the inter-cp server + port: 5683 # ENV: KUMA_INTER_CP_SERVER_PORT + # TlsMinVersion the minimum version of TLS + tlsMinVersion: "TLSv1_2" # ENV: KUMA_INTER_CP_SERVER_TLS_MIN_VERSION + # TlsMaxVersion the maximum version of TLS + tlsMaxVersion: # ENV: KUMA_INTER_CP_SERVER_TLS_MAX_VERSION + # TlsCipherSuites the list of cipher suites + tlsCipherSuites: [ ] # ENV: KUMA_INTER_CP_SERVER_TLS_CIPHER_SUITES + # Access Control configuration access: # Type of access strategy (available values: "static") diff --git a/pkg/config/intercp/config.go b/pkg/config/intercp/config.go new file mode 100644 index 000000000000..c93dd2c86205 --- /dev/null +++ b/pkg/config/intercp/config.go @@ -0,0 +1,87 @@ +package intercp + +import ( + "time" + + "github.com/asaskevich/govalidator" + "github.com/pkg/errors" + "go.uber.org/multierr" + + config_types "github.com/kumahq/kuma/pkg/config/types" +) + +func DefaultInterCpConfig() InterCpConfig { + return InterCpConfig{ + Catalog: CatalogConfig{ + InstanceAddress: "", // autoconfigured + HeartbeatInterval: config_types.Duration{Duration: 5 * time.Second}, + WriterInterval: config_types.Duration{Duration: 15 * time.Second}, + }, + Server: InterCpServerConfig{ + Port: 5683, + TlsMinVersion: "TLSv1_2", + TlsCipherSuites: []string{}, + }, + } +} + +type InterCpConfig struct { + // Catalog configuration. Catalog keeps a record of all live CP instances in the zone. + Catalog CatalogConfig `json:"catalog"` + // Intercommunication CP server configuration + Server InterCpServerConfig `json:"server"` +} + +func (i *InterCpConfig) Validate() error { + if err := i.Server.Validate(); err != nil { + return errors.Wrap(err, ".Server validation failed") + } + if err := i.Catalog.Validate(); err != nil { + return errors.Wrap(err, ".Catalog validation failed") + } + return nil +} + +type CatalogConfig struct { + // InstanceAddress indicates an address on which other control planes can communicate with this CP + // If empty then it's autoconfigured by taking the first IP of the nonloopback network interface. + InstanceAddress string `json:"instanceAddress" envconfig:"kuma_inter_cp_catalog_instance_address"` + // Interval on which CP will send heartbeat to a leader. + HeartbeatInterval config_types.Duration `json:"heartbeatInterval" envconfig:"kuma_inter_cp_catalog_heartbeat_interval"` + // Interval on which CP will write all instances to a catalog. + WriterInterval config_types.Duration `json:"writerInterval" envconfig:"kuma_inter_cp_catalog_writer_interval"` +} + +func (i *CatalogConfig) Validate() error { + if i.InstanceAddress != "" && !govalidator.IsDNSName(i.InstanceAddress) && !govalidator.IsIP(i.InstanceAddress) { + return errors.New(".InstanceAddress has to be valid IP or DNS address") + } + return nil +} + +type InterCpServerConfig struct { + // Port on which Intercommunication CP server will listen + Port uint16 `json:"port" envconfig:"kuma_inter_cp_server_port"` + // TlsMinVersion defines the minimum TLS version to be used + TlsMinVersion string `json:"tlsMinVersion" envconfig:"kuma_inter_cp_server_tls_min_version"` + // TlsMaxVersion defines the maximum TLS version to be used + TlsMaxVersion string `json:"tlsMaxVersion" envconfig:"kuma_inter_cp_server_tls_max_version"` + // TlsCipherSuites defines the list of ciphers to use + TlsCipherSuites []string `json:"tlsCipherSuites" envconfig:"kuma_inter_cp_server_tls_cipher_suites"` +} + +func (i *InterCpServerConfig) Validate() (errs error) { + if i.Port == 0 { + errs = multierr.Append(errs, errors.New(".Port cannot be zero")) + } + if _, err := config_types.TLSVersion(i.TlsMinVersion); err != nil { + errs = multierr.Append(errs, errors.New(".TlsMinVersion "+err.Error())) + } + if _, err := config_types.TLSVersion(i.TlsMaxVersion); err != nil { + errs = multierr.Append(errs, errors.New(".TlsMaxVersion "+err.Error())) + } + if _, err := config_types.TLSCiphers(i.TlsCipherSuites); err != nil { + errs = multierr.Append(errs, errors.New(".TlsCipherSuites "+err.Error())) + } + return +} diff --git a/pkg/config/loader_test.go b/pkg/config/loader_test.go index 85cc0f1c5c53..81ba36bd3331 100644 --- a/pkg/config/loader_test.go +++ b/pkg/config/loader_test.go @@ -280,6 +280,14 @@ var _ = Describe("Config loader", func() { Expect(cfg.DpServer.Hds.CheckDefaults.HealthyThreshold).To(Equal(uint32(8))) Expect(cfg.DpServer.Hds.CheckDefaults.UnhealthyThreshold).To(Equal(uint32(9))) + Expect(cfg.InterCp.Catalog.InstanceAddress).To(Equal("192.168.0.1")) + Expect(cfg.InterCp.Catalog.HeartbeatInterval.Duration).To(Equal(time.Second)) + Expect(cfg.InterCp.Catalog.WriterInterval.Duration).To(Equal(2 * time.Second)) + Expect(cfg.InterCp.Server.Port).To(Equal(uint16(15683))) + Expect(cfg.InterCp.Server.TlsMinVersion).To(Equal("TLSv1_3")) + Expect(cfg.InterCp.Server.TlsMaxVersion).To(Equal("TLSv1_3")) + Expect(cfg.InterCp.Server.TlsCipherSuites).To(Equal([]string{"TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_AES_256_GCM_SHA384"})) + Expect(cfg.Access.Type).To(Equal("custom-rbac")) Expect(cfg.Access.Static.AdminResources.Users).To(Equal([]string{"ar-admin1", "ar-admin2"})) Expect(cfg.Access.Static.AdminResources.Groups).To(Equal([]string{"ar-group1", "ar-group2"})) @@ -540,6 +548,16 @@ dpServer: noTrafficInterval: 7s healthyThreshold: 8 unhealthyThreshold: 9 +interCp: + catalog: + instanceAddress: "192.168.0.1" + heartbeatInterval: 1s + writerInterval: 2s + server: + port: 15683 + tlsMinVersion: "TLSv1_3" + tlsMaxVersion: "TLSv1_3" + tlsCipherSuites: ["TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_AES_256_GCM_SHA384"] access: type: custom-rbac static: @@ -750,6 +768,13 @@ proxy: "KUMA_DP_SERVER_HDS_CHECK_NO_TRAFFIC_INTERVAL": "7s", "KUMA_DP_SERVER_HDS_CHECK_HEALTHY_THRESHOLD": "8", "KUMA_DP_SERVER_HDS_CHECK_UNHEALTHY_THRESHOLD": "9", + "KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS": "192.168.0.1", + "KUMA_INTER_CP_CATALOG_HEARTBEAT_INTERVAL": "1s", + "KUMA_INTER_CP_CATALOG_WRITER_INTERVAL": "2s", + "KUMA_INTER_CP_SERVER_PORT": "15683", + "KUMA_INTER_CP_SERVER_TLS_MIN_VERSION": "TLSv1_3", + "KUMA_INTER_CP_SERVER_TLS_MAX_VERSION": "TLSv1_3", + "KUMA_INTER_CP_SERVER_TLS_CIPHER_SUITES": "TLS_RSA_WITH_AES_128_CBC_SHA,TLS_AES_256_GCM_SHA384", "KUMA_ACCESS_TYPE": "custom-rbac", "KUMA_ACCESS_STATIC_ADMIN_RESOURCES_USERS": "ar-admin1,ar-admin2", "KUMA_ACCESS_STATIC_ADMIN_RESOURCES_GROUPS": "ar-group1,ar-group2", diff --git a/pkg/core/bootstrap/autoconfig.go b/pkg/core/bootstrap/autoconfig.go index 2191f95713dd..36c2d78c630a 100644 --- a/pkg/core/bootstrap/autoconfig.go +++ b/pkg/core/bootstrap/autoconfig.go @@ -33,6 +33,9 @@ func autoconfigure(cfg *kuma_cp.Config) error { } autoconfigureServersTLS(cfg) autoconfigBootstrapXdsParams(cfg) + if err := autoconfigureInterCp(cfg); err != nil { + return errors.Wrap(err, "could not autoconfigure Inter CP config") + } return nil } @@ -86,6 +89,7 @@ func autoconfigureServersTLS(cfg *kuma_cp.Config) { cfg.DpServer.TlsMinVersion = cfg.General.TlsMinVersion cfg.ApiServer.HTTPS.TlsMinVersion = cfg.General.TlsMinVersion cfg.MonitoringAssignmentServer.TlsMinVersion = cfg.General.TlsMinVersion + cfg.InterCp.Server.TlsMinVersion = cfg.General.TlsMinVersion } if cfg.General.TlsMaxVersion != "" { cfg.Diagnostics.TlsMaxVersion = cfg.General.TlsMaxVersion @@ -93,6 +97,7 @@ func autoconfigureServersTLS(cfg *kuma_cp.Config) { cfg.DpServer.TlsMaxVersion = cfg.General.TlsMaxVersion cfg.ApiServer.HTTPS.TlsMaxVersion = cfg.General.TlsMaxVersion cfg.MonitoringAssignmentServer.TlsMaxVersion = cfg.General.TlsMaxVersion + cfg.InterCp.Server.TlsMaxVersion = cfg.General.TlsMaxVersion } if len(cfg.General.TlsCipherSuites) > 0 { cfg.Diagnostics.TlsCipherSuites = cfg.General.TlsCipherSuites @@ -100,6 +105,7 @@ func autoconfigureServersTLS(cfg *kuma_cp.Config) { cfg.DpServer.TlsCipherSuites = cfg.General.TlsCipherSuites cfg.ApiServer.HTTPS.TlsCipherSuites = cfg.General.TlsCipherSuites cfg.MonitoringAssignmentServer.TlsCipherSuites = cfg.General.TlsCipherSuites + cfg.InterCp.Server.TlsCipherSuites = cfg.General.TlsCipherSuites } } @@ -223,3 +229,21 @@ func saveKeyPair(pair tls.KeyPair, dir workDir) (string, string, error) { return crtFile.Name(), keyFile.Name(), nil } + +func autoconfigureInterCp(cfg *kuma_cp.Config) error { + if cfg.InterCp.Catalog.InstanceAddress != "" { + return nil + } + ips, err := util_net.GetAllIPs(util_net.NonLoopback) + if err != nil { + return errors.Wrap(err, "could not list all IPs of the machine") + } + if len(ips) == 0 { + return errors.New("there is 0 non-loopback interfaces on the machine. Set KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS explicitly.") + } + if len(ips) > 1 { + log.Info("there are multiple non-loopback interfaces on the machine. It is recommended to set KUMA_INTER_CP_CATALOG_INSTANCE_ADDRESS explicitly to set IP on which other control plane instances in the cluster can communicate with this instance.") + } + cfg.InterCp.Catalog.InstanceAddress = ips[0] + return nil +} diff --git a/pkg/core/bootstrap/autoconfig_test.go b/pkg/core/bootstrap/autoconfig_test.go index f7b4196f0905..7d3c9bbd60bc 100644 --- a/pkg/core/bootstrap/autoconfig_test.go +++ b/pkg/core/bootstrap/autoconfig_test.go @@ -23,4 +23,16 @@ var _ = Describe("Auto configuration", func() { // and Expect(cfg.BootstrapServer.Params.XdsPort).To(Equal(uint32(1234))) }) + + It("should fill intercp instance address", func() { + // given + cfg := kuma_cp.DefaultConfig() + + // when + err := autoconfigure(&cfg) + + // then + Expect(err).ToNot(HaveOccurred()) + Expect(cfg.InterCp.Catalog.InstanceAddress).ToNot(BeEmpty()) + }) }) diff --git a/pkg/envoy/admin/tls/pki.go b/pkg/envoy/admin/tls/pki.go index 39f64f8beb96..d1181981a9f9 100644 --- a/pkg/envoy/admin/tls/pki.go +++ b/pkg/envoy/admin/tls/pki.go @@ -68,7 +68,7 @@ func CreateCA(ctx context.Context, keyPair util_tls.KeyPair, resManager manager. func GenerateClientCert(ca tls.Certificate) (util_tls.KeyPair, error) { rootCert, err := x509.ParseCertificate(ca.Certificate[0]) if err != nil { - return util_tls.KeyPair{}, nil + return util_tls.KeyPair{}, err } return util_tls.NewCert(*rootCert, ca.PrivateKey.(*rsa.PrivateKey), ClientCertSAN, util_tls.ClientCertType, util_tls.DefaultKeyType, ClientCertSAN) } @@ -76,7 +76,7 @@ func GenerateClientCert(ca tls.Certificate) (util_tls.KeyPair, error) { func GenerateServerCert(ca tls.Certificate, ip string) (util_tls.KeyPair, error) { rootCert, err := x509.ParseCertificate(ca.Certificate[0]) if err != nil { - return util_tls.KeyPair{}, nil + return util_tls.KeyPair{}, err } return util_tls.NewCert(*rootCert, ca.PrivateKey.(*rsa.PrivateKey), ip, util_tls.ServerCertType, util_tls.DefaultKeyType, ip) } diff --git a/pkg/intercp/catalog/catalog.go b/pkg/intercp/catalog/catalog.go new file mode 100644 index 000000000000..a6a2b6decb55 --- /dev/null +++ b/pkg/intercp/catalog/catalog.go @@ -0,0 +1,48 @@ +package catalog + +import ( + "context" + "fmt" + + "github.com/pkg/errors" +) + +type Instance struct { + Id string `json:"id"` + Address string `json:"address"` + InterCpPort uint16 `json:"interCpPort"` + Leader bool `json:"leader"` +} + +func (i Instance) InterCpURL() string { + return fmt.Sprintf("grpcs://%s:%d", i.Address, i.InterCpPort) +} + +type Catalog interface { + Instances(context.Context) ([]Instance, error) + Replace(context.Context, []Instance) (bool, error) + ReplaceLeader(context.Context, Instance) error +} + +var ErrNoLeader = errors.New("leader not found") + +func Leader(ctx context.Context, catalog Catalog) (Instance, error) { + instances, err := catalog.Instances(ctx) + if err != nil { + return Instance{}, err + } + for _, instance := range instances { + if instance.Leader { + return instance, nil + } + } + return Instance{}, ErrNoLeader +} + +type InstancesByID []Instance + +func (a InstancesByID) Len() int { return len(a) } +func (a InstancesByID) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a InstancesByID) Less(i, j int) bool { + return a[i].Id < a[j].Id +} diff --git a/pkg/intercp/catalog/catalog_test.go b/pkg/intercp/catalog/catalog_test.go new file mode 100644 index 000000000000..38ea95ab81e2 --- /dev/null +++ b/pkg/intercp/catalog/catalog_test.go @@ -0,0 +1,197 @@ +package catalog_test + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/kumahq/kuma/pkg/core/resources/manager" + "github.com/kumahq/kuma/pkg/intercp/catalog" + "github.com/kumahq/kuma/pkg/plugins/resources/memory" +) + +var _ = Describe("Catalog", func() { + + var c catalog.Catalog + + BeforeEach(func() { + store := memory.NewStore() + resManager := manager.NewResourceManager(store) + c = catalog.NewConfigCatalog(resManager) + }) + + Context("Replace", func() { + instances := []catalog.Instance{ + { + Id: "instance-1", + Leader: true, + }, + } + + BeforeEach(func() { + _, err := c.Replace(context.Background(), instances) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should replace existing instances", func() { + // when + instances := []catalog.Instance{ + { + Id: "instance-1", + Leader: false, + }, + { + Id: "instance-2", + Leader: true, + }, + } + updated, err := c.Replace(context.Background(), instances) + + // then + Expect(updated).To(BeTrue()) + Expect(err).ToNot(HaveOccurred()) + + readInstances, err := c.Instances(context.Background()) + Expect(readInstances).To(Equal(instances)) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should return false if replace did not replace instances", func() { + // when + updated, err := c.Replace(context.Background(), instances) + + // then + Expect(updated).To(BeFalse()) + Expect(err).ToNot(HaveOccurred()) + }) + }) + + Context("ReplaceLeader", func() { + leader := catalog.Instance{ + Id: "leader-1", + Leader: true, + } + + It("should replace leader when catalog is empty", func() { + // when + err := c.ReplaceLeader(context.Background(), leader) + + // then + Expect(err).ToNot(HaveOccurred()) + + readInstances, err := c.Instances(context.Background()) + Expect(err).ToNot(HaveOccurred()) + Expect(readInstances).To(HaveLen(1)) + Expect(readInstances[0]).To(Equal(leader)) + }) + + It("should replace leader when there is another leader", func() { + // given + instances := []catalog.Instance{ + { + Id: "instance-1", + Leader: true, + }, + { + Id: "leader-1", + Leader: false, + }, + } + _, err := c.Replace(context.Background(), instances) + Expect(err).ToNot(HaveOccurred()) + + // when + err = c.ReplaceLeader(context.Background(), leader) + + // then + Expect(err).ToNot(HaveOccurred()) + + readInstances, err := c.Instances(context.Background()) + Expect(err).ToNot(HaveOccurred()) + Expect(readInstances).To(HaveLen(2)) + Expect(readInstances[0].Id).To(Equal("instance-1")) + Expect(readInstances[0].Leader).To(BeFalse()) + Expect(readInstances[1].Id).To(Equal("leader-1")) + Expect(readInstances[1].Leader).To(BeTrue()) + }) + + It("should replace leader when the new leader is not on the list", func() { + // given + instances := []catalog.Instance{ + { + Id: "instance-1", + Leader: true, + }, + } + _, err := c.Replace(context.Background(), instances) + Expect(err).ToNot(HaveOccurred()) + + // when + err = c.ReplaceLeader(context.Background(), leader) + + // then + Expect(err).ToNot(HaveOccurred()) + + readInstances, err := c.Instances(context.Background()) + Expect(err).ToNot(HaveOccurred()) + Expect(readInstances).To(HaveLen(2)) + Expect(readInstances[0].Id).To(Equal("instance-1")) + Expect(readInstances[0].Leader).To(BeFalse()) + Expect(readInstances[1].Id).To(Equal("leader-1")) + Expect(readInstances[1].Leader).To(BeTrue()) + }) + }) + + Context("Leader", func() { + It("should return a leader if there is a leader in the list", func() { + // given + instances := []catalog.Instance{ + { + Id: "instance-1", + Leader: false, + }, + { + Id: "instance-2", + Leader: true, + }, + } + _, err := c.Replace(context.Background(), instances) + Expect(err).ToNot(HaveOccurred()) + + // when + leader, err := catalog.Leader(context.Background(), c) + + // then + Expect(err).ToNot(HaveOccurred()) + Expect(leader.Id).To(Equal("instance-2")) + }) + + It("should return an error if there is no leader", func() { + // given + instances := []catalog.Instance{ + { + Id: "instance-1", + Leader: false, + }, + } + _, err := c.Replace(context.Background(), instances) + Expect(err).ToNot(HaveOccurred()) + + // when + _, err = catalog.Leader(context.Background(), c) + + // then + Expect(err).To(Equal(catalog.ErrNoLeader)) + }) + }) + + It("should return empty instances if the catalog was never updated", func() { + // when + instances, err := c.Instances(context.Background()) + + // then + Expect(err).ToNot(HaveOccurred()) + Expect(instances).To(HaveLen(0)) + }) +}) diff --git a/pkg/intercp/catalog/client.go b/pkg/intercp/catalog/client.go new file mode 100644 index 000000000000..0b961b143202 --- /dev/null +++ b/pkg/intercp/catalog/client.go @@ -0,0 +1,26 @@ +package catalog + +import ( + "google.golang.org/grpc" + + "github.com/kumahq/kuma/api/system/v1alpha1" +) + +type Client interface { + Close() error + v1alpha1.InterCpPingServiceClient +} + +type grpcClient struct { + v1alpha1.InterCpPingServiceClient + *grpc.ClientConn +} + +var _ Client = &grpcClient{} + +func NewGRPCClient(conn *grpc.ClientConn) Client { + return &grpcClient{ + InterCpPingServiceClient: v1alpha1.NewInterCpPingServiceClient(conn), + ClientConn: conn, + } +} diff --git a/pkg/intercp/catalog/config_catalog.go b/pkg/intercp/catalog/config_catalog.go new file mode 100644 index 000000000000..ab0febc0519e --- /dev/null +++ b/pkg/intercp/catalog/config_catalog.go @@ -0,0 +1,102 @@ +package catalog + +import ( + "context" + "encoding/json" + "sort" + + system_proto "github.com/kumahq/kuma/api/system/v1alpha1" + "github.com/kumahq/kuma/pkg/core/resources/apis/system" + "github.com/kumahq/kuma/pkg/core/resources/manager" + "github.com/kumahq/kuma/pkg/core/resources/model" + "github.com/kumahq/kuma/pkg/core/resources/store" +) + +type ConfigInstances struct { + Instances []Instance `json:"instances"` +} + +var CatalogKey = model.ResourceKey{ + Name: "cp-catalog", +} + +type ConfigCatalog struct { + resManager manager.ResourceManager +} + +var _ Catalog = &ConfigCatalog{} + +func NewConfigCatalog(resManager manager.ResourceManager) Catalog { + return &ConfigCatalog{ + resManager: resManager, + } +} + +func (c *ConfigCatalog) Instances(ctx context.Context) ([]Instance, error) { + cfg := system.NewConfigResource() + if err := c.resManager.Get(ctx, cfg, store.GetBy(CatalogKey)); err != nil { + if store.IsResourceNotFound(err) { + return []Instance{}, nil + } + return nil, err + } + var instances ConfigInstances + if err := json.Unmarshal([]byte(cfg.Spec.Config), &instances); err != nil { + return nil, err + } + return instances.Instances, nil +} + +func (c *ConfigCatalog) Replace(ctx context.Context, instances []Instance) (bool, error) { + sort.Stable(InstancesByID(instances)) + bytes, err := json.Marshal(ConfigInstances{ + Instances: instances, + }) + if err != nil { + return false, nil + } + var newConfig = string(bytes) + var updated bool + err = manager.Upsert(ctx, c.resManager, CatalogKey, system.NewConfigResource(), func(resource model.Resource) error { + if resource.(*system.ConfigResource).Spec.GetConfig() != newConfig { + resource.(*system.ConfigResource).Spec = &system_proto.Config{ + Config: newConfig, + } + updated = true + } + return nil + }) + return updated, err +} + +func (c *ConfigCatalog) ReplaceLeader(ctx context.Context, leader Instance) error { + return manager.Upsert(ctx, c.resManager, CatalogKey, system.NewConfigResource(), func(resource model.Resource) error { + instances := &ConfigInstances{} + if cfg := resource.(*system.ConfigResource).Spec.GetConfig(); cfg != "" { + if err := json.Unmarshal([]byte(cfg), instances); err != nil { + return err + } + } + leaderFound := false + for i, instance := range instances.Instances { + instance.Leader = false + if instance.Id == leader.Id { + instance.Leader = true + leaderFound = true + } + instances.Instances[i] = instance + } + if !leaderFound { + instances.Instances = append(instances.Instances, leader) + sort.Stable(InstancesByID(instances.Instances)) + } + bytes, err := json.Marshal(instances) + if err != nil { + return err + } + resource.(*system.ConfigResource).Spec = &system_proto.Config{ + Config: string(bytes), + } + return nil + }) +} diff --git a/pkg/intercp/catalog/discovery_suite_test.go b/pkg/intercp/catalog/discovery_suite_test.go new file mode 100644 index 000000000000..aeb4c7754010 --- /dev/null +++ b/pkg/intercp/catalog/discovery_suite_test.go @@ -0,0 +1,11 @@ +package catalog + +import ( + "testing" + + "github.com/kumahq/kuma/pkg/test" +) + +func TestCatalog(t *testing.T) { + test.RunSpecs(t, "Catalog Suite") +} diff --git a/pkg/intercp/catalog/heartbeat_component.go b/pkg/intercp/catalog/heartbeat_component.go new file mode 100644 index 000000000000..351b6340c8f9 --- /dev/null +++ b/pkg/intercp/catalog/heartbeat_component.go @@ -0,0 +1,127 @@ +package catalog + +import ( + "context" + "time" + + "github.com/pkg/errors" + + "github.com/kumahq/kuma/api/system/v1alpha1" + "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/core/runtime/component" + "github.com/kumahq/kuma/pkg/core/user" +) + +var heartbeatLog = core.Log.WithName("intercp").WithName("catalog").WithName("heartbeat") + +type heartbeatComponent struct { + catalog Catalog + newClientFn NewClientFn + request *v1alpha1.PingRequest + interval time.Duration + + leader *Instance + client Client +} + +var _ component.Component = &heartbeatComponent{} + +type NewClientFn = func(url string) (Client, error) + +func NewHeartbeatComponent( + catalog Catalog, + instance Instance, + interval time.Duration, + newClientFn NewClientFn, +) component.Component { + return &heartbeatComponent{ + catalog: catalog, + request: &v1alpha1.PingRequest{ + InstanceId: instance.Id, + Address: instance.Address, + InterCpPort: uint32(instance.InterCpPort), + }, + newClientFn: newClientFn, + interval: interval, + } +} + +func (h *heartbeatComponent) Start(stop <-chan struct{}) error { + heartbeatLog.Info("starting heartbeats to a leader") + ctx := user.Ctx(context.Background(), user.ControlPlane) + ticker := time.NewTicker(h.interval) + + for { + select { + case <-ticker.C: + if err := h.heartbeat(ctx, true); err != nil { + h.leader = nil + heartbeatLog.Error(err, "could not heartbeat the leader") + } + case <-stop: + // send final heartbeat to gracefully signal that the instance is going down + if err := h.heartbeat(ctx, false); err != nil { + h.leader = nil + heartbeatLog.Error(err, "could not heartbeat the leader") + } + return nil + } + } +} + +func (h *heartbeatComponent) heartbeat(ctx context.Context, ready bool) error { + if h.leader == nil { + if err := h.connectToLeader(ctx); err != nil { + return err + } + } + if h.leader.Id == h.request.InstanceId { + heartbeatLog.V(1).Info("this instance is a leader. No need to send a heartbeat.") + return nil + } + heartbeatLog.Info("sending a heartbeat to a leader", + "instanceId", h.request.InstanceId, + "leaderAddress", h.leader.Address, + "ready", ready, + ) + h.request.Ready = ready + resp, err := h.client.Ping(ctx, h.request) + if err != nil { + return errors.Wrap(err, "could not send a heartbeat to a leader") + } + if !resp.Leader { + heartbeatLog.Info("instance responded that it is no longer a leader") + h.leader = nil + } + return nil +} + +func (h *heartbeatComponent) connectToLeader(ctx context.Context) error { + newLeader, err := Leader(ctx, h.catalog) + if err != nil { + return err + } + h.leader = &newLeader + if h.leader.Id == h.request.InstanceId { + return nil + } + heartbeatLog.Info("leader has changed. Closing connection to the old leader. Creating connection to the new leader.", + "previousLeaderAddress", h.leader.Address, + "newLeaderAddress", newLeader.Leader, + ) + if h.client != nil { + if err := h.client.Close(); err != nil { + heartbeatLog.Error(err, "could not close a client to a previous leader") + // continue anyway. This is a better fallback of this error, because the component would restart and create new client anyways. + } + } + h.client, err = h.newClientFn(h.leader.InterCpURL()) + if err != nil { + return errors.Wrap(err, "could not create a client to a leader") + } + return nil +} + +func (h *heartbeatComponent) NeedLeaderElection() bool { + return false +} diff --git a/pkg/intercp/catalog/heartbeat_component_test.go b/pkg/intercp/catalog/heartbeat_component_test.go new file mode 100644 index 000000000000..a43dcd345563 --- /dev/null +++ b/pkg/intercp/catalog/heartbeat_component_test.go @@ -0,0 +1,189 @@ +package catalog_test + +import ( + "context" + "sync" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "google.golang.org/grpc" + + system_proto "github.com/kumahq/kuma/api/system/v1alpha1" + "github.com/kumahq/kuma/pkg/core/resources/manager" + "github.com/kumahq/kuma/pkg/core/runtime/component" + "github.com/kumahq/kuma/pkg/intercp/catalog" + "github.com/kumahq/kuma/pkg/plugins/resources/memory" +) + +var _ = Describe("Heartbeats", func() { + + var heartbeatComponent component.Component + var stopCh chan struct{} + + var pingClient *staticPingClient + var c catalog.Catalog + + currentInstance := catalog.Instance{ + Id: "instance-1", + Address: "10.10.10.1", + InterCpPort: 5679, + Leader: false, + } + + BeforeEach(func() { + store := memory.NewStore() + resManager := manager.NewResourceManager(store) + c = catalog.NewConfigCatalog(resManager) + pingClient = &staticPingClient{ + leader: true, + } + + heartbeatComponent = catalog.NewHeartbeatComponent( + c, + currentInstance, + 10*time.Millisecond, + func(serverURL string) (catalog.Client, error) { + pingClient.SetServerURL(serverURL) + return pingClient, nil + }, + ) + + stopCh = make(chan struct{}) + go func() { + defer GinkgoRecover() + err := heartbeatComponent.Start(stopCh) + Expect(err).ToNot(HaveOccurred()) + }() + }) + + AfterEach(func() { + close(stopCh) + }) + + It("should connect to a leader once we have a leader in the catalog", func() { + // given + instances := []catalog.Instance{ + { + Id: "instance-leader", + Address: "192.168.0.1", + InterCpPort: 1234, + Leader: true, + }, + } + + // when + _, err := c.Replace(context.Background(), instances) + Expect(err).ToNot(HaveOccurred()) + + // then + Eventually(func(g Gomega) { + received := pingClient.Received() + g.Expect(received).ToNot(BeNil()) + g.Expect(received.InstanceId).To(Equal(currentInstance.Id)) + g.Expect(received.Address).To(Equal(currentInstance.Address)) + g.Expect(received.InterCpPort).To(Equal(uint32(currentInstance.InterCpPort))) + }, "10s", "100ms").Should(Succeed()) + }) + + It("should reconnect to a leader when there is a leader change", func() { + // given + instances := []catalog.Instance{ + { + Id: "instance-leader", + Address: "192.168.0.1", + InterCpPort: 1234, + Leader: true, + }, + } + _, err := c.Replace(context.Background(), instances) + Expect(err).ToNot(HaveOccurred()) + Eventually(func(g Gomega) { + received := pingClient.Received() + g.Expect(received).ToNot(BeNil()) + g.Expect(pingClient.ServerURL()).To(Equal("grpcs://192.168.0.1:1234")) + }, "10s", "100ms").Should(Succeed()) + + // when + pingClient.SetLeaderResponse(false) + + // and + instances = []catalog.Instance{ + { + Id: "instance-leader-2", + Address: "192.168.0.2", + InterCpPort: 1234, + Leader: true, + }, + } + _, err = c.Replace(context.Background(), instances) + Expect(err).ToNot(HaveOccurred()) + + // then + Eventually(func(g Gomega) { + received := pingClient.Received() + g.Expect(received).ToNot(BeNil()) + g.Expect(pingClient.ServerURL()).To(Equal("grpcs://192.168.0.2:1234")) + }, "10s", "100ms").Should(Succeed()) + }) + + It("should not send heartbeat if the instance is a leader", func() { + // given + instance := currentInstance + instance.Leader = true + _, err := c.Replace(context.Background(), []catalog.Instance{instance}) + Expect(err).ToNot(HaveOccurred()) + + // then + Consistently(func(g Gomega) { + g.Expect(pingClient.ServerURL()).To(BeEmpty()) + g.Expect(pingClient.Received()).To(BeNil()) + }, "1s", "100ms").Should(Succeed()) + }) +}) + +type staticPingClient struct { + received *system_proto.PingRequest + serverURL string + leader bool + sync.Mutex +} + +var _ catalog.Client = &staticPingClient{} + +func (s *staticPingClient) Close() error { + return nil +} + +func (s *staticPingClient) Ping(ctx context.Context, in *system_proto.PingRequest, opts ...grpc.CallOption) (*system_proto.PingResponse, error) { + s.Lock() + defer s.Unlock() + s.received = in + return &system_proto.PingResponse{ + Leader: s.leader, + }, nil +} + +func (s *staticPingClient) Received() *system_proto.PingRequest { + s.Lock() + defer s.Unlock() + return s.received +} + +func (s *staticPingClient) SetLeaderResponse(leader bool) { + s.Lock() + defer s.Unlock() + s.leader = leader +} + +func (s *staticPingClient) SetServerURL(serverURL string) { + s.Lock() + defer s.Unlock() + s.serverURL = serverURL +} + +func (s *staticPingClient) ServerURL() string { + s.Lock() + defer s.Unlock() + return s.serverURL +} diff --git a/pkg/intercp/catalog/heartbeats.go b/pkg/intercp/catalog/heartbeats.go new file mode 100644 index 000000000000..f2aceda0a162 --- /dev/null +++ b/pkg/intercp/catalog/heartbeats.go @@ -0,0 +1,40 @@ +package catalog + +import ( + "sync" +) + +type Heartbeats struct { + instances map[Instance]struct{} + sync.Mutex +} + +func NewHeartbeats() *Heartbeats { + return &Heartbeats{ + instances: map[Instance]struct{}{}, + } +} + +func (h *Heartbeats) ResetAndCollect() []Instance { + h.Lock() + currentInstances := h.instances + h.instances = map[Instance]struct{}{} + h.Unlock() + var instances []Instance + for k := range currentInstances { + instances = append(instances, k) + } + return instances +} + +func (h *Heartbeats) Add(instance Instance) { + h.Lock() + h.instances[instance] = struct{}{} + h.Unlock() +} + +func (h *Heartbeats) Remove(instance Instance) { + h.Lock() + delete(h.instances, instance) + h.Unlock() +} diff --git a/pkg/intercp/catalog/heartbeats_test.go b/pkg/intercp/catalog/heartbeats_test.go new file mode 100644 index 000000000000..1eae4e12c4e5 --- /dev/null +++ b/pkg/intercp/catalog/heartbeats_test.go @@ -0,0 +1,39 @@ +package catalog_test + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/kumahq/kuma/pkg/intercp/catalog" +) + +var _ = Describe("Heartbeats", func() { + + var heartbeats *catalog.Heartbeats + + BeforeEach(func() { + heartbeats = catalog.NewHeartbeats() + }) + + It("should remove heartbeats on collection", func() { + // given + instance := catalog.Instance{ + Id: "instance-1", + } + heartbeats.Add(instance) + + // when + instances := heartbeats.ResetAndCollect() + + // then + Expect(instances).To(HaveLen(1)) + Expect(instances[0]).To(Equal(instance)) + + // when + instances = heartbeats.ResetAndCollect() + + // then + Expect(instances).To(BeEmpty()) + }) + +}) diff --git a/pkg/intercp/catalog/server.go b/pkg/intercp/catalog/server.go new file mode 100644 index 000000000000..7e532285e4f0 --- /dev/null +++ b/pkg/intercp/catalog/server.go @@ -0,0 +1,45 @@ +package catalog + +import ( + "context" + + system_proto "github.com/kumahq/kuma/api/system/v1alpha1" + "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/core/runtime/component" +) + +var serverLog = core.Log.WithName("intercp").WithName("catalog").WithName("server") + +type server struct { + heartbeats *Heartbeats + leaderInfo component.LeaderInfo + + system_proto.UnimplementedInterCpPingServiceServer +} + +var _ system_proto.InterCpPingServiceServer = &server{} + +func NewServer(heartbeats *Heartbeats, leaderInfo component.LeaderInfo) system_proto.InterCpPingServiceServer { + return &server{ + heartbeats: heartbeats, + leaderInfo: leaderInfo, + } +} + +func (s *server) Ping(_ context.Context, request *system_proto.PingRequest) (*system_proto.PingResponse, error) { + serverLog.Info("received ping", "instanceID", request.InstanceId, "address", request.Address, "ready", request.Ready) + instance := Instance{ + Id: request.InstanceId, + Address: request.Address, + InterCpPort: uint16(request.InterCpPort), + Leader: false, + } + if request.Ready { + s.heartbeats.Add(instance) + } else { + s.heartbeats.Remove(instance) + } + return &system_proto.PingResponse{ + Leader: s.leaderInfo.IsLeader(), + }, nil +} diff --git a/pkg/intercp/catalog/server_test.go b/pkg/intercp/catalog/server_test.go new file mode 100644 index 000000000000..f1aae2255bd5 --- /dev/null +++ b/pkg/intercp/catalog/server_test.go @@ -0,0 +1,91 @@ +package catalog_test + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "google.golang.org/protobuf/proto" + + system_proto "github.com/kumahq/kuma/api/system/v1alpha1" + "github.com/kumahq/kuma/pkg/intercp/catalog" +) + +var _ = Describe("Server", func() { + + var heartbeats *catalog.Heartbeats + var leaderInfo *staticLeaderInfo + var server system_proto.InterCpPingServiceServer + + BeforeEach(func() { + heartbeats = catalog.NewHeartbeats() + leaderInfo = &staticLeaderInfo{} + server = catalog.NewServer(heartbeats, leaderInfo) + }) + + request := &system_proto.PingRequest{ + InstanceId: "instance-1", + Address: "192.168.0.1", + InterCpPort: 1234, + Ready: true, + } + + It("should add instance to heartbeats", func() { + // when + _, err := server.Ping(context.Background(), request) + + // then + Expect(err).ToNot(HaveOccurred()) + instances := heartbeats.ResetAndCollect() + Expect(instances).To(HaveLen(1)) + Expect(instances[0].Id).To(Equal(request.InstanceId)) + Expect(instances[0].Address).To(Equal(request.Address)) + Expect(instances[0].InterCpPort).To(Equal(uint16(request.InterCpPort))) + Expect(instances[0].Leader).To(BeFalse()) + }) + + It("should remove instance when it's not ready", func() { + // given + _, err := server.Ping(context.Background(), request) + Expect(err).ToNot(HaveOccurred()) + unhealthyReq := proto.Clone(request).(*system_proto.PingRequest) + unhealthyReq.Ready = false + + // when + _, err = server.Ping(context.Background(), unhealthyReq) + + // then + Expect(err).ToNot(HaveOccurred()) + instances := heartbeats.ResetAndCollect() + Expect(instances).To(BeEmpty()) + }) + + It("should respond with leader when instance is a leader", func() { + // when + leader, err := server.Ping(context.Background(), request) + + // then + Expect(err).ToNot(HaveOccurred()) + Expect(leader.Leader).To(BeFalse()) + }) + + It("should respond with non-leader when instance is a follower", func() { + // given + leaderInfo.leader = true + + // when + leader, err := server.Ping(context.Background(), request) + + // then + Expect(err).ToNot(HaveOccurred()) + Expect(leader.Leader).To(BeTrue()) + }) +}) + +type staticLeaderInfo struct { + leader bool +} + +func (s *staticLeaderInfo) IsLeader() bool { + return s.leader +} diff --git a/pkg/intercp/catalog/writer.go b/pkg/intercp/catalog/writer.go new file mode 100644 index 000000000000..13aa0644d01b --- /dev/null +++ b/pkg/intercp/catalog/writer.go @@ -0,0 +1,65 @@ +package catalog + +import ( + "context" + "time" + + "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/core/runtime/component" + "github.com/kumahq/kuma/pkg/core/user" +) + +var writerLog = core.Log.WithName("intercp").WithName("catalog").WithName("writer") + +type catalogWriter struct { + catalog Catalog + heartbeats *Heartbeats + instance Instance + interval time.Duration +} + +var _ component.Component = &catalogWriter{} + +func NewWriter(catalog Catalog, heartbeats *Heartbeats, instance Instance, interval time.Duration) component.Component { + leaderInstance := instance + leaderInstance.Leader = true + return &catalogWriter{ + catalog: catalog, + heartbeats: heartbeats, + instance: leaderInstance, + interval: interval, + } +} + +func (r *catalogWriter) Start(stop <-chan struct{}) error { + heartbeatLog.Info("starting catalog writer") + ctx := user.Ctx(context.Background(), user.ControlPlane) + writerLog.Info("replacing a leader in the catalog") + if err := r.catalog.ReplaceLeader(ctx, r.instance); err != nil { + writerLog.Error(err, "could not replace leader") // continue, it will be replaced in ticker anyways + } + ticker := time.NewTicker(r.interval) + for { + select { + case <-ticker.C: + instances := r.heartbeats.ResetAndCollect() + instances = append(instances, r.instance) + updated, err := r.catalog.Replace(ctx, instances) + if err != nil { + writerLog.Error(err, "could not update catalog") + continue + } + if updated { + writerLog.Info("instances catalog updated", "instances", instances) + } else { + writerLog.V(1).Info("no need to update instances, because the catalog is the same", "instances", instances) + } + case <-stop: + return nil + } + } +} + +func (r *catalogWriter) NeedLeaderElection() bool { + return true +} diff --git a/pkg/intercp/catalog/writer_test.go b/pkg/intercp/catalog/writer_test.go new file mode 100644 index 000000000000..45e38b89afdb --- /dev/null +++ b/pkg/intercp/catalog/writer_test.go @@ -0,0 +1,95 @@ +package catalog_test + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/kumahq/kuma/pkg/core/resources/manager" + "github.com/kumahq/kuma/pkg/intercp/catalog" + "github.com/kumahq/kuma/pkg/plugins/resources/memory" +) + +var _ = Describe("Writer", func() { + + var c catalog.Catalog + var closeCh chan struct{} + var heartbeatCancelFunc context.CancelFunc + + leader := catalog.Instance{ + Id: "instance-2", + Address: "192.168.0.2", + InterCpPort: 1234, + Leader: true, + } + + instance := catalog.Instance{ + Id: "instance-1", + Address: "192.168.0.1", + InterCpPort: 1234, + } + + BeforeEach(func() { + store := memory.NewStore() + resManager := manager.NewResourceManager(store) + c = catalog.NewConfigCatalog(resManager) + heartbeats := catalog.NewHeartbeats() + closeCh = make(chan struct{}) + writer := catalog.NewWriter(c, heartbeats, leader, 100*time.Millisecond) + go func() { + defer GinkgoRecover() + Expect(writer.Start(closeCh)).To(Succeed()) + }() + + ctx, fn := context.WithCancel(context.Background()) + heartbeatCancelFunc = fn + go func() { + t := time.NewTicker(10 * time.Millisecond) + for { + select { + case <-t.C: + heartbeats.Add(instance) + case <-ctx.Done(): + return + } + } + }() + }) + + AfterEach(func() { + close(closeCh) + heartbeatCancelFunc() + }) + + It("should write to catalog", func() { + Eventually(func(g Gomega) { + instances, err := c.Instances(context.Background()) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(instances).To(HaveLen(2)) + g.Expect(instances[0]).To(Equal(instance)) + g.Expect(instances[1]).To(Equal(leader)) + }, "10s", "100ms").Should(Succeed()) + }) + + It("should remove instance from the catalog once hearth-beating stop", func() { + // given 2 instances in catalog + Eventually(func(g Gomega) { + instances, err := c.Instances(context.Background()) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(instances).To(HaveLen(2)) + }, "10s", "100ms").Should(Succeed()) + + // when + heartbeatCancelFunc() + + // then + Eventually(func(g Gomega) { + instances, err := c.Instances(context.Background()) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(instances).To(HaveLen(1)) + g.Expect(instances[0]).To(Equal(leader)) + }, "10s", "100ms").Should(Succeed()) + }) +}) diff --git a/pkg/intercp/client/client.go b/pkg/intercp/client/client.go new file mode 100644 index 000000000000..c327643ac24b --- /dev/null +++ b/pkg/intercp/client/client.go @@ -0,0 +1,43 @@ +package client + +import ( + "crypto/tls" + "crypto/x509" + "net/url" + + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" +) + +type TLSConfig struct { + CaCert x509.Certificate + ClientCert tls.Certificate +} + +func New(serverURL string, tlsCfg *TLSConfig) (*grpc.ClientConn, error) { + url, err := url.Parse(serverURL) + if err != nil { + return nil, err + } + var dialOpts []grpc.DialOption + switch url.Scheme { + case "grpc": // not used in production + dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) + case "grpcs": + tlsConfig := &tls.Config{} + if tlsCfg != nil { + cp := x509.NewCertPool() + cp.AddCert(&tlsCfg.CaCert) + tlsConfig.RootCAs = cp + tlsConfig.Certificates = []tls.Certificate{tlsCfg.ClientCert} + } else { + tlsConfig.InsecureSkipVerify = true + } + dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) + default: + return nil, errors.Errorf("unsupported scheme %q. Use one of %s", url.Scheme, []string{"grpc", "grpcs"}) + } + return grpc.Dial(url.Host, dialOpts...) +} diff --git a/pkg/intercp/components.go b/pkg/intercp/components.go new file mode 100644 index 000000000000..7e363b916353 --- /dev/null +++ b/pkg/intercp/components.go @@ -0,0 +1,119 @@ +package intercp + +import ( + "context" + "crypto/tls" + "crypto/x509" + "time" + + "github.com/pkg/errors" + "github.com/sethvargo/go-retry" + + "github.com/kumahq/kuma/api/system/v1alpha1" + "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/core/resources/manager" + "github.com/kumahq/kuma/pkg/core/runtime" + "github.com/kumahq/kuma/pkg/core/runtime/component" + "github.com/kumahq/kuma/pkg/core/user" + "github.com/kumahq/kuma/pkg/intercp/catalog" + "github.com/kumahq/kuma/pkg/intercp/client" + "github.com/kumahq/kuma/pkg/intercp/server" + intercp_tls "github.com/kumahq/kuma/pkg/intercp/tls" +) + +var log = core.Log.WithName("inter-cp") + +func Setup(rt runtime.Runtime) error { + cfg := rt.Config().InterCp + defaults := &intercp_tls.DefaultsComponent{ + ResManager: rt.ResourceManager(), + Log: log.WithName("defaults"), + } + + heartbeats := catalog.NewHeartbeats() + c := catalog.NewConfigCatalog(rt.ResourceManager()) + + instance := catalog.Instance{ + Id: rt.GetInstanceId(), + Address: cfg.Catalog.InstanceAddress, + InterCpPort: cfg.Server.Port, + } + + ctx := user.Ctx(context.Background(), user.ControlPlane) + registerComponent := component.ComponentFunc(func(stop <-chan struct{}) error { + certs, err := generateCerts(ctx, rt.ReadOnlyResourceManager(), cfg.Catalog.InstanceAddress) + if err != nil { + return errors.Wrap(err, "could not generate certificates to start inter-cp server") + } + + interCpServer, err := server.New(cfg.Server, rt.Metrics(), certs.server, certs.ca) + if err != nil { + return errors.Wrap(err, "could not start inter-cp server") + } + v1alpha1.RegisterInterCpPingServiceServer(interCpServer.GrpcServer(), catalog.NewServer(heartbeats, rt.LeaderInfo())) + + clientTLSConfig := client.TLSConfig{ + CaCert: certs.ca, + ClientCert: certs.client, + } + return rt.Add( + interCpServer, + catalog.NewHeartbeatComponent(c, instance, cfg.Catalog.HeartbeatInterval.Duration, func(serverURL string) (catalog.Client, error) { + conn, err := client.New(serverURL, &clientTLSConfig) + if err != nil { + return nil, errors.Wrap(err, "could not create inter-cp client") + } + return catalog.NewGRPCClient(conn), nil + }), + ) + }) + + return rt.Add( + defaults, + catalog.NewWriter(c, heartbeats, instance, cfg.Catalog.WriterInterval.Duration), + registerComponent, + ) +} + +type interCpCerts struct { + ca x509.Certificate + server tls.Certificate + client tls.Certificate +} + +func generateCerts(ctx context.Context, resManager manager.ReadOnlyResourceManager, instanceId string) (interCpCerts, error) { + backoff := retry.WithMaxRetries(300, retry.NewConstant(1*time.Second)) + var ca tls.Certificate + // we need to retry because the CA may not be created yet + err := retry.Do(ctx, backoff, func(ctx context.Context) error { + loadedCa, err := intercp_tls.LoadCA(ctx, resManager) + if err != nil { + return retry.RetryableError(err) + } + ca = loadedCa + return nil + }) + if err != nil { + return interCpCerts{}, err + } + if len(ca.Certificate) != 1 { + return interCpCerts{}, errors.New("there should be exactly one certificate") + } + caCert, err := x509.ParseCertificate(ca.Certificate[0]) + if err != nil { + return interCpCerts{}, err + } + serverCert, err := intercp_tls.GenerateServerCert(ca, instanceId) + if err != nil { + return interCpCerts{}, err + } + clientCert, err := intercp_tls.GenerateClientCert(ca, instanceId) + if err != nil { + return interCpCerts{}, err + } + return interCpCerts{ + ca: *caCert, + server: serverCert, + client: clientCert, + }, nil +} diff --git a/pkg/intercp/server/server.go b/pkg/intercp/server/server.go new file mode 100644 index 000000000000..33bf28223cec --- /dev/null +++ b/pkg/intercp/server/server.go @@ -0,0 +1,118 @@ +package server + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "net" + "net/http" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" + + "github.com/kumahq/kuma/pkg/config/intercp" + config_types "github.com/kumahq/kuma/pkg/config/types" + "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/core/runtime/component" + "github.com/kumahq/kuma/pkg/metrics" +) + +var log = core.Log.WithName("intercp-server") + +const ( + grpcMaxConcurrentStreams = 1000000 + grpcKeepAliveTime = 15 * time.Second +) + +type InterCpServer struct { + config intercp.InterCpServerConfig + grpcServer *grpc.Server +} + +var _ component.Component = &InterCpServer{} + +func New( + config intercp.InterCpServerConfig, + metrics metrics.Metrics, + certificate tls.Certificate, + caCert x509.Certificate, +) (*InterCpServer, error) { + grpcOptions := []grpc.ServerOption{ + grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams), + grpc.KeepaliveParams(keepalive.ServerParameters{ + Time: grpcKeepAliveTime, + Timeout: grpcKeepAliveTime, + }), + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: grpcKeepAliveTime, + PermitWithoutStream: true, + }), + } + + caPool := x509.NewCertPool() + caPool.AddCert(&caCert) + tlsCfg := &tls.Config{ + Certificates: []tls.Certificate{certificate}, + ClientAuth: tls.RequireAndVerifyClientCert, + ClientCAs: caPool, + } + var err error + if tlsCfg.MinVersion, err = config_types.TLSVersion(config.TlsMinVersion); err != nil { + return nil, err + } + if tlsCfg.MaxVersion, err = config_types.TLSVersion(config.TlsMaxVersion); err != nil { + return nil, err + } + if tlsCfg.CipherSuites, err = config_types.TLSCiphers(config.TlsCipherSuites); err != nil { + return nil, err + } + + grpcOptions = append(grpcOptions, grpc.Creds(credentials.NewTLS(tlsCfg))) + grpcOptions = append(grpcOptions, metrics.GRPCServerInterceptors()...) + grpcServer := grpc.NewServer(grpcOptions...) + + return &InterCpServer{ + config: config, + grpcServer: grpcServer, + }, nil +} + +func (d *InterCpServer) Start(stop <-chan struct{}) error { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", d.config.Port)) + if err != nil { + return err + } + + errChan := make(chan error) + go func() { + defer close(errChan) + if err := d.grpcServer.Serve(lis); err != nil { + if err != http.ErrServerClosed { + log.Error(err, "terminated with an error") + errChan <- err + return + } + } + log.Info("terminated normally") + }() + log.Info("starting", "interface", "0.0.0.0", "port", d.config.Port, "tls", true) + + select { + case <-stop: + log.Info("stopping") + d.grpcServer.GracefulStop() + return nil + case err := <-errChan: + return err + } +} + +func (d *InterCpServer) NeedLeaderElection() bool { + return false +} + +func (d *InterCpServer) GrpcServer() *grpc.Server { + return d.grpcServer +} diff --git a/pkg/intercp/tls/defaults.go b/pkg/intercp/tls/defaults.go new file mode 100644 index 000000000000..e3ac816b4fb8 --- /dev/null +++ b/pkg/intercp/tls/defaults.go @@ -0,0 +1,62 @@ +package tls + +import ( + "context" + "time" + + "github.com/go-logr/logr" + "github.com/pkg/errors" + "github.com/sethvargo/go-retry" + + "github.com/kumahq/kuma/pkg/core/resources/manager" + "github.com/kumahq/kuma/pkg/core/resources/store" + "github.com/kumahq/kuma/pkg/core/runtime/component" + "github.com/kumahq/kuma/pkg/core/user" +) + +type DefaultsComponent struct { + ResManager manager.ResourceManager + Log logr.Logger +} + +var _ component.Component = &DefaultsComponent{} + +func (e *DefaultsComponent) Start(stop <-chan struct{}) error { + ctx, cancelFn := context.WithCancel(user.Ctx(context.Background(), user.ControlPlane)) + go func() { + <-stop + cancelFn() + }() + return retry.Do(ctx, retry.WithMaxDuration(10*time.Minute, retry.NewConstant(5*time.Second)), func(ctx context.Context) error { + if err := e.ensureInterCpCaExist(ctx); err != nil { + e.Log.V(1).Info("could not ensure that Inter CP CA exists. Retrying.", "err", err) + return retry.RetryableError(err) + } + return nil + }) +} + +func (e DefaultsComponent) NeedLeaderElection() bool { + return true +} + +func (e *DefaultsComponent) ensureInterCpCaExist(ctx context.Context) error { + _, err := LoadCA(ctx, e.ResManager) + if err == nil { + e.Log.V(1).Info("Inter CP CA already exists. Skip creating Envoy Admin CA.") + return nil + } + if !store.IsResourceNotFound(err) { + return errors.Wrap(err, "error while loading admin client certificate") + } + e.Log.V(1).Info("trying to create Inter CP CA") + pair, err := GenerateCA() + if err != nil { + return errors.Wrap(err, "could not generate admin client certificate") + } + if err := CreateCA(ctx, *pair, e.ResManager); err != nil { + return errors.Wrap(err, "could not create admin client certificate") + } + e.Log.Info("Inter CP CA created") + return nil +} diff --git a/pkg/intercp/tls/defaults_test.go b/pkg/intercp/tls/defaults_test.go new file mode 100644 index 000000000000..ae1bb5024410 --- /dev/null +++ b/pkg/intercp/tls/defaults_test.go @@ -0,0 +1,57 @@ +package tls_test + +import ( + "context" + + "github.com/go-logr/logr" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/kumahq/kuma/pkg/core/resources/manager" + intercp_tls "github.com/kumahq/kuma/pkg/intercp/tls" + "github.com/kumahq/kuma/pkg/plugins/resources/memory" +) + +var _ = Describe("TLS", func() { + + var ch chan struct{} + var defaults intercp_tls.DefaultsComponent + var resManager manager.ResourceManager + + BeforeEach(func() { + resManager = manager.NewResourceManager(memory.NewStore()) + defaults = intercp_tls.DefaultsComponent{ + ResManager: resManager, + Log: logr.Discard(), + } + ch = make(chan struct{}) + }) + + AfterEach(func() { + close(ch) + }) + + It("should generate default CA", func() { + // when + Expect(defaults.Start(ch)).To(Succeed()) + + // then + _, err := intercp_tls.LoadCA(context.Background(), resManager) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should ignore creating CA when there is one already in place", func() { + // given + Expect(defaults.Start(ch)).To(Succeed()) + ca1, err := intercp_tls.LoadCA(context.Background(), resManager) + Expect(err).ToNot(HaveOccurred()) + + // when + Expect(defaults.Start(ch)).To(Succeed()) + + // then + ca2, err := intercp_tls.LoadCA(context.Background(), resManager) + Expect(err).ToNot(HaveOccurred()) + Expect(ca1).To(Equal(ca2)) + }) +}) diff --git a/pkg/intercp/tls/pki.go b/pkg/intercp/tls/pki.go new file mode 100644 index 000000000000..c03f0aba2885 --- /dev/null +++ b/pkg/intercp/tls/pki.go @@ -0,0 +1,75 @@ +package tls + +import ( + "context" + "crypto/rsa" + tls "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + + "google.golang.org/protobuf/types/known/wrapperspb" + + "github.com/kumahq/kuma/pkg/core/resources/apis/system" + "github.com/kumahq/kuma/pkg/core/resources/manager" + "github.com/kumahq/kuma/pkg/core/resources/model" + "github.com/kumahq/kuma/pkg/core/resources/store" + util_tls "github.com/kumahq/kuma/pkg/tls" +) + +var GlobalSecretKey = model.ResourceKey{ + Name: "inter-cp-ca", +} + +func GenerateCA() (*util_tls.KeyPair, error) { + subject := pkix.Name{ + Organization: []string{"Kuma"}, + OrganizationalUnit: []string{"Mesh"}, + CommonName: "Control Plane Intercommunication CA", + } + return util_tls.GenerateCA(util_tls.DefaultKeyType, subject) +} + +func LoadCA(ctx context.Context, resManager manager.ReadOnlyResourceManager) (tls.Certificate, error) { + globalSecret := system.NewGlobalSecretResource() + if err := resManager.Get(ctx, globalSecret, store.GetBy(GlobalSecretKey)); err != nil { + return tls.Certificate{}, err + } + bytes := globalSecret.Spec.GetData().GetValue() + certBlock, rest := pem.Decode(bytes) + keyBlock, _ := pem.Decode(rest) + return tls.X509KeyPair(pem.EncodeToMemory(certBlock), pem.EncodeToMemory(keyBlock)) +} + +func CreateCA(ctx context.Context, keyPair util_tls.KeyPair, resManager manager.ResourceManager) error { + bytes := append(keyPair.CertPEM, keyPair.KeyPEM...) + globalSecret := system.NewGlobalSecretResource() + globalSecret.Spec.Data = &wrapperspb.BytesValue{ + Value: bytes, + } + return resManager.Create(ctx, globalSecret, store.CreateBy(GlobalSecretKey)) +} + +func GenerateClientCert(ca tls.Certificate, ip string) (tls.Certificate, error) { + rootCert, err := x509.ParseCertificate(ca.Certificate[0]) + if err != nil { + return tls.Certificate{}, err + } + pair, err := util_tls.NewCert(*rootCert, ca.PrivateKey.(*rsa.PrivateKey), ip, util_tls.ClientCertType, util_tls.DefaultKeyType, ip) + if err != nil { + return tls.Certificate{}, err + } + return tls.X509KeyPair(pair.CertPEM, pair.KeyPEM) +} + +func GenerateServerCert(ca tls.Certificate, ip string) (tls.Certificate, error) { + rootCert, err := x509.ParseCertificate(ca.Certificate[0]) + if err != nil { + return tls.Certificate{}, err + } + pair, err := util_tls.NewCert(*rootCert, ca.PrivateKey.(*rsa.PrivateKey), ip, util_tls.ServerCertType, util_tls.DefaultKeyType, ip) + if err != nil { + return tls.Certificate{}, err + } + return tls.X509KeyPair(pair.CertPEM, pair.KeyPEM) +} diff --git a/pkg/intercp/tls/pki_test.go b/pkg/intercp/tls/pki_test.go new file mode 100644 index 000000000000..7c95c8f8371e --- /dev/null +++ b/pkg/intercp/tls/pki_test.go @@ -0,0 +1,53 @@ +package tls_test + +import ( + tls "crypto/tls" + "crypto/x509" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + intercp_tls "github.com/kumahq/kuma/pkg/intercp/tls" +) + +var _ = Describe("PKI", func() { + + var caCert tls.Certificate + + BeforeAll(func() { + pair, err := intercp_tls.GenerateCA() + Expect(err).ToNot(HaveOccurred()) + caCert, err = tls.X509KeyPair(pair.CertPEM, pair.KeyPEM) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should generate server cert", func() { + // given + ip := "192.168.0.1" + + // when + cert, err := intercp_tls.GenerateServerCert(caCert, ip) + + // then + Expect(err).ToNot(HaveOccurred()) + clientCert, err := x509.ParseCertificate(cert.Certificate[0]) + Expect(err).ToNot(HaveOccurred()) + Expect(clientCert.IPAddresses[0].String()).To(Equal(ip)) + Expect(clientCert.ExtKeyUsage).To(ContainElement(x509.ExtKeyUsageServerAuth)) + }) + + It("should generate client cert", func() { + // given + ip := "192.168.0.1" + + // when + cert, err := intercp_tls.GenerateClientCert(caCert, ip) + + // then + Expect(err).ToNot(HaveOccurred()) + clientCert, err := x509.ParseCertificate(cert.Certificate[0]) + Expect(err).ToNot(HaveOccurred()) + Expect(clientCert.IPAddresses[0].String()).To(Equal(ip)) + Expect(clientCert.ExtKeyUsage).To(ContainElement(x509.ExtKeyUsageClientAuth)) + }) +}, Ordered) diff --git a/pkg/intercp/tls/tls_suite_test.go b/pkg/intercp/tls/tls_suite_test.go new file mode 100644 index 000000000000..61696ce408c6 --- /dev/null +++ b/pkg/intercp/tls/tls_suite_test.go @@ -0,0 +1,11 @@ +package tls_test + +import ( + "testing" + + "github.com/kumahq/kuma/pkg/test" +) + +func TestTLS(t *testing.T) { + test.RunSpecs(t, "InterCP TLS Suite") +} diff --git a/pkg/util/net/ips.go b/pkg/util/net/ips.go index 60425dec2272..10c8e19eadde 100644 --- a/pkg/util/net/ips.go +++ b/pkg/util/net/ips.go @@ -8,8 +8,14 @@ import ( "github.com/pkg/errors" ) +type AddressPredicate = func(address *net.IPNet) bool + +func NonLoopback(address *net.IPNet) bool { + return !address.IP.IsLoopback() +} + // GetAllIPs returns all IPs (IPv4 and IPv6) from the all network interfaces on the machine -func GetAllIPs() ([]string, error) { +func GetAllIPs(predicates ...AddressPredicate) ([]string, error) { addrs, err := net.InterfaceAddrs() if err != nil { return nil, errors.Wrap(err, "could not list network interfaces") @@ -17,7 +23,16 @@ func GetAllIPs() ([]string, error) { var result []string for _, address := range addrs { if ipnet, ok := address.(*net.IPNet); ok { - result = append(result, ipnet.IP.String()) + matchedPredicate := true + for _, predicate := range predicates { + if !predicate(ipnet) { + matchedPredicate = false + break + } + } + if matchedPredicate { + result = append(result, ipnet.IP.String()) + } } } sort.Strings(result) // sort so IPv4 are the first elements in the list diff --git a/test/e2e/helm/kuma_helm_deploy_global_zone.go b/test/e2e/helm/kuma_helm_deploy_global_zone.go index cded9f05c7a0..d20c5ba8bda6 100644 --- a/test/e2e/helm/kuma_helm_deploy_global_zone.go +++ b/test/e2e/helm/kuma_helm_deploy_global_zone.go @@ -16,6 +16,7 @@ import ( api_server "github.com/kumahq/kuma/pkg/api-server" "github.com/kumahq/kuma/pkg/config/core" + "github.com/kumahq/kuma/pkg/intercp/catalog" . "github.com/kumahq/kuma/test/framework" "github.com/kumahq/kuma/test/framework/client" "github.com/kumahq/kuma/test/framework/deployments/testserver" @@ -49,6 +50,13 @@ func ZoneAndGlobalWithHelmChart() { Install(Kuma(core.Global, WithInstallationMode(HelmInstallationMode), WithHelmReleaseName(releaseName), + WithCPReplicas(2), + WithHelmOpt("controlPlane.config", ` +interCp: + catalog: + heartbeatInterval: 1s + writerInterval: 3s +`), )). Setup(c1) Expect(err).ToNot(HaveOccurred()) @@ -80,7 +88,7 @@ func ZoneAndGlobalWithHelmChart() { Expect(clusters.DismissCluster()).To(Succeed()) }) - It("Should deploy Zone and Global on 2 clusters", func() { + It("should deploy Zone and Global on 2 clusters", func() { clustersStatus := api_server.Zones{} Eventually(func() (bool, error) { status, response := http_helper.HttpGet(c1.GetTesting(), global.GetGlobalStatusAPI(), nil) @@ -122,4 +130,47 @@ func ZoneAndGlobalWithHelmChart() { g.Expect(err).ToNot(HaveOccurred()) }, "30s", "1s").Should(Succeed()) }) + + Context("Intercommunication CP server catalog on Global CP", func() { + fetchInstances := func() (map[string]struct{}, error) { + out, err := k8s.RunKubectlAndGetOutputE(c1.GetTesting(), c1.GetKubectlOptions(Config.KumaNamespace), "get", "configmap", "cp-catalog", "-o", "jsonpath={.data.config}") + if err != nil { + return nil, err + } + instances := catalog.ConfigInstances{} + if err := json.Unmarshal([]byte(out), &instances); err != nil { + return nil, err + } + m := map[string]struct{}{} + for _, instance := range instances.Instances { + m[instance.Id] = struct{}{} + } + return m, nil + } + + It("should update instances in catalog when we scale CP", func() { + // given + var instances map[string]struct{} + Eventually(func(g Gomega) { + ins, err := fetchInstances() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(ins).To(HaveLen(2)) + instances = ins + }, "30s", "1s").Should(Succeed()) + + // when + _, err := k8s.RunKubectlAndGetOutputE(c1.GetTesting(), c1.GetKubectlOptions(Config.KumaNamespace), "rollout", "restart", "deployment", Config.KumaServiceName) + + // then + Expect(err).ToNot(HaveOccurred()) + Eventually(func(g Gomega) { + ins, err := fetchInstances() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(ins).To(HaveLen(2)) + for instanceID := range instances { // there are no old instances + g.Expect(ins).ToNot(ContainElement(instanceID)) + } + }, "30s", "1s").Should(Succeed()) + }) + }) }