Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Remove CN from *envoy.Proxy #4773

Merged
merged 11 commits into from
Jun 7, 2022
2 changes: 2 additions & 0 deletions cmd/osm-healthcheck/osm-healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func TestGetHealthcheckHander(t *testing.T) {
//#nosec G307
defer listener.Close()

// required to avoid https://github.com/golang/go/wiki/CommonMistakes#using-goroutines-on-loop-iterator-variables
test := test
go func() {
conn, err := listener.Accept()
assert.Nil(err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/catalog/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ func TestListAllowedUpstreamEndpointsForService(t *testing.T) {
}
pod.Status.PodIPs = podIps
pod.Spec.ServiceAccountName = sa.Name
_, err := kubeClient.CoreV1().Pods(tests.Namespace).Create(context.TODO(), &pod, metav1.CreateOptions{})
_, err := kubeClient.CoreV1().Pods(tests.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
assert.Nil(err)
pods = append(pods, &pod)
pods = append(pods, pod)
}
}
mockKubeController.EXPECT().ListPods().Return(pods).AnyTimes()
Expand Down
12 changes: 3 additions & 9 deletions pkg/catalog/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,9 @@ import (

// NewFakeMeshCatalog creates a new struct implementing catalog.MeshCataloger interface used for testing.
func NewFakeMeshCatalog(kubeClient kubernetes.Interface, meshConfigClient configClientset.Interface) *catalog.MeshCatalog {
var (
mockCtrl *gomock.Controller
mockKubeController *k8s.MockController
mockPolicyController *policy.MockController
)

mockCtrl = gomock.NewController(ginkgo.GinkgoT())
mockKubeController = k8s.NewMockController(mockCtrl)
mockPolicyController = policy.NewMockController(mockCtrl)
mockCtrl := gomock.NewController(ginkgo.GinkgoT())
mockKubeController := k8s.NewMockController(mockCtrl)
mockPolicyController := policy.NewMockController(mockCtrl)

meshSpec := smiFake.NewFakeMeshSpecClient()

Expand Down
6 changes: 3 additions & 3 deletions pkg/catalog/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,19 @@ func newFakeMeshCatalogForRoutes(t *testing.T, testParams testParams) *MeshCatal

// Create a bookstoreV1 pod
bookstoreV1Pod := tests.NewPodFixture(tests.BookstoreV1Service.Namespace, tests.BookstoreV1Service.Name, tests.BookstoreServiceAccountName, tests.PodLabels)
if _, err := kubeClient.CoreV1().Pods(tests.BookstoreV1Service.Namespace).Create(context.TODO(), &bookstoreV1Pod, metav1.CreateOptions{}); err != nil {
if _, err := kubeClient.CoreV1().Pods(tests.BookstoreV1Service.Namespace).Create(context.TODO(), bookstoreV1Pod, metav1.CreateOptions{}); err != nil {
t.Fatalf("Error creating new pod: %s", err.Error())
}

// Create a bookstoreV2 pod
bookstoreV2Pod := tests.NewPodFixture(tests.BookstoreV2Service.Namespace, tests.BookstoreV2Service.Name, tests.BookstoreV2ServiceAccountName, tests.PodLabels)
if _, err := kubeClient.CoreV1().Pods(tests.BookstoreV2Service.Namespace).Create(context.TODO(), &bookstoreV2Pod, metav1.CreateOptions{}); err != nil {
if _, err := kubeClient.CoreV1().Pods(tests.BookstoreV2Service.Namespace).Create(context.TODO(), bookstoreV2Pod, metav1.CreateOptions{}); err != nil {
t.Fatalf("Error creating new pod: %s", err.Error())
}

// Create a bookbuyer pod
bookbuyerPod := tests.NewPodFixture(tests.BookbuyerService.Namespace, tests.BookbuyerService.Name, tests.BookbuyerServiceAccountName, tests.PodLabels)
if _, err := kubeClient.CoreV1().Pods(tests.BookbuyerService.Namespace).Create(context.TODO(), &bookbuyerPod, metav1.CreateOptions{}); err != nil {
if _, err := kubeClient.CoreV1().Pods(tests.BookbuyerService.Namespace).Create(context.TODO(), bookbuyerPod, metav1.CreateOptions{}); err != nil {
t.Fatalf("Error creating new pod: %s", err.Error())
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/catalog/traffictarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ func trafficTargetIdentityToSvcAccount(identitySubject smiAccess.IdentityBinding

// trafficTargetIdentityToServiceIdentity returns an identity of the form <namespace>/<service-account>
func trafficTargetIdentityToServiceIdentity(identitySubject smiAccess.IdentityBindingSubject) identity.ServiceIdentity {
svcAccount := trafficTargetIdentityToSvcAccount(identitySubject)
return identity.GetKubernetesServiceIdentity(svcAccount, identity.ClusterLocalTrustDomain)
return trafficTargetIdentityToSvcAccount(identitySubject).ToServiceIdentity()
}

// trafficTargetIdentitiesToSvcAccounts returns a list of Service Accounts from the given list of identities from a Traffic Target
Expand Down
100 changes: 64 additions & 36 deletions pkg/debugger/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,71 +6,99 @@ import (
"sort"
"time"

"github.com/openservicemesh/osm/pkg/certificate"
"github.com/openservicemesh/osm/pkg/envoy"
)

const (
specificProxyQueryKey = "proxy"
proxyConfigQueryKey = "cfg"
uuidQueryKey = "uuid"
steeling marked this conversation as resolved.
Show resolved Hide resolved
proxyConfigQueryKey = "cfg"
)

func (ds DebugConfig) getProxies() http.Handler {
// This function is needed to convert the list of connected proxies to
// the type (map) required by the printProxies function.
listConnected := func() map[certificate.CommonName]time.Time {
proxies := make(map[certificate.CommonName]time.Time)
for _, proxy := range ds.proxyRegistry.ListConnectedProxies() {
proxies[proxy.GetCertificateCommonName()] = (*proxy).GetConnectedAt()
}
return proxies
}

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html")
if proxyConfigDump, ok := r.URL.Query()[proxyConfigQueryKey]; ok {
ds.getConfigDump(certificate.CommonName(proxyConfigDump[0]), w)
} else if specificProxy, ok := r.URL.Query()[specificProxyQueryKey]; ok {
ds.getProxy(certificate.CommonName(specificProxy[0]), w)
} else {
printProxies(w, listConnected(), "Connected")
proxyConfigDump := r.URL.Query()[proxyConfigQueryKey]
uuid := r.URL.Query()[uuidQueryKey]

switch {
case len(uuid) == 0:
ds.printProxies(w)
case len(proxyConfigDump) > 0:
ds.getConfigDump(uuid[0], w)
default:
ds.getProxy(uuid[0], w)
}
})
}

func printProxies(w http.ResponseWriter, proxies map[certificate.CommonName]time.Time, category string) {
var commonNames []string
for cn := range proxies {
commonNames = append(commonNames, cn.String())
func (ds DebugConfig) printProxies(w http.ResponseWriter) {
// This function is needed to convert the list of connected proxies to
// the type (map) required by the printProxies function.
proxyMap := ds.proxyRegistry.ListConnectedProxies()
proxies := make([]*envoy.Proxy, 0, len(proxyMap))
for _, proxy := range proxyMap {
proxies = append(proxies, proxy)
}

sort.Strings(commonNames)
sort.Slice(proxies, func(i, j int) bool {
return proxies[i].Identity.String() < proxies[j].Identity.String()
})

_, _ = fmt.Fprintf(w, "<h1>%s Proxies (%d):</h1>", category, len(proxies))
_, _ = fmt.Fprintf(w, "<h1>Connected Proxies (%d):</h1>", len(proxies))
_, _ = fmt.Fprint(w, `<table>`)
_, _ = fmt.Fprint(w, "<tr><td>#</td><td>Envoy's certificate CN</td><td>Connected At</td><td>How long ago</td><td>tools</td></tr>")
for idx, cn := range commonNames {
ts := proxies[certificate.CommonName(cn)]
_, _ = fmt.Fprintf(w, `<tr><td>%d:</td><td>%s</td><td>%+v</td><td>(%+v ago)</td><td><a href="/debug/proxy?%s=%s">certs</a></td><td><a href="/debug/proxy?%s=%s">cfg</a></td></tr>`,
idx, cn, ts, time.Since(ts), specificProxyQueryKey, cn, proxyConfigQueryKey, cn)
_, _ = fmt.Fprint(w, "<tr><td>#</td><td>Envoy's Service Identity</td><td>Envoy's UUID</td><td>Connected At</td><td>How long ago</td><td>tools</td></tr>")
for idx, proxy := range proxies {
ts := proxy.GetConnectedAt()
proxyURL := fmt.Sprintf("/debug/proxy?%s=%s", uuidQueryKey, proxy.UUID)
configDumpURL := fmt.Sprintf("%s&%s=%t", proxyURL, proxyConfigQueryKey, true)
_, _ = fmt.Fprintf(w, `<tr><td>%d:</td><td>%s</td><td>%s</td><td>%+v</td><td>(%+v ago)</td><td><a href="%s">certs</a></td><td><a href="%s">cfg</a></td></tr>`,
idx+1, proxy.Identity, proxy.UUID, ts, time.Since(ts), proxyURL, configDumpURL)
}
_, _ = fmt.Fprint(w, `</table>`)
}

func (ds DebugConfig) getConfigDump(cn certificate.CommonName, w http.ResponseWriter) {
pod, err := envoy.GetPodFromCertificate(cn, ds.kubeController)
func (ds DebugConfig) getConfigDump(uuid string, w http.ResponseWriter) {
proxy := ds.proxyRegistry.GetConnectedProxy(uuid)
if proxy != nil {
msg := fmt.Sprintf("Proxy for UUID %s not found, may have been disconnected", uuid)
log.Error().Msg(msg)
if _, err := w.Write([]byte(msg)); err != nil {
log.Error().Err(err).Msg("Error writing debugger response")
}
return
}
pod, err := ds.kubeController.GetPodForProxy(proxy)
if err != nil {
log.Error().Err(err).Msgf("Error getting Pod from certificate with CN=%s", cn)
msg := fmt.Sprintf("Error getting Pod from proxy %s", proxy.GetName())
log.Error().Err(err).Msg(msg)
if _, err := w.Write([]byte(msg)); err != nil {
log.Error().Err(err).Msg("Error writing debugger response")
}
return
}
w.Header().Set("Content-Type", "application/json")
envoyConfig := ds.getEnvoyConfig(pod, "config_dump")
_, _ = fmt.Fprintf(w, "%s", envoyConfig)
}

func (ds DebugConfig) getProxy(cn certificate.CommonName, w http.ResponseWriter) {
pod, err := envoy.GetPodFromCertificate(cn, ds.kubeController)
func (ds DebugConfig) getProxy(uuid string, w http.ResponseWriter) {
proxy := ds.proxyRegistry.GetConnectedProxy(uuid)
if proxy == nil {
msg := fmt.Sprintf("Proxy for UUID %s not found, may have been disconnected", uuid)
log.Error().Msg(msg)
if _, err := w.Write([]byte(msg)); err != nil {
log.Error().Err(err).Msg("Error writing debugger response")
}
return
}
pod, err := ds.kubeController.GetPodForProxy(proxy)
if err != nil {
log.Error().Err(err).Msgf("Error getting Pod from certificate with CN=%s", cn)
msg := fmt.Sprintf("Error getting Pod from proxy %s", proxy.GetName())
log.Error().Err(err).Msg(msg)
if _, err := w.Write([]byte(msg)); err != nil {
log.Error().Err(err).Msg("Error writing debugger response")
}
return
}
w.Header().Set("Content-Type", "application/json")
envoyConfig := ds.getEnvoyConfig(pod, "certs")
Expand Down
28 changes: 10 additions & 18 deletions pkg/envoy/ads/cache_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
v1 "k8s.io/api/core/v1"

"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/certificate"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/identity"
)

// Routine which fulfills listening to proxy broadcasts
Expand Down Expand Up @@ -61,9 +61,6 @@ func (s *Server) allPodUpdater() {
// All verticals use the proxy structure to infer the pod later, so the actual only mandatory
// data for the verticals to be functional is the common name, which links proxy <-> pod
func GetProxyFromPod(pod *v1.Pod) (*envoy.Proxy, error) {
var serviceAccount string
var namespace string

uuidString, uuidFound := pod.Labels[constants.EnvoyUniqueIDLabelName]
if !uuidFound {
return nil, errors.Errorf("UUID not found for pod %s/%s, not a mesh pod", pod.Namespace, pod.Name)
Expand All @@ -73,27 +70,18 @@ func GetProxyFromPod(pod *v1.Pod) (*envoy.Proxy, error) {
return nil, errors.Errorf("Could not parse UUID label into UUID type (%s): %v", uuidString, err)
}

serviceAccount = pod.Spec.ServiceAccountName
namespace = pod.Namespace

// construct CN for this pod/proxy
// TODO: Infer proxy type from Pod
commonName := envoy.NewXDSCertCommonName(proxyUUID, envoy.KindSidecar, serviceAccount, namespace)
tempProxy, err := envoy.NewProxy(certificate.CommonName(commonName), "NoSerial", &net.IPAddr{IP: net.IPv4zero})
sa := pod.Spec.ServiceAccountName
namespace := pod.Namespace

return tempProxy, err
return envoy.NewProxy(envoy.KindSidecar, proxyUUID, identity.New(sa, namespace), &net.IPAddr{IP: net.IPv4zero}), nil
}

// RecordFullSnapshot stores a group of resources as a new Snapshot with a new version in the cache.
// It also runs a consistency check on the snapshot (will warn if there are missing resources referenced in
// the snapshot)
func (s *Server) RecordFullSnapshot(proxy *envoy.Proxy, snapshotResources map[string][]types.Resource) error {
s.configVerMutex.Lock()
s.configVersion[proxy.GetCertificateCommonName().String()]++
s.configVerMutex.Unlock()

snapshot, err := cache.NewSnapshot(
fmt.Sprintf("%d", s.configVersion[proxy.GetCertificateCommonName().String()]),
fmt.Sprintf("%d", s.configVersion[proxy.UUID.String()]),
snapshotResources,
)
if err != nil {
Expand All @@ -104,5 +92,9 @@ func (s *Server) RecordFullSnapshot(proxy *envoy.Proxy, snapshotResources map[st
log.Warn().Err(err).Str("proxy", proxy.String()).Msgf("Snapshot for proxy not consistent")
}

return s.ch.SetSnapshot(context.TODO(), proxy.GetCertificateCommonName().String(), snapshot)
s.configVerMutex.Lock()
defer s.configVerMutex.Unlock()
s.configVersion[proxy.UUID.String()]++

return s.ch.SetSnapshot(context.TODO(), proxy.UUID.String(), snapshot)
}
19 changes: 7 additions & 12 deletions pkg/envoy/ads/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/openservicemesh/osm/pkg/certificate"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/envoy"
)

func TestGetProxyFromPod(t *testing.T) {
assert := tassert.New(t)

var (
// Default fixtures for various test variables
podName = "pod"
namespace = "namespace"
serviceAccount = "serviceAccount"
validUUID = uuid.New()
validCommonName = envoy.NewXDSCertCommonName(validUUID, envoy.KindSidecar, serviceAccount, namespace)
podName = "pod"
namespace = "namespace"
serviceAccount = "serviceAccount"
validUUID = uuid.New()
)

testCases := []struct {
Expand All @@ -32,8 +29,7 @@ func TestGetProxyFromPod(t *testing.T) {
pod *v1.Pod

// Output check
expectErr bool
commonName certificate.CommonName
expectErr bool
}{
{
testCaseName: "Pod with no label",
Expand Down Expand Up @@ -80,7 +76,6 @@ func TestGetProxyFromPod(t *testing.T) {
ServiceAccountName: serviceAccount,
},
},
commonName: validCommonName,
},
}

Expand All @@ -90,8 +85,8 @@ func TestGetProxyFromPod(t *testing.T) {
if testCase.expectErr {
assert.Error(err)
} else {
assert.Equal(proxyResult.GetCertificateCommonName(), testCase.commonName,
"%s: Did not return equal common name")
assert.NotNil(proxyResult)
assert.Equal(proxyResult.UUID, validUUID)
}
}
}
1 change: 1 addition & 0 deletions pkg/envoy/ads/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ var errGrpcClosed = errors.New("grpc closed")
var errTooManyConnections = errors.New("too many connections")
var errServiceAccountMismatch = errors.New("service account mismatch in nodeid vs xds certificate common name")
var errUnsuportedXDSRequest = errors.New("Unsupported XDS server connection type")
var errInvalidCertificateCN = errors.New("invalid cn")
9 changes: 1 addition & 8 deletions pkg/envoy/ads/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,5 @@ func (proxyJob *proxyResponseJob) Run() {

// JobName implementation for this job, for logging purposes
func (proxyJob *proxyResponseJob) JobName() string {
return fmt.Sprintf("sendJob-%s", proxyJob.proxy.GetCertificateSerialNumber())
}

// Hash implementation for this job to hash into the worker queues
func (proxyJob *proxyResponseJob) Hash() uint64 {
// Uses proxy hash to always serialize work for the same proxy to the same worker,
// this avoid out-of-order mishandling of envoy updates by multiple workers
return proxyJob.proxy.GetHash()
return fmt.Sprintf("sendJob-%s", proxyJob.proxy.GetName())
}
12 changes: 6 additions & 6 deletions pkg/envoy/ads/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,23 @@ func xdsPathTimeTrack(startedAt time.Time, typeURI envoy.TypeURI, proxy *envoy.P
Observe(elapsed.Seconds())
}

func (s *Server) trackXDSLog(proxyName string, typeURL envoy.TypeURI) {
func (s *Server) trackXDSLog(proxyID string, typeURL envoy.TypeURI) {
steeling marked this conversation as resolved.
Show resolved Hide resolved
s.withXdsLogMutex(func() {
if _, ok := s.xdsLog[proxyName]; !ok {
s.xdsLog[proxyName] = make(map[envoy.TypeURI][]time.Time)
if _, ok := s.xdsLog[proxyID]; !ok {
s.xdsLog[proxyID] = make(map[envoy.TypeURI][]time.Time)
}

timeSlice, ok := s.xdsLog[proxyName][typeURL]
timeSlice, ok := s.xdsLog[proxyID][typeURL]
if !ok {
s.xdsLog[proxyName][typeURL] = []time.Time{time.Now()}
s.xdsLog[proxyID][typeURL] = []time.Time{time.Now()}
return
}

timeSlice = append(timeSlice, time.Now())
if len(timeSlice) > MaxXdsLogsPerProxy {
timeSlice = timeSlice[1:]
}
s.xdsLog[proxyName][typeURL] = timeSlice
s.xdsLog[proxyID][typeURL] = timeSlice
})
}

Expand Down
Loading