From 823693930d018a1c7c8090e2d49f9c61e580c3b3 Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Thu, 18 Apr 2024 16:43:25 +0200 Subject: [PATCH] Network metrics: filter by transport protocol (#752) * move transport.Protocol to its own package * protocol filter node * Integrate protocol filter into pipeline * optimized and cleaned up allower/excluder interfacing --- pkg/beyla/network_cfg.go | 5 + pkg/internal/netolly/agent/pipeline.go | 20 ++- pkg/internal/netolly/export/attributes.go | 68 +------ pkg/internal/netolly/flow/protocol_filter.go | 101 +++++++++++ .../netolly/flow/protocol_filter_test.go | 79 +++++++++ .../netolly/flow/transport/transport.go | 167 ++++++++++++++++++ .../manifests/06-beyla-netolly-promexport.yml | 2 + .../06-beyla-netolly-sk-promexport.yml | 2 + .../k8s/manifests/06-beyla-netolly.yml | 2 + .../netolly/k8s_netolly_network_metrics.go | 12 +- 10 files changed, 381 insertions(+), 77 deletions(-) create mode 100644 pkg/internal/netolly/flow/protocol_filter.go create mode 100644 pkg/internal/netolly/flow/protocol_filter_test.go create mode 100644 pkg/internal/netolly/flow/transport/transport.go diff --git a/pkg/beyla/network_cfg.go b/pkg/beyla/network_cfg.go index 4b71379d1..c2e7c0c3d 100644 --- a/pkg/beyla/network_cfg.go +++ b/pkg/beyla/network_cfg.go @@ -64,6 +64,11 @@ type NetworkConfig struct { // If an entry is enclosed by slashes (e.g. `/br-/`), it will match as regular expression, // otherwise it will be matched as a case-sensitive string. ExcludeInterfaces []string `yaml:"exclude_interfaces" env:"BEYLA_NETWORK_EXCLUDE_INTERFACES" envSeparator:","` + // Protocols causes Beyla to drop flows whose transport protocol is not in this list. + Protocols []string `yaml:"protocols" env:"BEYLA_NETWORK_PROTOCOLS" envSeparator:","` + // ExcludeProtocols causes Beyla to drop flows whose transport protocol is in this list. + // If the Protocols list is already defined, ExcludeProtocols has no effect. + ExcludeProtocols []string `yaml:"exclude_protocols" env:"BEYLA_NETWORK_EXCLUDE_PROTOCOLS" envSeparator:","` // CacheMaxFlows specifies how many flows can be accumulated in the accounting cache before // being flushed for its later export. Default value is 5000. // Decrease it if you see the "received message larger than max" error in Beyla logs. diff --git a/pkg/internal/netolly/agent/pipeline.go b/pkg/internal/netolly/agent/pipeline.go index de9e082ba..ead0524ad 100644 --- a/pkg/internal/netolly/agent/pipeline.go +++ b/pkg/internal/netolly/agent/pipeline.go @@ -20,11 +20,12 @@ type FlowsPipeline struct { MapTracer pipe.Start[[]*ebpf.Record] RingBufTracer pipe.Start[[]*ebpf.Record] - Deduper pipe.Middle[[]*ebpf.Record, []*ebpf.Record] - Kubernetes pipe.Middle[[]*ebpf.Record, []*ebpf.Record] - ReverseDNS pipe.Middle[[]*ebpf.Record, []*ebpf.Record] - CIDRs pipe.Middle[[]*ebpf.Record, []*ebpf.Record] - Decorator pipe.Middle[[]*ebpf.Record, []*ebpf.Record] + ProtoFilter pipe.Middle[[]*ebpf.Record, []*ebpf.Record] + Deduper pipe.Middle[[]*ebpf.Record, []*ebpf.Record] + Kubernetes pipe.Middle[[]*ebpf.Record, []*ebpf.Record] + ReverseDNS pipe.Middle[[]*ebpf.Record, []*ebpf.Record] + CIDRs pipe.Middle[[]*ebpf.Record, []*ebpf.Record] + Decorator pipe.Middle[[]*ebpf.Record, []*ebpf.Record] OTEL pipe.Final[[]*ebpf.Record] Prom pipe.Final[[]*ebpf.Record] @@ -33,9 +34,10 @@ type FlowsPipeline struct { // Connect specifies how the pipeline nodes are connected func (fp *FlowsPipeline) Connect() { - fp.MapTracer.SendTo(fp.Deduper) - fp.RingBufTracer.SendTo(fp.Deduper) + fp.MapTracer.SendTo(fp.ProtoFilter) + fp.RingBufTracer.SendTo(fp.ProtoFilter) + fp.ProtoFilter.SendTo(fp.Deduper) fp.Deduper.SendTo(fp.Kubernetes) fp.Kubernetes.SendTo(fp.ReverseDNS) fp.ReverseDNS.SendTo(fp.CIDRs) @@ -48,6 +50,7 @@ func (fp *FlowsPipeline) Connect() { func mapTracer(fp *FlowsPipeline) *pipe.Start[[]*ebpf.Record] { return &fp.MapTracer } func ringBufTracer(fp *FlowsPipeline) *pipe.Start[[]*ebpf.Record] { return &fp.RingBufTracer } +func prtFltr(fp *FlowsPipeline) *pipe.Middle[[]*ebpf.Record, []*ebpf.Record] { return &fp.ProtoFilter } func deduper(fp *FlowsPipeline) *pipe.Middle[[]*ebpf.Record, []*ebpf.Record] { return &fp.Deduper } func kube(fp *FlowsPipeline) *pipe.Middle[[]*ebpf.Record, []*ebpf.Record] { return &fp.Kubernetes } func rdns(fp *FlowsPipeline) *pipe.Middle[[]*ebpf.Record, []*ebpf.Record] { return &fp.ReverseDNS } @@ -79,6 +82,9 @@ func (f *Flows) buildPipeline(ctx context.Context) (*pipe.Runner, error) { // Middle nodes: transforming flow records and passing them to the next stage in the pipeline. // Many of the nodes here are not mandatory. It's decision of each Provider function to decide // whether the node needs to be instantiated or just bypassed. + pipe.AddMiddleProvider(pb, prtFltr, + flow.ProtocolFilterProvider(f.cfg.NetworkFlows.Protocols, f.cfg.NetworkFlows.ExcludeProtocols)) + pipe.AddMiddleProvider(pb, deduper, func() (pipe.MiddleFunc[[]*ebpf.Record, []*ebpf.Record], error) { var deduperExpireTime = f.cfg.NetworkFlows.DeduperFCTTL if deduperExpireTime <= 0 { diff --git a/pkg/internal/netolly/export/attributes.go b/pkg/internal/netolly/export/attributes.go index 12ba96497..3af7d96c6 100644 --- a/pkg/internal/netolly/export/attributes.go +++ b/pkg/internal/netolly/export/attributes.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/grafana/beyla/pkg/internal/netolly/ebpf" + "github.com/grafana/beyla/pkg/internal/netolly/flow/transport" ) // Attribute stores how to expose a metric attribute: its exposed name and how to @@ -50,7 +51,7 @@ func attributeFor(exposedName, internalName string) Attribute { case "beyla.ip": getter = func(r *ebpf.Record) string { return r.Attrs.BeylaIP } case "transport": - getter = func(r *ebpf.Record) string { return l4TransportStr(r.Id.TransportProtocol) } + getter = func(r *ebpf.Record) string { return transport.Protocol(r.Id.TransportProtocol).String() } case "src.address": getter = func(r *ebpf.Record) string { return r.Id.SrcIP().IP().String() } case "dst.address": @@ -83,68 +84,3 @@ func directionStr(direction uint8) string { return "" } } - -// values taken from the list of "Standard well-defined IP protocols" from uapi/linux/in.h -// nolint:cyclop -func l4TransportStr(proto uint8) string { - switch proto { - case 0: - return "IP" - case 1: - return "ICMP" - case 2: - return "IGMP" - case 4: - return "IPIP" - case 6: - return "TCP" - case 8: - return "EGP" - case 12: - return "PUP" - case 17: - return "UDP" - case 22: - return "IDP" - case 29: - return "TP" - case 33: - return "DCCP" - case 41: - return "IPV6" - case 46: - return "RSVP" - case 47: - return "GRE" - case 50: - return "ESP" - case 51: - return "AH" - case 92: - return "MTP" - case 94: - return "BEETPH" - case 98: - return "ENCAP" - case 103: - return "PIM" - case 108: - return "COMP" - case 115: - return "L2TP" - case 132: - return "SCTP" - case 136: - return "UDPLITE" - case 137: - return "MPLS" - case 143: - return "ETHERNET" - case 255: - return "RAW" - // TODO: consider adding an extra byte to TransportProtocol to support this protocol - // case 262: - // return "MPTCP" - } - return strconv.Itoa(int(proto)) -} diff --git a/pkg/internal/netolly/flow/protocol_filter.go b/pkg/internal/netolly/flow/protocol_filter.go new file mode 100644 index 000000000..5895b215b --- /dev/null +++ b/pkg/internal/netolly/flow/protocol_filter.go @@ -0,0 +1,101 @@ +package flow + +import ( + "fmt" + + "github.com/mariomac/pipes/pipe" + + "github.com/grafana/beyla/pkg/internal/netolly/ebpf" + "github.com/grafana/beyla/pkg/internal/netolly/flow/transport" +) + +// ProtocolFilterProvider allows selecting which protocols are going to be instrumented. +// It drops any flow not appearing in the "allowed" list. +// If the Allowed list is empty, it drops any flow appearing in the "excluded" list. +func ProtocolFilterProvider(allowed, excluded []string) pipe.MiddleProvider[[]*ebpf.Record, []*ebpf.Record] { + return func() (pipe.MiddleFunc[[]*ebpf.Record, []*ebpf.Record], error) { + if len(allowed) == 0 && len(excluded) == 0 { + // user did not configure any filter. Ignore this node + return pipe.Bypass[[]*ebpf.Record](), nil + } + pf, err := newFilter(allowed, excluded) + if err != nil { + return nil, err + } + return pf.nodeLoop, nil + } +} + +type protocolFilter struct { + isAllowed func(r *ebpf.Record) bool +} + +func newFilter(allowed, excluded []string) (*protocolFilter, error) { + // if the allowed list has items, only interfaces in that list are allowed + if len(allowed) > 0 { + allow, err := allower(allowed) + if err != nil { + return nil, err + } + return &protocolFilter{isAllowed: allow}, nil + } + // if the allowed list is empty, any interface is allowed except if it matches the exclusion list + exclude, err := excluder(excluded) + if err != nil { + return nil, err + } + return &protocolFilter{isAllowed: exclude}, nil +} + +func (pf *protocolFilter) nodeLoop(in <-chan []*ebpf.Record, out chan<- []*ebpf.Record) { + for records := range in { + if filtered := pf.filter(records); len(filtered) > 0 { + out <- filtered + } + } +} + +func (pf *protocolFilter) filter(input []*ebpf.Record) []*ebpf.Record { + writeIdx := 0 + for readIdx := range input { + if pf.isAllowed(input[readIdx]) { + input[writeIdx] = input[readIdx] + writeIdx++ + } + } + return input[:writeIdx] +} + +func allower(allowed []string) (func(r *ebpf.Record) bool, error) { + allow, err := protocolsMap(allowed) + if err != nil { + return nil, fmt.Errorf("in network protocols: %w", err) + } + return func(r *ebpf.Record) bool { + _, ok := allow[transport.Protocol(r.Id.TransportProtocol)] + return ok + }, nil +} + +func excluder(excluded []string) (func(r *ebpf.Record) bool, error) { + exclude, err := protocolsMap(excluded) + if err != nil { + return nil, fmt.Errorf("in network excluded protocols: %w", err) + } + return func(r *ebpf.Record) bool { + _, excluded := exclude[transport.Protocol(r.Id.TransportProtocol)] + return !excluded + }, nil +} + +func protocolsMap(entries []string) (map[transport.Protocol]struct{}, error) { + protoMap := map[transport.Protocol]struct{}{} + for _, aStr := range entries { + if atp, err := transport.ParseProtocol(aStr); err == nil { + protoMap[atp] = struct{}{} + } else { + return nil, err + } + } + return protoMap, nil +} diff --git a/pkg/internal/netolly/flow/protocol_filter_test.go b/pkg/internal/netolly/flow/protocol_filter_test.go new file mode 100644 index 000000000..14c9ee8b5 --- /dev/null +++ b/pkg/internal/netolly/flow/protocol_filter_test.go @@ -0,0 +1,79 @@ +package flow + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/beyla/pkg/internal/netolly/ebpf" + "github.com/grafana/beyla/pkg/internal/netolly/flow/transport" + "github.com/grafana/beyla/pkg/internal/testutil" +) + +var tcp1 = &ebpf.Record{NetFlowRecordT: ebpf.NetFlowRecordT{Id: ebpf.NetFlowId{SrcPort: 1, TransportProtocol: uint8(transport.TCP)}}} +var tcp2 = &ebpf.Record{NetFlowRecordT: ebpf.NetFlowRecordT{Id: ebpf.NetFlowId{SrcPort: 2, TransportProtocol: uint8(transport.TCP)}}} +var tcp3 = &ebpf.Record{NetFlowRecordT: ebpf.NetFlowRecordT{Id: ebpf.NetFlowId{SrcPort: 3, TransportProtocol: uint8(transport.TCP)}}} +var udp1 = &ebpf.Record{NetFlowRecordT: ebpf.NetFlowRecordT{Id: ebpf.NetFlowId{SrcPort: 4, TransportProtocol: uint8(transport.UDP)}}} +var udp2 = &ebpf.Record{NetFlowRecordT: ebpf.NetFlowRecordT{Id: ebpf.NetFlowId{SrcPort: 5, TransportProtocol: uint8(transport.UDP)}}} +var icmp1 = &ebpf.Record{NetFlowRecordT: ebpf.NetFlowRecordT{Id: ebpf.NetFlowId{SrcPort: 7, TransportProtocol: uint8(transport.ICMP)}}} +var icmp2 = &ebpf.Record{NetFlowRecordT: ebpf.NetFlowRecordT{Id: ebpf.NetFlowId{SrcPort: 8, TransportProtocol: uint8(transport.ICMP)}}} +var icmp3 = &ebpf.Record{NetFlowRecordT: ebpf.NetFlowRecordT{Id: ebpf.NetFlowId{SrcPort: 9, TransportProtocol: uint8(transport.ICMP)}}} + +func TestProtocolFilter_Allow(t *testing.T) { + protocolFilter, err := ProtocolFilterProvider([]string{"TCP"}, nil)() + require.NoError(t, err) + input, output := make(chan []*ebpf.Record, 10), make(chan []*ebpf.Record, 10) + defer close(input) + go protocolFilter(input, output) + + input <- []*ebpf.Record{} + input <- []*ebpf.Record{tcp1, tcp2, tcp3} + input <- []*ebpf.Record{icmp2, udp1, icmp1, udp2, icmp3} + input <- []*ebpf.Record{icmp2, tcp1, udp1, icmp1, tcp2, udp2, tcp3, icmp3} + + filtered := testutil.ReadChannel(t, output, timeout) + assert.Equal(t, []*ebpf.Record{tcp1, tcp2, tcp3}, filtered) + filtered = testutil.ReadChannel(t, output, timeout) + assert.Equal(t, []*ebpf.Record{tcp1, tcp2, tcp3}, filtered) + // no more slices are sent (the second was completely filtered) + select { + case o := <-output: + require.Failf(t, "unexpected flows!", "%v", o) + default: + // ok!! + } +} + +func TestProtocolFilter_Exclude(t *testing.T) { + protocolFilter, err := ProtocolFilterProvider(nil, []string{"TCP"})() + require.NoError(t, err) + input, output := make(chan []*ebpf.Record, 10), make(chan []*ebpf.Record, 10) + defer close(input) + go protocolFilter(input, output) + + input <- []*ebpf.Record{tcp1, tcp2, tcp3} + input <- []*ebpf.Record{icmp2, udp1, icmp1, udp2, icmp3} + input <- []*ebpf.Record{} + input <- []*ebpf.Record{icmp2, tcp1, udp1, icmp1, tcp2, udp2, tcp3, icmp3} + + filtered := testutil.ReadChannel(t, output, timeout) + assert.Equal(t, []*ebpf.Record{icmp2, udp1, icmp1, udp2, icmp3}, filtered) + filtered = testutil.ReadChannel(t, output, timeout) + assert.Equal(t, []*ebpf.Record{icmp2, udp1, icmp1, udp2, icmp3}, filtered) + // no more slices are sent (the first was completely filtered) + select { + case o := <-output: + require.Failf(t, "unexpected flows!", "%v", o) + default: + // ok!! + } +} +func TestProtocolFilter_ParsingErrors(t *testing.T) { + _, err := ProtocolFilterProvider([]string{"TCP", "tralara"}, nil)() + assert.Error(t, err) + _, err = ProtocolFilterProvider([]string{"TCP", "tralara"}, []string{"UDP"})() + assert.Error(t, err) + _, err = ProtocolFilterProvider(nil, []string{"TCP", "tralara"})() + assert.Error(t, err) +} diff --git a/pkg/internal/netolly/flow/transport/transport.go b/pkg/internal/netolly/flow/transport/transport.go new file mode 100644 index 000000000..38f2ea41a --- /dev/null +++ b/pkg/internal/netolly/flow/transport/transport.go @@ -0,0 +1,167 @@ +package transport + +import ( + "fmt" + "strconv" + "strings" +) + +// Protocol value stores the L4 transport protocol (TCP, UDP....) according to +// the values taken from the list of "Standard well-defined IP protocols" from uapi/linux/in.h +type Protocol uint8 + +const ( + IP = Protocol(0) + ICMP = Protocol(1) + IGMP = Protocol(2) + IPIP = Protocol(4) + TCP = Protocol(6) + EGP = Protocol(8) + PUP = Protocol(12) + UDP = Protocol(17) + IDP = Protocol(22) + TP = Protocol(29) + DCCP = Protocol(33) + IPV6 = Protocol(41) + RSVP = Protocol(46) + GRE = Protocol(47) + ESP = Protocol(50) + AH = Protocol(51) + MTP = Protocol(92) + BEETPH = Protocol(94) + ENCAP = Protocol(98) + PIM = Protocol(103) + COMP = Protocol(108) + L2TP = Protocol(115) + SCTP = Protocol(132) + UDPLITE = Protocol(136) + MPLS = Protocol(137) + ETHERNET = Protocol(143) + RAW = Protocol(255) + +// TODO: consider adding an extra byte to Protocol to support this protocol +// +// MPTCP = Protocol(262) +) + +// nolint:cyclop +func (p Protocol) String() string { + switch p { + case IP: + return "IP" + case ICMP: + return "ICMP" + case IGMP: + return "IGMP" + case IPIP: + return "IPIP" + case TCP: + return "TCP" + case EGP: + return "EGP" + case PUP: + return "PUP" + case UDP: + return "UDP" + case IDP: + return "IDP" + case TP: + return "TP" + case DCCP: + return "DCCP" + case IPV6: + return "IPV6" + case RSVP: + return "RSVP" + case GRE: + return "GRE" + case ESP: + return "ESP" + case AH: + return "AH" + case MTP: + return "MTP" + case BEETPH: + return "BEETPH" + case ENCAP: + return "ENCAP" + case PIM: + return "PIM" + case COMP: + return "COMP" + case L2TP: + return "L2TP" + case SCTP: + return "SCTP" + case UDPLITE: + return "UDPLITE" + case MPLS: + return "MPLS" + case ETHERNET: + return "ETHERNET" + case RAW: + return "RAW" + } + return strconv.Itoa(int(p)) +} + +// nolint:cyclop +func ParseProtocol(str string) (Protocol, error) { + switch strings.ToUpper(str) { + case "IP": + return IP, nil + case "ICMP": + return ICMP, nil + case "IGMP": + return IGMP, nil + case "IPIP": + return IPIP, nil + case "TCP": + return TCP, nil + case "EGP": + return EGP, nil + case "PUP": + return PUP, nil + case "UDP": + return UDP, nil + case "IDP": + return IDP, nil + case "TP": + return TP, nil + case "DCCP": + return DCCP, nil + case "IPV6": + return IPV6, nil + case "RSVP": + return RSVP, nil + case "GRE": + return GRE, nil + case "ESP": + return ESP, nil + case "AH": + return AH, nil + case "MTP": + return MTP, nil + case "BEETPH": + return BEETPH, nil + case "ENCAP": + return ENCAP, nil + case "PIM": + return PIM, nil + case "COMP": + return COMP, nil + case "L2TP": + return L2TP, nil + case "SCTP": + return SCTP, nil + case "UDPLITE": + return UDPLITE, nil + case "MPLS": + return MPLS, nil + case "ETHERNET": + return ETHERNET, nil + case "RAW": + return RAW, nil + } + return 0, fmt.Errorf("unknown protocol %q", str) +} diff --git a/test/integration/k8s/manifests/06-beyla-netolly-promexport.yml b/test/integration/k8s/manifests/06-beyla-netolly-promexport.yml index 564a4c6e8..73746857b 100644 --- a/test/integration/k8s/manifests/06-beyla-netolly-promexport.yml +++ b/test/integration/k8s/manifests/06-beyla-netolly-promexport.yml @@ -12,6 +12,8 @@ data: prometheus_export: port: 8999 network: + protocols: + - TCP cidrs: # default subnets of Kind Pods and services - 10.244.0.0/16 diff --git a/test/integration/k8s/manifests/06-beyla-netolly-sk-promexport.yml b/test/integration/k8s/manifests/06-beyla-netolly-sk-promexport.yml index 5e58fb00d..3e7ca2703 100644 --- a/test/integration/k8s/manifests/06-beyla-netolly-sk-promexport.yml +++ b/test/integration/k8s/manifests/06-beyla-netolly-sk-promexport.yml @@ -12,6 +12,8 @@ data: prometheus_export: port: 8999 network: + protocols: + - TCP cidrs: # default subnets of Kind Pods and services - 10.244.0.0/16 diff --git a/test/integration/k8s/manifests/06-beyla-netolly.yml b/test/integration/k8s/manifests/06-beyla-netolly.yml index 50fb26508..f2222ffbb 100644 --- a/test/integration/k8s/manifests/06-beyla-netolly.yml +++ b/test/integration/k8s/manifests/06-beyla-netolly.yml @@ -12,6 +12,8 @@ data: otel_metrics_export: endpoint: http://otelcol.default:4317 network: + protocols: + - TCP cidrs: # default subnets of Kind Pods and services - 10.244.0.0/16 diff --git a/test/integration/k8s/netolly/k8s_netolly_network_metrics.go b/test/integration/k8s/netolly/k8s_netolly_network_metrics.go index 6eccfeda4..aba33472e 100644 --- a/test/integration/k8s/netolly/k8s_netolly_network_metrics.go +++ b/test/integration/k8s/netolly/k8s_netolly_network_metrics.go @@ -56,8 +56,6 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, assert.Equal(t, "Service", metric["k8s_dst_type"]) assert.Contains(t, podSubnets, metric["src_cidr"], metric) assert.Contains(t, svcSubnets, metric["dst_cidr"], metric) - - assert.Equal(t, "TCP", metric["transport"]) // services don't have host IP or name }) // testing request flows (to testserver as Pod) @@ -91,7 +89,6 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, assertIsIP(t, metric["k8s_dst_node_ip"]) assert.Contains(t, podSubnets, metric["src_cidr"], metric) assert.Contains(t, podSubnets, metric["dst_cidr"], metric) - assert.Equal(t, "TCP", metric["transport"]) }) // testing response flows (from testserver Pod) @@ -154,7 +151,6 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, assertIsIP(t, metric["k8s_dst_node_ip"]) assert.Contains(t, svcSubnets, metric["src_cidr"], metric) assert.Contains(t, podSubnets, metric["dst_cidr"], metric) - assert.Equal(t, "TCP", metric["transport"]) }) // check that there aren't captured flows if there is no communication @@ -162,6 +158,14 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, require.NoError(t, err) require.Empty(t, results) + // check that only TCP traffic is captured, according to the Protocols configuration option + results, err = pq.Query(`beyla_network_flow_bytes_total`) + require.NoError(t, err) + require.NotEmpty(t, results) + for _, result := range results { + assert.Equal(t, "TCP", result.Metric["transport"]) + } + return ctx }