From 24b9afd5095b5e0db7511cd04b1d54e9a3a65b85 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 30 Jul 2020 12:44:30 -0400 Subject: [PATCH 01/11] Add improve version CLI cmd. --- x-pack/elastic-agent/control.proto | 2 +- x-pack/elastic-agent/pkg/agent/cmd/run.go | 8 + .../pkg/agent/control/client/client.go | 6 +- .../pkg/agent/control/proto/control.pb.go | 142 +++++++++--------- .../pkg/agent/control/server/listener.go | 8 + .../agent/control/server/listener_windows.go | 4 + .../pkg/agent/control/server/server.go | 8 +- .../elastic-agent/pkg/basecmd/version/cmd.go | 85 +++++++++-- .../pkg/basecmd/version/cmd_test.go | 85 ++++++++++- x-pack/elastic-agent/pkg/release/version.go | 35 +++++ 10 files changed, 286 insertions(+), 97 deletions(-) diff --git a/x-pack/elastic-agent/control.proto b/x-pack/elastic-agent/control.proto index a7ff22e5157..0c5645faab9 100644 --- a/x-pack/elastic-agent/control.proto +++ b/x-pack/elastic-agent/control.proto @@ -104,7 +104,7 @@ message StatusResponse { repeated ApplicationStatus applications = 3; } -service ElasticAgent { +service ElasticAgentControl { // Fetches the currently running version of the Elastic Agent. rpc Version(Empty) returns (VersionResponse); diff --git a/x-pack/elastic-agent/pkg/agent/cmd/run.go b/x-pack/elastic-agent/pkg/agent/cmd/run.go index 1c3c81b9ff7..d99fd3b18f4 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/run.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/run.go @@ -19,6 +19,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/reexec" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/server" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" @@ -82,6 +83,13 @@ func run(flags *globalFlags, streams *cli.IOStreams) error { rexLogger := logger.Named("reexec") rex := reexec.Manager(rexLogger, execPath) + // start the control listener + control := server.New(logger.Named("control")) + if err := control.Start(); err != nil { + return err + } + defer control.Stop() + app, err := application.New(logger, pathConfigFile) if err != nil { return err diff --git a/x-pack/elastic-agent/pkg/agent/control/client/client.go b/x-pack/elastic-agent/pkg/agent/control/client/client.go index bcd8eccdb82..d614318b5eb 100644 --- a/x-pack/elastic-agent/pkg/agent/control/client/client.go +++ b/x-pack/elastic-agent/pkg/agent/control/client/client.go @@ -8,11 +8,11 @@ import ( "context" "encoding/json" "fmt" + "sync" "time" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" ) @@ -81,7 +81,7 @@ type client struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup - client proto.ElasticAgentClient + client proto.ElasticAgentControlClient cfgLock sync.RWMutex obsLock sync.RWMutex } @@ -98,7 +98,7 @@ func (c *client) Start(ctx context.Context) error { if err != nil { return err } - c.client = proto.NewElasticAgentClient(conn) + c.client = proto.NewElasticAgentControlClient(conn) return nil } diff --git a/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go b/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go index 58df5e28f19..a0e2e710f0c 100644 --- a/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go +++ b/x-pack/elastic-agent/pkg/agent/control/proto/control.pb.go @@ -662,24 +662,24 @@ var file_control_proto_rawDesc = []byte{ 0x10, 0x06, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x4f, 0x4c, 0x4c, 0x42, 0x41, 0x43, 0x4b, 0x10, 0x07, 0x2a, 0x28, 0x0a, 0x0c, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x00, 0x12, 0x0b, 0x0a, - 0x07, 0x46, 0x41, 0x49, 0x4c, 0x55, 0x52, 0x45, 0x10, 0x01, 0x32, 0xd9, 0x01, 0x0a, 0x0c, 0x45, - 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x2f, 0x0a, 0x07, 0x56, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x56, 0x65, 0x72, - 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x06, - 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x07, 0x52, - 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x73, - 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x07, - 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x12, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, - 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, + 0x07, 0x46, 0x41, 0x49, 0x4c, 0x55, 0x52, 0x45, 0x10, 0x01, 0x32, 0xe0, 0x01, 0x0a, 0x13, 0x45, + 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x72, + 0x6f, 0x6c, 0x12, 0x2f, 0x0a, 0x07, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x0c, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0c, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x07, 0x52, 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x0c, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x07, 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x12, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x55, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x22, 0x5a, 0x1d, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x67, - 0x65, 0x6e, 0x74, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0xf8, 0x01, 0x01, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x55, 0x70, + 0x67, 0x72, 0x61, 0x64, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x22, 0x5a, + 0x1d, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, + 0x6f, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0xf8, 0x01, + 0x01, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -713,14 +713,14 @@ var file_control_proto_depIdxs = []int32{ 0, // 2: proto.ApplicationStatus.status:type_name -> proto.Status 0, // 3: proto.StatusResponse.status:type_name -> proto.Status 7, // 4: proto.StatusResponse.applications:type_name -> proto.ApplicationStatus - 2, // 5: proto.ElasticAgent.Version:input_type -> proto.Empty - 2, // 6: proto.ElasticAgent.Status:input_type -> proto.Empty - 2, // 7: proto.ElasticAgent.Restart:input_type -> proto.Empty - 5, // 8: proto.ElasticAgent.Upgrade:input_type -> proto.UpgradeRequest - 3, // 9: proto.ElasticAgent.Version:output_type -> proto.VersionResponse - 8, // 10: proto.ElasticAgent.Status:output_type -> proto.StatusResponse - 4, // 11: proto.ElasticAgent.Restart:output_type -> proto.RestartResponse - 6, // 12: proto.ElasticAgent.Upgrade:output_type -> proto.UpgradeResponse + 2, // 5: proto.ElasticAgentControl.Version:input_type -> proto.Empty + 2, // 6: proto.ElasticAgentControl.Status:input_type -> proto.Empty + 2, // 7: proto.ElasticAgentControl.Restart:input_type -> proto.Empty + 5, // 8: proto.ElasticAgentControl.Upgrade:input_type -> proto.UpgradeRequest + 3, // 9: proto.ElasticAgentControl.Version:output_type -> proto.VersionResponse + 8, // 10: proto.ElasticAgentControl.Status:output_type -> proto.StatusResponse + 4, // 11: proto.ElasticAgentControl.Restart:output_type -> proto.RestartResponse + 6, // 12: proto.ElasticAgentControl.Upgrade:output_type -> proto.UpgradeResponse 9, // [9:13] is the sub-list for method output_type 5, // [5:9] is the sub-list for method input_type 5, // [5:5] is the sub-list for extension type_name @@ -848,10 +848,10 @@ var _ grpc.ClientConnInterface // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion6 -// ElasticAgentClient is the client API for ElasticAgent service. +// ElasticAgentControlClient is the client API for ElasticAgentControl service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type ElasticAgentClient interface { +type ElasticAgentControlClient interface { // Fetches the currently running version of the Elastic Agent. Version(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*VersionResponse, error) // Fetches the currently status of the Elastic Agent. @@ -862,52 +862,52 @@ type ElasticAgentClient interface { Upgrade(ctx context.Context, in *UpgradeRequest, opts ...grpc.CallOption) (*UpgradeResponse, error) } -type elasticAgentClient struct { +type elasticAgentControlClient struct { cc grpc.ClientConnInterface } -func NewElasticAgentClient(cc grpc.ClientConnInterface) ElasticAgentClient { - return &elasticAgentClient{cc} +func NewElasticAgentControlClient(cc grpc.ClientConnInterface) ElasticAgentControlClient { + return &elasticAgentControlClient{cc} } -func (c *elasticAgentClient) Version(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*VersionResponse, error) { +func (c *elasticAgentControlClient) Version(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*VersionResponse, error) { out := new(VersionResponse) - err := c.cc.Invoke(ctx, "/proto.ElasticAgent/Version", in, out, opts...) + err := c.cc.Invoke(ctx, "/proto.ElasticAgentControl/Version", in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *elasticAgentClient) Status(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*StatusResponse, error) { +func (c *elasticAgentControlClient) Status(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*StatusResponse, error) { out := new(StatusResponse) - err := c.cc.Invoke(ctx, "/proto.ElasticAgent/Status", in, out, opts...) + err := c.cc.Invoke(ctx, "/proto.ElasticAgentControl/Status", in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *elasticAgentClient) Restart(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*RestartResponse, error) { +func (c *elasticAgentControlClient) Restart(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*RestartResponse, error) { out := new(RestartResponse) - err := c.cc.Invoke(ctx, "/proto.ElasticAgent/Restart", in, out, opts...) + err := c.cc.Invoke(ctx, "/proto.ElasticAgentControl/Restart", in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *elasticAgentClient) Upgrade(ctx context.Context, in *UpgradeRequest, opts ...grpc.CallOption) (*UpgradeResponse, error) { +func (c *elasticAgentControlClient) Upgrade(ctx context.Context, in *UpgradeRequest, opts ...grpc.CallOption) (*UpgradeResponse, error) { out := new(UpgradeResponse) - err := c.cc.Invoke(ctx, "/proto.ElasticAgent/Upgrade", in, out, opts...) + err := c.cc.Invoke(ctx, "/proto.ElasticAgentControl/Upgrade", in, out, opts...) if err != nil { return nil, err } return out, nil } -// ElasticAgentServer is the server API for ElasticAgent service. -type ElasticAgentServer interface { +// ElasticAgentControlServer is the server API for ElasticAgentControl service. +type ElasticAgentControlServer interface { // Fetches the currently running version of the Elastic Agent. Version(context.Context, *Empty) (*VersionResponse, error) // Fetches the currently status of the Elastic Agent. @@ -918,118 +918,118 @@ type ElasticAgentServer interface { Upgrade(context.Context, *UpgradeRequest) (*UpgradeResponse, error) } -// UnimplementedElasticAgentServer can be embedded to have forward compatible implementations. -type UnimplementedElasticAgentServer struct { +// UnimplementedElasticAgentControlServer can be embedded to have forward compatible implementations. +type UnimplementedElasticAgentControlServer struct { } -func (*UnimplementedElasticAgentServer) Version(context.Context, *Empty) (*VersionResponse, error) { +func (*UnimplementedElasticAgentControlServer) Version(context.Context, *Empty) (*VersionResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Version not implemented") } -func (*UnimplementedElasticAgentServer) Status(context.Context, *Empty) (*StatusResponse, error) { +func (*UnimplementedElasticAgentControlServer) Status(context.Context, *Empty) (*StatusResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Status not implemented") } -func (*UnimplementedElasticAgentServer) Restart(context.Context, *Empty) (*RestartResponse, error) { +func (*UnimplementedElasticAgentControlServer) Restart(context.Context, *Empty) (*RestartResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Restart not implemented") } -func (*UnimplementedElasticAgentServer) Upgrade(context.Context, *UpgradeRequest) (*UpgradeResponse, error) { +func (*UnimplementedElasticAgentControlServer) Upgrade(context.Context, *UpgradeRequest) (*UpgradeResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Upgrade not implemented") } -func RegisterElasticAgentServer(s *grpc.Server, srv ElasticAgentServer) { - s.RegisterService(&_ElasticAgent_serviceDesc, srv) +func RegisterElasticAgentControlServer(s *grpc.Server, srv ElasticAgentControlServer) { + s.RegisterService(&_ElasticAgentControl_serviceDesc, srv) } -func _ElasticAgent_Version_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _ElasticAgentControl_Version_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(Empty) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(ElasticAgentServer).Version(ctx, in) + return srv.(ElasticAgentControlServer).Version(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.ElasticAgent/Version", + FullMethod: "/proto.ElasticAgentControl/Version", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ElasticAgentServer).Version(ctx, req.(*Empty)) + return srv.(ElasticAgentControlServer).Version(ctx, req.(*Empty)) } return interceptor(ctx, in, info, handler) } -func _ElasticAgent_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _ElasticAgentControl_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(Empty) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(ElasticAgentServer).Status(ctx, in) + return srv.(ElasticAgentControlServer).Status(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.ElasticAgent/Status", + FullMethod: "/proto.ElasticAgentControl/Status", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ElasticAgentServer).Status(ctx, req.(*Empty)) + return srv.(ElasticAgentControlServer).Status(ctx, req.(*Empty)) } return interceptor(ctx, in, info, handler) } -func _ElasticAgent_Restart_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _ElasticAgentControl_Restart_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(Empty) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(ElasticAgentServer).Restart(ctx, in) + return srv.(ElasticAgentControlServer).Restart(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.ElasticAgent/Restart", + FullMethod: "/proto.ElasticAgentControl/Restart", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ElasticAgentServer).Restart(ctx, req.(*Empty)) + return srv.(ElasticAgentControlServer).Restart(ctx, req.(*Empty)) } return interceptor(ctx, in, info, handler) } -func _ElasticAgent_Upgrade_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _ElasticAgentControl_Upgrade_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(UpgradeRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(ElasticAgentServer).Upgrade(ctx, in) + return srv.(ElasticAgentControlServer).Upgrade(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.ElasticAgent/Upgrade", + FullMethod: "/proto.ElasticAgentControl/Upgrade", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ElasticAgentServer).Upgrade(ctx, req.(*UpgradeRequest)) + return srv.(ElasticAgentControlServer).Upgrade(ctx, req.(*UpgradeRequest)) } return interceptor(ctx, in, info, handler) } -var _ElasticAgent_serviceDesc = grpc.ServiceDesc{ - ServiceName: "proto.ElasticAgent", - HandlerType: (*ElasticAgentServer)(nil), +var _ElasticAgentControl_serviceDesc = grpc.ServiceDesc{ + ServiceName: "proto.ElasticAgentControl", + HandlerType: (*ElasticAgentControlServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "Version", - Handler: _ElasticAgent_Version_Handler, + Handler: _ElasticAgentControl_Version_Handler, }, { MethodName: "Status", - Handler: _ElasticAgent_Status_Handler, + Handler: _ElasticAgentControl_Status_Handler, }, { MethodName: "Restart", - Handler: _ElasticAgent_Restart_Handler, + Handler: _ElasticAgentControl_Restart_Handler, }, { MethodName: "Upgrade", - Handler: _ElasticAgent_Upgrade_Handler, + Handler: _ElasticAgentControl_Upgrade_Handler, }, }, Streams: []grpc.StreamDesc{}, diff --git a/x-pack/elastic-agent/pkg/agent/control/server/listener.go b/x-pack/elastic-agent/pkg/agent/control/server/listener.go index 2dd5d54a46f..989a95fd0bd 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/listener.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/listener.go @@ -17,6 +17,9 @@ import ( func createListener() (net.Listener, error) { path := strings.TrimPrefix(control.Address(), "unix://") + if _, err := os.Stat(path); !os.IsNotExist(err) { + os.Remove(path) + } dir := filepath.Dir(path) if _, err := os.Stat(dir); os.IsNotExist(err) { err = os.MkdirAll(dir, 0755) @@ -36,3 +39,8 @@ func createListener() (net.Listener, error) { } return lis, err } + +func cleanupListener() { + path := strings.TrimPrefix(control.Address(), "unix://") + os.Remove(path) +} diff --git a/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go b/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go index d2d2866b98a..bf9819c529d 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go @@ -27,3 +27,7 @@ func createListener() (net.Listener, error) { } return npipe.NewListener(control.Address(), sd) } + +func cleanupListener() { + // nothing to do on windows +} diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index c9a750808fc..c45b3f35953 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -8,13 +8,12 @@ import ( "context" "net" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" - "google.golang.org/grpc" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) // Server is the daemon side of the control protocol. @@ -44,7 +43,7 @@ func (s *Server) Start() error { } s.listener = lis s.server = grpc.NewServer() - proto.RegisterElasticAgentServer(s.server, s) + proto.RegisterElasticAgentControlServer(s.server, s) // start serving GRPC connections go func() { @@ -63,6 +62,7 @@ func (s *Server) Stop() { s.server.Stop() s.server = nil s.listener = nil + cleanupListener() } } diff --git a/x-pack/elastic-agent/pkg/basecmd/version/cmd.go b/x-pack/elastic-agent/pkg/basecmd/version/cmd.go index 0bf25438e80..ba12555444c 100644 --- a/x-pack/elastic-agent/pkg/basecmd/version/cmd.go +++ b/x-pack/elastic-agent/pkg/basecmd/version/cmd.go @@ -5,32 +5,93 @@ package version import ( + "context" "fmt" "github.com/spf13/cobra" + "gopkg.in/yaml.v2" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) +// Output returns the output when `--yaml` is used. +type Output struct { + Binary *release.VersionInfo `yaml:"binary` + Daemon *release.VersionInfo `yaml:"daemon,omitempty""` +} + // NewCommandWithArgs returns a new version command. func NewCommandWithArgs(streams *cli.IOStreams) *cobra.Command { - return &cobra.Command{ + cmd := &cobra.Command{ Use: "version", Short: "Display the version of the elastic-agent.", - Run: func(_ *cobra.Command, _ []string) { - version := release.Version() - if release.Snapshot() { - version = version + "-SNAPSHOT" + Run: func(cmd *cobra.Command, _ []string) { + var daemon *release.VersionInfo + var daemonError error + + binary := release.Info() + binaryOnly, _ := cmd.Flags().GetBool("binary-only") + if !binaryOnly { + c := client.New() + daemonError = c.Start(context.Background()) + if daemonError == nil { + var version client.Version + version, daemonError = c.Version(context.Background()) + if daemonError == nil { + daemon = &release.VersionInfo{ + Version: version.Version, + Commit: version.Commit, + BuildTime: version.BuildTime, + Snapshot: version.Snapshot, + } + } + } + } + if daemonError != nil { + fmt.Fprintf(streams.Err, "Failed talking to running daemon: %s\n", daemonError) + } + + outputYaml, _ := cmd.Flags().GetBool("yaml") + if outputYaml { + p := Output{ + Binary: &binary, + Daemon: daemon, + } + out, err := yaml.Marshal(p) + if err != nil { + fmt.Fprintf(streams.Err, "Failed to render YAML: %s\n", err) + } + fmt.Fprintf(streams.Out, "%s", out) + return } - fmt.Fprintf( - streams.Out, - "Agent version is %s (build: %s at %s)\n", - version, - release.Commit(), - release.BuildTime(), - ) + if !binaryOnly { + mismatch := false + str := "" + if daemon != nil { + str = daemon.String() + mismatch = isMismatch(&binary, daemon) + } + if mismatch { + fmt.Fprintf(streams.Err, "WARN: Then running daemon of Elastic Agent does not match this version.\n") + } + fmt.Fprintf(streams.Out, "Daemon: %s\n", str) + } + fmt.Fprintf(streams.Out, "Binary: %s\n", binary.String()) }, } + + cmd.Flags().Bool("binary-only", false, "Version of current binary only") + cmd.Flags().Bool("yaml", false, "Output information in YAML format") + + return cmd +} + +func isMismatch(a *release.VersionInfo, b *release.VersionInfo) bool { + if a.Commit != "unknown" && b.Commit != "unknown" { + return a.Commit != b.Commit + } + return a.Version != b.Version || a.BuildTime != b.BuildTime || a.Snapshot != b.Snapshot } diff --git a/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go b/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go index 111d174608f..fa479fe5879 100644 --- a/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go +++ b/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go @@ -10,17 +10,90 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" + "github.com/elastic/beats/v7/libbeat/logp" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/server" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) -func TestCmd(t *testing.T) { +func TestCmdBinaryOnly(t *testing.T) { + streams, _, out, _ := cli.NewTestingIOStreams() + cmd := NewCommandWithArgs(streams) + cmd.Flags().Set("binary-only", "true") + cmd.Execute() + version, err := ioutil.ReadAll(out) + + require.NoError(t, err) + assert.True(t, strings.Contains(string(version), "Binary: ")) + assert.False(t, strings.Contains(string(version), "Daemon: ")) +} + +func TestCmdBinaryOnlyYAML(t *testing.T) { + streams, _, out, _ := cli.NewTestingIOStreams() + cmd := NewCommandWithArgs(streams) + cmd.Flags().Set("binary-only", "true") + cmd.Flags().Set("yaml", "true") + cmd.Execute() + version, err := ioutil.ReadAll(out) + + require.NoError(t, err) + + var output Output + err = yaml.Unmarshal(version, &output) + require.NoError(t, err) + + assert.Nil(t, output.Daemon) + assert.Equal(t, release.Info(), *output.Binary) +} + +func TestCmdDaemon(t *testing.T) { + srv := server.New(newErrorLogger(t)) + require.NoError(t, srv.Start()) + defer srv.Stop() + + streams, _, out, _ := cli.NewTestingIOStreams() + cmd := NewCommandWithArgs(streams) + cmd.Execute() + version, err := ioutil.ReadAll(out) + + require.NoError(t, err) + assert.True(t, strings.Contains(string(version), "Binary: ")) + assert.True(t, strings.Contains(string(version), "Daemon: ")) +} + +func TestCmdDaemonYAML(t *testing.T) { + srv := server.New(newErrorLogger(t)) + require.NoError(t, srv.Start()) + defer srv.Stop() + streams, _, out, _ := cli.NewTestingIOStreams() - NewCommandWithArgs(streams).Execute() + cmd := NewCommandWithArgs(streams) + cmd.Flags().Set("yaml", "true") + cmd.Execute() version, err := ioutil.ReadAll(out) - if !assert.NoError(t, err) { - return - } - assert.True(t, strings.Contains(string(version), "Agent version is")) + require.NoError(t, err) + + var output Output + err = yaml.Unmarshal(version, &output) + require.NoError(t, err) + + assert.Equal(t, release.Info(), *output.Daemon) + assert.Equal(t, release.Info(), *output.Binary) +} + +func newErrorLogger(t *testing.T) *logger.Logger { + t.Helper() + + loggerCfg := logger.DefaultLoggingConfig() + loggerCfg.Level = logp.ErrorLevel + + log, err := logger.NewFromConfig("", loggerCfg) + require.NoError(t, err) + return log } diff --git a/x-pack/elastic-agent/pkg/release/version.go b/x-pack/elastic-agent/pkg/release/version.go index 7c139d943a9..542ea829417 100644 --- a/x-pack/elastic-agent/pkg/release/version.go +++ b/x-pack/elastic-agent/pkg/release/version.go @@ -6,6 +6,7 @@ package release import ( "strconv" + "strings" "time" libbeatVersion "github.com/elastic/beats/v7/libbeat/version" @@ -34,3 +35,37 @@ func Snapshot() bool { val, err := strconv.ParseBool(snapshot) return err == nil && val } + +// VersionInfo is structure used by `version --yaml`. +type VersionInfo struct { + Version string `yaml:"version"` + Commit string `yaml:"commit"` + BuildTime time.Time `yaml:"build_time"` + Snapshot bool `yaml:"snapshot"` +} + +// Info returns current version information. +func Info() VersionInfo { + return VersionInfo{ + Version: Version(), + Commit: Commit(), + BuildTime: BuildTime(), + Snapshot: Snapshot(), + } +} + +// String returns the string format for the version informaiton. +func (v *VersionInfo) String() string { + var sb strings.Builder + + sb.WriteString(v.Version) + if v.Snapshot { + sb.WriteString("-SNAPSHOT") + } + sb.WriteString(" (build: ") + sb.WriteString(v.Commit) + sb.WriteString(" at ") + sb.WriteString(v.BuildTime.Format("2006-01-02 15:04:05 -0700 MST")) + sb.WriteString(")") + return sb.String() +} From d659548ca08ce812b35695591d499403b64cbba7 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 30 Jul 2020 13:08:52 -0400 Subject: [PATCH 02/11] Add new restart cmd. Perform restart at end of enroll. --- .../pkg/agent/application/reexec/manager.go | 9 +++-- x-pack/elastic-agent/pkg/agent/cmd/enroll.go | 24 ++++++++++++- x-pack/elastic-agent/pkg/agent/cmd/run.go | 2 +- .../pkg/agent/control/server/server.go | 8 +++-- x-pack/elastic-agent/pkg/basecmd/cmd.go | 2 ++ .../elastic-agent/pkg/basecmd/restart/cmd.go | 36 +++++++++++++++++++ .../elastic-agent/pkg/basecmd/version/cmd.go | 2 ++ 7 files changed, 76 insertions(+), 7 deletions(-) create mode 100644 x-pack/elastic-agent/pkg/basecmd/restart/cmd.go diff --git a/x-pack/elastic-agent/pkg/agent/application/reexec/manager.go b/x-pack/elastic-agent/pkg/agent/application/reexec/manager.go index 33de754a27d..ef300705ff0 100644 --- a/x-pack/elastic-agent/pkg/agent/application/reexec/manager.go +++ b/x-pack/elastic-agent/pkg/agent/application/reexec/manager.go @@ -30,14 +30,19 @@ type ExecManager interface { ShutdownComplete() } -// Manager returns the global reexec manager. -func Manager(log *logger.Logger, exec string) ExecManager { +// NewManager returns the global reexec manager. +func NewManager(log *logger.Logger, exec string) ExecManager { execSingletonOnce.Do(func() { execSingleton = newManager(log, exec) }) return execSingleton } +// Manager returns the global reexec manager. +func Manager() ExecManager { + return execSingleton +} + type manager struct { logger *logger.Logger exec string diff --git a/x-pack/elastic-agent/pkg/agent/cmd/enroll.go b/x-pack/elastic-agent/pkg/agent/cmd/enroll.go index 311ad31e63b..47be8f82e68 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/enroll.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/enroll.go @@ -5,11 +5,14 @@ package cmd import ( + "context" "fmt" "math/rand" "os" "time" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" + "github.com/spf13/cobra" "github.com/elastic/beats/v7/libbeat/common/backoff" @@ -45,6 +48,7 @@ func newEnrollCommandWithArgs(flags *globalFlags, _ []string, streams *cli.IOStr cmd.Flags().BoolP("force", "f", false, "Force overwrite the current and do not prompt for confirmation") cmd.Flags().BoolP("insecure", "i", false, "Allow insecure connection to Kibana") cmd.Flags().StringP("staging", "", "", "Configures agent to download artifacts from a staging build") + cmd.Flags().Bool("no-restart", false, "Skip restarting the currently running daemon") return cmd } @@ -144,7 +148,25 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args return errors.New(err, "fail to enroll") } - fmt.Fprintln(streams.Out, "Successfully enrolled the Agent.") + fmt.Fprintln(streams.Out, "Successfully enrolled the Elastic Agent.") + + // skip restarting + noRestart, _ := cmd.Flags().GetBool("no-restart") + if noRestart { + return nil + } + + daemon := client.New() + err = daemon.Start(context.Background()) + if err == nil { + defer daemon.Stop() + err = daemon.Restart(context.Background()) + if err == nil { + fmt.Fprintln(streams.Out, "Successfully triggered restart on running Elastic Agent.") + return nil + } + } + fmt.Fprintln(streams.Out, "Elastic Agent might not be running; unable to trigger restart") return nil } diff --git a/x-pack/elastic-agent/pkg/agent/cmd/run.go b/x-pack/elastic-agent/pkg/agent/cmd/run.go index d99fd3b18f4..b1c5c127951 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/run.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/run.go @@ -81,7 +81,7 @@ func run(flags *globalFlags, streams *cli.IOStreams) error { return err } rexLogger := logger.Named("reexec") - rex := reexec.Manager(rexLogger, execPath) + rex := reexec.NewManager(rexLogger, execPath) // start the control listener control := server.New(logger.Named("control")) diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index c45b3f35953..7b719a7d26a 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -8,6 +8,8 @@ import ( "context" "net" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/reexec" + "google.golang.org/grpc" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" @@ -88,10 +90,10 @@ func (s *Server) Status(_ context.Context, _ *proto.Empty) (*proto.StatusRespons // Restart performs re-exec. func (s *Server) Restart(_ context.Context, _ *proto.Empty) (*proto.RestartResponse, error) { - // not implemented + rex := reexec.Manager() + rex.ReExec() return &proto.RestartResponse{ - Status: proto.ActionStatus_FAILURE, - Error: "not implemented", + Status: proto.ActionStatus_SUCCESS, }, nil } diff --git a/x-pack/elastic-agent/pkg/basecmd/cmd.go b/x-pack/elastic-agent/pkg/basecmd/cmd.go index 9b957916fb1..b30b540d472 100644 --- a/x-pack/elastic-agent/pkg/basecmd/cmd.go +++ b/x-pack/elastic-agent/pkg/basecmd/cmd.go @@ -7,6 +7,7 @@ package basecmd import ( "github.com/spf13/cobra" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/basecmd/restart" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/basecmd/version" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" ) @@ -14,6 +15,7 @@ import ( // NewDefaultCommandsWithArgs returns a list of default commands to executes. func NewDefaultCommandsWithArgs(args []string, streams *cli.IOStreams) []*cobra.Command { return []*cobra.Command{ + restart.NewCommandWithArgs(streams), version.NewCommandWithArgs(streams), } } diff --git a/x-pack/elastic-agent/pkg/basecmd/restart/cmd.go b/x-pack/elastic-agent/pkg/basecmd/restart/cmd.go new file mode 100644 index 00000000000..a2f2491376e --- /dev/null +++ b/x-pack/elastic-agent/pkg/basecmd/restart/cmd.go @@ -0,0 +1,36 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package restart + +import ( + "context" + + "github.com/spf13/cobra" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" +) + +// NewCommandWithArgs returns a new version command. +func NewCommandWithArgs(streams *cli.IOStreams) *cobra.Command { + return &cobra.Command{ + Use: "restart", + Short: "Restart the currently running Elastic Agent daemon", + RunE: func(cmd *cobra.Command, _ []string) error { + c := client.New() + err := c.Start(context.Background()) + if err != nil { + return errors.New(err, "Failed talking to running daemon") + } + defer c.Stop() + err = c.Restart(context.Background()) + if err != nil { + return errors.New(err, "Failed trigger restart of daemon") + } + return nil + }, + } +} diff --git a/x-pack/elastic-agent/pkg/basecmd/version/cmd.go b/x-pack/elastic-agent/pkg/basecmd/version/cmd.go index ba12555444c..43c995b18a7 100644 --- a/x-pack/elastic-agent/pkg/basecmd/version/cmd.go +++ b/x-pack/elastic-agent/pkg/basecmd/version/cmd.go @@ -37,6 +37,8 @@ func NewCommandWithArgs(streams *cli.IOStreams) *cobra.Command { c := client.New() daemonError = c.Start(context.Background()) if daemonError == nil { + defer c.Stop() + var version client.Version version, daemonError = c.Version(context.Background()) if daemonError == nil { From 9121ac55eed6af2c06220b875f0c7a68db149c67 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 30 Jul 2020 13:10:46 -0400 Subject: [PATCH 03/11] Fix yaml annotations on version struct. --- x-pack/elastic-agent/pkg/basecmd/version/cmd.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/elastic-agent/pkg/basecmd/version/cmd.go b/x-pack/elastic-agent/pkg/basecmd/version/cmd.go index 43c995b18a7..2b844d19da9 100644 --- a/x-pack/elastic-agent/pkg/basecmd/version/cmd.go +++ b/x-pack/elastic-agent/pkg/basecmd/version/cmd.go @@ -18,8 +18,8 @@ import ( // Output returns the output when `--yaml` is used. type Output struct { - Binary *release.VersionInfo `yaml:"binary` - Daemon *release.VersionInfo `yaml:"daemon,omitempty""` + Binary *release.VersionInfo `yaml:"binary"` + Daemon *release.VersionInfo `yaml:"daemon,omitempty"` } // NewCommandWithArgs returns a new version command. From 23202503326c26434f975edcb4c02bb218f8d0ab Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 30 Jul 2020 13:17:07 -0400 Subject: [PATCH 04/11] Fix control.Address on Windows. --- x-pack/elastic-agent/pkg/agent/control/addr_windows.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/pkg/agent/control/addr_windows.go b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go index 1123eec941b..acca6924cb4 100644 --- a/x-pack/elastic-agent/pkg/agent/control/addr_windows.go +++ b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go @@ -15,7 +15,7 @@ import ( // Address returns the address to connect to Elastic Agent daemon. func Address() string { - data = paths.Data() + data := paths.Data() // entire string cannot be longer than 256 characters, this forces the // length to always be 87 characters (but unique per data path) return fmt.Sprintf(`\\.\pipe\elastic-agent-%s`, sha256.Sum256(data)) From 8e24d0240b7ad1a311b7f609c75bc49406e9dfd3 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 30 Jul 2020 13:18:09 -0400 Subject: [PATCH 05/11] Fix control.Address on Windows. --- x-pack/elastic-agent/pkg/agent/control/addr_windows.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/pkg/agent/control/addr_windows.go b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go index acca6924cb4..fa98fbe1015 100644 --- a/x-pack/elastic-agent/pkg/agent/control/addr_windows.go +++ b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go @@ -18,5 +18,5 @@ func Address() string { data := paths.Data() // entire string cannot be longer than 256 characters, this forces the // length to always be 87 characters (but unique per data path) - return fmt.Sprintf(`\\.\pipe\elastic-agent-%s`, sha256.Sum256(data)) + return fmt.Sprintf(`\\.\pipe\elastic-agent-%s`, sha256.Sum256([]byte(data))) } From 93d6fb1d8431a33503ac58fd2b21d62ca145ac93 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 30 Jul 2020 13:19:24 -0400 Subject: [PATCH 06/11] Fix windows dialer. --- x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go b/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go index 58b36c18043..c061753d327 100644 --- a/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go +++ b/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go @@ -22,5 +22,5 @@ func dialContext(ctx context.Context) (*grpc.ClientConn, error) { } func dialer(ctx context.Context, addr string) (net.Conn, error) { - return npipe.DialContext(arr)(ctx, "", "") + return npipe.DialContext(addr)(ctx, "", "") } From 85e0d89e0c4ffa46c29740fed651aa35c1805fd9 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 30 Jul 2020 13:21:18 -0400 Subject: [PATCH 07/11] Fix control.Address on Windows. --- x-pack/elastic-agent/pkg/agent/control/addr_windows.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/pkg/agent/control/addr_windows.go b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go index fa98fbe1015..bf2e164fbae 100644 --- a/x-pack/elastic-agent/pkg/agent/control/addr_windows.go +++ b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go @@ -18,5 +18,5 @@ func Address() string { data := paths.Data() // entire string cannot be longer than 256 characters, this forces the // length to always be 87 characters (but unique per data path) - return fmt.Sprintf(`\\.\pipe\elastic-agent-%s`, sha256.Sum256([]byte(data))) + return fmt.Sprintf(`\\.\pipe\elastic-agent-%x`, sha256.Sum256([]byte(data))) } From ac69b870758e24a85dfc726cb5890871575fe526 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 30 Jul 2020 13:43:54 -0400 Subject: [PATCH 08/11] Add to CHANGELOG. --- x-pack/elastic-agent/CHANGELOG.asciidoc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc index e0e925847aa..f4ad19cc290 100644 --- a/x-pack/elastic-agent/CHANGELOG.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.asciidoc @@ -99,3 +99,6 @@ - Add --staging option to enroll command {pull}20026[20026] - Add `event.dataset` to all events {pull}20076[20076] - Prepare packaging for endpoint and asc files {pull}20186[20186] +- Improved version CLI {pull}20359[20359] +- Enroll CLI now restarts running daemon {pull}20359[20359] +- Add restart CLI cmd {pull}20359[20359] From a7b0d3c9b8454dc3c8209317d4c61b0bc89717ff Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Mon, 3 Aug 2020 10:25:57 -0400 Subject: [PATCH 09/11] Review cleanups. --- .../pkg/agent/application/reexec/manager.go | 22 +------------------ x-pack/elastic-agent/pkg/agent/cmd/enroll.go | 4 ++-- x-pack/elastic-agent/pkg/agent/cmd/run.go | 2 +- .../pkg/agent/control/client/client.go | 16 +++++++------- .../pkg/agent/control/control_test.go | 6 ++--- .../pkg/agent/control/server/listener.go | 17 ++++++++++---- .../agent/control/server/listener_windows.go | 5 +++-- .../pkg/agent/control/server/server.go | 11 +++++----- .../elastic-agent/pkg/basecmd/restart/cmd.go | 7 +++--- .../elastic-agent/pkg/basecmd/version/cmd.go | 4 ++-- .../pkg/basecmd/version/cmd_test.go | 4 ++-- 11 files changed, 45 insertions(+), 53 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/application/reexec/manager.go b/x-pack/elastic-agent/pkg/agent/application/reexec/manager.go index ef300705ff0..70d34d83a45 100644 --- a/x-pack/elastic-agent/pkg/agent/application/reexec/manager.go +++ b/x-pack/elastic-agent/pkg/agent/application/reexec/manager.go @@ -5,16 +5,9 @@ package reexec import ( - "sync" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) -var ( - execSingleton ExecManager - execSingletonOnce sync.Once -) - // ExecManager is the interface that the global reexec manager implements. type ExecManager interface { // ReExec asynchronously re-executes command in the same PID and memory address @@ -30,19 +23,6 @@ type ExecManager interface { ShutdownComplete() } -// NewManager returns the global reexec manager. -func NewManager(log *logger.Logger, exec string) ExecManager { - execSingletonOnce.Do(func() { - execSingleton = newManager(log, exec) - }) - return execSingleton -} - -// Manager returns the global reexec manager. -func Manager() ExecManager { - return execSingleton -} - type manager struct { logger *logger.Logger exec string @@ -51,7 +31,7 @@ type manager struct { complete chan bool } -func newManager(log *logger.Logger, exec string) *manager { +func NewManager(log *logger.Logger, exec string) ExecManager { return &manager{ logger: log, exec: exec, diff --git a/x-pack/elastic-agent/pkg/agent/cmd/enroll.go b/x-pack/elastic-agent/pkg/agent/cmd/enroll.go index 47be8f82e68..b34e0236782 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/enroll.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/enroll.go @@ -157,9 +157,9 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args } daemon := client.New() - err = daemon.Start(context.Background()) + err = daemon.Connect(context.Background()) if err == nil { - defer daemon.Stop() + defer daemon.Disconnect() err = daemon.Restart(context.Background()) if err == nil { fmt.Fprintln(streams.Out, "Successfully triggered restart on running Elastic Agent.") diff --git a/x-pack/elastic-agent/pkg/agent/cmd/run.go b/x-pack/elastic-agent/pkg/agent/cmd/run.go index b1c5c127951..f502ef9cf49 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/run.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/run.go @@ -84,7 +84,7 @@ func run(flags *globalFlags, streams *cli.IOStreams) error { rex := reexec.NewManager(rexLogger, execPath) // start the control listener - control := server.New(logger.Named("control")) + control := server.New(logger.Named("control"), rex) if err := control.Start(); err != nil { return err } diff --git a/x-pack/elastic-agent/pkg/agent/control/client/client.go b/x-pack/elastic-agent/pkg/agent/control/client/client.go index d614318b5eb..5e55fce9349 100644 --- a/x-pack/elastic-agent/pkg/agent/control/client/client.go +++ b/x-pack/elastic-agent/pkg/agent/control/client/client.go @@ -62,10 +62,10 @@ type AgentStatus struct { // Client communicates to Elastic Agent through the control protocol. type Client interface { - // Start starts the client. - Start(ctx context.Context) error - // Stop stops the client. - Stop() + // Connect connects to the running Elastic Agent. + Connect(ctx context.Context) error + // Disconnect disconnects from the running Elastic Agent. + Disconnect() // Version returns the current version of the running agent. Version(ctx context.Context) (Version, error) // Status returns the current status of the running agent. @@ -91,8 +91,8 @@ func New() Client { return &client{} } -// Start starts the connection to Elastic Agent. -func (c *client) Start(ctx context.Context) error { +// Connect connects to the running Elastic Agent. +func (c *client) Connect(ctx context.Context) error { c.ctx, c.cancel = context.WithCancel(ctx) conn, err := dialContext(ctx) if err != nil { @@ -102,8 +102,8 @@ func (c *client) Start(ctx context.Context) error { return nil } -// Stop stops the connection to Elastic Agent. -func (c *client) Stop() { +// Disconnect disconnects from the running Elastic Agent. +func (c *client) Disconnect() { if c.cancel != nil { c.cancel() c.wg.Wait() diff --git a/x-pack/elastic-agent/pkg/agent/control/control_test.go b/x-pack/elastic-agent/pkg/agent/control/control_test.go index 13d32420258..9454179ae60 100644 --- a/x-pack/elastic-agent/pkg/agent/control/control_test.go +++ b/x-pack/elastic-agent/pkg/agent/control/control_test.go @@ -20,15 +20,15 @@ import ( ) func TestServerClient_Version(t *testing.T) { - srv := server.New(newErrorLogger(t)) + srv := server.New(newErrorLogger(t), nil) err := srv.Start() require.NoError(t, err) defer srv.Stop() c := client.New() - err = c.Start(context.Background()) + err = c.Connect(context.Background()) require.NoError(t, err) - defer c.Stop() + defer c.Disconnect() ver, err := c.Version(context.Background()) require.NoError(t, err) diff --git a/x-pack/elastic-agent/pkg/agent/control/server/listener.go b/x-pack/elastic-agent/pkg/agent/control/server/listener.go index 989a95fd0bd..bf03f54e2da 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/listener.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/listener.go @@ -7,18 +7,25 @@ package server import ( + "fmt" + "net" "os" "path/filepath" "strings" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) -func createListener() (net.Listener, error) { +func createListener(log *logger.Logger) (net.Listener, error) { path := strings.TrimPrefix(control.Address(), "unix://") if _, err := os.Stat(path); !os.IsNotExist(err) { - os.Remove(path) + err = os.Remove(path) + if err != nil { + log.Errorf("%s", errors.New(err, fmt.Sprintf("Failed to cleanup %s", path), errors.TypeFilesystem, errors.M("path", path))) + } } dir := filepath.Dir(path) if _, err := os.Stat(dir); os.IsNotExist(err) { @@ -40,7 +47,9 @@ func createListener() (net.Listener, error) { return lis, err } -func cleanupListener() { +func cleanupListener(log *logger.Logger) { path := strings.TrimPrefix(control.Address(), "unix://") - os.Remove(path) + if err := os.Remove(path); err != nil { + log.Errorf("%s", errors.New(err, fmt.Sprintf("Failed to cleanup %s", path), errors.TypeFilesystem, errors.M("path", path))) + } } diff --git a/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go b/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go index bf9819c529d..f98c32bcee3 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go @@ -13,10 +13,11 @@ import ( "github.com/elastic/beats/v7/libbeat/api/npipe" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) // createListener creates a named pipe listener on Windows -func createListener() (net.Listener, error) { +func createListener(_ *logger.Logger) (net.Listener, error) { u, err := user.Current() if err != nil { return nil, err @@ -28,6 +29,6 @@ func createListener() (net.Listener, error) { return npipe.NewListener(control.Address(), sd) } -func cleanupListener() { +func cleanupListener(_ *logger.Logger) { // nothing to do on windows } diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index 7b719a7d26a..faa7982c814 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -21,14 +21,16 @@ import ( // Server is the daemon side of the control protocol. type Server struct { logger *logger.Logger + rex reexec.ExecManager listener net.Listener server *grpc.Server } // New creates a new control protocol server. -func New(log *logger.Logger) *Server { +func New(log *logger.Logger, rex reexec.ExecManager) *Server { return &Server{ logger: log, + rex: rex, } } @@ -39,7 +41,7 @@ func (s *Server) Start() error { return nil } - lis, err := createListener() + lis, err := createListener(s.logger) if err != nil { return err } @@ -64,7 +66,7 @@ func (s *Server) Stop() { s.server.Stop() s.server = nil s.listener = nil - cleanupListener() + cleanupListener(s.logger) } } @@ -90,8 +92,7 @@ func (s *Server) Status(_ context.Context, _ *proto.Empty) (*proto.StatusRespons // Restart performs re-exec. func (s *Server) Restart(_ context.Context, _ *proto.Empty) (*proto.RestartResponse, error) { - rex := reexec.Manager() - rex.ReExec() + s.rex.ReExec() return &proto.RestartResponse{ Status: proto.ActionStatus_SUCCESS, }, nil diff --git a/x-pack/elastic-agent/pkg/basecmd/restart/cmd.go b/x-pack/elastic-agent/pkg/basecmd/restart/cmd.go index a2f2491376e..9fcaa9d443f 100644 --- a/x-pack/elastic-agent/pkg/basecmd/restart/cmd.go +++ b/x-pack/elastic-agent/pkg/basecmd/restart/cmd.go @@ -9,6 +9,7 @@ import ( "github.com/spf13/cobra" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" @@ -21,11 +22,11 @@ func NewCommandWithArgs(streams *cli.IOStreams) *cobra.Command { Short: "Restart the currently running Elastic Agent daemon", RunE: func(cmd *cobra.Command, _ []string) error { c := client.New() - err := c.Start(context.Background()) + err := c.Connect(context.Background()) if err != nil { - return errors.New(err, "Failed talking to running daemon") + return errors.New(err, "Failed talking to running daemon", errors.TypeNetwork, errors.M("socket", control.Address())) } - defer c.Stop() + defer c.Disconnect() err = c.Restart(context.Background()) if err != nil { return errors.New(err, "Failed trigger restart of daemon") diff --git a/x-pack/elastic-agent/pkg/basecmd/version/cmd.go b/x-pack/elastic-agent/pkg/basecmd/version/cmd.go index 2b844d19da9..b4e602759cb 100644 --- a/x-pack/elastic-agent/pkg/basecmd/version/cmd.go +++ b/x-pack/elastic-agent/pkg/basecmd/version/cmd.go @@ -35,9 +35,9 @@ func NewCommandWithArgs(streams *cli.IOStreams) *cobra.Command { binaryOnly, _ := cmd.Flags().GetBool("binary-only") if !binaryOnly { c := client.New() - daemonError = c.Start(context.Background()) + daemonError = c.Connect(context.Background()) if daemonError == nil { - defer c.Stop() + defer c.Disconnect() var version client.Version version, daemonError = c.Version(context.Background()) diff --git a/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go b/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go index fa479fe5879..119809338d6 100644 --- a/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go +++ b/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go @@ -52,7 +52,7 @@ func TestCmdBinaryOnlyYAML(t *testing.T) { } func TestCmdDaemon(t *testing.T) { - srv := server.New(newErrorLogger(t)) + srv := server.New(newErrorLogger(t), nil) require.NoError(t, srv.Start()) defer srv.Stop() @@ -67,7 +67,7 @@ func TestCmdDaemon(t *testing.T) { } func TestCmdDaemonYAML(t *testing.T) { - srv := server.New(newErrorLogger(t)) + srv := server.New(newErrorLogger(t), nil) require.NoError(t, srv.Start()) defer srv.Stop() From a3342535e24257f38f2c4353e81d9103b61688d5 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Mon, 3 Aug 2020 10:27:21 -0400 Subject: [PATCH 10/11] Fix go vet. --- x-pack/elastic-agent/pkg/agent/application/reexec/manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/elastic-agent/pkg/agent/application/reexec/manager.go b/x-pack/elastic-agent/pkg/agent/application/reexec/manager.go index 70d34d83a45..4662b1c6230 100644 --- a/x-pack/elastic-agent/pkg/agent/application/reexec/manager.go +++ b/x-pack/elastic-agent/pkg/agent/application/reexec/manager.go @@ -31,6 +31,7 @@ type manager struct { complete chan bool } +// NewManager returns the reexec manager. func NewManager(log *logger.Logger, exec string) ExecManager { return &manager{ logger: log, From a8d00aa85e903f2bc6469ef0f7c652c4fdb92b96 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Mon, 3 Aug 2020 10:32:16 -0400 Subject: [PATCH 11/11] Update talking to communicating. --- x-pack/elastic-agent/pkg/basecmd/restart/cmd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/pkg/basecmd/restart/cmd.go b/x-pack/elastic-agent/pkg/basecmd/restart/cmd.go index 9fcaa9d443f..ebb3bf6effd 100644 --- a/x-pack/elastic-agent/pkg/basecmd/restart/cmd.go +++ b/x-pack/elastic-agent/pkg/basecmd/restart/cmd.go @@ -24,7 +24,7 @@ func NewCommandWithArgs(streams *cli.IOStreams) *cobra.Command { c := client.New() err := c.Connect(context.Background()) if err != nil { - return errors.New(err, "Failed talking to running daemon", errors.TypeNetwork, errors.M("socket", control.Address())) + return errors.New(err, "Failed communicating to running daemon", errors.TypeNetwork, errors.M("socket", control.Address())) } defer c.Disconnect() err = c.Restart(context.Background())