From 89da54a6862088f72764de168b02e31f31e20d7e Mon Sep 17 00:00:00 2001 From: ajssmith Date: Thu, 30 Nov 2023 11:05:06 -0500 Subject: [PATCH] fix connector reconcile --- pkg/flow/flow_mem_driver.go | 24 ++-- pkg/flow/flow_mem_driver_test.go | 195 ++++++++++++++++++++++++++++--- 2 files changed, 195 insertions(+), 24 deletions(-) diff --git a/pkg/flow/flow_mem_driver.go b/pkg/flow/flow_mem_driver.go index 76fed1671..dfd9aa882 100644 --- a/pkg/flow/flow_mem_driver.go +++ b/pkg/flow/flow_mem_driver.go @@ -2436,18 +2436,24 @@ func (fc *FlowCollector) reconcileConnectorRecords() error { } for _, process := range fc.Processes { if siteId == process.Parent { - if process.SourceHost != nil { + if process.SourceHost != nil && matchHost != nil { if *matchHost == *process.SourceHost { - connector.ProcessId = &process.Identity - connector.Target = process.Name - process.connector = &connector.Identity - process.ProcessBinding = &Bound - fc.updateNetworkStatus() - log.Printf("COLLECTOR: Connector %s/%s associated to process %s\n", connector.Identity, *connector.Address, *process.Name) - delete(fc.connectorsToReconcile, connId) found = true - break } + } else if process.HostName != nil { + if *process.HostName == *connector.DestHost { + found = true + } + } + if found { + connector.ProcessId = &process.Identity + connector.Target = process.Name + process.connector = &connector.Identity + process.ProcessBinding = &Bound + fc.updateNetworkStatus() + log.Printf("COLLECTOR: Connector %s/%s associated to process %s\n", connector.Identity, *connector.Address, *process.Name) + delete(fc.connectorsToReconcile, connId) + break } } } diff --git a/pkg/flow/flow_mem_driver_test.go b/pkg/flow/flow_mem_driver_test.go index a6eb2f6ce..e6a328194 100644 --- a/pkg/flow/flow_mem_driver_test.go +++ b/pkg/flow/flow_mem_driver_test.go @@ -12,9 +12,12 @@ import ( ) func TestRecordGraphWithMetrics(t *testing.T) { - name := "skupper-site" - namespace := "skupper-public" + name := "skupper-site1" + namespace := "skupper-public1" provider := "aws" + name2 := "skupper-site2" + namespace2 := "skupper-public2" + provider2 := "gke" address := "tcp-go-echo" address2 := "redis-cart" address3 := "mongo" @@ -31,6 +34,8 @@ func TestRecordGraphWithMetrics(t *testing.T) { connectorProcess1 := "process:1" connectorProcess2 := "process:2" processConnector := "connector:0" + destHost1 := "10.20.30.40" + destHost2 := "host.cloud.com" sites := []SiteRecord{ { @@ -43,6 +48,16 @@ func TestRecordGraphWithMetrics(t *testing.T) { NameSpace: &namespace, Provider: &provider, }, + { + Base: Base{ + RecType: recordNames[Site], + Identity: "site:1", + StartTime: uint64(time.Now().UnixNano()) / uint64(time.Microsecond), + }, + Name: &name2, + NameSpace: &namespace2, + Provider: &provider2, + }, } routers := []RouterRecord{ { @@ -126,9 +141,10 @@ func TestRecordGraphWithMetrics(t *testing.T) { Parent: "site:0", StartTime: uint64(time.Now().UnixNano()) / uint64(time.Microsecond), }, - Name: &processName2, - GroupName: &groupName1, - connector: &processConnector, + Name: &processName2, + GroupName: &groupName1, + connector: &processConnector, + SourceHost: &destHost1, }, { Base: Base{ @@ -139,6 +155,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { }, Name: &processName1, GroupName: &groupName1, + HostName: &destHost2, }, } listeners := []ListenerRecord{ @@ -149,8 +166,9 @@ func TestRecordGraphWithMetrics(t *testing.T) { Parent: "router:0", StartTime: uint64(time.Now().UnixNano()) / uint64(time.Microsecond), }, - Address: &address, - Protocol: &protocol, + Address: &address, + AddressId: &address, + Protocol: &protocol, }, { Base: Base{ @@ -159,7 +177,9 @@ func TestRecordGraphWithMetrics(t *testing.T) { Parent: "router:1", StartTime: uint64(time.Now().UnixNano()) / uint64(time.Microsecond), }, - Protocol: &protocol, + Protocol: &protocol, + Address: &address, + AddressId: &address, }, { Base: Base{ @@ -183,6 +203,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { Address: &address3, Protocol: &protocol, ProcessId: &connectorProcess1, + DestHost: &destHost1, }, { Base: Base{ @@ -203,6 +224,19 @@ func TestRecordGraphWithMetrics(t *testing.T) { Protocol: &protocol, Address: &address4, ProcessId: &connectorProcess2, + DestHost: &destHost2, + }, + { + Base: Base{ + RecType: recordNames[Connector], + Identity: "connector:2", + Parent: "router:0", + StartTime: uint64(time.Now().UnixNano()) / uint64(time.Microsecond), + }, + Protocol: &protocol, + Address: &address, + ProcessId: &connectorProcess1, + DestHost: &destHost1, }, } flows := []FlowRecord{ @@ -227,6 +261,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { Trace: &flowTrace, ProcessName: &processName2, Process: &connectorProcess1, + SourceHost: &destHost1, }, { Base: Base{ @@ -377,14 +412,29 @@ func TestRecordGraphWithMetrics(t *testing.T) { } if f.Trace != nil { trace := fc.annotateFlowTrace(&f) - assert.Equal(t, *trace, "skupper-site@skupper-site") + assert.Equal(t, *trace, "skupper-site1@skupper-site1") } } fc.reconcileConnectorRecords() fc.reconcileFlowRecords() + var addressId string + for _, address := range fc.VanAddresses { + if address.Name == "tcp-go-echo" { + addressId = address.Identity + break + } + } + + var processGroupId string + for x, _ := range fc.ProcessGroups { + processGroupId = x + break + } + type test struct { + desc string recordType int method string url string @@ -396,6 +446,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { testTable := []test{ { + desc: "Get Site list", recordType: Site, method: "Get", url: "/", @@ -405,6 +456,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: len(sites), }, { + desc: "Get Site item", recordType: Site, method: "Get", url: "/", @@ -414,6 +466,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get Site routers", recordType: Site, method: "Get", url: "/", @@ -423,6 +476,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: len(routers), }, { + desc: "Get Site links", recordType: Site, method: "Get", url: "/", @@ -432,6 +486,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: len(links), }, { + desc: "Get Site hosts", recordType: Site, method: "Get", url: "/", @@ -441,6 +496,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: len(hosts), }, { + desc: "Get Site processes", recordType: Site, method: "Get", url: "/", @@ -450,6 +506,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: len(processes), }, { + desc: "Get Host list", recordType: Host, method: "Get", url: "/", @@ -459,6 +516,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: len(hosts), }, { + desc: "Get Host item", recordType: Host, method: "Get", url: "/", @@ -468,6 +526,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get Router list", recordType: Router, method: "Get", url: "/", @@ -477,6 +536,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: len(routers), }, { + desc: "Get Router item", recordType: Router, method: "Get", url: "/", @@ -486,6 +546,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get Router flows", recordType: Router, method: "Get", url: "/", @@ -495,6 +556,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 6, }, { + desc: "Get Router links", recordType: Router, method: "Get", url: "/", @@ -504,6 +566,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get Router listeners", recordType: Router, method: "Get", url: "/", @@ -513,15 +576,17 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get Router connectors", recordType: Router, method: "Get", url: "/", params: map[string]string{"sortBy": "identity.asc"}, vars: map[string]string{"id": "router:0"}, name: "connectors", - responseSize: 1, + responseSize: 2, }, { + desc: "Get Link list", recordType: Link, method: "Get", url: "/", @@ -531,6 +596,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: len(links), }, { + desc: "Get Link item", recordType: Link, method: "Get", url: "/", @@ -540,6 +606,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get Listener list", recordType: Listener, method: "Get", url: "/", @@ -549,6 +616,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: len(listeners), }, { + desc: "Get Listener item", recordType: Listener, method: "Get", url: "/", @@ -558,6 +626,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get Listener flows", recordType: Listener, method: "Get", url: "/", @@ -567,15 +636,17 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 3, }, { + desc: "Get Connector list", recordType: Connector, method: "Get", url: "/", params: map[string]string{"sortBy": "identity.asc"}, vars: map[string]string{}, name: "list", - responseSize: 2, + responseSize: 3, }, { + desc: "Get Connector item", recordType: Connector, method: "Get", url: "/", @@ -585,6 +656,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get Connector flows", recordType: Connector, method: "Get", url: "/", @@ -594,6 +666,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 3, }, { + desc: "Get Connector process", recordType: Connector, method: "Get", url: "/", @@ -603,6 +676,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get Address list", recordType: Address, method: "Get", url: "/", @@ -612,15 +686,57 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: len(fc.VanAddresses), }, { - recordType: Connector, + desc: "Get Address item", + recordType: Address, method: "Get", url: "/", params: map[string]string{"sortBy": "identity.asc"}, - vars: map[string]string{"id": "connector:0"}, + vars: map[string]string{"id": addressId}, name: "item", responseSize: 1, }, { + desc: "Get Address flows", + recordType: Address, + method: "Get", + url: "/", + params: map[string]string{"sortBy": "identity.asc"}, + vars: map[string]string{"id": addressId}, + name: "flows", + responseSize: 3, + }, + { + desc: "Get Address flowpairs", + recordType: Address, + method: "Get", + url: "/", + params: map[string]string{"sortBy": "identity.asc"}, + vars: map[string]string{"id": addressId}, + name: "flowpairs", + responseSize: 3, + }, + { + desc: "Get Address listeners", + recordType: Address, + method: "Get", + url: "/", + params: map[string]string{"sortBy": "identity.asc"}, + vars: map[string]string{"id": addressId}, + name: "listeners", + responseSize: 2, + }, + { + desc: "Get Address connectors", + recordType: Address, + method: "Get", + url: "/", + params: map[string]string{"sortBy": "identity.asc"}, + vars: map[string]string{"id": addressId}, + name: "connectors", + responseSize: 1, + }, + { + desc: "Get Flow list", recordType: Flow, method: "Get", url: "/", @@ -630,6 +746,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: len(fc.Flows), }, { + desc: "Get Flow item", recordType: Flow, method: "Get", url: "/", @@ -639,6 +756,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get Flow process", recordType: Flow, method: "Get", url: "/", @@ -648,6 +766,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get Flowpair list", recordType: FlowPair, method: "Get", url: "/", @@ -657,6 +776,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: len(fc.FlowPairs), }, { + desc: "Get Flowpair item", recordType: FlowPair, method: "Get", url: "/", @@ -666,6 +786,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get Process list", recordType: Process, method: "Get", url: "/", @@ -675,6 +796,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: len(processes), }, { + desc: "Get Process item", recordType: Process, method: "Get", url: "/", @@ -684,6 +806,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get Process flows", recordType: Process, method: "Get", url: "/", @@ -693,15 +816,17 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 3, }, { + desc: "Get Process addresses", recordType: Process, method: "Get", url: "/", params: map[string]string{"sortBy": "identity.asc"}, vars: map[string]string{"id": "process:1"}, name: "addresses", - responseSize: 1, + responseSize: 2, }, { + desc: "Get Process connector", recordType: Process, method: "Get", url: "/", @@ -711,6 +836,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get ProcessGroup list", recordType: ProcessGroup, method: "Get", url: "/", @@ -720,6 +846,27 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get ProcessGroup item", + recordType: ProcessGroup, + method: "Get", + url: "/", + params: map[string]string{"sortBy": "identity.asc"}, + vars: map[string]string{"id": processGroupId}, + name: "item", + responseSize: 1, + }, + { + desc: "Get ProcessGroup processes", + recordType: ProcessGroup, + method: "Get", + url: "/", + params: map[string]string{"sortBy": "identity.asc"}, + vars: map[string]string{"id": processGroupId}, + name: "processes", + responseSize: 3, + }, + { + desc: "Get SitePair list", recordType: SitePair, method: "Get", url: "/", @@ -729,6 +876,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get SitePair item", recordType: SitePair, method: "Get", url: "/", @@ -738,6 +886,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get ProcessGroupPair list", recordType: ProcessGroupPair, method: "Get", url: "/", @@ -747,6 +896,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get ProcessPair list", recordType: ProcessPair, method: "Get", url: "/", @@ -756,6 +906,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get ProcessPair item", recordType: ProcessPair, method: "Get", url: "/", @@ -765,6 +916,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { responseSize: 1, }, { + desc: "Get Collector list", recordType: Collector, method: "Get", url: "/", @@ -773,8 +925,21 @@ func TestRecordGraphWithMetrics(t *testing.T) { name: "list", responseSize: 1, }, + { + desc: "Get Collector item", + recordType: Collector, + method: "Get", + url: "/", + params: map[string]string{"sortBy": "identity.asc"}, + vars: map[string]string{}, + name: "item", + responseSize: 1, + }, } + fc.reconcileConnectorRecords() + fc.reconcileFlowRecords() + var payload Payload for _, test := range testTable { req, _ := http.NewRequest(test.method, test.url, nil) @@ -788,7 +953,7 @@ func TestRecordGraphWithMetrics(t *testing.T) { assert.Assert(t, err) err = json.Unmarshal([]byte(*resp), &payload) assert.Assert(t, err) - assert.Equal(t, test.responseSize, payload.Count) + assert.Equal(t, test.responseSize, payload.Count, test.desc) } endTime := uint64(time.Now().UnixNano()) / uint64(time.Microsecond)