From 511e89d5196fa23206dcc0035a5f26d6bb0cd4d6 Mon Sep 17 00:00:00 2001 From: Bernard Kim Date: Tue, 17 Nov 2020 11:51:02 -0800 Subject: [PATCH 1/4] Bump planet --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index fb9f2ecbe2..f2247f07a5 100644 --- a/Makefile +++ b/Makefile @@ -46,7 +46,7 @@ RELEASE_OUT ?= TELEPORT_TAG = 3.2.16 # TELEPORT_REPOTAG adapts TELEPORT_TAG to the teleport tagging scheme TELEPORT_REPOTAG := v$(TELEPORT_TAG) -PLANET_TAG := 7.1.18-$(K8S_VER_SUFFIX) +PLANET_TAG := 7.1.18-$(K8S_VER_SUFFIX)-2-g61c4dddb PLANET_BRANCH := $(PLANET_TAG) K8S_APP_TAG := $(GRAVITY_TAG) TELEKUBE_APP_TAG := $(GRAVITY_TAG) From edf3003e1d00a7ae77e6db7ff88eb51c7b45f480 Mon Sep 17 00:00:00 2001 From: Bernard Kim Date: Tue, 17 Nov 2020 13:33:43 -0800 Subject: [PATCH 2/4] Revendor satellite --- go.mod | 2 +- go.sum | 3 + .../gravitational/satellite/agent/agent.go | 144 ++-- .../satellite/agent/health/health.go | 8 + .../gravitational/satellite/agent/server.go | 2 +- .../satellite/lib/kubernetes/constants.go | 6 +- .../satellite/lib/membership/serf.go | 82 +-- .../satellite/lib/nethealth/client.go | 146 ++++ .../satellite/lib/nethealth/client_mock.go | 57 ++ .../satellite/lib/nethealth/nethealth.go | 661 ++++++++++++++++++ .../satellite/monitoring/checkers.go | 13 - .../satellite/monitoring/defaults_linux.go | 6 - .../satellite/monitoring/nethealth.go | 92 +-- .../satellite/monitoring/ping.go | 295 -------- .../satellite/monitoring/system_pods.go | 24 +- .../satellite/monitoring/timedrift.go | 80 ++- vendor/golang.org/x/net/icmp/dstunreach.go | 59 ++ vendor/golang.org/x/net/icmp/echo.go | 173 +++++ vendor/golang.org/x/net/icmp/endpoint.go | 113 +++ vendor/golang.org/x/net/icmp/extension.go | 170 +++++ vendor/golang.org/x/net/icmp/helper_posix.go | 75 ++ vendor/golang.org/x/net/icmp/interface.go | 322 +++++++++ vendor/golang.org/x/net/icmp/ipv4.go | 69 ++ vendor/golang.org/x/net/icmp/ipv6.go | 23 + vendor/golang.org/x/net/icmp/listen_posix.go | 103 +++ vendor/golang.org/x/net/icmp/listen_stub.go | 33 + vendor/golang.org/x/net/icmp/message.go | 162 +++++ vendor/golang.org/x/net/icmp/messagebody.go | 52 ++ vendor/golang.org/x/net/icmp/mpls.go | 77 ++ vendor/golang.org/x/net/icmp/multipart.go | 129 ++++ vendor/golang.org/x/net/icmp/packettoobig.go | 43 ++ vendor/golang.org/x/net/icmp/paramprob.go | 72 ++ vendor/golang.org/x/net/icmp/sys_freebsd.go | 11 + vendor/golang.org/x/net/icmp/timeexceeded.go | 57 ++ vendor/modules.txt | 4 +- 35 files changed, 2828 insertions(+), 540 deletions(-) create mode 100644 vendor/github.com/gravitational/satellite/lib/nethealth/client.go create mode 100644 vendor/github.com/gravitational/satellite/lib/nethealth/client_mock.go create mode 100644 vendor/github.com/gravitational/satellite/lib/nethealth/nethealth.go delete mode 100644 vendor/github.com/gravitational/satellite/monitoring/ping.go create mode 100644 vendor/golang.org/x/net/icmp/dstunreach.go create mode 100644 vendor/golang.org/x/net/icmp/echo.go create mode 100644 vendor/golang.org/x/net/icmp/endpoint.go create mode 100644 vendor/golang.org/x/net/icmp/extension.go create mode 100644 vendor/golang.org/x/net/icmp/helper_posix.go create mode 100644 vendor/golang.org/x/net/icmp/interface.go create mode 100644 vendor/golang.org/x/net/icmp/ipv4.go create mode 100644 vendor/golang.org/x/net/icmp/ipv6.go create mode 100644 vendor/golang.org/x/net/icmp/listen_posix.go create mode 100644 vendor/golang.org/x/net/icmp/listen_stub.go create mode 100644 vendor/golang.org/x/net/icmp/message.go create mode 100644 vendor/golang.org/x/net/icmp/messagebody.go create mode 100644 vendor/golang.org/x/net/icmp/mpls.go create mode 100644 vendor/golang.org/x/net/icmp/multipart.go create mode 100644 vendor/golang.org/x/net/icmp/packettoobig.go create mode 100644 vendor/golang.org/x/net/icmp/paramprob.go create mode 100644 vendor/golang.org/x/net/icmp/sys_freebsd.go create mode 100644 vendor/golang.org/x/net/icmp/timeexceeded.go diff --git a/go.mod b/go.mod index 5b161a182e..8336bfa858 100644 --- a/go.mod +++ b/go.mod @@ -64,7 +64,7 @@ require ( github.com/gravitational/oxy v0.0.0-20180629203109-e4a7e35311e6 // indirect github.com/gravitational/rigging v0.0.0-20191021212636-83b2e9505286 github.com/gravitational/roundtrip v1.0.0 - github.com/gravitational/satellite v0.0.9-0.20200826203500-ad8030ab3ddb + github.com/gravitational/satellite v0.0.9-0.20201117211559-1d546b9eca4e github.com/gravitational/tail v1.0.1 github.com/gravitational/teleport v3.2.15-0.20201005225507-eccdab5226f7+incompatible github.com/gravitational/trace v1.1.11 diff --git a/go.sum b/go.sum index 7a2ebee24a..b09604faef 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,7 @@ github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEg github.com/Azure/go-autorest/autorest/date v0.1.0 h1:YGrhWfrgtFs84+h0o46rJrlmsZtyZRg470CqAXTZaGM= github.com/Azure/go-autorest/autorest/date v0.1.0/go.mod h1:plvfp3oPSKwf2DNjlBjWF/7vwR+cUD/ELuzDCXwHUVA= github.com/Azure/go-autorest/autorest/mocks v0.1.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0= +github.com/Azure/go-autorest/autorest/mocks v0.2.0 h1:Ww5g4zThfD/6cLb4z6xxgeyDa7QDkizMkJKe0ysZXp0= github.com/Azure/go-autorest/autorest/mocks v0.2.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0= github.com/Azure/go-autorest/autorest/to v0.2.0/go.mod h1:GunWKJp1AEqgMaGLV+iocmRAJWqST1wQYhyyjXJ3SJc= github.com/Azure/go-autorest/autorest/validation v0.1.0/go.mod h1:Ha3z/SqBeaalWQvokg3NZAlQTalVMtOIAs1aGK7G6u8= @@ -583,6 +584,8 @@ github.com/gravitational/satellite v0.0.9-0.20200825155435-feccb39a021d h1:FBTLB github.com/gravitational/satellite v0.0.9-0.20200825155435-feccb39a021d/go.mod h1:gqyBdtaefi/t7Mw//N/eoC4c3YriZZssmOiZ6NPvuek= github.com/gravitational/satellite v0.0.9-0.20200826203500-ad8030ab3ddb h1:XF4a8LC+T3e59ZczOkJopPL57p2/3zVmlfVcrneIOdc= github.com/gravitational/satellite v0.0.9-0.20200826203500-ad8030ab3ddb/go.mod h1:gqyBdtaefi/t7Mw//N/eoC4c3YriZZssmOiZ6NPvuek= +github.com/gravitational/satellite v0.0.9-0.20201117211559-1d546b9eca4e h1:KCJf3/32nvnzGCLUkwAP4HxFENI+5EMB038hlypGZpg= +github.com/gravitational/satellite v0.0.9-0.20201117211559-1d546b9eca4e/go.mod h1:OcgAUN5zzYAEiNPcjQ/MS2+YAMX6jTpPrbP6zIg29GI= github.com/gravitational/tail v1.0.1 h1:Yv5nh+zV0yHQ9D9kXGEOFpUoA+Ti5uXHwOGioKSIeng= github.com/gravitational/tail v1.0.1/go.mod h1:3aRU+xPwNCaXykBn4jhiXsSJXX7jalcZtvCYGckNbPw= github.com/gravitational/teleport v3.2.15-0.20200110233851-f4445fa60013+incompatible h1:zXwIbEof+bTDqbYW97fZe7BWEuHa7ccrNe2PcMt8kYU= diff --git a/vendor/github.com/gravitational/satellite/agent/agent.go b/vendor/github.com/gravitational/satellite/agent/agent.go index 2107bada8c..981760a4f6 100644 --- a/vendor/github.com/gravitational/satellite/agent/agent.go +++ b/vendor/github.com/gravitational/satellite/agent/agent.go @@ -136,9 +136,6 @@ type agent struct { // Config is the agent configuration. Config - // ClusterMembership provides access to cluster membership service. - ClusterMembership membership.ClusterMembership - // ClusterTimeline keeps track of all timeline events in the cluster. This // timeline is only used by members that have the role 'master'. ClusterTimeline history.Timeline @@ -180,6 +177,9 @@ type agent struct { cancel context.CancelFunc // g manages the internal agent's processes g ctxgroup.Group + + // newSerfClientFunc is used to create a serf client on demand. + newSerfClientFunc func() (membership.ClusterMembership, error) } // New creates an instance of an agent based on configuration options given in config. @@ -188,13 +188,6 @@ func New(config *Config) (*agent, error) { return nil, trace.Wrap(err) } - serfClient, err := initSerfClient(config.SerfConfig, config.Tags) - if err != nil { - return nil, trace.Wrap(err, "failed to initialize serf client") - } - - // TODO: do we need to initialize metrics listener in constructor? - // Move to Start? metricsListener, err := net.Listen("tcp", config.MetricsAddr) if err != nil { return nil, trace.Wrap(err, "failed to bind on %v to serve metrics", config.MetricsAddr) @@ -234,7 +227,6 @@ func New(config *Config) (*agent, error) { g := ctxgroup.WithContext(ctx) agent := &agent{ Config: *config, - ClusterMembership: serfClient, ClusterTimeline: clusterTimeline, LocalTimeline: localTimeline, dialRPC: client.DefaultDialRPC(config.CAFile, config.CertFile, config.KeyFile), @@ -246,6 +238,7 @@ func New(config *Config) (*agent, error) { cancel: cancel, g: g, } + agent.newSerfClientFunc = agent.newSerfClient agent.rpc, err = newRPCServer(agent, config.CAFile, config.CertFile, config.KeyFile, config.RPCAddrs) if err != nil { @@ -254,19 +247,6 @@ func New(config *Config) (*agent, error) { return agent, nil } -// initSerfClient initializes a new serf client and modifies the client with -// the provided tags. -func initSerfClient(config serf.Config, tags map[string]string) (*membership.RetryingClient, error) { - client, err := membership.NewSerfClient(config) - if err != nil { - return nil, trace.Wrap(err, "failed to connect to serf") - } - if err = client.UpdateTags(tags, nil); err != nil { - return nil, trace.Wrap(err, "failed to update serf agent tags") - } - return client, nil -} - // initTimeline initializes a new sqlite timeline. fileName specifies the // SQLite database file name. func initTimeline(config sqlite.Config, fileName string) (history.Timeline, error) { @@ -296,33 +276,41 @@ func (r *agent) Start() error { } // IsMember returns true if this agent is a member of the serf cluster -func (r *agent) IsMember() bool { - members, err := r.ClusterMembership.Members() +func (r *agent) IsMember() (ok bool, err error) { + client, err := r.newSerfClientFunc() if err != nil { - log.WithError(err).Warn("Failed to retrieve members.") - return false + return false, trace.Wrap(err) + } + defer client.Close() + members, err := client.Members() + if err != nil { + return false, trace.Wrap(err, "failed to retrieve members") } // if we're the only one, consider that we're not in the cluster yet // (cause more often than not there are more than 1 member) if len(members) == 1 && members[0].Name() == r.Name { - return false + return false, nil } for _, member := range members { if member.Name() == r.Name { - return true + return true, nil } } - return false + return false, nil } // Join attempts to join a serf cluster identified by peers. func (r *agent) Join(peers []string) error { + client, err := r.newSerfClientFunc() + if err != nil { + return trace.Wrap(err) + } + defer client.Close() noReplay := false - numJoined, err := r.ClusterMembership.Join(peers, noReplay) + numJoined, err := client.Join(peers, noReplay) if err != nil { return trace.Wrap(err) } - log.Infof("Joined %d nodes.", numJoined) return nil } @@ -335,9 +323,6 @@ func (r *agent) Close() (err error) { if err := r.g.Wait(); err != nil && !utils.IsContextCanceledError(err) { errors = append(errors, err) } - if err := r.ClusterMembership.Close(); err != nil { - errors = append(errors, err) - } return trace.NewAggregate(errors...) } @@ -557,34 +542,54 @@ func (r *agent) statusUpdateLoop(ctx context.Context) error { func (r *agent) updateStatus(ctx context.Context) error { ctxStatus, cancel := context.WithTimeout(ctx, r.statusQueryReplyTimeout) defer cancel() - status, err := r.collectStatus(ctxStatus) - if err != nil { - return trace.Wrap(err, "error collecting system status") - } - if status == nil { - return nil - } + status := r.collectStatus(ctxStatus) if err := r.Cache.UpdateStatus(status); err != nil { return trace.Wrap(err, "error updating system status in cache") } return nil } +func (r *agent) defaultUnknownStatus() *pb.NodeStatus { + return &pb.NodeStatus{ + Name: r.Name, + MemberStatus: &pb.MemberStatus{ + Name: r.Name, + }, + } +} + // collectStatus obtains the cluster status by querying statuses of // known cluster members. -func (r *agent) collectStatus(ctx context.Context) (systemStatus *pb.SystemStatus, err error) { +func (r *agent) collectStatus(ctx context.Context) *pb.SystemStatus { ctx, cancel := context.WithTimeout(ctx, StatusUpdateTimeout) defer cancel() - systemStatus = &pb.SystemStatus{ - Status: pb.SystemStatus_Unknown, - Timestamp: pb.NewTimeToProto(r.Clock.Now()), + client, err := r.newSerfClientFunc() + if err != nil { + log.WithError(err).Error("Failed to create serf client.") + r.setLocalStatus(r.defaultUnknownStatus()) + return &pb.SystemStatus{ + Status: pb.SystemStatus_Degraded, + Timestamp: pb.NewTimeToProto(r.Clock.Now()), + Summary: fmt.Sprintf("failed to create serf client: %v", err), + } } + defer client.Close() - members, err := r.ClusterMembership.Members() + members, err := client.Members() if err != nil { - log.WithError(err).Warn("Failed to query serf members.") - return nil, trace.Wrap(err, "failed to query serf members") + log.WithError(err).Error("Failed to query serf members.") + r.setLocalStatus(r.defaultUnknownStatus()) + return &pb.SystemStatus{ + Status: pb.SystemStatus_Degraded, + Timestamp: pb.NewTimeToProto(r.Clock.Now()), + Summary: fmt.Sprintf("failed to query serf members: %v", err), + } + } + + systemStatus := &pb.SystemStatus{ + Status: pb.SystemStatus_Unknown, + Timestamp: pb.NewTimeToProto(r.Clock.Now()), } log.Debugf("Started collecting statuses from members %v.", members) @@ -595,7 +600,7 @@ func (r *agent) collectStatus(ctx context.Context) (systemStatus *pb.SystemStatu statusCh := make(chan *statusResponse, len(members)) for _, member := range members { if r.Name == member.Name() { - go r.getLocalStatus(ctxNode, statusCh) + go r.getLocalStatus(ctxNode, statusCh, client) } else { go r.getStatusFrom(ctxNode, member, statusCh) } @@ -623,12 +628,12 @@ L: setSystemStatus(systemStatus, members) - return systemStatus, nil + return systemStatus } // collectLocalStatus executes monitoring tests on the local node. -func (r *agent) collectLocalStatus(ctx context.Context) (status *pb.NodeStatus, err error) { - local, err := r.ClusterMembership.FindMember(r.Name) +func (r *agent) collectLocalStatus(ctx context.Context, client membership.ClusterMembership) (status *pb.NodeStatus, err error) { + local, err := client.FindMember(r.Name) if err != nil { return nil, trace.Wrap(err, "failed to query local serf member") } @@ -646,7 +651,7 @@ func (r *agent) collectLocalStatus(ctx context.Context) (status *pb.NodeStatus, return status, trace.Wrap(err, "failed to record local timeline events") } - if err := r.notifyMasters(ctx); err != nil { + if err := r.notifyMasters(ctx, client); err != nil { return status, trace.Wrap(err, "failed to notify master nodes of local timeline events") } @@ -654,15 +659,14 @@ func (r *agent) collectLocalStatus(ctx context.Context) (status *pb.NodeStatus, } // getLocalStatus obtains local node status. -func (r *agent) getLocalStatus(ctx context.Context, respc chan<- *statusResponse) { +func (r *agent) getLocalStatus(ctx context.Context, respc chan<- *statusResponse, client membership.ClusterMembership) { // TODO: restructure code so that local member is not needed here. - local, err := r.ClusterMembership.FindMember(r.Name) + local, err := client.FindMember(r.Name) if err != nil { respc <- &statusResponse{err: err} return } - - status, err := r.collectLocalStatus(ctx) + status, err := r.collectLocalStatus(ctx, client) resp := &statusResponse{ NodeStatus: status, member: local, @@ -675,8 +679,8 @@ func (r *agent) getLocalStatus(ctx context.Context, respc chan<- *statusResponse } // notifyMasters pushes new timeline events to all master nodes in the cluster. -func (r *agent) notifyMasters(ctx context.Context) error { - members, err := r.ClusterMembership.Members() +func (r *agent) notifyMasters(ctx context.Context, client membership.ClusterMembership) error { + members, err := client.Members() if err != nil { return trace.Wrap(err) } @@ -762,6 +766,24 @@ func (r *agent) recentLocalStatus() *pb.NodeStatus { return r.localStatus } +func (r *agent) setLocalStatus(status *pb.NodeStatus) { + r.Lock() + defer r.Unlock() + r.localStatus = status +} + +// newSerfClient creates a new instance of the serf client. +// +// It is responsibility of the caller to close the returned client. +func (r *agent) newSerfClient() (membership.ClusterMembership, error) { + client, err := membership.NewSerfClient(r.Config.SerfConfig) + if err != nil { + return nil, trace.Wrap(err, "failed to connect to serf agent: %#v", + r.Config.SerfConfig) + } + return client, nil +} + // filterByTimestamp filters out events that occurred before the provided // timestamp. func filterByTimestamp(events []*pb.TimelineEvent, timestamp time.Time) (filtered []*pb.TimelineEvent) { diff --git a/vendor/github.com/gravitational/satellite/agent/health/health.go b/vendor/github.com/gravitational/satellite/agent/health/health.go index 2b308a222b..2bcfab9eb9 100644 --- a/vendor/github.com/gravitational/satellite/agent/health/health.go +++ b/vendor/github.com/gravitational/satellite/agent/health/health.go @@ -106,3 +106,11 @@ func (r Probes) Status() pb.NodeStatus_Type { } return result } + +// ByDetail implements sort.Interface. +// Enables probes to be sorted by detail. +type ByDetail Probes + +func (r ByDetail) Len() int { return len(r) } +func (r ByDetail) Swap(i, j int) { r[i], r[j] = r[j], r[i] } +func (r ByDetail) Less(i, j int) bool { return r[i].Detail < r[j].Detail } diff --git a/vendor/github.com/gravitational/satellite/agent/server.go b/vendor/github.com/gravitational/satellite/agent/server.go index 795857edcf..11a180153d 100644 --- a/vendor/github.com/gravitational/satellite/agent/server.go +++ b/vendor/github.com/gravitational/satellite/agent/server.go @@ -365,7 +365,7 @@ type Agent interface { // RecordLocalEvents records the events into the local timeline. RecordLocalEvents(ctx context.Context, events []*pb.TimelineEvent) error // IsMember returns whether this agent is already a member of serf cluster - IsMember() bool + IsMember() (ok bool, err error) // GetConfig returns the agent configuration. GetConfig() Config // CheckerRepository allows to add checks to the agent. diff --git a/vendor/github.com/gravitational/satellite/lib/kubernetes/constants.go b/vendor/github.com/gravitational/satellite/lib/kubernetes/constants.go index b5dd846944..f1d73baaae 100644 --- a/vendor/github.com/gravitational/satellite/lib/kubernetes/constants.go +++ b/vendor/github.com/gravitational/satellite/lib/kubernetes/constants.go @@ -16,5 +16,7 @@ limitations under the License. package kubernetes -// AllNamespaces can be used to query pods in all namespaces. -const AllNamespaces = "" +const ( + // AllNamespaces can be used to query pods in all namespaces. + AllNamespaces = "" +) diff --git a/vendor/github.com/gravitational/satellite/lib/membership/serf.go b/vendor/github.com/gravitational/satellite/lib/membership/serf.go index 4397e53930..bcd29cc042 100644 --- a/vendor/github.com/gravitational/satellite/lib/membership/serf.go +++ b/vendor/github.com/gravitational/satellite/lib/membership/serf.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "net" - "sync" "github.com/gravitational/satellite/lib/rpc" "github.com/gravitational/satellite/lib/rpc/client" @@ -30,35 +29,24 @@ import ( "github.com/hashicorp/serf/coordinate" ) -// RetryingClient is an rpc client used to make requests to a serf agent. -// Attempts to reconnect to agent if connection is lost. -type RetryingClient struct { - sync.RWMutex +// Client is an rpc client used to make requests to a serf agent. +type Client struct { client *serf.RPCClient - config serf.Config } // NewSerfClient returns a new serf client for the specified configuration. -// The client will attempt to reconnect if it detects that the connection to the -// serf agent has been lost. -func NewSerfClient(config serf.Config) (*RetryingClient, error) { - client, err := reinit(config) +func NewSerfClient(config serf.Config) (*Client, error) { + client, err := serf.ClientFromConfig(&config) if err != nil { return nil, trace.Wrap(err) } - return &RetryingClient{ + return &Client{ client: client, - config: config, }, nil } // Members lists members of the serf cluster. -func (r *RetryingClient) Members() ([]ClusterMember, error) { - if err := r.reinit(); err != nil { - return nil, trace.Wrap(err) - } - r.RLock() - defer r.RUnlock() +func (r *Client) Members() ([]ClusterMember, error) { members, err := r.client.Members() if err != nil { return nil, trace.Wrap(err) @@ -77,7 +65,7 @@ func (r *RetryingClient) Members() ([]ClusterMember, error) { } // FindMember finds serf member with the specified name. -func (r *RetryingClient) FindMember(name string) (member ClusterMember, err error) { +func (r *Client) FindMember(name string) (member ClusterMember, err error) { members, err := r.Members() if err != nil { return member, trace.Wrap(err) @@ -91,41 +79,24 @@ func (r *RetryingClient) FindMember(name string) (member ClusterMember, err erro } // Stop cancels the serf event delivery and removes the subscription. -func (r *RetryingClient) Stop(handle serf.StreamHandle) error { - if err := r.reinit(); err != nil { - return trace.Wrap(err) - } - r.RLock() - defer r.RUnlock() +func (r *Client) Stop(handle serf.StreamHandle) error { return r.client.Stop(handle) } // Join attempts to join an existing serf cluster identified by peers. // Replay controls if previous user events are replayed once this node has joined the cluster. // Returns the number of nodes joined -func (r *RetryingClient) Join(peers []string, replay bool) (int, error) { - if err := r.reinit(); err != nil { - return 0, trace.Wrap(err) - } - r.RLock() - defer r.RUnlock() +func (r *Client) Join(peers []string, replay bool) (int, error) { return r.client.Join(peers, replay) } // UpdateTags will modify the tags on a running serf agent -func (r *RetryingClient) UpdateTags(tags map[string]string, delTags []string) error { - if err := r.reinit(); err != nil { - return trace.Wrap(err) - } - r.RLock() - defer r.RUnlock() +func (r *Client) UpdateTags(tags map[string]string, delTags []string) error { return r.client.UpdateTags(tags, delTags) } // Close closes the client -func (r *RetryingClient) Close() error { - r.RLock() - defer r.RUnlock() +func (r *Client) Close() error { if r.client.IsClosed() { return nil } @@ -133,38 +104,10 @@ func (r *RetryingClient) Close() error { } // GetCoordinate returns the Serf Coordinate for a specific node -func (r *RetryingClient) GetCoordinate(node string) (*coordinate.Coordinate, error) { - if err := r.reinit(); err != nil { - return nil, trace.Wrap(err) - } - r.RLock() - defer r.RUnlock() +func (r *Client) GetCoordinate(node string) (*coordinate.Coordinate, error) { return r.client.GetCoordinate(node) } -func (r *RetryingClient) reinit() (err error) { - r.Lock() - defer r.Unlock() - client := r.client - if !client.IsClosed() { - return nil - } - client, err = reinit(r.config) - if err != nil { - return trace.Wrap(err) - } - r.client = client - return nil -} - -func reinit(clientConfig serf.Config) (*serf.RPCClient, error) { - client, err := serf.ClientFromConfig(&clientConfig) - if err != nil { - return nil, trace.Wrap(err) - } - return client, nil -} - // filterLeft filters out members that have left the serf cluster func filterLeft(members []serf.Member) (result []serf.Member) { result = make([]serf.Member, 0, len(members)) @@ -185,7 +128,6 @@ type SerfMember struct { // Dial attempts to create client connection to the serf member. func (r SerfMember) Dial(ctx context.Context, caFile, certFile, keyFile string) (client.Client, error) { - config := client.Config{ Address: fmt.Sprintf("%s:%d", r.Member.Addr.String(), rpc.Port), CAFile: caFile, diff --git a/vendor/github.com/gravitational/satellite/lib/nethealth/client.go b/vendor/github.com/gravitational/satellite/lib/nethealth/client.go new file mode 100644 index 0000000000..5291dd180f --- /dev/null +++ b/vendor/github.com/gravitational/satellite/lib/nethealth/client.go @@ -0,0 +1,146 @@ +/* +Copyright 2020 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nethealth + +import ( + "bytes" + "context" + "io/ioutil" + "net" + "net/http" + + "github.com/gravitational/trace" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "github.com/sirupsen/logrus" +) + +// Client provides nethealth client interface. Client can be used to query +// exposed nethealth metrics. +type Client struct { + // socketPath specifies nethealth socket path. + socketPath string + // FieldLogger is used for logging. + logrus.FieldLogger +} + +// NewClient constructs a new Client with the provided socket path. +func NewClient(socketPath string) *Client { + return &Client{ + socketPath: socketPath, + FieldLogger: logrus.WithField(trace.Component, "nethealth-client"), + } +} + +// LatencySummariesMilli returns the latency summary for each peer. The latency +// values represent milliseconds. +func (r *Client) LatencySummariesMilli(ctx context.Context) (map[string]*dto.Summary, error) { + const labelLatencySummary = "nethealth_echo_latency_summary_milli" + + metrics, err := r.metrics(ctx) + if err != nil { + return nil, trace.Wrap(err, "failed to retrieve metrics") + } + + summaries, err := parseSummaries(metrics, labelLatencySummary) + if err != nil { + return nil, trace.Wrap(err, "failed to parse latency summaries") + } + + return summaries, nil +} + +// parseSummaries parses the metrics and returns the summaries for the specified +// label. Returns NotFound if the label does not exist. +func parseSummaries(metricFamilies map[string]*dto.MetricFamily, label string) (map[string]*dto.Summary, error) { + metricFamily, ok := metricFamilies[label] + if !ok { + return nil, trace.NotFound("%s metrics not found", label) + } + + summaries := make(map[string]*dto.Summary) + for _, m := range metricFamily.GetMetric() { + peerName, err := getPeerName(m.GetLabel()) + if err != nil { + logrus.WithError(err).Warn("Failed to get peer name.") + continue + } + summaries[peerName] = m.GetSummary() + } + + return summaries, nil +} + +// metrics fetches the nethealth metrics and returns the metric families. +func (r *Client) metrics(ctx context.Context) (res map[string]*dto.MetricFamily, err error) { + var dialer net.Dialer + client := http.Client{ + Transport: &http.Transport{ + DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { + return dialer.DialContext(ctx, "unix", r.socketPath) + }, + }, + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://unix/metrics", nil) + if err != nil { + return nil, trace.Wrap(err) + } + + resp, err := client.Do(req) + if err != nil { + return nil, trace.Wrap(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, trace.BadParameter("unexpected response from %s: %v", r.socketPath, resp.Status) + } + + buffer, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, trace.ConvertSystemError(err) + } + + metricFamilies, err := parseMetrics(buffer) + if err != nil { + r.WithField("nethealth-metrics", string(buffer)).Debug("Failed to parse nethealth metrics.") + return nil, trace.Wrap(err, "failed to parse nethealth metrics") + } + + return metricFamilies, nil +} + +// parseMetrics parses the metrics and returns the metric families. +func parseMetrics(metrics []byte) (map[string]*dto.MetricFamily, error) { + var parser expfmt.TextParser + metricFamilies, err := parser.TextToMetricFamilies(bytes.NewReader(metrics)) + if err != nil { + return nil, trace.Wrap(err, "failed to parse text to MetricFamilies") + } + return metricFamilies, nil +} + +// getPeerName extracts the 'peer_name' value from the provided labels. +func getPeerName(labels []*dto.LabelPair) (peer string, err error) { + for _, label := range labels { + if LabelPeerName == label.GetName() { + return label.GetValue(), nil + } + } + return "", trace.NotFound("unable to find %s label", LabelPeerName) +} diff --git a/vendor/github.com/gravitational/satellite/lib/nethealth/client_mock.go b/vendor/github.com/gravitational/satellite/lib/nethealth/client_mock.go new file mode 100644 index 0000000000..6a82fd7401 --- /dev/null +++ b/vendor/github.com/gravitational/satellite/lib/nethealth/client_mock.go @@ -0,0 +1,57 @@ +/* +Copyright 2020 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nethealth + +import ( + "context" + + "github.com/gravitational/trace" + dto "github.com/prometheus/client_model/go" +) + +// MockClient is a mock implementation of the nethealth Client. Instead of +// reading metrics from a live Prometheus metrics endpoint, metrics are +// pre-loaded and stored in the MockClient. +type MockClient struct { + // textMetrics stores the current text formatted metrics. + textMetrics string +} + +// NewMockClient constructs a new MockClient with the provided metrics. +func NewMockClient(metrics string) *MockClient { + return &MockClient{ + textMetrics: metrics, + } +} + +// LatencySummariesMilli returns the latency summary for each peer. The latency +// values represent milliseconds. +func (r *MockClient) LatencySummariesMilli(_ context.Context) (map[string]*dto.Summary, error) { + const labelLatencySummary = "nethealth_echo_latency_summary_milli" + + metricFamilies, err := parseMetrics([]byte(r.textMetrics)) + if err != nil { + return nil, trace.Wrap(err, "failed to parse metrics") + } + + summaries, err := parseSummaries(metricFamilies, labelLatencySummary) + if err != nil { + return nil, trace.Wrap(err, "failed to parse prometheus summaries") + } + + return summaries, nil +} diff --git a/vendor/github.com/gravitational/satellite/lib/nethealth/nethealth.go b/vendor/github.com/gravitational/satellite/lib/nethealth/nethealth.go new file mode 100644 index 0000000000..a73c3f127d --- /dev/null +++ b/vendor/github.com/gravitational/satellite/lib/nethealth/nethealth.go @@ -0,0 +1,661 @@ +/* +Copyright 2019 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package nethealth implements a daemonset that when deployed to a kubernetes cluster, will locate and send ICMP echos +// (pings) to the nethealth pod on every other node in the cluster. This will give an indication into whether the +// overlay network is functional for pod -> pod communications, and also record packet loss on the network. +package nethealth + +import ( + "fmt" + "net" + "net/http" + "os" + "reflect" + "sort" + "time" + + "golang.org/x/net/icmp" + "golang.org/x/net/ipv4" + + "github.com/gravitational/trace" + "github.com/jonboulle/clockwork" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +const ( + // heartbeatInterval is the duration between sending heartbeats to each peer. Any heartbeat that takes more + // than one interval to respond will also be considered timed out. + heartbeatInterval = 1 * time.Second + + // resyncInterval is the duration between full resyncs of local state with kubernetes. If a node is deleted it + // may not be detected until the full resync completes. + resyncInterval = 15 * time.Minute + + // dnsDiscoveryInterval is the duration of time for doing DNS based service discovery for pod changes. This is a + // lightweight test for whether there is a change to the nethealth pods within the cluster. + dnsDiscoveryInterval = 10 * time.Second + + // Default selector to use for finding nethealth pods + DefaultSelector = "k8s-app=nethealth" + + // DefaultServiceDiscoveryQuery is the default name to query for service discovery changes + DefaultServiceDiscoveryQuery = "any.nethealth" + + // RxQueueSize is the size of queued ping responses to process + // Main processing occurs in a single goroutine, so we need a large enough processing queue to hold onto all ping + // responses while the routine is working on other operations. + // 2000 is chosen as double the maximum supported cluster size (1k) + RxQueueSize = 2000 + + // DefaultNethealthSocket is the default location of a unix domain socket that contains the prometheus metrics + DefaultNethealthSocket = "/run/nethealth/nethealth.sock" + + // LabelNodeName specifies metrics label mapped to node name + LabelNodeName = "node_name" + // LabelPeerName specifies metrics label mapped to peer node name + LabelPeerName = "peer_name" +) + +const ( + // Init is peer state that we've found the node but don't know anything about it yet. + Init = "init" + // Up is a peer state that the peer is currently reachable + Up = "up" + // Timeout is a peer state that the peer is currently timing out to pings + Timeout = "timeout" +) + +type Config struct { + // PrometheusSocket is the path to a unix socket that can be used to retrieve the prometheus metrics + PrometheusSocket string + + // PrometheusPort is the port to bind to for serving prometheus metrics + PrometheusPort uint32 + + // Namespace is the kubernetes namespace to monitor for other nethealth instances + Namespace string + // NodeName is the node this instance is running on + NodeName string + // Selector is a kubernetes selector to find all the nethealth pods in the configured namespace + Selector string + // ServiceDiscoveryQuery is a DNS name that will be used for lightweight service discovery checks. A query to + // any..default.svc.cluster.local will return a list of pods for the service. If the list of pods + // changes we know to resync with the kubernetes API. This method uses significantly less resources than running a + // kubernetes watcher on the API. Defaults to any.nethealth which will utilize the search path from resolv.conf. + ServiceDiscoveryQuery string +} + +// New creates a new server to ping each peer. +func (c Config) New() (*Server, error) { + promPeerRTT := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "nethealth", + Subsystem: "echo", + Name: "duration_seconds", + Help: "The round trip time to reach the peer", + Buckets: []float64{ + 0.0001, // 0.1 ms + 0.0002, // 0.2 ms + 0.0003, // 0.3 ms + 0.0004, // 0.4 ms + 0.0005, // 0.5 ms + 0.0006, // 0.6 ms + 0.0007, // 0.7 ms + 0.0008, // 0.8 ms + 0.0009, // 0.9 ms + 0.001, // 1ms + 0.0015, // 1.5ms + 0.002, // 2ms + 0.003, // 3ms + 0.004, // 4ms + 0.005, // 5ms + 0.01, // 10ms + 0.02, // 20ms + 0.04, // 40ms + 0.08, // 80ms + }, + }, []string{LabelNodeName, LabelPeerName}) + promPeerRTTSummary := prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "nethealth", + Subsystem: "echo", + Name: "latency_summary_milli", + Help: "The round trip time between peers in milliseconds", + MaxAge: 30 * time.Second, + AgeBuckets: 5, + Objectives: map[float64]float64{ + 0.1: 0.09, // 10th percentile + 0.2: 0.08, // ... + 0.3: 0.07, + 0.4: 0.06, + 0.5: 0.05, + 0.6: 0.04, + 0.7: 0.03, + 0.8: 0.02, + 0.9: 0.01, + 0.95: 0.005, + 0.99: 0.001, // 99th percentile + }, + }, []string{LabelNodeName, LabelPeerName}) + promPeerTimeout := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "nethealth", + Subsystem: "echo", + Name: "timeout_total", + Help: "The number of echo requests that have timed out", + }, []string{LabelNodeName, LabelPeerName}) + promPeerRequest := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "nethealth", + Subsystem: "echo", + Name: "request_total", + Help: "The number of echo requests that have been sent", + }, []string{LabelNodeName, LabelPeerName}) + + prometheus.MustRegister( + promPeerRTT, + promPeerRTTSummary, + promPeerTimeout, + promPeerRequest, + ) + + selector := DefaultSelector + if c.Selector != "" { + selector = c.Selector + } + + labelSelector, err := labels.Parse(selector) + if err != nil { + return nil, trace.Wrap(err) + } + + if c.ServiceDiscoveryQuery == "" { + c.ServiceDiscoveryQuery = DefaultServiceDiscoveryQuery + } + + return &Server{ + config: c, + FieldLogger: logrus.WithField(trace.Component, "nethealth"), + promPeerRTT: promPeerRTT, + promPeerRTTSummary: promPeerRTTSummary, + promPeerTimeout: promPeerTimeout, + promPeerRequest: promPeerRequest, + selector: labelSelector, + triggerResync: make(chan bool, 1), + rxMessage: make(chan messageWrapper, RxQueueSize), + peers: make(map[string]*peer), + addrToPeer: make(map[string]string), + }, nil +} + +// Server is an instance of nethealth that is running on each node responsible for sending and responding to heartbeats. +type Server struct { + logrus.FieldLogger + + config Config + clock clockwork.Clock + conn *icmp.PacketConn + httpServer *http.Server + selector labels.Selector + + // rxMessage is a processing queue of received echo responses + rxMessage chan messageWrapper + triggerResync chan bool + + peers map[string]*peer + addrToPeer map[string]string + + client kubernetes.Interface + + promPeerRTT *prometheus.HistogramVec + promPeerRTTSummary *prometheus.SummaryVec + promPeerTimeout *prometheus.CounterVec + promPeerRequest *prometheus.CounterVec +} + +type peer struct { + name string + addr net.Addr + echoCounter int + echoTime time.Time + echoTimeout bool + + status string + lastStatusChange time.Time +} + +type messageWrapper struct { + message *icmp.Message + rxTime time.Time + peerAddr net.Addr +} + +// Start sets up the server and begins normal operation +func (s *Server) Start() error { + config, err := rest.InClusterConfig() + if err != nil { + return trace.Wrap(err) + } + s.client, err = kubernetes.NewForConfig(config) + if err != nil { + return trace.Wrap(err) + } + + s.conn, err = icmp.ListenPacket("ip4:icmp", "0.0.0.0") + if err != nil { + return trace.Wrap(err) + } + + s.clock = clockwork.NewRealClock() + go s.loop() + go s.loopServiceDiscovery() + go s.serve() + + mux := http.ServeMux{} + mux.Handle("/metrics", promhttp.Handler()) + s.httpServer = &http.Server{Addr: fmt.Sprint(":", s.config.PrometheusPort), Handler: &mux} + go func() { + if err := s.httpServer.ListenAndServe(); err != http.ErrServerClosed { + s.Fatalf("ListenAndServe(): %s", err) + } + }() + + if s.config.PrometheusSocket != "" { + _ = os.Remove(s.config.PrometheusSocket) + + unixListener, err := net.Listen("unix", s.config.PrometheusSocket) + if err != nil { + return trace.Wrap(err) + } + + go func() { + if err := s.httpServer.Serve(unixListener); err != http.ErrServerClosed { + s.Fatalf("Unix Listen(): %s", err) + } + }() + } + + s.Info("Started nethealth with config:") + s.Info(" PrometheusSocket: ", s.config.PrometheusSocket) + s.Info(" PrometheusPort: ", s.config.PrometheusPort) + s.Info(" Namespace: ", s.config.Namespace) + s.Info(" NodeName: ", s.config.NodeName) + s.Info(" Selector: ", s.selector) + s.Info(" ServiceDiscoveryQuery: ", s.config.ServiceDiscoveryQuery) + + return nil +} + +// loop is the main processing loop for sending/receiving heartbeats. +func (s *Server) loop() { + heartbeatTicker := s.clock.NewTicker(heartbeatInterval) + defer heartbeatTicker.Stop() + + resyncTicker := s.clock.NewTicker(resyncInterval) + defer resyncTicker.Stop() + + for { + select { + // + // Re-sync cluster peers + // + case <-resyncTicker.Chan(): + err := s.resyncPeerList() + if err != nil { + s.WithError(err).Error("Unexpected error re-syncing the list of peer nodes.") + } + + err = s.resyncNethealthPods() + if err != nil { + s.WithError(err).Error("Unexpected error re-syncing the list of peer pods.") + } + case <-s.triggerResync: + err := s.resyncPeerList() + if err != nil { + s.WithError(err).Error("Unexpected error re-syncing the list of peer nodes.") + } + + err = s.resyncNethealthPods() + if err != nil { + s.WithError(err).Error("Unexpected error re-syncing the list of peer pods.") + } + + // + // Send a heartbeat to each peer we know about + // Check for peers that are timing out / down + // + case <-heartbeatTicker.Chan(): + s.checkTimeouts() + for _, peer := range s.peers { + s.sendHeartbeat(peer) + } + + // + // Rx heartbeats responses from peers + // + case rx := <-s.rxMessage: + err := s.processAck(rx) + if err != nil { + s.WithFields(logrus.Fields{ + logrus.ErrorKey: err, + "peer_addr": rx.peerAddr, + "rx_time": rx.rxTime, + "message": rx.message, + }).Error("Error processing icmp message.") + } + } + } +} + +// loopServiceDiscovery uses cluster-dns service discovery as a lightweight check for pod changes +// and will trigger a resync if the cluster DNS service discovery changes +func (s *Server) loopServiceDiscovery() { + s.Info("Starting DNS service discovery for nethealth pod.") + ticker := s.clock.NewTicker(dnsDiscoveryInterval) + defer ticker.Stop() + query := s.config.ServiceDiscoveryQuery + + previousNames := []string{} + + for { + <-ticker.Chan() + + s.Debugf("Querying %v for service discovery", query) + names, err := net.LookupHost(query) + if err != nil { + s.WithError(err).WithField("query", query).Error("Error querying service discovery.") + continue + } + + sort.Strings(names) + if reflect.DeepEqual(names, previousNames) { + continue + } + previousNames = names + s.Info("Triggering peer resync due to service discovery change") + + select { + case s.triggerResync <- true: + default: + // Don't block + } + } +} + +// resyncPeerList contacts the kubernetes API to sync the list of kubernetes nodes +func (s *Server) resyncPeerList() error { + nodes, err := s.client.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + return trace.Wrap(err) + } + + peerMap := make(map[string]bool) + for _, node := range nodes.Items { + // Don't add our own node as a peer + if node.Name == s.config.NodeName { + continue + } + + peerMap[node.Name] = true + if _, ok := s.peers[node.Name]; !ok { + s.peers[node.Name] = &peer{ + name: node.Name, + lastStatusChange: s.clock.Now(), + addr: &net.IPAddr{}, + } + s.WithField("peer", node.Name).Info("Adding peer.") + // Initialize the peer so it shows up in prometheus with a 0 count + s.promPeerTimeout.WithLabelValues(s.config.NodeName, node.Name).Add(0) + s.promPeerRequest.WithLabelValues(s.config.NodeName, node.Name).Add(0) + } + } + + // check for peers that have been deleted + for key := range s.peers { + if _, ok := peerMap[key]; !ok { + s.WithField("peer", key).Info("Deleting peer.") + delete(s.peers, key) + s.promPeerRTT.DeleteLabelValues(s.config.NodeName, key) + s.promPeerRTTSummary.DeleteLabelValues(s.config.NodeName, key) + s.promPeerRequest.DeleteLabelValues(s.config.NodeName, key) + s.promPeerTimeout.DeleteLabelValues(s.config.NodeName, key) + } + } + + return nil +} + +// resyncNethealthPods contacts the kubernetes API to sync the list of pods running the nethealth daemon +func (s *Server) resyncNethealthPods() error { + list, err := s.client.CoreV1().Pods(s.config.Namespace).List(metav1.ListOptions{ + LabelSelector: s.selector.String(), + }) + if err != nil { + return trace.Wrap(err) + } + + for _, pod := range list.Items { + // skip our own pod + if pod.Spec.NodeName == s.config.NodeName { + continue + } + + // skip if the peer object can't be located + if peer, ok := s.peers[pod.Spec.NodeName]; !ok { + continue + } else { + newAddr := &net.IPAddr{ + IP: net.ParseIP(pod.Status.PodIP), + } + + if peer.addr.String() != newAddr.String() { + s.WithFields(logrus.Fields{ + "peer": peer.name, + "new_peer_addr": newAddr, + "old_peer_addr": peer.addr, + }).Info("Updating peer pod IP address.") + peer.addr = newAddr + s.addrToPeer[peer.addr.String()] = pod.Spec.NodeName + } + } + } + + // Free entries in the lookup table that no longer point to a valid object + for key, value := range s.addrToPeer { + if _, ok := s.peers[value]; !ok { + delete(s.addrToPeer, key) + } + } + + return nil +} + +// serve monitors for incoming icmp messages +func (s *Server) serve() { + buf := make([]byte, 256) + + for { + n, peerAddr, err := s.conn.ReadFrom(buf) + rxTime := s.clock.Now() + log := s.WithFields(logrus.Fields{ + "peer_addr": peerAddr, + "node": s.config.NodeName, + "length": n, + }) + if err != nil { + log.WithError(err).Error("Error in udp socket read.") + continue + } + + // The ICMP package doesn't export the protocol numbers + // 1 - ICMP + // 58 - ICMPv6 + // https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml + msg, err := icmp.ParseMessage(1, buf[:n]) + if err != nil { + log.WithError(err).Error("Error parsing icmp message.") + continue + } + + select { + case s.rxMessage <- messageWrapper{ + message: msg, + rxTime: rxTime, + peerAddr: peerAddr, + }: + default: + // Don't block + log.Warn("Dropped icmp message due to full rxMessage queue") + } + } +} + +func (s *Server) lookupPeer(addr string) (*peer, error) { + peerName, ok := s.addrToPeer[addr] + if !ok { + return nil, trace.BadParameter("address not found in address table").AddField("address", addr) + } + + p, ok := s.peers[peerName] + if !ok { + return nil, trace.BadParameter("peer not found in peer table").AddField("peer_name", peerName) + } + return p, nil +} + +// processAck processes a received ICMP Ack message +func (s *Server) processAck(e messageWrapper) error { + switch e.message.Type { + case ipv4.ICMPTypeEchoReply: + // ok + case ipv4.ICMPTypeEcho: + // nothing to do with echo requests + return nil + default: + //unexpected / unknown + return trace.BadParameter("received unexpected icmp message type").AddField("type", e.message.Type) + } + + switch pkt := e.message.Body.(type) { + case *icmp.Echo: + peer, err := s.lookupPeer(e.peerAddr.String()) + if err != nil { + return trace.Wrap(err) + } + if uint16(pkt.Seq) != uint16(peer.echoCounter) { + return trace.BadParameter("response sequence doesn't match latest request."). + AddField("expected", uint16(peer.echoCounter)). + AddField("received", uint16(pkt.Seq)) + } + + rtt := e.rxTime.Sub(peer.echoTime) + s.promPeerRTT.WithLabelValues(s.config.NodeName, peer.name).Observe(rtt.Seconds()) + s.promPeerRTTSummary.WithLabelValues(s.config.NodeName, peer.name).Observe(float64(rtt.Milliseconds())) + s.updatePeerStatus(peer, Up) + peer.echoTimeout = false + + s.WithFields(logrus.Fields{ + "peer_name": peer.name, + "peer_addr": peer.addr, + "counter": peer.echoCounter, + "seq": uint16(peer.echoCounter), + "rtt": rtt, + }).Debug("Ack.") + default: + s.WithFields(logrus.Fields{ + "peer_addr": e.peerAddr.String(), + }).Warn("Unexpected icmp message") + } + return nil +} + +func (s *Server) sendHeartbeat(peer *peer) { + peer.echoCounter++ + log := s.WithFields(logrus.Fields{ + "peer_name": peer.name, + "peer_addr": peer.addr, + "id": peer.echoCounter, + }) + + // If we don't know the pod IP address of the peer, we still want to generate a timeout, but not actually send + // a heartbeat + peer.echoTimeout = true + if peer.addr == nil || peer.addr.String() == "" || peer.addr.String() == "0.0.0.0" { + return + } + + msg := icmp.Message{ + Type: ipv4.ICMPTypeEcho, + Code: 0, + Body: &icmp.Echo{ + ID: 1, + Seq: peer.echoCounter, + }, + } + buf, err := msg.Marshal(nil) + if err != nil { + log.WithError(err).Warn("Failed to marshal ping.") + return + } + + peer.echoTime = s.clock.Now() + _, err = s.conn.WriteTo(buf, peer.addr) + if err != nil { + log.WithError(err).Warn("Failed to send ping.") + return + } + s.promPeerRequest.WithLabelValues(s.config.NodeName, peer.name).Inc() + + log.Debug("Sent echo request.") +} + +// checkTimeouts iterates over each peer, and checks whether our last heartbeat has timed out +func (s *Server) checkTimeouts() { + s.Debug("checking for timeouts") + for _, peer := range s.peers { + // if the echoTimeout flag is set, it means we didn't receive a response to our last request + if peer.echoTimeout { + s.WithFields(logrus.Fields{ + "peer_name": peer.name, + "peer_addr": peer.addr, + "id": peer.echoCounter, + }).Debug("echo timeout") + s.promPeerTimeout.WithLabelValues(s.config.NodeName, peer.name).Inc() + s.updatePeerStatus(peer, Timeout) + } + } +} + +func (s *Server) updatePeerStatus(peer *peer, status string) { + if peer.status == status { + return + } + + s.WithFields(logrus.Fields{ + "peer_name": peer.name, + "peer_addr": peer.addr, + "duration": s.clock.Now().Sub(peer.lastStatusChange), + "old_status": peer.status, + "new_status": status, + }).Info("Peer status changed.") + + peer.status = status + peer.lastStatusChange = s.clock.Now() + +} diff --git a/vendor/github.com/gravitational/satellite/monitoring/checkers.go b/vendor/github.com/gravitational/satellite/monitoring/checkers.go index 63da1d8f02..a2b5c4d5e3 100644 --- a/vendor/github.com/gravitational/satellite/monitoring/checkers.go +++ b/vendor/github.com/gravitational/satellite/monitoring/checkers.go @@ -59,19 +59,6 @@ func NodesStatusHealth(config KubeConfig, nodesReadyThreshold int) health.Checke return NewNodesStatusChecker(config, nodesReadyThreshold) } -// PingHealth creates a checker that monitors ping values between Master nodes -// and other nodes -func PingHealth(serfRPCAddr, serfMemberName string) (c health.Checker, err error) { - c, err = NewPingChecker(PingCheckerConfig{ - SerfRPCAddr: serfRPCAddr, - SerfMemberName: serfMemberName, - }) - if err != nil { - return nil, trace.Wrap(err) - } - return c, nil -} - // TimeDriftHealth creates a checker that monitors time difference between cluster nodes. func TimeDriftHealth(config TimeDriftCheckerConfig) (c health.Checker, err error) { c, err = NewTimeDriftChecker(config) diff --git a/vendor/github.com/gravitational/satellite/monitoring/defaults_linux.go b/vendor/github.com/gravitational/satellite/monitoring/defaults_linux.go index 05a38b2834..7306dafbfd 100644 --- a/vendor/github.com/gravitational/satellite/monitoring/defaults_linux.go +++ b/vendor/github.com/gravitational/satellite/monitoring/defaults_linux.go @@ -113,12 +113,6 @@ func DefaultBootConfigParams() health.Checker { BootConfigParam{Name: "CONFIG_VETH"}, BootConfigParam{Name: "CONFIG_BRIDGE"}, BootConfigParam{Name: "CONFIG_BRIDGE_NETFILTER"}, - BootConfigParam{ - // https://cateee.net/lkddb/web-lkddb/NF_NAT_IPV4.html - // CONFIG_NF_NAT_IPV4 has been removed as of kernel 5.1 - Name: "CONFIG_NF_NAT_IPV4", - KernelConstraint: KernelVersionLessThan(KernelVersion{Release: 5, Major: 1}), - }, BootConfigParam{Name: "CONFIG_IP_NF_FILTER"}, BootConfigParam{Name: "CONFIG_IP_NF_TARGET_MASQUERADE"}, BootConfigParam{Name: "CONFIG_NETFILTER_XT_MATCH_ADDRTYPE"}, diff --git a/vendor/github.com/gravitational/satellite/monitoring/nethealth.go b/vendor/github.com/gravitational/satellite/monitoring/nethealth.go index 8dc7828ea5..5d85325b39 100644 --- a/vendor/github.com/gravitational/satellite/monitoring/nethealth.go +++ b/vendor/github.com/gravitational/satellite/monitoring/nethealth.go @@ -20,23 +20,24 @@ import ( "bytes" "context" "fmt" + "io/ioutil" "math" - "net/url" + "net" + "net/http" "sync" "time" "github.com/gravitational/satellite/agent" "github.com/gravitational/satellite/agent/health" pb "github.com/gravitational/satellite/agent/proto/agentpb" + "github.com/gravitational/satellite/lib/nethealth" "github.com/gravitational/satellite/utils" - "github.com/gravitational/roundtrip" "github.com/gravitational/trace" "github.com/gravitational/ttlmap/v2" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" log "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" ) @@ -45,8 +46,8 @@ import ( type NethealthConfig struct { // NodeName specifies the kubernetes name of this node. NodeName string - // NethealthPort specifies the port that nethealth is listening on. - NethealthPort int + // NethealthSocketPath specifies the location of the unix-socket nethealth is listening on. + NethealthSocketPath string // NetStatsInterval specifies the duration to store net stats. NetStatsInterval time.Duration // KubeConfig specifies kubernetes access information. @@ -63,8 +64,8 @@ func (c *NethealthConfig) CheckAndSetDefaults() error { if c.KubeConfig == nil { errors = append(errors, trace.BadParameter("kubernetes access config must be provided")) } - if c.NethealthPort == 0 { - c.NethealthPort = defaultNethealthPort + if c.NethealthSocketPath == "" { + c.NethealthSocketPath = nethealth.DefaultNethealthSocket } if c.NetStatsInterval == time.Duration(0) { c.NetStatsInterval = defaultNetStatsInterval @@ -128,16 +129,7 @@ func (c *nethealthChecker) check(ctx context.Context, reporter health.Reporter) return nil } - addr, err := c.getNethealthAddr() - if trace.IsNotFound(err) { - log.Debug("Nethealth pod was not found.") - return nil // pod was not found, log and treat gracefully - } - if err != nil { - return trace.Wrap(err) // received unexpected error, maybe network-related, will add error probe above - } - - resp, err := fetchNethealthMetrics(ctx, addr) + resp, err := c.fetchNethealthMetrics(ctx) if err != nil { return trace.Wrap(err, "failed to fetch nethealth metrics") } @@ -174,33 +166,6 @@ func (c *nethealthChecker) getPeers() (peers []string, err error) { return peers, nil } -// getNethealthAddr returns the address of the local nethealth pod. -func (c *nethealthChecker) getNethealthAddr() (addr string, err error) { - opts := metav1.ListOptions{ - LabelSelector: nethealthLabelSelector.String(), - FieldSelector: fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String(), - Limit: 1, - } - pods, err := c.Client.CoreV1().Pods(nethealthNamespace).List(opts) - if err != nil { - return addr, utils.ConvertError(err) // this will convert error to a proper trace error, e.g. trace.NotFound - } - - if len(pods.Items) < 1 { - return addr, trace.NotFound("nethealth pod not found on local node %s", c.NodeName) - } - - pod := pods.Items[0] - if pod.Status.Phase != corev1.PodRunning { - return addr, trace.NotFound("unable to find running local nethealth pod") - } - if pod.Status.PodIP == "" { - return addr, trace.NotFound("local nethealth pod IP has not been assigned yet") - } - - return fmt.Sprintf("http://%s:%d", pod.Status.PodIP, c.NethealthPort), nil -} - // updateStats updates netStats with new incoming data. // Returns the list of updated peers. func (c *nethealthChecker) updateStats(incoming map[string]networkData) (updated []string, err error) { @@ -329,14 +294,16 @@ func nethealthFailureProbe(name, peer string, packetLoss float64) *pb.Probe { } } -// fetchNethealthMetrics collects the network metrics from the nethealth pod -// specified by addr. Returns the resp as an array of bytes. -func fetchNethealthMetrics(ctx context.Context, addr string) ([]byte, error) { - client, err := roundtrip.NewClient(addr, "") - if err != nil { - return nil, trace.Wrap(err, "failed to connect to nethealth service at %s.", addr) +// fetchNethealthMetrics collects the network metrics from the nethealth pod. +// Returns the response as an array of bytes. +func (c *nethealthChecker) fetchNethealthMetrics(ctx context.Context) (res []byte, err error) { + client := http.Client{ + Transport: &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", c.NethealthSocketPath) + }, + }, } - // The two relevant metrics exposed by nethealth are 'nethealth_echo_request_total' and // 'nethealth_echo_timeout_total'. We expect a pair of request/timeout metrics per peer. // Example metrics received from nethealth may look something like the output below: @@ -349,11 +316,27 @@ func fetchNethealthMetrics(ctx context.Context, addr string) ([]byte, error) { // # TYPE nethealth_echo_timeout_total counter // nethealth_echo_timeout_total{node_name="10.128.0.96",peer_name="10.128.0.70"} 37 // nethealth_echo_timeout_total{node_name="10.128.0.96",peer_name="10.128.0.97"} 0 - resp, err := client.Get(ctx, client.Endpoint("metrics"), url.Values{}) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://unix/metrics", nil) + if err != nil { + return nil, trace.Wrap(err) + } + + resp, err := client.Do(req) if err != nil { return nil, trace.Wrap(err) } - return resp.Bytes(), nil + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + buffer, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, trace.ConvertSystemError(err) + } + + return buffer, nil + } + + return nil, trace.BadParameter("unexpected response from %s: %v", c.NethealthSocketPath, resp.Status) } // parseMetrics parses the provided data and returns the structured network @@ -519,9 +502,6 @@ const ( // echoTimeoutLabel defines the metric family label for the echo timeout counter. echoTimeoutLabel = "nethealth_echo_timeout_total" - // defaultNethealthPort defines the default nethealth port. - defaultNethealthPort = 9801 - // defaultNetStatsInterval defines the default interval duration for the netStats. defaultNetStatsInterval = 5 * time.Minute diff --git a/vendor/github.com/gravitational/satellite/monitoring/ping.go b/vendor/github.com/gravitational/satellite/monitoring/ping.go deleted file mode 100644 index cbf0bfa96e..0000000000 --- a/vendor/github.com/gravitational/satellite/monitoring/ping.go +++ /dev/null @@ -1,295 +0,0 @@ -/* -Copyright 2019 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package monitoring - -import ( - "context" - "fmt" - "strings" - "sync" - "time" - - "github.com/gravitational/satellite/agent" - "github.com/gravitational/satellite/agent/health" - pb "github.com/gravitational/satellite/agent/proto/agentpb" - - "github.com/codahale/hdrhistogram" - "github.com/gravitational/trace" - "github.com/gravitational/ttlmap/v2" - serf "github.com/hashicorp/serf/client" - log "github.com/sirupsen/logrus" -) - -// TODO: latencyThreshold should be configurable -// TODO: latency stats should be sent to metrics - -const ( - // pingCheckerID specifies the check name - pingCheckerID = "ping-checker" - // latencyStatsTTLSeconds specifies how long check results will be kept before being dropped - latencyStatsTTLSeconds = 3600 // 1 hour - // latencyStatsCapacity sets the number of TTLMaps that can be stored; this will be the size of the cluster -1 - latencyStatsCapacity = 1000 - // latencyStatsSlidingWindowSize specifies the number of retained check results - latencyStatsSlidingWindowSize = 20 - // pingMinimum sets the minimum value that can be recorded - pingMinimum = 0 * time.Second - // pingMaximum sets the maximum value that can be recorded - pingMaximum = 10 * time.Second - // pingSignificantFigures specifies how many decimals should be recorded - pingSignificantFigures = 3 - // latencyThreshold sets the RTT threshold - latencyThreshold = 15 * time.Millisecond - // latencyQuantile sets the quantile used while checking Histograms against RTT results - latencyQuantile = 95.0 -) - -// pingChecker is a checker that verifies that ping times (RTT) between nodes in -// the cluster are within a predefined threshold -type pingChecker struct { - self serf.Member - serfClient agent.SerfClient - serfMemberName string - latencyStats ttlmap.TTLMap - mux sync.Mutex - logger log.FieldLogger -} - -// PingCheckerConfig is used to store all the configuration related to the current check -type PingCheckerConfig struct { - // SerfRPCAddr is the address used by the Serf RPC client to communicate - // with the Serf cluster - SerfRPCAddr string - // SerfMemberName is the name assigned to this node in Serf - SerfMemberName string - // NewSerfClient is an optional Serf Client function that can be used instead - // of the default one. If not specified it will fallback to the default one - NewSerfClient agent.NewSerfClientFunc -} - -// CheckAndSetDefaults validates that this configuration is correct and sets -// value defaults where necessary -func (c *PingCheckerConfig) CheckAndSetDefaults() error { - if c.SerfMemberName == "" { - return trace.BadParameter("serf member name can't be empty") - } - if c.NewSerfClient == nil { - c.NewSerfClient = agent.NewSerfClient - } - return nil -} - -// NewPingChecker returns a checker that verifies accessibility of nodes in the cluster by exchanging ping requests -func NewPingChecker(conf PingCheckerConfig) (c health.Checker, err error) { - err = conf.CheckAndSetDefaults() - if err != nil { - return nil, trace.Wrap(err) - } - - latencyTTLMap := ttlmap.NewTTLMap(latencyStatsCapacity) - - client, err := conf.NewSerfClient(serf.Config{ - Addr: conf.SerfRPCAddr, - }) - if err != nil { - return nil, trace.Wrap(err) - } - - self, err := client.FindMember(conf.SerfMemberName) - if err != nil { - return nil, trace.Wrap(err) - } - - return &pingChecker{ - self: *self, - serfClient: client, - serfMemberName: conf.SerfMemberName, - latencyStats: *latencyTTLMap, - logger: log.WithField(trace.Component, pingCheckerID), - }, nil -} - -// Name returns the checker name -// Implements health.Checker -func (c *pingChecker) Name() string { - return pingCheckerID -} - -// Check verifies that all nodes' ping with Master Nodes is lower than the -// desired threshold -// Implements health.Checker -func (c *pingChecker) Check(ctx context.Context, r health.Reporter) { - if err := c.check(ctx, r); err != nil { - c.logger.WithError(err).Debug("Failed to verify ping latency.") - return - } - if r.NumProbes() == 0 { - r.Add(NewSuccessProbe(c.Name())) - } -} - -// check runs the actual system status verification code and returns an error -// in case issues arise in the process -func (c *pingChecker) check(_ context.Context, r health.Reporter) (err error) { - client := c.serfClient - - nodes, err := client.Members() - if err != nil { - return trace.Wrap(err) - } - - if err = c.checkNodesRTT(nodes, client, r); err != nil { - return trace.Wrap(err) - } - - return nil -} - -// checkNodesRTT implements the bulk of the logic by checking the ping time -// between this node (self) and the other Serf Cluster member nodes -func (c *pingChecker) checkNodesRTT(nodes []serf.Member, client agent.SerfClient, - reporter health.Reporter) (err error) { - // ping each other node and raise a warning in case the results are over - // a specified threshold - for _, node := range nodes { - // skipping nodes that are not alive (failed, removed, etc..) - if strings.ToLower(node.Status) != strings.ToLower(pb.MemberStatus_Alive.String()) { - c.logger.Debugf("skipping node %s because status is %q", node.Name, node.Status) - continue - } - // skip pinging self - if c.self.Addr.String() == node.Addr.String() { - c.logger.Debugf("skipping analyzing self node (%s)", node.Name) - continue - } - c.logger.Debugf("node %s status %s", node.Name, node.Status) - - rttNanoSec, err := c.calculateRTT(client, c.self, node) - if err != nil { - return trace.Wrap(err) - } - - latencies, err := c.saveLatencyStats(rttNanoSec, node) - if err != nil { - return trace.Wrap(err) - } - - latencyHistogram, err := c.buildLatencyHistogram(node.Name, latencies) - if err != nil { - return trace.Wrap(err) - } - - latency95 := time.Duration(latencyHistogram.ValueAtQuantile(latencyQuantile)) - - c.logger.Debugf("%s <-ping-> %s = %s [latest]", c.self.Name, node.Name, time.Duration(rttNanoSec)) - c.logger.Debugf("%s <-ping-> %s = %s [%.2f percentile]", c.self.Name, node.Name, latency95, latencyQuantile) - - if latency95 >= latencyThreshold { - c.logger.Warningf("%s <-ping-> %s = slow ping detected. Value %s over threshold %s", - c.self.Name, node.Name, latency95, latencyThreshold) - reporter.Add(c.failureProbe(node.Name, latency95)) - } else { - c.logger.Debugf("%s <-ping-> %s = ping okay. Value %s within threshold %s", - c.self.Name, node.Name, latency95, latencyThreshold) - } - } - - return nil -} - -// buildLatencyHistogram maps latencies to a HDRHistrogram -func (c *pingChecker) buildLatencyHistogram(nodeName string, latencies []int64) (latencyHDR *hdrhistogram.Histogram, err error) { - latencyHDR = hdrhistogram.New(pingMinimum.Nanoseconds(), - pingMaximum.Nanoseconds(), pingSignificantFigures) - - for _, v := range latencies { - err := latencyHDR.RecordValue(v) - if err != nil { - return nil, trace.Wrap(err) - } - } - - return latencyHDR, nil -} - -// saveLatencyStats is used to store ping values in HDR Histograms in memory -func (c *pingChecker) saveLatencyStats(pingLatency int64, node serf.Member) (latencies []int64, err error) { - c.mux.Lock() - defer c.mux.Unlock() - - if value, exists := c.latencyStats.Get(node.Name); exists { - var ok bool - if latencies, ok = value.([]int64); !ok { - return nil, trace.BadParameter("couldn't parse node latency as []int64 on %s", c.serfMemberName) - } - } - - if len(latencies) >= latencyStatsSlidingWindowSize { - // keep the slice within the sliding window size - // slidingWindowSize is -1 because another element will be added a few lines below - latencies = latencies[1:latencyStatsSlidingWindowSize] - } - - latencies = append(latencies, pingLatency) - c.logger.Debugf("%d recorded ping values for node %s => %v", len(latencies), node.Name, latencies) - - err = c.latencyStats.Set(node.Name, latencies, latencyStatsTTLSeconds) - if err != nil { - return nil, trace.Wrap(err) - } - - return latencies, nil -} - -// calculateRTT calculates and returns the latency time (in nanoseconds) between two Serf Cluster members -func (c *pingChecker) calculateRTT(serfClient agent.SerfClient, self, node serf.Member) (rttNanos int64, err error) { - selfCoord, err := serfClient.GetCoordinate(self.Name) - if err != nil { - return 0, trace.Wrap(err) - } - if selfCoord == nil { - return 0, trace.NotFound("could not find a coordinate for node %s", self.Name) - } - - otherNodeCoord, err := serfClient.GetCoordinate(node.Name) - if err != nil { - return 0, trace.Wrap(err) - } - if otherNodeCoord == nil { - return 0, trace.NotFound("could not find a coordinate for node %s", node.Name) - } - - latency := selfCoord.DistanceTo(otherNodeCoord).Nanoseconds() - c.logger.Debugf("self {%v,%v,%v,%v} === %v ===> other {%v,%v,%v,%v}", - selfCoord.Vec, selfCoord.Error, selfCoord.Height, selfCoord.Adjustment, - latency, - otherNodeCoord.Vec, otherNodeCoord.Error, otherNodeCoord.Height, otherNodeCoord.Adjustment) - return latency, nil -} - -// failureProbe constructs a new probe that represents a failed ping check -// against the specified node. -func (c *pingChecker) failureProbe(node string, latency time.Duration) *pb.Probe { - return &pb.Probe{ - Checker: c.Name(), - Detail: fmt.Sprintf("ping between %s and %s is higher than the allowed threshold of %s", - c.self.Name, node, latencyThreshold), - Error: fmt.Sprintf("ping latency at %s", latency), - Status: pb.Probe_Failed, - Severity: pb.Probe_Warning, - } -} diff --git a/vendor/github.com/gravitational/satellite/monitoring/system_pods.go b/vendor/github.com/gravitational/satellite/monitoring/system_pods.go index 1eac53405d..bbb576e6ad 100644 --- a/vendor/github.com/gravitational/satellite/monitoring/system_pods.go +++ b/vendor/github.com/gravitational/satellite/monitoring/system_pods.go @@ -22,31 +22,26 @@ import ( "github.com/gravitational/satellite/agent/health" pb "github.com/gravitational/satellite/agent/proto/agentpb" - "github.com/gravitational/satellite/lib/kubernetes" "github.com/gravitational/satellite/utils" "github.com/gravitational/trace" log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" ) // SystemPodsConfig specifies configuration for a system pods checker. type SystemPodsConfig struct { - // NodeName specifies the kubernetes name of this node. - NodeName string // KubeConfig specifies kubernetes access configuration. *KubeConfig + // Namespaces specifies the list of namespaces to query for critical pods. + Namespaces []string } // checkAndSetDefaults validates that this configuration is correct and sets // value defaults where necessary. func (r *SystemPodsConfig) checkAndSetDefaults() error { var errors []error - if r.NodeName == "" { - errors = append(errors, trace.BadParameter("node name must be provided")) - } if r.KubeConfig == nil { errors = append(errors, trace.BadParameter("kubernetes access config must be provided")) } @@ -104,17 +99,20 @@ func (r *systemPodsChecker) check(ctx context.Context, reporter health.Reporter) // getPods returns a list of the local pods that have the // `gravitational.io/critical-pod` label. -func (r *systemPodsChecker) getPods() ([]corev1.Pod, error) { +func (r *systemPodsChecker) getPods() (pods []corev1.Pod, err error) { opts := metav1.ListOptions{ LabelSelector: systemPodsSelector.String(), - FieldSelector: fields.OneTermEqualSelector("spec.nodeName", r.NodeName).String(), } - pods, err := r.Client.CoreV1().Pods(kubernetes.AllNamespaces).List(opts) - if err != nil { - return nil, utils.ConvertError(err) + + for _, namespace := range r.Namespaces { + podList, err := r.Client.CoreV1().Pods(namespace).List(opts) + if err != nil { + return pods, utils.ConvertError(err) + } + pods = append(pods, podList.Items...) } - return pods.Items, nil + return pods, nil } // verifyPods verifies the pods are in a valid state. Reports a failed probe for diff --git a/vendor/github.com/gravitational/satellite/monitoring/timedrift.go b/vendor/github.com/gravitational/satellite/monitoring/timedrift.go index 1c5be4bb7b..b7930d419a 100644 --- a/vendor/github.com/gravitational/satellite/monitoring/timedrift.go +++ b/vendor/github.com/gravitational/satellite/monitoring/timedrift.go @@ -40,6 +40,14 @@ const ( // timeDriftThreshold sets the default threshold of the acceptable time // difference between nodes. timeDriftThreshold = 300 * time.Millisecond + + // timeDriftCheckTimeout drops time checks where the RPC call to the remote server take too long to respond. + // If the client or server is busy and the request takes too long to be processed, this will cause an inaccurate + // comparison of the current time. + timeDriftCheckTimeout = 100 * time.Millisecond + + // parallelRoutines indicates how many parallel queries we should run to peer nodes + parallelRoutines = 20 ) // timeDriftChecker is a checker that verifies that the time difference between @@ -133,16 +141,39 @@ func (c *timeDriftChecker) check(ctx context.Context, r health.Reporter) (err er if err != nil { return trace.Wrap(err) } + + nodesC := make(chan serf.Member, len(nodes)) for _, node := range nodes { - drift, err := c.getTimeDrift(ctx, node) - if err != nil { - log.WithError(err).Debug("Failed to get time drift.") - continue - } - if isDriftHigh(drift) { - r.Add(c.failureProbe(node, drift)) - } + nodesC <- node + } + close(nodesC) + + var mutex sync.Mutex + + var wg sync.WaitGroup + + wg.Add(parallelRoutines) + + for i := 0; i < parallelRoutines; i++ { + go func() { + for node := range nodesC { + drift, err := c.getTimeDrift(ctx, node) + if err != nil { + log.WithError(err).Debug("Failed to get time drift.") + continue + } + + if isDriftHigh(drift) { + mutex.Lock() + r.Add(c.failureProbe(node, drift)) + mutex.Unlock() + } + } + wg.Done() + }() } + + wg.Wait() return nil } @@ -179,11 +210,15 @@ func (c *timeDriftChecker) getTimeDrift(ctx context.Context, node serf.Member) ( return 0, trace.Wrap(err) } - // Obtain this node's local timestamp. - t1Start := c.Clock.Now().UTC() + queryStart := c.Clock.Now().UTC() + + // if the RPC call takes a long duration it will result in an inaccurate comparison. Timeout the RPC + // call to reduce false positives on a slow server. + ctx, cancel := context.WithTimeout(ctx, timeDriftCheckTimeout) + defer cancel() // Send "time" request to the specified node. - t2Response, err := agentClient.Time(ctx, &pb.TimeRequest{}) + peerResponse, err := agentClient.Time(ctx, &pb.TimeRequest{}) if err != nil { // If the agent we're making request to is of an older version, // it may not support Time() method yet. This can happen, e.g., @@ -195,18 +230,21 @@ func (c *timeDriftChecker) getTimeDrift(ctx context.Context, node serf.Member) ( return 0, trace.Wrap(err) } - // Calculate how much time has elapsed since T1Start. This value will - // roughly be the request roundtrip time, so the latency b/w the nodes - // is half that. - latency := c.Clock.Now().UTC().Sub(t1Start) / 2 + queryEnd := c.Clock.Now().UTC() + + // The request / response will take some time to perform over the network + // Use an adjustment of half the RTT time under the assumption that the request / response consume + // equal delays. + latencyAdjustment := queryEnd.Sub(queryStart) / 2 - // Finally calculate the time drift between this and the specified node - // using formula: T2 - T1Start - Latency. - t2 := t2Response.GetTimestamp().ToTime() - drift := t2.Sub(t1Start) - latency + adjustedPeerTime := peerResponse.GetTimestamp().ToTime().Add(latencyAdjustment) - c.WithField("node", node.Name).Debugf("T1Start: %v; T2: %v; Latency: %v; Drift: %v.", - t1Start, t2, latency, drift) + // drift is relative to the current nodes time. + // if peer time > node time, return a positive duration + // if peer time < node time, return a negative duration + drift := adjustedPeerTime.Sub(queryEnd) + c.WithField("node", node.Name).Debugf("queryStart: %v; queryEnd: %v; peerTime: %v; adjustedPeerTime: %v drift: %v.", + queryStart, queryEnd, peerResponse.GetTimestamp().ToTime(), adjustedPeerTime, drift) return drift, nil } diff --git a/vendor/golang.org/x/net/icmp/dstunreach.go b/vendor/golang.org/x/net/icmp/dstunreach.go new file mode 100644 index 0000000000..8615cf54a4 --- /dev/null +++ b/vendor/golang.org/x/net/icmp/dstunreach.go @@ -0,0 +1,59 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package icmp + +import ( + "golang.org/x/net/internal/iana" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" +) + +// A DstUnreach represents an ICMP destination unreachable message +// body. +type DstUnreach struct { + Data []byte // data, known as original datagram field + Extensions []Extension // extensions +} + +// Len implements the Len method of MessageBody interface. +func (p *DstUnreach) Len(proto int) int { + if p == nil { + return 0 + } + l, _ := multipartMessageBodyDataLen(proto, true, p.Data, p.Extensions) + return l +} + +// Marshal implements the Marshal method of MessageBody interface. +func (p *DstUnreach) Marshal(proto int) ([]byte, error) { + var typ Type + switch proto { + case iana.ProtocolICMP: + typ = ipv4.ICMPTypeDestinationUnreachable + case iana.ProtocolIPv6ICMP: + typ = ipv6.ICMPTypeDestinationUnreachable + default: + return nil, errInvalidProtocol + } + if !validExtensions(typ, p.Extensions) { + return nil, errInvalidExtension + } + return marshalMultipartMessageBody(proto, true, p.Data, p.Extensions) +} + +// parseDstUnreach parses b as an ICMP destination unreachable message +// body. +func parseDstUnreach(proto int, typ Type, b []byte) (MessageBody, error) { + if len(b) < 4 { + return nil, errMessageTooShort + } + p := &DstUnreach{} + var err error + p.Data, p.Extensions, err = parseMultipartMessageBody(proto, typ, b) + if err != nil { + return nil, err + } + return p, nil +} diff --git a/vendor/golang.org/x/net/icmp/echo.go b/vendor/golang.org/x/net/icmp/echo.go new file mode 100644 index 0000000000..b591864278 --- /dev/null +++ b/vendor/golang.org/x/net/icmp/echo.go @@ -0,0 +1,173 @@ +// Copyright 2012 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package icmp + +import ( + "encoding/binary" + + "golang.org/x/net/internal/iana" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" +) + +// An Echo represents an ICMP echo request or reply message body. +type Echo struct { + ID int // identifier + Seq int // sequence number + Data []byte // data +} + +// Len implements the Len method of MessageBody interface. +func (p *Echo) Len(proto int) int { + if p == nil { + return 0 + } + return 4 + len(p.Data) +} + +// Marshal implements the Marshal method of MessageBody interface. +func (p *Echo) Marshal(proto int) ([]byte, error) { + b := make([]byte, 4+len(p.Data)) + binary.BigEndian.PutUint16(b[:2], uint16(p.ID)) + binary.BigEndian.PutUint16(b[2:4], uint16(p.Seq)) + copy(b[4:], p.Data) + return b, nil +} + +// parseEcho parses b as an ICMP echo request or reply message body. +func parseEcho(proto int, _ Type, b []byte) (MessageBody, error) { + bodyLen := len(b) + if bodyLen < 4 { + return nil, errMessageTooShort + } + p := &Echo{ID: int(binary.BigEndian.Uint16(b[:2])), Seq: int(binary.BigEndian.Uint16(b[2:4]))} + if bodyLen > 4 { + p.Data = make([]byte, bodyLen-4) + copy(p.Data, b[4:]) + } + return p, nil +} + +// An ExtendedEchoRequest represents an ICMP extended echo request +// message body. +type ExtendedEchoRequest struct { + ID int // identifier + Seq int // sequence number + Local bool // must be true when identifying by name or index + Extensions []Extension // extensions +} + +// Len implements the Len method of MessageBody interface. +func (p *ExtendedEchoRequest) Len(proto int) int { + if p == nil { + return 0 + } + l, _ := multipartMessageBodyDataLen(proto, false, nil, p.Extensions) + return l +} + +// Marshal implements the Marshal method of MessageBody interface. +func (p *ExtendedEchoRequest) Marshal(proto int) ([]byte, error) { + var typ Type + switch proto { + case iana.ProtocolICMP: + typ = ipv4.ICMPTypeExtendedEchoRequest + case iana.ProtocolIPv6ICMP: + typ = ipv6.ICMPTypeExtendedEchoRequest + default: + return nil, errInvalidProtocol + } + if !validExtensions(typ, p.Extensions) { + return nil, errInvalidExtension + } + b, err := marshalMultipartMessageBody(proto, false, nil, p.Extensions) + if err != nil { + return nil, err + } + binary.BigEndian.PutUint16(b[:2], uint16(p.ID)) + b[2] = byte(p.Seq) + if p.Local { + b[3] |= 0x01 + } + return b, nil +} + +// parseExtendedEchoRequest parses b as an ICMP extended echo request +// message body. +func parseExtendedEchoRequest(proto int, typ Type, b []byte) (MessageBody, error) { + if len(b) < 4 { + return nil, errMessageTooShort + } + p := &ExtendedEchoRequest{ID: int(binary.BigEndian.Uint16(b[:2])), Seq: int(b[2])} + if b[3]&0x01 != 0 { + p.Local = true + } + var err error + _, p.Extensions, err = parseMultipartMessageBody(proto, typ, b) + if err != nil { + return nil, err + } + return p, nil +} + +// An ExtendedEchoReply represents an ICMP extended echo reply message +// body. +type ExtendedEchoReply struct { + ID int // identifier + Seq int // sequence number + State int // 3-bit state working together with Message.Code + Active bool // probed interface is active + IPv4 bool // probed interface runs IPv4 + IPv6 bool // probed interface runs IPv6 +} + +// Len implements the Len method of MessageBody interface. +func (p *ExtendedEchoReply) Len(proto int) int { + if p == nil { + return 0 + } + return 4 +} + +// Marshal implements the Marshal method of MessageBody interface. +func (p *ExtendedEchoReply) Marshal(proto int) ([]byte, error) { + b := make([]byte, 4) + binary.BigEndian.PutUint16(b[:2], uint16(p.ID)) + b[2] = byte(p.Seq) + b[3] = byte(p.State<<5) & 0xe0 + if p.Active { + b[3] |= 0x04 + } + if p.IPv4 { + b[3] |= 0x02 + } + if p.IPv6 { + b[3] |= 0x01 + } + return b, nil +} + +// parseExtendedEchoReply parses b as an ICMP extended echo reply +// message body. +func parseExtendedEchoReply(proto int, _ Type, b []byte) (MessageBody, error) { + if len(b) < 4 { + return nil, errMessageTooShort + } + p := &ExtendedEchoReply{ + ID: int(binary.BigEndian.Uint16(b[:2])), + Seq: int(b[2]), + State: int(b[3]) >> 5, + } + if b[3]&0x04 != 0 { + p.Active = true + } + if b[3]&0x02 != 0 { + p.IPv4 = true + } + if b[3]&0x01 != 0 { + p.IPv6 = true + } + return p, nil +} diff --git a/vendor/golang.org/x/net/icmp/endpoint.go b/vendor/golang.org/x/net/icmp/endpoint.go new file mode 100644 index 0000000000..4841bdd2b3 --- /dev/null +++ b/vendor/golang.org/x/net/icmp/endpoint.go @@ -0,0 +1,113 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package icmp + +import ( + "net" + "runtime" + "time" + + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" +) + +var _ net.PacketConn = &PacketConn{} + +// A PacketConn represents a packet network endpoint that uses either +// ICMPv4 or ICMPv6. +type PacketConn struct { + c net.PacketConn + p4 *ipv4.PacketConn + p6 *ipv6.PacketConn +} + +func (c *PacketConn) ok() bool { return c != nil && c.c != nil } + +// IPv4PacketConn returns the ipv4.PacketConn of c. +// It returns nil when c is not created as the endpoint for ICMPv4. +func (c *PacketConn) IPv4PacketConn() *ipv4.PacketConn { + if !c.ok() { + return nil + } + return c.p4 +} + +// IPv6PacketConn returns the ipv6.PacketConn of c. +// It returns nil when c is not created as the endpoint for ICMPv6. +func (c *PacketConn) IPv6PacketConn() *ipv6.PacketConn { + if !c.ok() { + return nil + } + return c.p6 +} + +// ReadFrom reads an ICMP message from the connection. +func (c *PacketConn) ReadFrom(b []byte) (int, net.Addr, error) { + if !c.ok() { + return 0, nil, errInvalidConn + } + // Please be informed that ipv4.NewPacketConn enables + // IP_STRIPHDR option by default on Darwin. + // See golang.org/issue/9395 for further information. + if runtime.GOOS == "darwin" && c.p4 != nil { + n, _, peer, err := c.p4.ReadFrom(b) + return n, peer, err + } + return c.c.ReadFrom(b) +} + +// WriteTo writes the ICMP message b to dst. +// The provided dst must be net.UDPAddr when c is a non-privileged +// datagram-oriented ICMP endpoint. +// Otherwise it must be net.IPAddr. +func (c *PacketConn) WriteTo(b []byte, dst net.Addr) (int, error) { + if !c.ok() { + return 0, errInvalidConn + } + return c.c.WriteTo(b, dst) +} + +// Close closes the endpoint. +func (c *PacketConn) Close() error { + if !c.ok() { + return errInvalidConn + } + return c.c.Close() +} + +// LocalAddr returns the local network address. +func (c *PacketConn) LocalAddr() net.Addr { + if !c.ok() { + return nil + } + return c.c.LocalAddr() +} + +// SetDeadline sets the read and write deadlines associated with the +// endpoint. +func (c *PacketConn) SetDeadline(t time.Time) error { + if !c.ok() { + return errInvalidConn + } + return c.c.SetDeadline(t) +} + +// SetReadDeadline sets the read deadline associated with the +// endpoint. +func (c *PacketConn) SetReadDeadline(t time.Time) error { + if !c.ok() { + return errInvalidConn + } + return c.c.SetReadDeadline(t) +} + +// SetWriteDeadline sets the write deadline associated with the +// endpoint. +func (c *PacketConn) SetWriteDeadline(t time.Time) error { + if !c.ok() { + return errInvalidConn + } + return c.c.SetWriteDeadline(t) +} diff --git a/vendor/golang.org/x/net/icmp/extension.go b/vendor/golang.org/x/net/icmp/extension.go new file mode 100644 index 0000000000..eeb85c3fc0 --- /dev/null +++ b/vendor/golang.org/x/net/icmp/extension.go @@ -0,0 +1,170 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package icmp + +import ( + "encoding/binary" + + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" +) + +// An Extension represents an ICMP extension. +type Extension interface { + // Len returns the length of ICMP extension. + // The provided proto must be either the ICMPv4 or ICMPv6 + // protocol number. + Len(proto int) int + + // Marshal returns the binary encoding of ICMP extension. + // The provided proto must be either the ICMPv4 or ICMPv6 + // protocol number. + Marshal(proto int) ([]byte, error) +} + +const extensionVersion = 2 + +func validExtensionHeader(b []byte) bool { + v := int(b[0]&0xf0) >> 4 + s := binary.BigEndian.Uint16(b[2:4]) + if s != 0 { + s = checksum(b) + } + if v != extensionVersion || s != 0 { + return false + } + return true +} + +// parseExtensions parses b as a list of ICMP extensions. +// The length attribute l must be the length attribute field in +// received icmp messages. +// +// It will return a list of ICMP extensions and an adjusted length +// attribute that represents the length of the padded original +// datagram field. Otherwise, it returns an error. +func parseExtensions(typ Type, b []byte, l int) ([]Extension, int, error) { + // Still a lot of non-RFC 4884 compliant implementations are + // out there. Set the length attribute l to 128 when it looks + // inappropriate for backwards compatibility. + // + // A minimal extension at least requires 8 octets; 4 octets + // for an extension header, and 4 octets for a single object + // header. + // + // See RFC 4884 for further information. + switch typ { + case ipv4.ICMPTypeExtendedEchoRequest, ipv6.ICMPTypeExtendedEchoRequest: + if len(b) < 8 || !validExtensionHeader(b) { + return nil, -1, errNoExtension + } + l = 0 + default: + if 128 > l || l+8 > len(b) { + l = 128 + } + if l+8 > len(b) { + return nil, -1, errNoExtension + } + if !validExtensionHeader(b[l:]) { + if l == 128 { + return nil, -1, errNoExtension + } + l = 128 + if !validExtensionHeader(b[l:]) { + return nil, -1, errNoExtension + } + } + } + var exts []Extension + for b = b[l+4:]; len(b) >= 4; { + ol := int(binary.BigEndian.Uint16(b[:2])) + if 4 > ol || ol > len(b) { + break + } + switch b[2] { + case classMPLSLabelStack: + ext, err := parseMPLSLabelStack(b[:ol]) + if err != nil { + return nil, -1, err + } + exts = append(exts, ext) + case classInterfaceInfo: + ext, err := parseInterfaceInfo(b[:ol]) + if err != nil { + return nil, -1, err + } + exts = append(exts, ext) + case classInterfaceIdent: + ext, err := parseInterfaceIdent(b[:ol]) + if err != nil { + return nil, -1, err + } + exts = append(exts, ext) + default: + ext := &RawExtension{Data: make([]byte, ol)} + copy(ext.Data, b[:ol]) + exts = append(exts, ext) + } + b = b[ol:] + } + return exts, l, nil +} + +func validExtensions(typ Type, exts []Extension) bool { + switch typ { + case ipv4.ICMPTypeDestinationUnreachable, ipv4.ICMPTypeTimeExceeded, ipv4.ICMPTypeParameterProblem, + ipv6.ICMPTypeDestinationUnreachable, ipv6.ICMPTypeTimeExceeded: + for i := range exts { + switch exts[i].(type) { + case *MPLSLabelStack, *InterfaceInfo, *RawExtension: + default: + return false + } + } + return true + case ipv4.ICMPTypeExtendedEchoRequest, ipv6.ICMPTypeExtendedEchoRequest: + var n int + for i := range exts { + switch exts[i].(type) { + case *InterfaceIdent: + n++ + case *RawExtension: + default: + return false + } + } + // Not a single InterfaceIdent object or a combo of + // RawExtension and InterfaceIdent objects is not + // allowed. + if n == 1 && len(exts) > 1 { + return false + } + return true + default: + return false + } +} + +// A RawExtension represents a raw extension. +// +// A raw extension is excluded from message processing and can be used +// to construct applications such as protocol conformance testing. +type RawExtension struct { + Data []byte // data +} + +// Len implements the Len method of Extension interface. +func (p *RawExtension) Len(proto int) int { + if p == nil { + return 0 + } + return len(p.Data) +} + +// Marshal implements the Marshal method of Extension interface. +func (p *RawExtension) Marshal(proto int) ([]byte, error) { + return p.Data, nil +} diff --git a/vendor/golang.org/x/net/icmp/helper_posix.go b/vendor/golang.org/x/net/icmp/helper_posix.go new file mode 100644 index 0000000000..75e7557570 --- /dev/null +++ b/vendor/golang.org/x/net/icmp/helper_posix.go @@ -0,0 +1,75 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build aix darwin dragonfly freebsd linux netbsd openbsd solaris windows + +package icmp + +import ( + "net" + "strconv" + "syscall" +) + +func sockaddr(family int, address string) (syscall.Sockaddr, error) { + switch family { + case syscall.AF_INET: + a, err := net.ResolveIPAddr("ip4", address) + if err != nil { + return nil, err + } + if len(a.IP) == 0 { + a.IP = net.IPv4zero + } + if a.IP = a.IP.To4(); a.IP == nil { + return nil, net.InvalidAddrError("non-ipv4 address") + } + sa := &syscall.SockaddrInet4{} + copy(sa.Addr[:], a.IP) + return sa, nil + case syscall.AF_INET6: + a, err := net.ResolveIPAddr("ip6", address) + if err != nil { + return nil, err + } + if len(a.IP) == 0 { + a.IP = net.IPv6unspecified + } + if a.IP.Equal(net.IPv4zero) { + a.IP = net.IPv6unspecified + } + if a.IP = a.IP.To16(); a.IP == nil || a.IP.To4() != nil { + return nil, net.InvalidAddrError("non-ipv6 address") + } + sa := &syscall.SockaddrInet6{ZoneId: zoneToUint32(a.Zone)} + copy(sa.Addr[:], a.IP) + return sa, nil + default: + return nil, net.InvalidAddrError("unexpected family") + } +} + +func zoneToUint32(zone string) uint32 { + if zone == "" { + return 0 + } + if ifi, err := net.InterfaceByName(zone); err == nil { + return uint32(ifi.Index) + } + n, err := strconv.Atoi(zone) + if err != nil { + return 0 + } + return uint32(n) +} + +func last(s string, b byte) int { + i := len(s) + for i--; i >= 0; i-- { + if s[i] == b { + break + } + } + return i +} diff --git a/vendor/golang.org/x/net/icmp/interface.go b/vendor/golang.org/x/net/icmp/interface.go new file mode 100644 index 0000000000..b3dd72fb0a --- /dev/null +++ b/vendor/golang.org/x/net/icmp/interface.go @@ -0,0 +1,322 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package icmp + +import ( + "encoding/binary" + "net" + "strings" + + "golang.org/x/net/internal/iana" +) + +const ( + classInterfaceInfo = 2 +) + +const ( + attrMTU = 1 << iota + attrName + attrIPAddr + attrIfIndex +) + +// An InterfaceInfo represents interface and next-hop identification. +type InterfaceInfo struct { + Class int // extension object class number + Type int // extension object sub-type + Interface *net.Interface + Addr *net.IPAddr +} + +func (ifi *InterfaceInfo) nameLen() int { + if len(ifi.Interface.Name) > 63 { + return 64 + } + l := 1 + len(ifi.Interface.Name) + return (l + 3) &^ 3 +} + +func (ifi *InterfaceInfo) attrsAndLen(proto int) (attrs, l int) { + l = 4 + if ifi.Interface != nil && ifi.Interface.Index > 0 { + attrs |= attrIfIndex + l += 4 + if len(ifi.Interface.Name) > 0 { + attrs |= attrName + l += ifi.nameLen() + } + if ifi.Interface.MTU > 0 { + attrs |= attrMTU + l += 4 + } + } + if ifi.Addr != nil { + switch proto { + case iana.ProtocolICMP: + if ifi.Addr.IP.To4() != nil { + attrs |= attrIPAddr + l += 4 + net.IPv4len + } + case iana.ProtocolIPv6ICMP: + if ifi.Addr.IP.To16() != nil && ifi.Addr.IP.To4() == nil { + attrs |= attrIPAddr + l += 4 + net.IPv6len + } + } + } + return +} + +// Len implements the Len method of Extension interface. +func (ifi *InterfaceInfo) Len(proto int) int { + _, l := ifi.attrsAndLen(proto) + return l +} + +// Marshal implements the Marshal method of Extension interface. +func (ifi *InterfaceInfo) Marshal(proto int) ([]byte, error) { + attrs, l := ifi.attrsAndLen(proto) + b := make([]byte, l) + if err := ifi.marshal(proto, b, attrs, l); err != nil { + return nil, err + } + return b, nil +} + +func (ifi *InterfaceInfo) marshal(proto int, b []byte, attrs, l int) error { + binary.BigEndian.PutUint16(b[:2], uint16(l)) + b[2], b[3] = classInterfaceInfo, byte(ifi.Type) + for b = b[4:]; len(b) > 0 && attrs != 0; { + switch { + case attrs&attrIfIndex != 0: + b = ifi.marshalIfIndex(proto, b) + attrs &^= attrIfIndex + case attrs&attrIPAddr != 0: + b = ifi.marshalIPAddr(proto, b) + attrs &^= attrIPAddr + case attrs&attrName != 0: + b = ifi.marshalName(proto, b) + attrs &^= attrName + case attrs&attrMTU != 0: + b = ifi.marshalMTU(proto, b) + attrs &^= attrMTU + } + } + return nil +} + +func (ifi *InterfaceInfo) marshalIfIndex(proto int, b []byte) []byte { + binary.BigEndian.PutUint32(b[:4], uint32(ifi.Interface.Index)) + return b[4:] +} + +func (ifi *InterfaceInfo) parseIfIndex(b []byte) ([]byte, error) { + if len(b) < 4 { + return nil, errMessageTooShort + } + ifi.Interface.Index = int(binary.BigEndian.Uint32(b[:4])) + return b[4:], nil +} + +func (ifi *InterfaceInfo) marshalIPAddr(proto int, b []byte) []byte { + switch proto { + case iana.ProtocolICMP: + binary.BigEndian.PutUint16(b[:2], uint16(iana.AddrFamilyIPv4)) + copy(b[4:4+net.IPv4len], ifi.Addr.IP.To4()) + b = b[4+net.IPv4len:] + case iana.ProtocolIPv6ICMP: + binary.BigEndian.PutUint16(b[:2], uint16(iana.AddrFamilyIPv6)) + copy(b[4:4+net.IPv6len], ifi.Addr.IP.To16()) + b = b[4+net.IPv6len:] + } + return b +} + +func (ifi *InterfaceInfo) parseIPAddr(b []byte) ([]byte, error) { + if len(b) < 4 { + return nil, errMessageTooShort + } + afi := int(binary.BigEndian.Uint16(b[:2])) + b = b[4:] + switch afi { + case iana.AddrFamilyIPv4: + if len(b) < net.IPv4len { + return nil, errMessageTooShort + } + ifi.Addr.IP = make(net.IP, net.IPv4len) + copy(ifi.Addr.IP, b[:net.IPv4len]) + b = b[net.IPv4len:] + case iana.AddrFamilyIPv6: + if len(b) < net.IPv6len { + return nil, errMessageTooShort + } + ifi.Addr.IP = make(net.IP, net.IPv6len) + copy(ifi.Addr.IP, b[:net.IPv6len]) + b = b[net.IPv6len:] + } + return b, nil +} + +func (ifi *InterfaceInfo) marshalName(proto int, b []byte) []byte { + l := byte(ifi.nameLen()) + b[0] = l + copy(b[1:], []byte(ifi.Interface.Name)) + return b[l:] +} + +func (ifi *InterfaceInfo) parseName(b []byte) ([]byte, error) { + if 4 > len(b) || len(b) < int(b[0]) { + return nil, errMessageTooShort + } + l := int(b[0]) + if l%4 != 0 || 4 > l || l > 64 { + return nil, errInvalidExtension + } + var name [63]byte + copy(name[:], b[1:l]) + ifi.Interface.Name = strings.Trim(string(name[:]), "\000") + return b[l:], nil +} + +func (ifi *InterfaceInfo) marshalMTU(proto int, b []byte) []byte { + binary.BigEndian.PutUint32(b[:4], uint32(ifi.Interface.MTU)) + return b[4:] +} + +func (ifi *InterfaceInfo) parseMTU(b []byte) ([]byte, error) { + if len(b) < 4 { + return nil, errMessageTooShort + } + ifi.Interface.MTU = int(binary.BigEndian.Uint32(b[:4])) + return b[4:], nil +} + +func parseInterfaceInfo(b []byte) (Extension, error) { + ifi := &InterfaceInfo{ + Class: int(b[2]), + Type: int(b[3]), + } + if ifi.Type&(attrIfIndex|attrName|attrMTU) != 0 { + ifi.Interface = &net.Interface{} + } + if ifi.Type&attrIPAddr != 0 { + ifi.Addr = &net.IPAddr{} + } + attrs := ifi.Type & (attrIfIndex | attrIPAddr | attrName | attrMTU) + for b = b[4:]; len(b) > 0 && attrs != 0; { + var err error + switch { + case attrs&attrIfIndex != 0: + b, err = ifi.parseIfIndex(b) + attrs &^= attrIfIndex + case attrs&attrIPAddr != 0: + b, err = ifi.parseIPAddr(b) + attrs &^= attrIPAddr + case attrs&attrName != 0: + b, err = ifi.parseName(b) + attrs &^= attrName + case attrs&attrMTU != 0: + b, err = ifi.parseMTU(b) + attrs &^= attrMTU + } + if err != nil { + return nil, err + } + } + if ifi.Interface != nil && ifi.Interface.Name != "" && ifi.Addr != nil && ifi.Addr.IP.To16() != nil && ifi.Addr.IP.To4() == nil { + ifi.Addr.Zone = ifi.Interface.Name + } + return ifi, nil +} + +const ( + classInterfaceIdent = 3 + typeInterfaceByName = 1 + typeInterfaceByIndex = 2 + typeInterfaceByAddress = 3 +) + +// An InterfaceIdent represents interface identification. +type InterfaceIdent struct { + Class int // extension object class number + Type int // extension object sub-type + Name string // interface name + Index int // interface index + AFI int // address family identifier; see address family numbers in IANA registry + Addr []byte // address +} + +// Len implements the Len method of Extension interface. +func (ifi *InterfaceIdent) Len(_ int) int { + switch ifi.Type { + case typeInterfaceByName: + l := len(ifi.Name) + if l > 255 { + l = 255 + } + return 4 + (l+3)&^3 + case typeInterfaceByIndex: + return 4 + 4 + case typeInterfaceByAddress: + return 4 + 4 + (len(ifi.Addr)+3)&^3 + default: + return 4 + } +} + +// Marshal implements the Marshal method of Extension interface. +func (ifi *InterfaceIdent) Marshal(proto int) ([]byte, error) { + b := make([]byte, ifi.Len(proto)) + if err := ifi.marshal(proto, b); err != nil { + return nil, err + } + return b, nil +} + +func (ifi *InterfaceIdent) marshal(proto int, b []byte) error { + l := ifi.Len(proto) + binary.BigEndian.PutUint16(b[:2], uint16(l)) + b[2], b[3] = classInterfaceIdent, byte(ifi.Type) + switch ifi.Type { + case typeInterfaceByName: + copy(b[4:], ifi.Name) + case typeInterfaceByIndex: + binary.BigEndian.PutUint32(b[4:4+4], uint32(ifi.Index)) + case typeInterfaceByAddress: + binary.BigEndian.PutUint16(b[4:4+2], uint16(ifi.AFI)) + b[4+2] = byte(len(ifi.Addr)) + copy(b[4+4:], ifi.Addr) + } + return nil +} + +func parseInterfaceIdent(b []byte) (Extension, error) { + ifi := &InterfaceIdent{ + Class: int(b[2]), + Type: int(b[3]), + } + switch ifi.Type { + case typeInterfaceByName: + ifi.Name = strings.Trim(string(b[4:]), "\x00") + case typeInterfaceByIndex: + if len(b[4:]) < 4 { + return nil, errInvalidExtension + } + ifi.Index = int(binary.BigEndian.Uint32(b[4 : 4+4])) + case typeInterfaceByAddress: + if len(b[4:]) < 4 { + return nil, errInvalidExtension + } + ifi.AFI = int(binary.BigEndian.Uint16(b[4 : 4+2])) + l := int(b[4+2]) + if len(b[4+4:]) < l { + return nil, errInvalidExtension + } + ifi.Addr = make([]byte, l) + copy(ifi.Addr, b[4+4:]) + } + return ifi, nil +} diff --git a/vendor/golang.org/x/net/icmp/ipv4.go b/vendor/golang.org/x/net/icmp/ipv4.go new file mode 100644 index 0000000000..c4629240ca --- /dev/null +++ b/vendor/golang.org/x/net/icmp/ipv4.go @@ -0,0 +1,69 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package icmp + +import ( + "encoding/binary" + "net" + "runtime" + + "golang.org/x/net/internal/socket" + "golang.org/x/net/ipv4" +) + +// freebsdVersion is set in sys_freebsd.go. +// See http://www.freebsd.org/doc/en/books/porters-handbook/freebsd-versions.html. +var freebsdVersion uint32 + +// ParseIPv4Header returns the IPv4 header of the IPv4 packet that +// triggered an ICMP error message. +// This is found in the Data field of the ICMP error message body. +// +// The provided b must be in the format used by a raw ICMP socket on +// the local system. +// This may differ from the wire format, and the format used by a raw +// IP socket, depending on the system. +// +// To parse an IPv6 header, use ipv6.ParseHeader. +func ParseIPv4Header(b []byte) (*ipv4.Header, error) { + if len(b) < ipv4.HeaderLen { + return nil, errHeaderTooShort + } + hdrlen := int(b[0]&0x0f) << 2 + if hdrlen > len(b) { + return nil, errBufferTooShort + } + h := &ipv4.Header{ + Version: int(b[0] >> 4), + Len: hdrlen, + TOS: int(b[1]), + ID: int(binary.BigEndian.Uint16(b[4:6])), + FragOff: int(binary.BigEndian.Uint16(b[6:8])), + TTL: int(b[8]), + Protocol: int(b[9]), + Checksum: int(binary.BigEndian.Uint16(b[10:12])), + Src: net.IPv4(b[12], b[13], b[14], b[15]), + Dst: net.IPv4(b[16], b[17], b[18], b[19]), + } + switch runtime.GOOS { + case "darwin": + h.TotalLen = int(socket.NativeEndian.Uint16(b[2:4])) + case "freebsd": + if freebsdVersion >= 1000000 { + h.TotalLen = int(binary.BigEndian.Uint16(b[2:4])) + } else { + h.TotalLen = int(socket.NativeEndian.Uint16(b[2:4])) + } + default: + h.TotalLen = int(binary.BigEndian.Uint16(b[2:4])) + } + h.Flags = ipv4.HeaderFlags(h.FragOff&0xe000) >> 13 + h.FragOff = h.FragOff & 0x1fff + if hdrlen-ipv4.HeaderLen > 0 { + h.Options = make([]byte, hdrlen-ipv4.HeaderLen) + copy(h.Options, b[ipv4.HeaderLen:]) + } + return h, nil +} diff --git a/vendor/golang.org/x/net/icmp/ipv6.go b/vendor/golang.org/x/net/icmp/ipv6.go new file mode 100644 index 0000000000..2e8cfeb131 --- /dev/null +++ b/vendor/golang.org/x/net/icmp/ipv6.go @@ -0,0 +1,23 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package icmp + +import ( + "net" + + "golang.org/x/net/internal/iana" +) + +const ipv6PseudoHeaderLen = 2*net.IPv6len + 8 + +// IPv6PseudoHeader returns an IPv6 pseudo header for checksum +// calculation. +func IPv6PseudoHeader(src, dst net.IP) []byte { + b := make([]byte, ipv6PseudoHeaderLen) + copy(b, src.To16()) + copy(b[net.IPv6len:], dst.To16()) + b[len(b)-1] = byte(iana.ProtocolIPv6ICMP) + return b +} diff --git a/vendor/golang.org/x/net/icmp/listen_posix.go b/vendor/golang.org/x/net/icmp/listen_posix.go new file mode 100644 index 0000000000..f0f1f2ffab --- /dev/null +++ b/vendor/golang.org/x/net/icmp/listen_posix.go @@ -0,0 +1,103 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build aix darwin dragonfly freebsd linux netbsd openbsd solaris windows + +package icmp + +import ( + "net" + "os" + "runtime" + "syscall" + + "golang.org/x/net/internal/iana" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" +) + +const sysIP_STRIPHDR = 0x17 // for now only darwin supports this option + +// ListenPacket listens for incoming ICMP packets addressed to +// address. See net.Dial for the syntax of address. +// +// For non-privileged datagram-oriented ICMP endpoints, network must +// be "udp4" or "udp6". The endpoint allows to read, write a few +// limited ICMP messages such as echo request and echo reply. +// Currently only Darwin and Linux support this. +// +// Examples: +// ListenPacket("udp4", "192.168.0.1") +// ListenPacket("udp4", "0.0.0.0") +// ListenPacket("udp6", "fe80::1%en0") +// ListenPacket("udp6", "::") +// +// For privileged raw ICMP endpoints, network must be "ip4" or "ip6" +// followed by a colon and an ICMP protocol number or name. +// +// Examples: +// ListenPacket("ip4:icmp", "192.168.0.1") +// ListenPacket("ip4:1", "0.0.0.0") +// ListenPacket("ip6:ipv6-icmp", "fe80::1%en0") +// ListenPacket("ip6:58", "::") +func ListenPacket(network, address string) (*PacketConn, error) { + var family, proto int + switch network { + case "udp4": + family, proto = syscall.AF_INET, iana.ProtocolICMP + case "udp6": + family, proto = syscall.AF_INET6, iana.ProtocolIPv6ICMP + default: + i := last(network, ':') + if i < 0 { + i = len(network) + } + switch network[:i] { + case "ip4": + proto = iana.ProtocolICMP + case "ip6": + proto = iana.ProtocolIPv6ICMP + } + } + var cerr error + var c net.PacketConn + switch family { + case syscall.AF_INET, syscall.AF_INET6: + s, err := syscall.Socket(family, syscall.SOCK_DGRAM, proto) + if err != nil { + return nil, os.NewSyscallError("socket", err) + } + if runtime.GOOS == "darwin" && family == syscall.AF_INET { + if err := syscall.SetsockoptInt(s, iana.ProtocolIP, sysIP_STRIPHDR, 1); err != nil { + syscall.Close(s) + return nil, os.NewSyscallError("setsockopt", err) + } + } + sa, err := sockaddr(family, address) + if err != nil { + syscall.Close(s) + return nil, err + } + if err := syscall.Bind(s, sa); err != nil { + syscall.Close(s) + return nil, os.NewSyscallError("bind", err) + } + f := os.NewFile(uintptr(s), "datagram-oriented icmp") + c, cerr = net.FilePacketConn(f) + f.Close() + default: + c, cerr = net.ListenPacket(network, address) + } + if cerr != nil { + return nil, cerr + } + switch proto { + case iana.ProtocolICMP: + return &PacketConn{c: c, p4: ipv4.NewPacketConn(c)}, nil + case iana.ProtocolIPv6ICMP: + return &PacketConn{c: c, p6: ipv6.NewPacketConn(c)}, nil + default: + return &PacketConn{c: c}, nil + } +} diff --git a/vendor/golang.org/x/net/icmp/listen_stub.go b/vendor/golang.org/x/net/icmp/listen_stub.go new file mode 100644 index 0000000000..3acd91dc5c --- /dev/null +++ b/vendor/golang.org/x/net/icmp/listen_stub.go @@ -0,0 +1,33 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !aix,!darwin,!dragonfly,!freebsd,!linux,!netbsd,!openbsd,!solaris,!windows + +package icmp + +// ListenPacket listens for incoming ICMP packets addressed to +// address. See net.Dial for the syntax of address. +// +// For non-privileged datagram-oriented ICMP endpoints, network must +// be "udp4" or "udp6". The endpoint allows to read, write a few +// limited ICMP messages such as echo request and echo reply. +// Currently only Darwin and Linux support this. +// +// Examples: +// ListenPacket("udp4", "192.168.0.1") +// ListenPacket("udp4", "0.0.0.0") +// ListenPacket("udp6", "fe80::1%en0") +// ListenPacket("udp6", "::") +// +// For privileged raw ICMP endpoints, network must be "ip4" or "ip6" +// followed by a colon and an ICMP protocol number or name. +// +// Examples: +// ListenPacket("ip4:icmp", "192.168.0.1") +// ListenPacket("ip4:1", "0.0.0.0") +// ListenPacket("ip6:ipv6-icmp", "fe80::1%en0") +// ListenPacket("ip6:58", "::") +func ListenPacket(network, address string) (*PacketConn, error) { + return nil, errNotImplemented +} diff --git a/vendor/golang.org/x/net/icmp/message.go b/vendor/golang.org/x/net/icmp/message.go new file mode 100644 index 0000000000..40db65d0cd --- /dev/null +++ b/vendor/golang.org/x/net/icmp/message.go @@ -0,0 +1,162 @@ +// Copyright 2012 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package icmp provides basic functions for the manipulation of +// messages used in the Internet Control Message Protocols, +// ICMPv4 and ICMPv6. +// +// ICMPv4 and ICMPv6 are defined in RFC 792 and RFC 4443. +// Multi-part message support for ICMP is defined in RFC 4884. +// ICMP extensions for MPLS are defined in RFC 4950. +// ICMP extensions for interface and next-hop identification are +// defined in RFC 5837. +// PROBE: A utility for probing interfaces is defined in RFC 8335. +package icmp // import "golang.org/x/net/icmp" + +import ( + "encoding/binary" + "errors" + "net" + "runtime" + + "golang.org/x/net/internal/iana" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" +) + +// BUG(mikio): This package is not implemented on JS, NaCl and Plan 9. + +var ( + errInvalidConn = errors.New("invalid connection") + errInvalidProtocol = errors.New("invalid protocol") + errMessageTooShort = errors.New("message too short") + errHeaderTooShort = errors.New("header too short") + errBufferTooShort = errors.New("buffer too short") + errInvalidBody = errors.New("invalid body") + errNoExtension = errors.New("no extension") + errInvalidExtension = errors.New("invalid extension") + errNotImplemented = errors.New("not implemented on " + runtime.GOOS + "/" + runtime.GOARCH) +) + +func checksum(b []byte) uint16 { + csumcv := len(b) - 1 // checksum coverage + s := uint32(0) + for i := 0; i < csumcv; i += 2 { + s += uint32(b[i+1])<<8 | uint32(b[i]) + } + if csumcv&1 == 0 { + s += uint32(b[csumcv]) + } + s = s>>16 + s&0xffff + s = s + s>>16 + return ^uint16(s) +} + +// A Type represents an ICMP message type. +type Type interface { + Protocol() int +} + +// A Message represents an ICMP message. +type Message struct { + Type Type // type, either ipv4.ICMPType or ipv6.ICMPType + Code int // code + Checksum int // checksum + Body MessageBody // body +} + +// Marshal returns the binary encoding of the ICMP message m. +// +// For an ICMPv4 message, the returned message always contains the +// calculated checksum field. +// +// For an ICMPv6 message, the returned message contains the calculated +// checksum field when psh is not nil, otherwise the kernel will +// compute the checksum field during the message transmission. +// When psh is not nil, it must be the pseudo header for IPv6. +func (m *Message) Marshal(psh []byte) ([]byte, error) { + var mtype byte + switch typ := m.Type.(type) { + case ipv4.ICMPType: + mtype = byte(typ) + case ipv6.ICMPType: + mtype = byte(typ) + default: + return nil, errInvalidProtocol + } + b := []byte{mtype, byte(m.Code), 0, 0} + proto := m.Type.Protocol() + if proto == iana.ProtocolIPv6ICMP && psh != nil { + b = append(psh, b...) + } + if m.Body != nil && m.Body.Len(proto) != 0 { + mb, err := m.Body.Marshal(proto) + if err != nil { + return nil, err + } + b = append(b, mb...) + } + if proto == iana.ProtocolIPv6ICMP { + if psh == nil { // cannot calculate checksum here + return b, nil + } + off, l := 2*net.IPv6len, len(b)-len(psh) + binary.BigEndian.PutUint32(b[off:off+4], uint32(l)) + } + s := checksum(b) + // Place checksum back in header; using ^= avoids the + // assumption the checksum bytes are zero. + b[len(psh)+2] ^= byte(s) + b[len(psh)+3] ^= byte(s >> 8) + return b[len(psh):], nil +} + +var parseFns = map[Type]func(int, Type, []byte) (MessageBody, error){ + ipv4.ICMPTypeDestinationUnreachable: parseDstUnreach, + ipv4.ICMPTypeTimeExceeded: parseTimeExceeded, + ipv4.ICMPTypeParameterProblem: parseParamProb, + + ipv4.ICMPTypeEcho: parseEcho, + ipv4.ICMPTypeEchoReply: parseEcho, + ipv4.ICMPTypeExtendedEchoRequest: parseExtendedEchoRequest, + ipv4.ICMPTypeExtendedEchoReply: parseExtendedEchoReply, + + ipv6.ICMPTypeDestinationUnreachable: parseDstUnreach, + ipv6.ICMPTypePacketTooBig: parsePacketTooBig, + ipv6.ICMPTypeTimeExceeded: parseTimeExceeded, + ipv6.ICMPTypeParameterProblem: parseParamProb, + + ipv6.ICMPTypeEchoRequest: parseEcho, + ipv6.ICMPTypeEchoReply: parseEcho, + ipv6.ICMPTypeExtendedEchoRequest: parseExtendedEchoRequest, + ipv6.ICMPTypeExtendedEchoReply: parseExtendedEchoReply, +} + +// ParseMessage parses b as an ICMP message. +// The provided proto must be either the ICMPv4 or ICMPv6 protocol +// number. +func ParseMessage(proto int, b []byte) (*Message, error) { + if len(b) < 4 { + return nil, errMessageTooShort + } + var err error + m := &Message{Code: int(b[1]), Checksum: int(binary.BigEndian.Uint16(b[2:4]))} + switch proto { + case iana.ProtocolICMP: + m.Type = ipv4.ICMPType(b[0]) + case iana.ProtocolIPv6ICMP: + m.Type = ipv6.ICMPType(b[0]) + default: + return nil, errInvalidProtocol + } + if fn, ok := parseFns[m.Type]; !ok { + m.Body, err = parseRawBody(proto, b[4:]) + } else { + m.Body, err = fn(proto, m.Type, b[4:]) + } + if err != nil { + return nil, err + } + return m, nil +} diff --git a/vendor/golang.org/x/net/icmp/messagebody.go b/vendor/golang.org/x/net/icmp/messagebody.go new file mode 100644 index 0000000000..e2d9bfa01b --- /dev/null +++ b/vendor/golang.org/x/net/icmp/messagebody.go @@ -0,0 +1,52 @@ +// Copyright 2012 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package icmp + +// A MessageBody represents an ICMP message body. +type MessageBody interface { + // Len returns the length of ICMP message body. + // The provided proto must be either the ICMPv4 or ICMPv6 + // protocol number. + Len(proto int) int + + // Marshal returns the binary encoding of ICMP message body. + // The provided proto must be either the ICMPv4 or ICMPv6 + // protocol number. + Marshal(proto int) ([]byte, error) +} + +// A RawBody represents a raw message body. +// +// A raw message body is excluded from message processing and can be +// used to construct applications such as protocol conformance +// testing. +type RawBody struct { + Data []byte // data +} + +// Len implements the Len method of MessageBody interface. +func (p *RawBody) Len(proto int) int { + if p == nil { + return 0 + } + return len(p.Data) +} + +// Marshal implements the Marshal method of MessageBody interface. +func (p *RawBody) Marshal(proto int) ([]byte, error) { + return p.Data, nil +} + +// parseRawBody parses b as an ICMP message body. +func parseRawBody(proto int, b []byte) (MessageBody, error) { + p := &RawBody{Data: make([]byte, len(b))} + copy(p.Data, b) + return p, nil +} + +// A DefaultMessageBody represents the default message body. +// +// Deprecated: Use RawBody instead. +type DefaultMessageBody = RawBody diff --git a/vendor/golang.org/x/net/icmp/mpls.go b/vendor/golang.org/x/net/icmp/mpls.go new file mode 100644 index 0000000000..f9f4841bce --- /dev/null +++ b/vendor/golang.org/x/net/icmp/mpls.go @@ -0,0 +1,77 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package icmp + +import "encoding/binary" + +// MPLSLabel represents an MPLS label stack entry. +type MPLSLabel struct { + Label int // label value + TC int // traffic class; formerly experimental use + S bool // bottom of stack + TTL int // time to live +} + +const ( + classMPLSLabelStack = 1 + typeIncomingMPLSLabelStack = 1 +) + +// MPLSLabelStack represents an MPLS label stack. +type MPLSLabelStack struct { + Class int // extension object class number + Type int // extension object sub-type + Labels []MPLSLabel +} + +// Len implements the Len method of Extension interface. +func (ls *MPLSLabelStack) Len(proto int) int { + return 4 + (4 * len(ls.Labels)) +} + +// Marshal implements the Marshal method of Extension interface. +func (ls *MPLSLabelStack) Marshal(proto int) ([]byte, error) { + b := make([]byte, ls.Len(proto)) + if err := ls.marshal(proto, b); err != nil { + return nil, err + } + return b, nil +} + +func (ls *MPLSLabelStack) marshal(proto int, b []byte) error { + l := ls.Len(proto) + binary.BigEndian.PutUint16(b[:2], uint16(l)) + b[2], b[3] = classMPLSLabelStack, typeIncomingMPLSLabelStack + off := 4 + for _, ll := range ls.Labels { + b[off], b[off+1], b[off+2] = byte(ll.Label>>12), byte(ll.Label>>4&0xff), byte(ll.Label<<4&0xf0) + b[off+2] |= byte(ll.TC << 1 & 0x0e) + if ll.S { + b[off+2] |= 0x1 + } + b[off+3] = byte(ll.TTL) + off += 4 + } + return nil +} + +func parseMPLSLabelStack(b []byte) (Extension, error) { + ls := &MPLSLabelStack{ + Class: int(b[2]), + Type: int(b[3]), + } + for b = b[4:]; len(b) >= 4; b = b[4:] { + ll := MPLSLabel{ + Label: int(b[0])<<12 | int(b[1])<<4 | int(b[2])>>4, + TC: int(b[2]&0x0e) >> 1, + TTL: int(b[3]), + } + if b[2]&0x1 != 0 { + ll.S = true + } + ls.Labels = append(ls.Labels, ll) + } + return ls, nil +} diff --git a/vendor/golang.org/x/net/icmp/multipart.go b/vendor/golang.org/x/net/icmp/multipart.go new file mode 100644 index 0000000000..5f36675594 --- /dev/null +++ b/vendor/golang.org/x/net/icmp/multipart.go @@ -0,0 +1,129 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package icmp + +import "golang.org/x/net/internal/iana" + +// multipartMessageBodyDataLen takes b as an original datagram and +// exts as extensions, and returns a required length for message body +// and a required length for a padded original datagram in wire +// format. +func multipartMessageBodyDataLen(proto int, withOrigDgram bool, b []byte, exts []Extension) (bodyLen, dataLen int) { + bodyLen = 4 // length of leading octets + var extLen int + var rawExt bool // raw extension may contain an empty object + for _, ext := range exts { + extLen += ext.Len(proto) + if _, ok := ext.(*RawExtension); ok { + rawExt = true + } + } + if extLen > 0 && withOrigDgram { + dataLen = multipartMessageOrigDatagramLen(proto, b) + } else { + dataLen = len(b) + } + if extLen > 0 || rawExt { + bodyLen += 4 // length of extension header + } + bodyLen += dataLen + extLen + return bodyLen, dataLen +} + +// multipartMessageOrigDatagramLen takes b as an original datagram, +// and returns a required length for a padded orignal datagram in wire +// format. +func multipartMessageOrigDatagramLen(proto int, b []byte) int { + roundup := func(b []byte, align int) int { + // According to RFC 4884, the padded original datagram + // field must contain at least 128 octets. + if len(b) < 128 { + return 128 + } + r := len(b) + return (r + align - 1) &^ (align - 1) + } + switch proto { + case iana.ProtocolICMP: + return roundup(b, 4) + case iana.ProtocolIPv6ICMP: + return roundup(b, 8) + default: + return len(b) + } +} + +// marshalMultipartMessageBody takes data as an original datagram and +// exts as extesnsions, and returns a binary encoding of message body. +// It can be used for non-multipart message bodies when exts is nil. +func marshalMultipartMessageBody(proto int, withOrigDgram bool, data []byte, exts []Extension) ([]byte, error) { + bodyLen, dataLen := multipartMessageBodyDataLen(proto, withOrigDgram, data, exts) + b := make([]byte, bodyLen) + copy(b[4:], data) + if len(exts) > 0 { + b[4+dataLen] = byte(extensionVersion << 4) + off := 4 + dataLen + 4 // leading octets, data, extension header + for _, ext := range exts { + switch ext := ext.(type) { + case *MPLSLabelStack: + if err := ext.marshal(proto, b[off:]); err != nil { + return nil, err + } + off += ext.Len(proto) + case *InterfaceInfo: + attrs, l := ext.attrsAndLen(proto) + if err := ext.marshal(proto, b[off:], attrs, l); err != nil { + return nil, err + } + off += ext.Len(proto) + case *InterfaceIdent: + if err := ext.marshal(proto, b[off:]); err != nil { + return nil, err + } + off += ext.Len(proto) + case *RawExtension: + copy(b[off:], ext.Data) + off += ext.Len(proto) + } + } + s := checksum(b[4+dataLen:]) + b[4+dataLen+2] ^= byte(s) + b[4+dataLen+3] ^= byte(s >> 8) + if withOrigDgram { + switch proto { + case iana.ProtocolICMP: + b[1] = byte(dataLen / 4) + case iana.ProtocolIPv6ICMP: + b[0] = byte(dataLen / 8) + } + } + } + return b, nil +} + +// parseMultipartMessageBody parses b as either a non-multipart +// message body or a multipart message body. +func parseMultipartMessageBody(proto int, typ Type, b []byte) ([]byte, []Extension, error) { + var l int + switch proto { + case iana.ProtocolICMP: + l = 4 * int(b[1]) + case iana.ProtocolIPv6ICMP: + l = 8 * int(b[0]) + } + if len(b) == 4 { + return nil, nil, nil + } + exts, l, err := parseExtensions(typ, b[4:], l) + if err != nil { + l = len(b) - 4 + } + var data []byte + if l > 0 { + data = make([]byte, l) + copy(data, b[4:]) + } + return data, exts, nil +} diff --git a/vendor/golang.org/x/net/icmp/packettoobig.go b/vendor/golang.org/x/net/icmp/packettoobig.go new file mode 100644 index 0000000000..afbf24f1ba --- /dev/null +++ b/vendor/golang.org/x/net/icmp/packettoobig.go @@ -0,0 +1,43 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package icmp + +import "encoding/binary" + +// A PacketTooBig represents an ICMP packet too big message body. +type PacketTooBig struct { + MTU int // maximum transmission unit of the nexthop link + Data []byte // data, known as original datagram field +} + +// Len implements the Len method of MessageBody interface. +func (p *PacketTooBig) Len(proto int) int { + if p == nil { + return 0 + } + return 4 + len(p.Data) +} + +// Marshal implements the Marshal method of MessageBody interface. +func (p *PacketTooBig) Marshal(proto int) ([]byte, error) { + b := make([]byte, 4+len(p.Data)) + binary.BigEndian.PutUint32(b[:4], uint32(p.MTU)) + copy(b[4:], p.Data) + return b, nil +} + +// parsePacketTooBig parses b as an ICMP packet too big message body. +func parsePacketTooBig(proto int, _ Type, b []byte) (MessageBody, error) { + bodyLen := len(b) + if bodyLen < 4 { + return nil, errMessageTooShort + } + p := &PacketTooBig{MTU: int(binary.BigEndian.Uint32(b[:4]))} + if bodyLen > 4 { + p.Data = make([]byte, bodyLen-4) + copy(p.Data, b[4:]) + } + return p, nil +} diff --git a/vendor/golang.org/x/net/icmp/paramprob.go b/vendor/golang.org/x/net/icmp/paramprob.go new file mode 100644 index 0000000000..f16fd33ec2 --- /dev/null +++ b/vendor/golang.org/x/net/icmp/paramprob.go @@ -0,0 +1,72 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package icmp + +import ( + "encoding/binary" + + "golang.org/x/net/internal/iana" + "golang.org/x/net/ipv4" +) + +// A ParamProb represents an ICMP parameter problem message body. +type ParamProb struct { + Pointer uintptr // offset within the data where the error was detected + Data []byte // data, known as original datagram field + Extensions []Extension // extensions +} + +// Len implements the Len method of MessageBody interface. +func (p *ParamProb) Len(proto int) int { + if p == nil { + return 0 + } + l, _ := multipartMessageBodyDataLen(proto, true, p.Data, p.Extensions) + return l +} + +// Marshal implements the Marshal method of MessageBody interface. +func (p *ParamProb) Marshal(proto int) ([]byte, error) { + switch proto { + case iana.ProtocolICMP: + if !validExtensions(ipv4.ICMPTypeParameterProblem, p.Extensions) { + return nil, errInvalidExtension + } + b, err := marshalMultipartMessageBody(proto, true, p.Data, p.Extensions) + if err != nil { + return nil, err + } + b[0] = byte(p.Pointer) + return b, nil + case iana.ProtocolIPv6ICMP: + b := make([]byte, p.Len(proto)) + binary.BigEndian.PutUint32(b[:4], uint32(p.Pointer)) + copy(b[4:], p.Data) + return b, nil + default: + return nil, errInvalidProtocol + } +} + +// parseParamProb parses b as an ICMP parameter problem message body. +func parseParamProb(proto int, typ Type, b []byte) (MessageBody, error) { + if len(b) < 4 { + return nil, errMessageTooShort + } + p := &ParamProb{} + if proto == iana.ProtocolIPv6ICMP { + p.Pointer = uintptr(binary.BigEndian.Uint32(b[:4])) + p.Data = make([]byte, len(b)-4) + copy(p.Data, b[4:]) + return p, nil + } + p.Pointer = uintptr(b[0]) + var err error + p.Data, p.Extensions, err = parseMultipartMessageBody(proto, typ, b) + if err != nil { + return nil, err + } + return p, nil +} diff --git a/vendor/golang.org/x/net/icmp/sys_freebsd.go b/vendor/golang.org/x/net/icmp/sys_freebsd.go new file mode 100644 index 0000000000..c75f3ddaa7 --- /dev/null +++ b/vendor/golang.org/x/net/icmp/sys_freebsd.go @@ -0,0 +1,11 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package icmp + +import "syscall" + +func init() { + freebsdVersion, _ = syscall.SysctlUint32("kern.osreldate") +} diff --git a/vendor/golang.org/x/net/icmp/timeexceeded.go b/vendor/golang.org/x/net/icmp/timeexceeded.go new file mode 100644 index 0000000000..ffa986fdea --- /dev/null +++ b/vendor/golang.org/x/net/icmp/timeexceeded.go @@ -0,0 +1,57 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package icmp + +import ( + "golang.org/x/net/internal/iana" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" +) + +// A TimeExceeded represents an ICMP time exceeded message body. +type TimeExceeded struct { + Data []byte // data, known as original datagram field + Extensions []Extension // extensions +} + +// Len implements the Len method of MessageBody interface. +func (p *TimeExceeded) Len(proto int) int { + if p == nil { + return 0 + } + l, _ := multipartMessageBodyDataLen(proto, true, p.Data, p.Extensions) + return l +} + +// Marshal implements the Marshal method of MessageBody interface. +func (p *TimeExceeded) Marshal(proto int) ([]byte, error) { + var typ Type + switch proto { + case iana.ProtocolICMP: + typ = ipv4.ICMPTypeTimeExceeded + case iana.ProtocolIPv6ICMP: + typ = ipv6.ICMPTypeTimeExceeded + default: + return nil, errInvalidProtocol + } + if !validExtensions(typ, p.Extensions) { + return nil, errInvalidExtension + } + return marshalMultipartMessageBody(proto, true, p.Data, p.Extensions) +} + +// parseTimeExceeded parses b as an ICMP time exceeded message body. +func parseTimeExceeded(proto int, typ Type, b []byte) (MessageBody, error) { + if len(b) < 4 { + return nil, errMessageTooShort + } + p := &TimeExceeded{} + var err error + p.Data, p.Extensions, err = parseMultipartMessageBody(proto, typ, b) + if err != nil { + return nil, err + } + return p, nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index c1543db630..b4f058cf37 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -405,7 +405,7 @@ github.com/gravitational/oxy/utils github.com/gravitational/rigging # github.com/gravitational/roundtrip v1.0.0 github.com/gravitational/roundtrip -# github.com/gravitational/satellite v0.0.9-0.20200826203500-ad8030ab3ddb +# github.com/gravitational/satellite v0.0.9-0.20201117211559-1d546b9eca4e github.com/gravitational/satellite/agent github.com/gravitational/satellite/agent/cache github.com/gravitational/satellite/agent/health @@ -416,6 +416,7 @@ github.com/gravitational/satellite/lib/history github.com/gravitational/satellite/lib/history/sqlite github.com/gravitational/satellite/lib/kubernetes github.com/gravitational/satellite/lib/membership +github.com/gravitational/satellite/lib/nethealth github.com/gravitational/satellite/lib/rpc github.com/gravitational/satellite/lib/rpc/client github.com/gravitational/satellite/monitoring @@ -752,6 +753,7 @@ golang.org/x/net/context/ctxhttp golang.org/x/net/http/httpguts golang.org/x/net/http2 golang.org/x/net/http2/hpack +golang.org/x/net/icmp golang.org/x/net/idna golang.org/x/net/internal/iana golang.org/x/net/internal/socket From 8a33036e4ea85ffc1b61110d81b252b3fc370052 Mon Sep 17 00:00:00 2001 From: Walt Della Date: Thu, 19 Nov 2020 01:21:47 +0000 Subject: [PATCH 3/4] Add RHEL 8.3 install testing. --- assets/robotest/Makefile | 2 +- assets/robotest/config/pr.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/assets/robotest/Makefile b/assets/robotest/Makefile index 4d647445f6..b46c05492f 100644 --- a/assets/robotest/Makefile +++ b/assets/robotest/Makefile @@ -30,7 +30,7 @@ ROBOTEST_CONFIG_SCRIPT = $(TOP)/config/$(ROBOTEST_CONFIG).sh # End variables expected to be set outside this Makefile. # Everything below is Robotest specific. -ROBOTEST_VERSION ?= 2.1.0 +ROBOTEST_VERSION ?= 2.2.0 ROBOTEST_DOCKER_IMAGE ?= quay.io/gravitational/robotest-suite:$(ROBOTEST_VERSION) # ROBOTEST_BUILDDIR is the root of all robotest build artifacts for this build diff --git a/assets/robotest/config/pr.sh b/assets/robotest/config/pr.sh index 8aac15c33d..7a3e37a52a 100755 --- a/assets/robotest/config/pr.sh +++ b/assets/robotest/config/pr.sh @@ -61,7 +61,7 @@ EOF function build_install_suite { local suite='' - local test_os="centos:7" + local test_os="redhat:8.3" local cluster_size='"flavor":"three","nodes":3,"role":"node"' suite+=$(cat < Date: Thu, 19 Nov 2020 11:38:48 -0800 Subject: [PATCH 4/4] Bump planet 7.1.19; Bump satellite 7.1.9 --- Makefile | 2 +- go.mod | 2 +- go.sum | 2 ++ vendor/modules.txt | 2 +- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index f2247f07a5..bb4b5dc732 100644 --- a/Makefile +++ b/Makefile @@ -46,7 +46,7 @@ RELEASE_OUT ?= TELEPORT_TAG = 3.2.16 # TELEPORT_REPOTAG adapts TELEPORT_TAG to the teleport tagging scheme TELEPORT_REPOTAG := v$(TELEPORT_TAG) -PLANET_TAG := 7.1.18-$(K8S_VER_SUFFIX)-2-g61c4dddb +PLANET_TAG := 7.1.19-$(K8S_VER_SUFFIX) PLANET_BRANCH := $(PLANET_TAG) K8S_APP_TAG := $(GRAVITY_TAG) TELEKUBE_APP_TAG := $(GRAVITY_TAG) diff --git a/go.mod b/go.mod index 8336bfa858..2febae4807 100644 --- a/go.mod +++ b/go.mod @@ -64,7 +64,7 @@ require ( github.com/gravitational/oxy v0.0.0-20180629203109-e4a7e35311e6 // indirect github.com/gravitational/rigging v0.0.0-20191021212636-83b2e9505286 github.com/gravitational/roundtrip v1.0.0 - github.com/gravitational/satellite v0.0.9-0.20201117211559-1d546b9eca4e + github.com/gravitational/satellite v0.0.9-0.20201119182158-fe29d7b2c010 github.com/gravitational/tail v1.0.1 github.com/gravitational/teleport v3.2.15-0.20201005225507-eccdab5226f7+incompatible github.com/gravitational/trace v1.1.11 diff --git a/go.sum b/go.sum index b09604faef..bc81a63a97 100644 --- a/go.sum +++ b/go.sum @@ -586,6 +586,8 @@ github.com/gravitational/satellite v0.0.9-0.20200826203500-ad8030ab3ddb h1:XF4a8 github.com/gravitational/satellite v0.0.9-0.20200826203500-ad8030ab3ddb/go.mod h1:gqyBdtaefi/t7Mw//N/eoC4c3YriZZssmOiZ6NPvuek= github.com/gravitational/satellite v0.0.9-0.20201117211559-1d546b9eca4e h1:KCJf3/32nvnzGCLUkwAP4HxFENI+5EMB038hlypGZpg= github.com/gravitational/satellite v0.0.9-0.20201117211559-1d546b9eca4e/go.mod h1:OcgAUN5zzYAEiNPcjQ/MS2+YAMX6jTpPrbP6zIg29GI= +github.com/gravitational/satellite v0.0.9-0.20201119182158-fe29d7b2c010 h1:s2gtOgBX9uAQ33MY4dlQ5p2BHeYFnaNAfieMjeSd95o= +github.com/gravitational/satellite v0.0.9-0.20201119182158-fe29d7b2c010/go.mod h1:OcgAUN5zzYAEiNPcjQ/MS2+YAMX6jTpPrbP6zIg29GI= github.com/gravitational/tail v1.0.1 h1:Yv5nh+zV0yHQ9D9kXGEOFpUoA+Ti5uXHwOGioKSIeng= github.com/gravitational/tail v1.0.1/go.mod h1:3aRU+xPwNCaXykBn4jhiXsSJXX7jalcZtvCYGckNbPw= github.com/gravitational/teleport v3.2.15-0.20200110233851-f4445fa60013+incompatible h1:zXwIbEof+bTDqbYW97fZe7BWEuHa7ccrNe2PcMt8kYU= diff --git a/vendor/modules.txt b/vendor/modules.txt index b4f058cf37..e7c2e8fa70 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -405,7 +405,7 @@ github.com/gravitational/oxy/utils github.com/gravitational/rigging # github.com/gravitational/roundtrip v1.0.0 github.com/gravitational/roundtrip -# github.com/gravitational/satellite v0.0.9-0.20201117211559-1d546b9eca4e +# github.com/gravitational/satellite v0.0.9-0.20201119182158-fe29d7b2c010 github.com/gravitational/satellite/agent github.com/gravitational/satellite/agent/cache github.com/gravitational/satellite/agent/health