From b6e45e7b9ba4ce0e5d1707be2b7868c12ddf49b0 Mon Sep 17 00:00:00 2001 From: Karen Schoener Date: Tue, 17 Dec 2024 16:10:46 -0500 Subject: [PATCH 1/2] Add sourceHost and destHost to flow metrics. This change adds sourceHost and destHost to the flow metrics produced by the flow collector. It acomplishes this by adding those labels to the promethus metrics, and changing the aggregation key for metric aggreagation in the flow collector. This approach may change to one that does not require new labels on the prometheus metric. To be discussed. --- pkg/flow/collector.go | 14 +++++++------- pkg/flow/flow_mem_driver.go | 10 ++++++++++ pkg/flow/record.go | 2 ++ 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/pkg/flow/collector.go b/pkg/flow/collector.go index 6188816ac..8e575f1d9 100644 --- a/pkg/flow/collector.go +++ b/pkg/flow/collector.go @@ -75,37 +75,37 @@ func (fc *FlowCollector) NewMetrics(reg prometheus.Registerer) *collectorMetrics Name: "flows_total", Help: "Total Flows", }, - []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess"}), + []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "sourceHost", "destHost"}), octets: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "octets_total", Help: "Total Octets", }, - []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess"}), + []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "sourceHost", "destHost"}), httpReqsMethod: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "http_requests_method_total", Help: "How many HTTP requests processed, partitioned by method", }, - []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "method"}), + []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "method", "sourceHost", "destHost"}), httpReqsResult: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "http_requests_result_total", Help: "How many HTTP requests processed, partitioned by result code", }, - []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "code"}), + []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "code", "sourceHost", "destHost"}), activeFlows: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "active_flows", Help: "Number of flows that are currently active, partitioned by source and destination", }, - []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess"}), + []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "sourceHost", "destHost"}), lastAccessed: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "address_last_time_seconds", Help: "The last time the address was served", }, - []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess"}), + []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "sourceHost", "destHost"}), flowLatency: prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "flow_latency_microseconds", @@ -113,7 +113,7 @@ func (fc *FlowCollector) NewMetrics(reg prometheus.Registerer) *collectorMetrics // 1ms, 2 ms, 5ms, 10ms, 100ms, 1s, 10s Buckets: []float64{1000, 2000, 5000, 10000, 100000, 1000000, 10000000}, }, - []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess"}), + []string{"sourceSite", "destSite", "address", "protocol", "direction", "sourceProcess", "destProcess", "sourceHost", "destHost"}), activeReconcile: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "active_reconciles", diff --git a/pkg/flow/flow_mem_driver.go b/pkg/flow/flow_mem_driver.go index ec40338fd..7d350c323 100644 --- a/pkg/flow/flow_mem_driver.go +++ b/pkg/flow/flow_mem_driver.go @@ -421,11 +421,15 @@ func (fc *FlowCollector) linkFlowPair(flow *FlowRecord) (*FlowPairRecord, bool) fwdLabels["destSite"] = destSiteName + "@_@" + destSiteId fwdLabels["sourceProcess"] = *sourceFlow.ProcessName fwdLabels["destProcess"] = *destFlow.ProcessName + fwdLabels["sourceHost"] = *sourceFlow.SourceHost + fwdLabels["destHost"] = *destFlow.SourceHost delete(fwdLabels, "process") revLabels["sourceSite"] = destSiteName + "@_@" + destSiteId revLabels["destSite"] = sourceSiteName + "@_@" + sourceSiteId revLabels["sourceProcess"] = *destFlow.ProcessName revLabels["destProcess"] = *sourceFlow.ProcessName + revLabels["sourceHost"] = *destFlow.SourceHost + fwdLabels["destHost"] = *sourceFlow.SourceHost delete(revLabels, "process") fp := &FlowPairRecord{ @@ -2292,6 +2296,12 @@ func (fc *FlowCollector) setupFlowMetrics(va *VanAddressRecord, flow *FlowRecord if key.sourceProcess, ok = metricLabel["sourceProcess"]; !ok { return fmt.Errorf("Metric label missing source process key") } + if key.sourceHost, ok = metricLabel["sourceHost"]; !ok { + return fmt.Errorf("Metric label missing source host key") + } + if key.sourceHost, ok = metricLabel["destHost"]; !ok { + return fmt.Errorf("Metric label missing dest host key") + } if key.destSite, ok = metricLabel["destSite"]; !ok { return fmt.Errorf("Metric label missing dest site key") } diff --git a/pkg/flow/record.go b/pkg/flow/record.go index 4ccf883ab..c694caadf 100644 --- a/pkg/flow/record.go +++ b/pkg/flow/record.go @@ -324,6 +324,8 @@ type LogEventRecord struct { type metricKey struct { sourceSite string sourceProcess string + sourceHost string + destHost string destSite string destProcess string } From c3bc1bb03974734b07be6c338d9ee47758f12820 Mon Sep 17 00:00:00 2001 From: Karen Schoener Date: Tue, 17 Dec 2024 16:27:47 -0500 Subject: [PATCH 2/2] Add sourceHost and destHost to flow metrics. Fix to populate destHost label correctly in revLabels --- pkg/flow/flow_mem_driver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/flow/flow_mem_driver.go b/pkg/flow/flow_mem_driver.go index 7d350c323..b07833d12 100644 --- a/pkg/flow/flow_mem_driver.go +++ b/pkg/flow/flow_mem_driver.go @@ -429,7 +429,7 @@ func (fc *FlowCollector) linkFlowPair(flow *FlowRecord) (*FlowPairRecord, bool) revLabels["sourceProcess"] = *destFlow.ProcessName revLabels["destProcess"] = *sourceFlow.ProcessName revLabels["sourceHost"] = *destFlow.SourceHost - fwdLabels["destHost"] = *sourceFlow.SourceHost + revLabels["destHost"] = *sourceFlow.SourceHost delete(revLabels, "process") fp := &FlowPairRecord{