Skip to content

Commit

Permalink
[circuit_breaking_drop_reporting] xds: report drops by circuit breaking
Browse files Browse the repository at this point in the history
Those drops will be reported to store with category "".  When reported via LRS,
they will only be counted in total_drops, but not in per category.
  • Loading branch information
menghanl committed Jan 22, 2021
1 parent 2c42474 commit 3e508a3
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 12 deletions.
5 changes: 5 additions & 0 deletions xds/internal/balancer/edsbalancer/eds_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,11 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
}
if d.counter != nil {
if err := d.counter.StartRequest(); err != nil {
// Drops by circuit breaking are reported with empty category. They
// will be reported only in total drops, but not in per category.
if d.loadStore != nil {
d.loadStore.CallDropped("")
}
return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error())
}
pr, err := d.p.Pick(info)
Expand Down
47 changes: 41 additions & 6 deletions xds/internal/balancer/edsbalancer/eds_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
}

// The same locality, different drop rate, dropping 50%.
clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50})
clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{"test-drop": 50})
clab5.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab5.Build()))

Expand Down Expand Up @@ -746,6 +746,10 @@ func (s) TestDropPicker(t *testing.T) {
}

func (s) TestEDS_LoadReport(t *testing.T) {
origCircuitBreakingSupport := env.CircuitBreakingSupport
env.CircuitBreakingSupport = true
defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }()

// We create an xdsClientWrapper with a dummy xdsClientInterface which only
// implements the LoadStore() method to return the underlying load.Store to
// be used.
Expand All @@ -758,10 +762,19 @@ func (s) TestEDS_LoadReport(t *testing.T) {
edsb := newEDSBalancerImpl(cc, nil, lsWrapper, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState

const (
testServiceName = "test-service"
cbMaxRequests = 20
)
var maxRequestsTemp uint32 = cbMaxRequests
client.SetMaxRequests(testServiceName, &maxRequestsTemp)
edsb.updateServiceRequestsCounter(testServiceName)

backendToBalancerID := make(map[balancer.SubConn]internal.LocalityID)

const testDropCategory = "test-drop"
// Two localities, each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{testDropCategory: 50})
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
sc1 := <-cc.NewSubConnCh
Expand All @@ -788,20 +801,42 @@ func (s) TestEDS_LoadReport(t *testing.T) {
// the picks on sc1 should show up as inProgress.
locality1JSON, _ := locality1.ToString()
locality2JSON, _ := locality2.ToString()
const (
rpcCount = 100
// 50% will be dropped with category testDropCategory.
dropWithCategory = rpcCount / 2
// In the remaining RPCs, only cbMaxRequests are allowed by circuit
// breaking. Others will be dropped by CB.
dropWithCB = rpcCount - dropWithCategory - cbMaxRequests

rpcInProgress = cbMaxRequests / 2 // 50% of RPCs will be never done.
rpcSucceeded = cbMaxRequests / 2 // 50% of RPCs will succeed.
)
wantStoreData := []*load.Data{{
Cluster: testClusterNames[0],
Service: "",
LocalityStats: map[string]load.LocalityData{
locality1JSON: {RequestStats: load.RequestData{InProgress: 5}},
locality2JSON: {RequestStats: load.RequestData{Succeeded: 5}},
locality1JSON: {RequestStats: load.RequestData{InProgress: rpcInProgress}},
locality2JSON: {RequestStats: load.RequestData{Succeeded: rpcSucceeded}},
},
TotalDrops: dropWithCategory + dropWithCB,
Drops: map[string]uint64{
testDropCategory: dropWithCategory,
},
}}
for i := 0; i < 10; i++ {

var rpcsToBeDone []balancer.PickResult
// Run the picks, but only pick with sc1 will be done later.
for i := 0; i < rpcCount; i++ {
scst, _ := p1.Pick(balancer.PickInfo{})
if scst.Done != nil && scst.SubConn != sc1 {
scst.Done(balancer.DoneInfo{})
rpcsToBeDone = append(rpcsToBeDone, scst)
}
}
// Call done on those sc1 picks.
for _, scst := range rpcsToBeDone {
scst.Done(balancer.DoneInfo{})
}

gotStoreData := loadStore.Stats(testClusterNames[0:1])
if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(load.Data{}, "ReportInterval")); diff != "" {
Expand Down
7 changes: 6 additions & 1 deletion xds/internal/client/load/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,12 @@ func (ls *perClusterStore) stats() *Data {
return true
}
sd.TotalDrops += d
sd.Drops[key.(string)] = d
keyStr := key.(string)
if keyStr != "" {
// Skip drops without category. They are counted in total_drops, but
// not in per category. One example is drops by circuit breaking.
sd.Drops[key.(string)] = d
}
return true
})
ls.localityRPCCount.Range(func(key, val interface{}) bool {
Expand Down
3 changes: 2 additions & 1 deletion xds/internal/client/load/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ func TestDrops(t *testing.T) {
drops = map[string]int{
dropCategories[0]: 30,
dropCategories[1]: 40,
"": 10,
}
wantStoreData = &Data{
TotalDrops: 70,
TotalDrops: 80,
Drops: map[string]uint64{
dropCategories[0]: 30,
dropCategories[1]: 40,
Expand Down
7 changes: 3 additions & 4 deletions xds/internal/testutils/protos.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package testutils

import (
"fmt"
"net"
"strconv"

Expand Down Expand Up @@ -59,11 +58,11 @@ type ClusterLoadAssignmentBuilder struct {
}

// NewClusterLoadAssignmentBuilder creates a ClusterLoadAssignmentBuilder.
func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *ClusterLoadAssignmentBuilder {
func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents map[string]uint32) *ClusterLoadAssignmentBuilder {
var drops []*v2xdspb.ClusterLoadAssignment_Policy_DropOverload
for i, d := range dropPercents {
for n, d := range dropPercents {
drops = append(drops, &v2xdspb.ClusterLoadAssignment_Policy_DropOverload{
Category: fmt.Sprintf("test-drop-%d", i),
Category: n,
DropPercentage: &v2typepb.FractionalPercent{
Numerator: d,
Denominator: v2typepb.FractionalPercent_HUNDRED,
Expand Down

0 comments on commit 3e508a3

Please sign in to comment.