Skip to content

Commit

Permalink
stable-2.14.8 change notes (#11870)
Browse files Browse the repository at this point in the history
* destination: Fix GetProfile endpoint buffering (#11815)

In 71635cb and 357a1d3 we updated the endpoint and profile translators
to prevent backpressure from stalling the informer tasks. This change
updates the endpoint profile translator with the same fix, so that
updates are buffered and can detect when when a gRPC stream is stalled.

Furthermore, the update method is updated to use a protobuf-aware
comparison method instead of using serialization and string comparison.

A test is added for the endpoint profile translator, since none existed
previously.

* stable-2.14.8

This stable release fixes an issue in the control plane where discovery for pod
IP addresses could hang indefinitely ([#11815]).

---------

Co-authored-by: Oliver Gould <[email protected]>
  • Loading branch information
alpeb and olix0r authored Jan 3, 2024
1 parent 5902ad5 commit 3af6563
Show file tree
Hide file tree
Showing 14 changed files with 255 additions and 47 deletions.
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changes

## stable-2.14.8

This stable release fixes an issue in the control plane where discovery for pod
IP addresses could hang indefinitely ([#11815]).

[#11815]: https://github.com/linkerd/linkerd2/pull/11815

## stable-2.14.7

This stable release fixes two bugs in the Linkerd control plane.
Expand Down
2 changes: 1 addition & 1 deletion charts/linkerd-control-plane/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies:
- name: partials
version: 0.1.0
repository: file://../partials
version: 1.16.8
version: 1.16.9
icon: https://linkerd.io/images/logo-only-200h.png
maintainers:
- name: Linkerd authors
Expand Down
2 changes: 1 addition & 1 deletion charts/linkerd-control-plane/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Linkerd gives you observability, reliability, and security
for your microservices — with no code change required.

![Version: 1.16.8](https://img.shields.io/badge/Version-1.16.8-informational?style=flat-square)
![Version: 1.16.9](https://img.shields.io/badge/Version-1.16.9-informational?style=flat-square)
![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square)
![AppVersion: edge-XX.X.X](https://img.shields.io/badge/AppVersion-edge--XX.X.X-informational?style=flat-square)

Expand Down
102 changes: 86 additions & 16 deletions controller/api/destination/endpoint_profile_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,122 @@ import (
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
logging "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

type endpointProfileTranslator struct {
enableH2Upgrade bool
controllerNS string
identityTrustDomain string
defaultOpaquePorts map[uint32]struct{}
stream pb.Destination_GetProfileServer
lastMessage string

stream pb.Destination_GetProfileServer
endStream chan struct{}

updates chan *watcher.Address
stop chan struct{}

current *pb.DestinationProfile

k8sAPI *k8s.API
metadataAPI *k8s.MetadataAPI
log *logging.Entry
}

// newEndpointProfileTranslator translates pod updates and protocol updates to
// endpointProfileUpdatesQueueOverflowCounter is a prometheus counter that is incremented
// whenever the profile updates queue overflows.
//
// We omit ip and port labels because they are high cardinality.
var endpointProfileUpdatesQueueOverflowCounter = promauto.NewCounter(
prometheus.CounterOpts{
Name: "endpoint_profile_updates_queue_overflow",
Help: "A counter incremented whenever the endpoint profile updates queue overflows",
},
)

// newEndpointProfileTranslator translates pod updates and profile updates to
// DestinationProfiles for endpoints
func newEndpointProfileTranslator(
enableH2Upgrade bool,
controllerNS,
identityTrustDomain string,
defaultOpaquePorts map[uint32]struct{},
log *logging.Entry,
stream pb.Destination_GetProfileServer,
k8sAPI *k8s.API,
metadataAPI *k8s.MetadataAPI,
stream pb.Destination_GetProfileServer,
endStream chan struct{},
log *logging.Entry,
) *endpointProfileTranslator {
return &endpointProfileTranslator{
enableH2Upgrade: enableH2Upgrade,
controllerNS: controllerNS,
identityTrustDomain: identityTrustDomain,
defaultOpaquePorts: defaultOpaquePorts,
stream: stream,
endStream: endStream,
updates: make(chan *watcher.Address, updateQueueCapacity),
stop: make(chan struct{}),
k8sAPI: k8sAPI,
metadataAPI: metadataAPI,
log: log.WithField("component", "endpoint-profile-translator"),
}
}

// Update sends a DestinationProfile message into the stream, if the same
// message hasn't been sent already. If it has, false is returned.
func (ept *endpointProfileTranslator) Update(address *watcher.Address) (bool, error) {
// Start initiates a goroutine which processes update events off of the
// endpointProfileTranslator's internal queue and sends to the grpc stream as
// appropriate. The goroutine calls non-thread-safe Send, therefore Start must
// not be called more than once.
func (ept *endpointProfileTranslator) Start() {
go func() {
for {
select {
case update := <-ept.updates:
ept.update(update)
case <-ept.stop:
return
}
}
}()
}

// Stop terminates the goroutine started by Start.
func (ept *endpointProfileTranslator) Stop() {
close(ept.stop)
}

// Update enqueues an address update to be translated into a DestinationProfile.
// An error is returned if the update cannot be enqueued.
func (ept *endpointProfileTranslator) Update(address *watcher.Address) error {
select {
case ept.updates <- address:
// Update has been successfully enqueued.
return nil
default:
select {
case <-ept.endStream:
// The endStream channel has already been closed so no action is
// necessary.
return fmt.Errorf("profile update stream closed")
default:
// We are unable to enqueue because the channel does not have capacity.
// The stream has fallen too far behind and should be closed.
endpointProfileUpdatesQueueOverflowCounter.Inc()
close(ept.endStream)
return fmt.Errorf("profile update queue full; aborting stream")
}
}
}

func (ept *endpointProfileTranslator) update(address *watcher.Address) {
opaquePorts := watcher.GetAnnotatedOpaquePorts(address.Pod, ept.defaultOpaquePorts)
endpoint, err := ept.createEndpoint(*address, opaquePorts)
if err != nil {
return false, fmt.Errorf("failed to create endpoint: %w", err)
ept.log.Errorf("Failed to create endpoint for %s:%d: %s",
address.IP, address.Port, err)
return
}

// The protocol for an endpoint should only be updated if there is a pod,
Expand All @@ -79,17 +148,18 @@ func (ept *endpointProfileTranslator) Update(address *watcher.Address) (bool, er
Endpoint: endpoint,
OpaqueProtocol: address.OpaqueProtocol,
}
msg := profile.String()
if msg == ept.lastMessage {
return false, nil
if proto.Equal(profile, ept.current) {
ept.log.Debugf("Ignoring redundant profile update: %+v", profile)
return
}
ept.lastMessage = msg
ept.log.Debugf("sending protocol update: %+v", profile)

ept.log.Debugf("Sending profile update: %+v", profile)
if err := ept.stream.Send(profile); err != nil {
return false, fmt.Errorf("failed to send protocol update: %w", err)
ept.log.Errorf("failed to send profile update: %s", err)
return
}

return true, nil
ept.current = profile
}

func (ept *endpointProfileTranslator) createEndpoint(address watcher.Address, opaquePorts map[uint32]struct{}) (*pb.WeightedAddr, error) {
Expand Down
127 changes: 127 additions & 0 deletions controller/api/destination/endpoint_profile_translator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package destination

import (
"testing"
"time"

pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
consts "github.com/linkerd/linkerd2/pkg/k8s"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestEndpointProfileTranslator(t *testing.T) {
// logging.SetLevel(logging.TraceLevel)
// defer logging.SetLevel(logging.PanicLevel)

addr := &watcher.Address{
IP: "10.10.11.11",
Port: 8080,
}
podAddr := &watcher.Address{
IP: "10.10.11.11",
Port: 8080,
Pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
consts.ProxyOpaquePortsAnnotation: "8080",
},
},
},
}

t.Run("Sends update", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{
profilesReceived: make(chan *pb.DestinationProfile, 1),
}
log := logging.WithField("test", t.Name())
translator := newEndpointProfileTranslator(
true, "cluster", "identity", make(map[uint32]struct{}),
nil, nil,
mockGetProfileServer,
nil,
log,
)
translator.Start()
defer translator.Stop()

if err := translator.Update(addr); err != nil {
t.Fatal("Expected update")
}
select {
case p := <-mockGetProfileServer.profilesReceived:
log.Debugf("Received update: %v", p)
case <-time.After(1 * time.Second):
t.Fatal("No update received")
}

if err := translator.Update(addr); err != nil {
t.Fatal("Unexpected update")
}
select {
case p := <-mockGetProfileServer.profilesReceived:
t.Fatalf("Duplicate update sent: %v", p)
case <-time.After(1 * time.Second):
}

if err := translator.Update(podAddr); err != nil {
t.Fatal("Expected update")
}
select {
case p := <-mockGetProfileServer.profilesReceived:
log.Debugf("Received update: %v", p)
case <-time.After(1 * time.Second):
}
})

t.Run("Handles overflow", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{
profilesReceived: make(chan *pb.DestinationProfile, 1),
}
log := logging.WithField("test", t.Name())
endStream := make(chan struct{})
translator := newEndpointProfileTranslator(
true, "cluster", "identity", make(map[uint32]struct{}),
nil, nil,
mockGetProfileServer,
endStream,
log,
)
translator.Start()
defer translator.Stop()

for i := 0; i < updateQueueCapacity/2; i++ {
if err := translator.Update(podAddr); err != nil {
t.Fatal("Expected update")
}
select {
case <-endStream:
t.Fatal("Stream ended prematurely")
default:
}

if err := translator.Update(addr); err != nil {
t.Fatal("Expected update")
}
select {
case <-endStream:
t.Fatal("Stream ended prematurely")
default:
}
}

if err := translator.Update(podAddr); err == nil {
t.Fatal("Expected update to fail")
}
select {
case <-endStream:
default:
t.Fatal("Stream should have ended")
}

// XXX We should assert that endpointProfileUpdatesQueueOverflowCounter
// == 1 but we can't read counter values.
})
}
13 changes: 10 additions & 3 deletions controller/api/destination/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,16 +484,21 @@ func (s *server) subscribeToEndpointProfile(
log *logging.Entry,
stream pb.Destination_GetProfileServer,
) error {
canceled := stream.Context().Done()
streamEnd := make(chan struct{})
translator := newEndpointProfileTranslator(
s.enableH2Upgrade,
s.controllerNS,
s.identityTrustDomain,
s.defaultOpaquePorts,
log,
stream,
s.k8sAPI,
s.metadataAPI,
stream,
streamEnd,
log,
)
translator.Start()
defer translator.Stop()

var err error
ip, err = s.pods.Subscribe(service, hostname, ip, port, translator)
Expand All @@ -504,8 +509,10 @@ func (s *server) subscribeToEndpointProfile(

select {
case <-s.shutdown:
case <-stream.Context().Done():
case <-canceled:
s.log.Debugf("Cancelled")
case <-streamEnd:
log.Errorf("GetProfile %s:%d stream aborted", ip, port)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ spec:
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
}
log := logging.WithField("test", t.Name())
logging.SetLevel(logging.TraceLevel)
// logging.SetLevel(logging.TraceLevel)
defaultOpaquePorts := map[uint32]struct{}{
25: {},
443: {},
Expand Down
Loading

0 comments on commit 3af6563

Please sign in to comment.