Skip to content

Commit

Permalink
fix(kuma-cp) bug with lost update of Dataplane (#1313)
Browse files Browse the repository at this point in the history
  • Loading branch information
lobkovilya authored Dec 18, 2020
1 parent d24b69d commit d595909
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pkg/core/xds/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type FaultInjectionMap map[mesh_proto.InboundInterface]*mesh_proto.FaultInjectio
type TrafficPermissionMap map[mesh_proto.InboundInterface]*mesh_core.TrafficPermissionResource

type CLACache interface {
GetCLA(ctx context.Context, meshName, service string) (*envoy_api_v2.ClusterLoadAssignment, error)
GetCLA(ctx context.Context, meshName, meshHash, service string) (*envoy_api_v2.ClusterLoadAssignment, error)
}
type Proxy struct {
Id ProxyId
Expand Down
4 changes: 4 additions & 0 deletions pkg/util/watchdog/watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type SimpleWatchdog struct {
NewTicker func() *time.Ticker
OnTick func() error
OnError func(error)
OnStop func()
}

func (w *SimpleWatchdog) Start(stop <-chan struct{}) {
Expand All @@ -25,6 +26,9 @@ func (w *SimpleWatchdog) Start(stop <-chan struct{}) {
w.OnError(err)
}
case <-stop:
if w.OnStop != nil {
w.OnStop()
}
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/xds/cache/cla/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func NewCache(
}, nil
}

func (c *Cache) GetCLA(ctx context.Context, meshName, service string) (*envoy_api_v2.ClusterLoadAssignment, error) {
key := fmt.Sprintf("%s:%s", meshName, service)
func (c *Cache) GetCLA(ctx context.Context, meshName, meshHash, service string) (*envoy_api_v2.ClusterLoadAssignment, error) {
key := fmt.Sprintf("%s:%s:%s", meshName, service, meshHash)
value, found := c.cache.Get(key)
if found {
c.metrics.WithLabelValues("get", "hit").Inc()
Expand Down
8 changes: 4 additions & 4 deletions pkg/xds/cache/cla/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var _ = Describe("ClusterLoadAssignment Cache", func() {

It("should cache Get() queries", func() {
By("getting CLA for the first time")
cla, err := claCache.GetCLA(context.Background(), "mesh-0", "backend")
cla, err := claCache.GetCLA(context.Background(), "mesh-0", "", "backend")
Expect(err).ToNot(HaveOccurred())
Expect(countingManager.getQueries).To(Equal(1))
Expect(countingManager.listQueries).To(Equal(2))
Expand All @@ -115,7 +115,7 @@ var _ = Describe("ClusterLoadAssignment Cache", func() {
Expect(js).To(MatchJSON(string(expected)))

By("getting cached CLA")
_, err = claCache.GetCLA(context.Background(), "mesh-0", "backend")
_, err = claCache.GetCLA(context.Background(), "mesh-0", "", "backend")
Expect(err).ToNot(HaveOccurred())
Expect(countingManager.getQueries).To(Equal(1))
Expect(countingManager.listQueries).To(Equal(2))
Expand All @@ -130,7 +130,7 @@ var _ = Describe("ClusterLoadAssignment Cache", func() {

<-time.After(2 * time.Second)

cla, err = claCache.GetCLA(context.Background(), "mesh-0", "backend")
cla, err = claCache.GetCLA(context.Background(), "mesh-0", "", "backend")
Expect(err).ToNot(HaveOccurred())
Expect(countingManager.getQueries).To(Equal(2))
Expect(countingManager.listQueries).To(Equal(4))
Expand All @@ -147,7 +147,7 @@ var _ = Describe("ClusterLoadAssignment Cache", func() {
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
cla, err := claCache.GetCLA(context.Background(), "mesh-0", "backend")
cla, err := claCache.GetCLA(context.Background(), "mesh-0", "", "backend")
Expect(err).ToNot(HaveOccurred())

marshalled, err := json.Marshal(cla) // to imitate Read access to 'cla'
Expand Down
1 change: 1 addition & 0 deletions pkg/xds/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (c Context) SDSLocation() string {
type MeshContext struct {
Resource *mesh_core.MeshResource
Dataplanes *mesh_core.DataplaneResourceList
Hash string
}

func BuildControlPlaneContext(config kuma_cp.Config) (*ControlPlaneContext, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/xds/generator/outbound_proxy_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (_ OutboundProxyGenerator) generateEDS(ctx xds_context.Context, proxy *mode
// We are not allowed to add endpoints with DNS names through EDS.
if !clusters.Get(clusterName).HasExternalService() {
serviceName := clusters.Tags(clusterName)[0][kuma_mesh.ServiceTag]
loadAssignment, err := proxy.CLACache.GetCLA(context.Background(), ctx.Mesh.Resource.Meta.GetName(), serviceName)
loadAssignment, err := proxy.CLACache.GetCLA(context.Background(), ctx.Mesh.Resource.Meta.GetName(), ctx.Mesh.Hash, serviceName)
if err != nil {
return nil, errors.Wrapf(err, "could not get ClusterLoadAssingment for %s", serviceName)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/xds/generator/proxy_template_profile_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type dummyCLACache struct {
outboundTargets model.EndpointMap
}

func (d *dummyCLACache) GetCLA(ctx context.Context, meshName, service string) (*envoy_api_v2.ClusterLoadAssignment, error) {
func (d *dummyCLACache) GetCLA(ctx context.Context, meshName, meshHash, service string) (*envoy_api_v2.ClusterLoadAssignment, error) {
return endpoints.CreateClusterLoadAssignment(service, d.outboundTargets[service]), nil
}

Expand Down
52 changes: 35 additions & 17 deletions pkg/xds/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
envoy_server "github.com/envoyproxy/go-control-plane/pkg/server/v2"
"google.golang.org/grpc"

"github.com/kumahq/kuma/pkg/xds/ingress"

dp_server "github.com/kumahq/kuma/pkg/config/dp-server"
core_system "github.com/kumahq/kuma/pkg/core/resources/apis/system"
"github.com/kumahq/kuma/pkg/core/resources/registry"
Expand Down Expand Up @@ -38,7 +40,6 @@ import (
k8s_auth "github.com/kumahq/kuma/pkg/xds/auth/k8s"
universal_auth "github.com/kumahq/kuma/pkg/xds/auth/universal"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
"github.com/kumahq/kuma/pkg/xds/ingress"
xds_sync "github.com/kumahq/kuma/pkg/xds/sync"
xds_template "github.com/kumahq/kuma/pkg/xds/template"
xds_topology "github.com/kumahq/kuma/pkg/xds/topology"
Expand Down Expand Up @@ -220,22 +221,30 @@ func DefaultDataplaneSyncTracker(rt core_runtime.Runtime, reconciler, ingressRec
}()

ctx := context.Background()
dataplane := core_mesh.NewDataplaneResource()
proxyID := xds.FromResourceKey(key)

if err := rt.ReadOnlyResourceManager().Get(ctx, dataplane, core_store.GetBy(key)); err != nil {
// first of all calculate Hash before fetching any resources,
// otherwise we can have lost updates
snapshotHash, err := meshSnapshotCache.GetHash(ctx, proxyID.Mesh)
if err != nil {
return err
}

dataplane := core_mesh.NewDataplaneResource()
if err := rt.ResourceManager().Get(ctx, dataplane, core_store.GetBy(key)); err != nil {
if core_store.IsResourceNotFound(err) {
return reconciler.Clear(&proxyID)
}
return err
}

resolvedDp, err := xds_topology.ResolveAddress(rt.LookupIP(), dataplane)
if err != nil {
return err
}
dataplane = resolvedDp

// hash for Ingress should be calculated based on all dataplanes in all meshes,
// we don't do that now, so just ignore existing `snapshotHash` and always reconcile Ingress
if dataplane.Spec.IsIngress() {
// update Ingress
allMeshDataplanes := &core_mesh.DataplaneResourceList{}
Expand Down Expand Up @@ -267,18 +276,15 @@ func DefaultDataplaneSyncTracker(rt core_runtime.Runtime, reconciler, ingressRec
return ingressReconciler.Reconcile(envoyCtx, &proxy)
}

snapshotHash, err := meshSnapshotCache.GetHash(ctx, dataplane.GetMeta().GetMesh())
if err != nil {
return err
}
// if previous reconciliation was without an error AND current hash is equal to previous hash
// then we don't reconcile
if prevHash != "" && snapshotHash == prevHash {
return nil
}
log.V(1).Info("snapshot hash updated, reconcile", "prev", prevHash, "current", snapshotHash)
prevHash = snapshotHash

mesh := core_mesh.NewMeshResource()
if err := rt.ReadOnlyResourceManager().Get(ctx, mesh, core_store.GetByKey(proxyID.Mesh, core_model.NoMesh)); err != nil {
meshRes := core_mesh.NewMeshResource()
if err := rt.ReadOnlyResourceManager().Get(ctx, meshRes, core_store.GetByKey(proxyID.Mesh, core_model.NoMesh)); err != nil {
return err
}

Expand All @@ -293,8 +299,9 @@ func DefaultDataplaneSyncTracker(rt core_runtime.Runtime, reconciler, ingressRec
envoyCtx := xds_context.Context{
ControlPlane: envoyCpCtx,
Mesh: xds_context.MeshContext{
Resource: mesh,
Resource: meshRes,
Dataplanes: dataplanes,
Hash: snapshotHash,
},
ConnectionInfo: connectionInfoTracker.ConnectionInfo(streamId),
}
Expand All @@ -310,7 +317,7 @@ func DefaultDataplaneSyncTracker(rt core_runtime.Runtime, reconciler, ingressRec

// resolve all endpoints that match given selectors
outbound := xds_topology.BuildEndpointMap(
mesh, rt.Config().Multizone.Remote.Zone,
meshRes, rt.Config().Multizone.Remote.Zone,
dataplanes.Items, externalServices.Items, rt.DataSourceLoader())

healthChecks, err := xds_topology.GetHealthChecks(ctx, dataplane, destinations, rt.ReadOnlyResourceManager())
Expand All @@ -329,10 +336,10 @@ func DefaultDataplaneSyncTracker(rt core_runtime.Runtime, reconciler, ingressRec
}
var tracingBackend *mesh_proto.TracingBackend
if trafficTrace != nil {
tracingBackend = mesh.GetTracingBackend(trafficTrace.Spec.GetConf().GetBackend())
tracingBackend = meshRes.GetTracingBackend(trafficTrace.Spec.GetConf().GetBackend())
}

matchedPermissions, err := permissionsMatcher.Match(ctx, dataplane, mesh)
matchedPermissions, err := permissionsMatcher.Match(ctx, dataplane, meshRes)
if err != nil {
return err
}
Expand All @@ -342,7 +349,7 @@ func DefaultDataplaneSyncTracker(rt core_runtime.Runtime, reconciler, ingressRec
return err
}

faultInjection, err := faultInjectionMatcher.Match(ctx, dataplane, mesh)
faultInjection, err := faultInjectionMatcher.Match(ctx, dataplane, meshRes)
if err != nil {
return err
}
Expand All @@ -363,12 +370,23 @@ func DefaultDataplaneSyncTracker(rt core_runtime.Runtime, reconciler, ingressRec
FaultInjections: faultInjection,
CLACache: claCache,
}
return reconciler.Reconcile(envoyCtx, &proxy)
err = reconciler.Reconcile(envoyCtx, &proxy)
if err != nil {
return err
}
prevHash = snapshotHash
return nil
},
OnError: func(err error) {
xdsGenerationsErrors.Inc()
log.Error(err, "OnTick() failed")
},
OnStop: func() {
proxyID := xds.FromResourceKey(key)
if err := reconciler.Clear(&proxyID); err != nil {
log.Error(err, "OnStop() failed")
}
},
}
}), nil
}
Expand Down

0 comments on commit d595909

Please sign in to comment.