From 1c48b130019eb09e0d6b5d0f4a141661bcd60fc1 Mon Sep 17 00:00:00 2001 From: "Marcelo E. Magallon" Date: Fri, 26 Jul 2024 16:08:48 -0600 Subject: [PATCH] Add retries to BBE DNS probe Copy the BBE DNS probe code to modify it in order to add retry logic. We can try to upstream this, that's why this is not adding any metrics yet. This will retry 3 times if there's enough time (at least 15 seconds). No DNS check should actually take more than 5 seconds, so when we see a long timeout, inject the retry logic in there. This experimental DNS prober is behind a feature flag called "experimental-dns-prober". The existing behavior is the default. Signed-off-by: Marcelo E. Magallon --- go.mod | 4 +- internal/adhoc/adhoc.go | 2 +- internal/checks/checks.go | 4 +- internal/feature/feature.go | 5 +- internal/prober/dns/dns.go | 45 +- internal/prober/dns/dns_test.go | 110 +++- .../prober/dns/internal/bbe/config/config.go | 521 ++++++++++++++++++ .../prober/dns/internal/bbe/prober/dns.go | 350 ++++++++++++ .../prober/dns/internal/bbe/prober/utils.go | 145 +++++ internal/prober/prober.go | 19 +- internal/scraper/scraper.go | 21 +- 11 files changed, 1203 insertions(+), 23 deletions(-) create mode 100644 internal/prober/dns/internal/bbe/config/config.go create mode 100644 internal/prober/dns/internal/bbe/prober/dns.go create mode 100644 internal/prober/dns/internal/bbe/prober/utils.go diff --git a/go.mod b/go.mod index 82eeb1d6c..07e9e51a9 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( require ( github.com/KimMachineGun/automemlimit v0.6.1 + github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 github.com/felixge/httpsnoop v1.0.4 github.com/go-kit/log v0.2.1 github.com/go-ping/ping v1.1.0 @@ -38,11 +39,11 @@ require ( github.com/quasilyte/go-ruleguard/dsl v0.3.22 github.com/spf13/afero v1.11.0 golang.org/x/exp v0.0.0-20240707233637-46b078467d37 + gopkg.in/yaml.v3 v3.0.1 kernel.org/pub/linux/libs/security/libcap/cap v1.2.70 ) require ( - github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 // indirect github.com/andybalholm/brotli v1.1.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/buger/goterm v1.0.4 // indirect @@ -73,7 +74,6 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect kernel.org/pub/linux/libs/security/libcap/psx v1.2.70 // indirect ) diff --git a/internal/adhoc/adhoc.go b/internal/adhoc/adhoc.go index 4ca6cb3a3..6e29ef17a 100644 --- a/internal/adhoc/adhoc.go +++ b/internal/adhoc/adhoc.go @@ -139,7 +139,7 @@ func NewHandler(opts HandlerOpts) (*Handler, error) { tenantCh: opts.TenantCh, runnerFactory: opts.runnerFactory, grpcAdhocChecksClientFactory: opts.grpcAdhocChecksClientFactory, - proberFactory: prober.NewProberFactory(opts.K6Runner, 0), + proberFactory: prober.NewProberFactory(opts.K6Runner, 0, opts.Features), api: apiInfo{ conn: opts.Conn, }, diff --git a/internal/checks/checks.go b/internal/checks/checks.go index 7413be233..d5c9fdfaf 100644 --- a/internal/checks/checks.go +++ b/internal/checks/checks.go @@ -888,7 +888,9 @@ func (c *Updater) addAndStartScraperWithLock(ctx context.Context, check model.Ch ) scraper, err := c.scraperFactory( - ctx, check, c.publisher, *c.probe, c.logger, + ctx, check, c.publisher, *c.probe, + c.features, + c.logger, metrics, c.k6Runner, c.tenantLimits, c.telemeter, diff --git a/internal/feature/feature.go b/internal/feature/feature.go index 9e2048bc4..06b0f3543 100644 --- a/internal/feature/feature.go +++ b/internal/feature/feature.go @@ -10,8 +10,9 @@ import ( // TODO: this doesn't seem like the right place for this const ( - Traceroute = "traceroute" - K6 = "k6" + Traceroute = "traceroute" + K6 = "k6" + ExperimentalDnsProber = "experimental-dns-prober" ) // ErrInvalidCollection is returned when you try to set a flag in an diff --git a/internal/prober/dns/dns.go b/internal/prober/dns/dns.go index d00449f39..4c2ca1360 100644 --- a/internal/prober/dns/dns.go +++ b/internal/prober/dns/dns.go @@ -6,18 +6,19 @@ import ( "strings" "time" + "github.com/grafana/synthetic-monitoring-agent/internal/prober/dns/internal/bbe/config" + bbeprober "github.com/grafana/synthetic-monitoring-agent/internal/prober/dns/internal/bbe/prober" "github.com/grafana/synthetic-monitoring-agent/internal/prober/logger" sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring" - "github.com/prometheus/blackbox_exporter/config" - bbeprober "github.com/prometheus/blackbox_exporter/prober" "github.com/prometheus/client_golang/prometheus" ) var errUnsupportedCheck = errors.New("unsupported check") type Prober struct { - target string - config config.Module + target string + config config.Module + experimental bool } func NewProber(check sm.Check) (Prober, error) { @@ -34,16 +35,50 @@ func NewProber(check sm.Check) (Prober, error) { }, nil } +func NewExperimentalProber(check sm.Check) (Prober, error) { + p, err := NewProber(check) + if err != nil { + return p, err + } + + p.experimental = true + + return p, nil +} + func (p Prober) Name() string { return "dns" } func (p Prober) Probe(ctx context.Context, target string, registry *prometheus.Registry, logger logger.Logger) bool { + cfg := p.config + + if p.experimental { + const ( + cutoff = 15 * time.Second + retries = 3 + ) + + if deadline, found := ctx.Deadline(); found { + budget := time.Until(deadline) + if budget >= cutoff { + cfg.DNS.Retries = retries + // Split 99% of the budget between three retries. For a + // budget of 15s, this allows for 150 ms per retry for + // other operations. + cfg.DNS.RetryTimeout = budget * 99 / (retries * 100) + } + } + + _ = logger.Log("msg", "probing DNS", "target", target, "retries", cfg.DNS.Retries, "retry_timeout", cfg.DNS.RetryTimeout) + } + // The target of the BBE DNS check is the _DNS server_, while // the target of the SM DNS check is the _query_, so we need // pass the server as the target parameter, and ignore the // _target_ paramater that is passed to this function. - return bbeprober.ProbeDNS(ctx, p.target, p.config, registry, logger) + + return bbeprober.ProbeDNS(ctx, p.target, cfg, registry, logger) } func settingsToModule(settings *sm.DnsSettings, target string) config.Module { diff --git a/internal/prober/dns/dns_test.go b/internal/prober/dns/dns_test.go index 75a679110..71786a0f7 100644 --- a/internal/prober/dns/dns_test.go +++ b/internal/prober/dns/dns_test.go @@ -1,10 +1,22 @@ package dns import ( + "bytes" + "context" + "net" + "os" + "slices" + "strings" + "sync/atomic" "testing" + "time" + "github.com/go-kit/log" + "github.com/grafana/synthetic-monitoring-agent/internal/prober/dns/internal/bbe/config" sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring" - "github.com/prometheus/blackbox_exporter/config" + "github.com/miekg/dns" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/expfmt" "github.com/stretchr/testify/require" ) @@ -157,3 +169,99 @@ func TestSettingsToModule(t *testing.T) { }) } } + +func TestProberRetries(t *testing.T) { + if !slices.Contains(strings.Split(os.Getenv("SM_TEST_RUN"), ","), "TestProberRetries") { + t.Skip("Skipping long test TestProberRetries") + } + + mux := dns.NewServeMux() + var counter atomic.Int32 + mux.Handle(".", dns.HandlerFunc(func(w dns.ResponseWriter, r *dns.Msg) { + answer := dns.Msg{ + MsgHdr: dns.MsgHdr{ + Id: r.Id, + Response: true, + Rcode: dns.RcodeSuccess, + }, + Question: r.Question, + Answer: []dns.RR{ + &dns.A{ + Hdr: dns.RR_Header{ + Name: r.Question[0].Name, + Rrtype: dns.TypeA, + Class: dns.ClassINET, + Ttl: 60, + }, + A: net.ParseIP("1.2.3.4"), + }, + }, + } + + counter.Add(1) + t.Logf("Received request %d: %v", counter.Load(), r) + delay := 10 * time.Second + if counter.Load()%3 == 0 { + delay = 0 + } + t.Log("Answer...") + time.Sleep(delay) + _ = w.WriteMsg(&answer) + })) + addr, err := net.ResolveUDPAddr("udp4", "127.0.0.1:0") + require.NoError(t, err) + l, err := net.ListenUDP("udp4", addr) + require.NoError(t, err) + t.Log(l.LocalAddr().String()) + server := &dns.Server{Addr: ":0", PacketConn: l, Net: "udp", Handler: mux} + go func() { + err := server.ActivateAndServe() + if err != nil { + panic(err) + } + }() + + p, err := NewExperimentalProber(sm.Check{ + Target: "www.grafana.com", + Timeout: 20000, + Settings: sm.CheckSettings{ + Dns: &sm.DnsSettings{ + Server: l.LocalAddr().String(), + RecordType: sm.DnsRecordType_A, + Protocol: sm.DnsProtocol_UDP, + IpVersion: sm.IpVersion_V4, + }, + }, + }) + + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), p.config.Timeout) + t.Cleanup(cancel) + + defer func() { + err := server.ShutdownContext(ctx) + if err != nil { + panic(err) + } + }() + + registry := prometheus.NewPedanticRegistry() + + var buf bytes.Buffer + logger := log.NewLogfmtLogger(&buf) + + t0 := time.Now() + success := p.Probe(ctx, p.target, registry, logger) + t.Log(success, time.Since(t0)) + require.True(t, success) + + mfs, err := registry.Gather() + require.NoError(t, err) + enc := expfmt.NewEncoder(&buf, expfmt.NewFormat(expfmt.TypeTextPlain)) + for _, mf := range mfs { + require.NoError(t, enc.Encode(mf)) + } + + t.Log(buf.String()) +} diff --git a/internal/prober/dns/internal/bbe/config/config.go b/internal/prober/dns/internal/bbe/config/config.go new file mode 100644 index 000000000..331346a21 --- /dev/null +++ b/internal/prober/dns/internal/bbe/config/config.go @@ -0,0 +1,521 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//nolint +package config + +import ( + "errors" + "fmt" + "math" + "net/textproto" + "os" + "regexp" + "runtime" + "sort" + "strconv" + "strings" + "sync" + "time" + + yaml "gopkg.in/yaml.v3" + + "github.com/alecthomas/units" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/miekg/dns" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/config" +) + +var ( + // DefaultModule set default configuration for the Module + DefaultModule = Module{ + HTTP: DefaultHTTPProbe, + TCP: DefaultTCPProbe, + ICMP: DefaultICMPProbe, + DNS: DefaultDNSProbe, + } + + // DefaultHTTPProbe set default value for HTTPProbe + DefaultHTTPProbe = HTTPProbe{ + IPProtocolFallback: true, + HTTPClientConfig: config.DefaultHTTPClientConfig, + } + + // DefaultGRPCProbe set default value for HTTPProbe + DefaultGRPCProbe = GRPCProbe{ + Service: "", + IPProtocolFallback: true, + } + + // DefaultTCPProbe set default value for TCPProbe + DefaultTCPProbe = TCPProbe{ + IPProtocolFallback: true, + } + + // DefaultICMPProbe set default value for ICMPProbe + DefaultICMPTTL = 64 + DefaultICMPProbe = ICMPProbe{ + IPProtocolFallback: true, + TTL: DefaultICMPTTL, + } + + // DefaultDNSProbe set default value for DNSProbe + DefaultDNSProbe = DNSProbe{ + IPProtocolFallback: true, + Recursion: true, + } +) + +type Config struct { + Modules map[string]Module `yaml:"modules"` +} + +type SafeConfig struct { + sync.RWMutex + C *Config + configReloadSuccess prometheus.Gauge + configReloadSeconds prometheus.Gauge +} + +func NewSafeConfig(reg prometheus.Registerer) *SafeConfig { + configReloadSuccess := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Namespace: "blackbox_exporter", + Name: "config_last_reload_successful", + Help: "Blackbox exporter config loaded successfully.", + }) + + configReloadSeconds := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Namespace: "blackbox_exporter", + Name: "config_last_reload_success_timestamp_seconds", + Help: "Timestamp of the last successful configuration reload.", + }) + return &SafeConfig{C: &Config{}, configReloadSuccess: configReloadSuccess, configReloadSeconds: configReloadSeconds} +} + +func (sc *SafeConfig) ReloadConfig(confFile string, logger log.Logger) (err error) { + var c = &Config{} + defer func() { + if err != nil { + sc.configReloadSuccess.Set(0) + } else { + sc.configReloadSuccess.Set(1) + sc.configReloadSeconds.SetToCurrentTime() + } + }() + + yamlReader, err := os.Open(confFile) + if err != nil { + return fmt.Errorf("error reading config file: %s", err) + } + defer yamlReader.Close() + decoder := yaml.NewDecoder(yamlReader) + decoder.KnownFields(true) + + if err = decoder.Decode(c); err != nil { + return fmt.Errorf("error parsing config file: %s", err) + } + + for name, module := range c.Modules { + if module.HTTP.NoFollowRedirects != nil { + // Hide the old flag from the /config page. + module.HTTP.NoFollowRedirects = nil + c.Modules[name] = module + if logger != nil { + level.Warn(logger).Log("msg", "no_follow_redirects is deprecated and will be removed in the next release. It is replaced by follow_redirects.", "module", name) + } + } + } + + sc.Lock() + sc.C = c + sc.Unlock() + + return nil +} + +// Regexp encapsulates a regexp.Regexp and makes it YAML marshalable. +type Regexp struct { + *regexp.Regexp + original string +} + +// NewRegexp creates a new anchored Regexp and returns an error if the +// passed-in regular expression does not compile. +func NewRegexp(s string) (Regexp, error) { + regex, err := regexp.Compile(s) + return Regexp{ + Regexp: regex, + original: s, + }, err +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (re *Regexp) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + if err := unmarshal(&s); err != nil { + return err + } + r, err := NewRegexp(s) + if err != nil { + return fmt.Errorf("\"Could not compile regular expression\" regexp=\"%s\"", s) + } + *re = r + return nil +} + +// MarshalYAML implements the yaml.Marshaler interface. +func (re Regexp) MarshalYAML() (interface{}, error) { + if re.original != "" { + return re.original, nil + } + return nil, nil +} + +// MustNewRegexp works like NewRegexp, but panics if the regular expression does not compile. +func MustNewRegexp(s string) Regexp { + re, err := NewRegexp(s) + if err != nil { + panic(err) + } + return re +} + +type Module struct { + Prober string `yaml:"prober,omitempty"` + Timeout time.Duration `yaml:"timeout,omitempty"` + HTTP HTTPProbe `yaml:"http,omitempty"` + TCP TCPProbe `yaml:"tcp,omitempty"` + ICMP ICMPProbe `yaml:"icmp,omitempty"` + DNS DNSProbe `yaml:"dns,omitempty"` + GRPC GRPCProbe `yaml:"grpc,omitempty"` +} + +type HTTPProbe struct { + // Defaults to 2xx. + ValidStatusCodes []int `yaml:"valid_status_codes,omitempty"` + ValidHTTPVersions []string `yaml:"valid_http_versions,omitempty"` + IPProtocol string `yaml:"preferred_ip_protocol,omitempty"` + IPProtocolFallback bool `yaml:"ip_protocol_fallback,omitempty"` + SkipResolvePhaseWithProxy bool `yaml:"skip_resolve_phase_with_proxy,omitempty"` + NoFollowRedirects *bool `yaml:"no_follow_redirects,omitempty"` + FailIfSSL bool `yaml:"fail_if_ssl,omitempty"` + FailIfNotSSL bool `yaml:"fail_if_not_ssl,omitempty"` + Method string `yaml:"method,omitempty"` + Headers map[string]string `yaml:"headers,omitempty"` + FailIfBodyMatchesRegexp []Regexp `yaml:"fail_if_body_matches_regexp,omitempty"` + FailIfBodyNotMatchesRegexp []Regexp `yaml:"fail_if_body_not_matches_regexp,omitempty"` + FailIfHeaderMatchesRegexp []HeaderMatch `yaml:"fail_if_header_matches,omitempty"` + FailIfHeaderNotMatchesRegexp []HeaderMatch `yaml:"fail_if_header_not_matches,omitempty"` + Body string `yaml:"body,omitempty"` + BodyFile string `yaml:"body_file,omitempty"` + HTTPClientConfig config.HTTPClientConfig `yaml:"http_client_config,inline"` + Compression string `yaml:"compression,omitempty"` + BodySizeLimit units.Base2Bytes `yaml:"body_size_limit,omitempty"` +} + +type GRPCProbe struct { + Service string `yaml:"service,omitempty"` + TLS bool `yaml:"tls,omitempty"` + TLSConfig config.TLSConfig `yaml:"tls_config,omitempty"` + IPProtocolFallback bool `yaml:"ip_protocol_fallback,omitempty"` + PreferredIPProtocol string `yaml:"preferred_ip_protocol,omitempty"` +} + +type HeaderMatch struct { + Header string `yaml:"header,omitempty"` + Regexp Regexp `yaml:"regexp,omitempty"` + AllowMissing bool `yaml:"allow_missing,omitempty"` +} + +type QueryResponse struct { + Expect Regexp `yaml:"expect,omitempty"` + Send string `yaml:"send,omitempty"` + StartTLS bool `yaml:"starttls,omitempty"` +} + +type TCPProbe struct { + IPProtocol string `yaml:"preferred_ip_protocol,omitempty"` + IPProtocolFallback bool `yaml:"ip_protocol_fallback,omitempty"` + SourceIPAddress string `yaml:"source_ip_address,omitempty"` + QueryResponse []QueryResponse `yaml:"query_response,omitempty"` + TLS bool `yaml:"tls,omitempty"` + TLSConfig config.TLSConfig `yaml:"tls_config,omitempty"` +} + +type ICMPProbe struct { + IPProtocol string `yaml:"preferred_ip_protocol,omitempty"` // Defaults to "ip6". + IPProtocolFallback bool `yaml:"ip_protocol_fallback,omitempty"` + SourceIPAddress string `yaml:"source_ip_address,omitempty"` + PayloadSize int `yaml:"payload_size,omitempty"` + DontFragment bool `yaml:"dont_fragment,omitempty"` + TTL int `yaml:"ttl,omitempty"` +} + +type DNSProbe struct { + IPProtocol string `yaml:"preferred_ip_protocol,omitempty"` + IPProtocolFallback bool `yaml:"ip_protocol_fallback,omitempty"` + DNSOverTLS bool `yaml:"dns_over_tls,omitempty"` + TLSConfig config.TLSConfig `yaml:"tls_config,omitempty"` + SourceIPAddress string `yaml:"source_ip_address,omitempty"` + TransportProtocol string `yaml:"transport_protocol,omitempty"` + QueryClass string `yaml:"query_class,omitempty"` // Defaults to IN. + QueryName string `yaml:"query_name,omitempty"` + QueryType string `yaml:"query_type,omitempty"` // Defaults to ANY. + Recursion bool `yaml:"recursion_desired,omitempty"` // Defaults to true. + ValidRcodes []string `yaml:"valid_rcodes,omitempty"` // Defaults to NOERROR. + ValidateAnswer DNSRRValidator `yaml:"validate_answer_rrs,omitempty"` + ValidateAuthority DNSRRValidator `yaml:"validate_authority_rrs,omitempty"` + ValidateAdditional DNSRRValidator `yaml:"validate_additional_rrs,omitempty"` + Retries int `yaml:"retries,omitempty"` + RetryTimeout time.Duration `yaml:"retry_timeout,omitempty"` +} + +type DNSRRValidator struct { + FailIfMatchesRegexp []string `yaml:"fail_if_matches_regexp,omitempty"` + FailIfAllMatchRegexp []string `yaml:"fail_if_all_match_regexp,omitempty"` + FailIfNotMatchesRegexp []string `yaml:"fail_if_not_matches_regexp,omitempty"` + FailIfNoneMatchesRegexp []string `yaml:"fail_if_none_matches_regexp,omitempty"` +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (s *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { + type plain Config + if err := unmarshal((*plain)(s)); err != nil { + return err + } + return nil +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (s *Module) UnmarshalYAML(unmarshal func(interface{}) error) error { + *s = DefaultModule + type plain Module + if err := unmarshal((*plain)(s)); err != nil { + return err + } + return nil +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (s *HTTPProbe) UnmarshalYAML(unmarshal func(interface{}) error) error { + *s = DefaultHTTPProbe + type plain HTTPProbe + if err := unmarshal((*plain)(s)); err != nil { + return err + } + + // BodySizeLimit == 0 means no limit. By leaving it at 0 we + // avoid setting up the limiter. + if s.BodySizeLimit < 0 || s.BodySizeLimit == math.MaxInt64 { + // The implementation behind http.MaxBytesReader tries + // to add 1 to the specified limit causing it to wrap + // around and become negative, and then it tries to use + // that result to index an slice. + s.BodySizeLimit = math.MaxInt64 - 1 + } + + if err := s.HTTPClientConfig.Validate(); err != nil { + return err + } + + if s.NoFollowRedirects != nil { + s.HTTPClientConfig.FollowRedirects = !*s.NoFollowRedirects + } + + if s.Body != "" && s.BodyFile != "" { + return errors.New("setting body and body_file both are not allowed") + } + + for key, value := range s.Headers { + switch textproto.CanonicalMIMEHeaderKey(key) { + case "Accept-Encoding": + if !isCompressionAcceptEncodingValid(s.Compression, value) { + return fmt.Errorf(`invalid configuration "%s: %s", "compression: %s"`, key, value, s.Compression) + } + } + } + + return nil +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (s *GRPCProbe) UnmarshalYAML(unmarshal func(interface{}) error) error { + *s = DefaultGRPCProbe + type plain GRPCProbe + if err := unmarshal((*plain)(s)); err != nil { + return err + } + return nil +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (s *DNSProbe) UnmarshalYAML(unmarshal func(interface{}) error) error { + *s = DefaultDNSProbe + type plain DNSProbe + if err := unmarshal((*plain)(s)); err != nil { + return err + } + if s.QueryName == "" { + return errors.New("query name must be set for DNS module") + } + if s.QueryClass != "" { + if _, ok := dns.StringToClass[s.QueryClass]; !ok { + return fmt.Errorf("query class '%s' is not valid", s.QueryClass) + } + } + if s.QueryType != "" { + if _, ok := dns.StringToType[s.QueryType]; !ok { + return fmt.Errorf("query type '%s' is not valid", s.QueryType) + } + } + + return nil +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (s *TCPProbe) UnmarshalYAML(unmarshal func(interface{}) error) error { + *s = DefaultTCPProbe + type plain TCPProbe + if err := unmarshal((*plain)(s)); err != nil { + return err + } + return nil +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (s *DNSRRValidator) UnmarshalYAML(unmarshal func(interface{}) error) error { + type plain DNSRRValidator + if err := unmarshal((*plain)(s)); err != nil { + return err + } + return nil +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (s *ICMPProbe) UnmarshalYAML(unmarshal func(interface{}) error) error { + *s = DefaultICMPProbe + type plain ICMPProbe + if err := unmarshal((*plain)(s)); err != nil { + return err + } + + if runtime.GOOS == "windows" && s.DontFragment { + return errors.New("\"dont_fragment\" is not supported on windows platforms") + } + + if s.TTL < 0 { + return errors.New("\"ttl\" cannot be negative") + } + if s.TTL > 255 { + return errors.New("\"ttl\" cannot exceed 255") + } + return nil +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (s *QueryResponse) UnmarshalYAML(unmarshal func(interface{}) error) error { + type plain QueryResponse + if err := unmarshal((*plain)(s)); err != nil { + return err + } + + return nil +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (s *HeaderMatch) UnmarshalYAML(unmarshal func(interface{}) error) error { + type plain HeaderMatch + if err := unmarshal((*plain)(s)); err != nil { + return err + } + + if s.Header == "" { + return errors.New("header name must be set for HTTP header matchers") + } + + if s.Regexp.Regexp == nil || s.Regexp.Regexp.String() == "" { + return errors.New("regexp must be set for HTTP header matchers") + } + + return nil +} + +// isCompressionAcceptEncodingValid validates the compression + +// Accept-Encoding combination. +// +// If there's a compression setting, and there's also an accept-encoding +// header, they MUST match, otherwise we end up requesting something +// that doesn't include the specified compression, and that's likely to +// fail, depending on how the server is configured. Testing that the +// server _ignores_ Accept-Encoding, e.g. by not including a particular +// compression in the header but expecting it in the response falls out +// of the scope of the tests we perform. +// +// With that logic, this function validates that if a compression +// algorithm is specified, it's covered by the specified accept encoding +// header. It doesn't need to be the most preferred encoding, but it MUST +// be included in the preferred encodings. +func isCompressionAcceptEncodingValid(encoding, acceptEncoding string) bool { + // unspecified compression + any encoding value is valid + // any compression + no accept encoding is valid + if encoding == "" || acceptEncoding == "" { + return true + } + + type encodingQuality struct { + encoding string + quality float32 + } + + var encodings []encodingQuality + + for _, parts := range strings.Split(acceptEncoding, ",") { + var e encodingQuality + + if idx := strings.LastIndexByte(parts, ';'); idx == -1 { + e.encoding = strings.TrimSpace(parts) + e.quality = 1.0 + } else { + parseQuality := func(str string) float32 { + q, err := strconv.ParseFloat(str, 32) + if err != nil { + return 0 + } + return float32(math.Round(q*1000) / 1000) + } + + e.encoding = strings.TrimSpace(parts[:idx]) + + q := strings.TrimSpace(parts[idx+1:]) + q = strings.TrimPrefix(q, "q=") + e.quality = parseQuality(q) + } + + encodings = append(encodings, e) + } + + sort.SliceStable(encodings, func(i, j int) bool { + return encodings[j].quality < encodings[i].quality + }) + + for _, e := range encodings { + if encoding == e.encoding || e.encoding == "*" { + return e.quality > 0 + } + } + + return false +} diff --git a/internal/prober/dns/internal/bbe/prober/dns.go b/internal/prober/dns/internal/bbe/prober/dns.go new file mode 100644 index 000000000..713385db1 --- /dev/null +++ b/internal/prober/dns/internal/bbe/prober/dns.go @@ -0,0 +1,350 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//nolint +package prober + +import ( + "context" + "errors" + "net" + "regexp" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/miekg/dns" + "github.com/prometheus/client_golang/prometheus" + pconfig "github.com/prometheus/common/config" + + "github.com/grafana/synthetic-monitoring-agent/internal/prober/dns/internal/bbe/config" +) + +// validRRs checks a slice of RRs received from the server against a DNSRRValidator. +func validRRs(rrs *[]dns.RR, v *config.DNSRRValidator, logger log.Logger) bool { + var anyMatch bool = false + var allMatch bool = true + // Fail the probe if there are no RRs of a given type, but a regexp match is required + // (i.e. FailIfNotMatchesRegexp or FailIfNoneMatchesRegexp is set). + if len(*rrs) == 0 && len(v.FailIfNotMatchesRegexp) > 0 { + level.Error(logger).Log("msg", "fail_if_not_matches_regexp specified but no RRs returned") + return false + } + if len(*rrs) == 0 && len(v.FailIfNoneMatchesRegexp) > 0 { + level.Error(logger).Log("msg", "fail_if_none_matches_regexp specified but no RRs returned") + return false + } + for _, rr := range *rrs { + level.Info(logger).Log("msg", "Validating RR", "rr", rr) + for _, re := range v.FailIfMatchesRegexp { + match, err := regexp.MatchString(re, rr.String()) + if err != nil { + level.Error(logger).Log("msg", "Error matching regexp", "regexp", re, "err", err) + return false + } + if match { + level.Error(logger).Log("msg", "At least one RR matched regexp", "regexp", re, "rr", rr) + return false + } + } + for _, re := range v.FailIfAllMatchRegexp { + match, err := regexp.MatchString(re, rr.String()) + if err != nil { + level.Error(logger).Log("msg", "Error matching regexp", "regexp", re, "err", err) + return false + } + if !match { + allMatch = false + } + } + for _, re := range v.FailIfNotMatchesRegexp { + match, err := regexp.MatchString(re, rr.String()) + if err != nil { + level.Error(logger).Log("msg", "Error matching regexp", "regexp", re, "err", err) + return false + } + if !match { + level.Error(logger).Log("msg", "At least one RR did not match regexp", "regexp", re, "rr", rr) + return false + } + } + for _, re := range v.FailIfNoneMatchesRegexp { + match, err := regexp.MatchString(re, rr.String()) + if err != nil { + level.Error(logger).Log("msg", "Error matching regexp", "regexp", re, "err", err) + return false + } + if match { + anyMatch = true + } + } + } + if len(v.FailIfAllMatchRegexp) > 0 && !allMatch { + level.Error(logger).Log("msg", "Not all RRs matched regexp") + return false + } + if len(v.FailIfNoneMatchesRegexp) > 0 && !anyMatch { + level.Error(logger).Log("msg", "None of the RRs did matched any regexp") + return false + } + return true +} + +// validRcode checks rcode in the response against a list of valid rcodes. +func validRcode(rcode int, valid []string, logger log.Logger) bool { + var validRcodes []int + // If no list of valid rcodes is specified, only NOERROR is considered valid. + if valid == nil { + validRcodes = append(validRcodes, dns.StringToRcode["NOERROR"]) + } else { + for _, rcode := range valid { + rc, ok := dns.StringToRcode[rcode] + if !ok { + level.Error(logger).Log("msg", "Invalid rcode", "rcode", rcode, "known_rcode", dns.RcodeToString) + return false + } + validRcodes = append(validRcodes, rc) + } + } + for _, rc := range validRcodes { + if rcode == rc { + level.Info(logger).Log("msg", "Rcode is valid", "rcode", rcode, "string_rcode", dns.RcodeToString[rcode]) + return true + } + } + level.Error(logger).Log("msg", "Rcode is not one of the valid rcodes", "rcode", rcode, "string_rcode", dns.RcodeToString[rcode], "valid_rcodes", validRcodes) + return false +} + +func ProbeDNS(ctx context.Context, target string, module config.Module, registry *prometheus.Registry, logger log.Logger) bool { + success, _ := ProbeDNSWithError(ctx, target, module, registry, logger) + return success +} + +func ProbeDNSWithError(ctx context.Context, target string, module config.Module, registry *prometheus.Registry, logger log.Logger) (bool, error) { + var dialProtocol string + probeDNSDurationGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "probe_dns_duration_seconds", + Help: "Duration of DNS request by phase", + }, []string{"phase"}) + probeDNSAnswerRRSGauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "probe_dns_answer_rrs", + Help: "Returns number of entries in the answer resource record list", + }) + probeDNSAuthorityRRSGauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "probe_dns_authority_rrs", + Help: "Returns number of entries in the authority resource record list", + }) + probeDNSAdditionalRRSGauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "probe_dns_additional_rrs", + Help: "Returns number of entries in the additional resource record list", + }) + probeDNSQuerySucceeded := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "probe_dns_query_succeeded", + Help: "Displays whether or not the query was executed successfully", + }) + + for _, lv := range []string{"resolve", "connect", "request"} { + probeDNSDurationGaugeVec.WithLabelValues(lv) + } + + registry.MustRegister(probeDNSDurationGaugeVec) + registry.MustRegister(probeDNSAnswerRRSGauge) + registry.MustRegister(probeDNSAuthorityRRSGauge) + registry.MustRegister(probeDNSAdditionalRRSGauge) + registry.MustRegister(probeDNSQuerySucceeded) + + qc := uint16(dns.ClassINET) + if module.DNS.QueryClass != "" { + var ok bool + qc, ok = dns.StringToClass[module.DNS.QueryClass] + if !ok { + level.Error(logger).Log("msg", "Invalid query class", "Class seen", module.DNS.QueryClass, "Existing classes", dns.ClassToString) + return false, errors.New("Invalid query class") + } + } + + qt := dns.TypeANY + if module.DNS.QueryType != "" { + var ok bool + qt, ok = dns.StringToType[module.DNS.QueryType] + if !ok { + level.Error(logger).Log("msg", "Invalid query type", "Type seen", module.DNS.QueryType, "Existing types", dns.TypeToString) + return false, errors.New("Invalid query type") + } + } + var probeDNSSOAGauge prometheus.Gauge + + var ip *net.IPAddr + if module.DNS.TransportProtocol == "" { + module.DNS.TransportProtocol = "udp" + } + if !(module.DNS.TransportProtocol == "udp" || module.DNS.TransportProtocol == "tcp") { + level.Error(logger).Log("msg", "Configuration error: Expected transport protocol udp or tcp", "protocol", module.DNS.TransportProtocol) + return false, errors.New("Invalid transport protocol") + } + + targetAddr, port, err := net.SplitHostPort(target) + if err != nil { + // Target only contains host so fallback to default port and set targetAddr as target. + if module.DNS.DNSOverTLS { + port = "853" + } else { + port = "53" + } + targetAddr = target + } + ip, lookupTime, err := chooseProtocol(ctx, module.DNS.IPProtocol, module.DNS.IPProtocolFallback, targetAddr, registry, logger) + if err != nil { + level.Error(logger).Log("msg", "Error resolving address", "err", err) + return false, errors.Join(err, errors.New("Error resolving address")) + } + probeDNSDurationGaugeVec.WithLabelValues("resolve").Add(lookupTime) + targetIP := net.JoinHostPort(ip.String(), port) + + if ip.IP.To4() == nil { + dialProtocol = module.DNS.TransportProtocol + "6" + } else { + dialProtocol = module.DNS.TransportProtocol + "4" + } + + if module.DNS.DNSOverTLS { + if module.DNS.TransportProtocol == "tcp" { + dialProtocol += "-tls" + } else { + level.Error(logger).Log("msg", "Configuration error: Expected transport protocol tcp for DoT", "protocol", module.DNS.TransportProtocol) + return false, errors.New("Invalid transport protocol") + } + } + + client := new(dns.Client) + client.Net = dialProtocol + + if module.DNS.DNSOverTLS { + tlsConfig, err := pconfig.NewTLSConfig(&module.DNS.TLSConfig) + if err != nil { + level.Error(logger).Log("msg", "Failed to create TLS configuration", "err", err) + return false, errors.Join(err, errors.New("Failed to create TLS configuration")) + } + if tlsConfig.ServerName == "" { + // Use target-hostname as default for TLS-servername. + tlsConfig.ServerName = targetAddr + } + + client.TLSConfig = tlsConfig + } + + // Use configured SourceIPAddress. + if len(module.DNS.SourceIPAddress) > 0 { + srcIP := net.ParseIP(module.DNS.SourceIPAddress) + if srcIP == nil { + level.Error(logger).Log("msg", "Error parsing source ip address", "srcIP", module.DNS.SourceIPAddress) + return false, errors.New("Error parsing source ip address") + } + level.Info(logger).Log("msg", "Using local address", "srcIP", srcIP) + client.Dialer = &net.Dialer{} + if module.DNS.TransportProtocol == "tcp" { + client.Dialer.LocalAddr = &net.TCPAddr{IP: srcIP} + } else { + client.Dialer.LocalAddr = &net.UDPAddr{IP: srcIP} + } + } + + for retry := 0; retry <= module.DNS.Retries; retry++ { + msg := new(dns.Msg) + msg.Id = dns.Id() + msg.RecursionDesired = module.DNS.Recursion + msg.Question = make([]dns.Question, 1) + msg.Question[0] = dns.Question{Name: dns.Fqdn(module.DNS.QueryName), Qtype: qt, Qclass: qc} + + level.Info(logger).Log("msg", "Making DNS query", "target", targetIP, "dial_protocol", dialProtocol, "query", module.DNS.QueryName, "type", qt, "class", qc, "retry", retry) + + if module.DNS.RetryTimeout > 0 { + client.Timeout = module.DNS.RetryTimeout + } else { + timeoutDeadline, _ := ctx.Deadline() + client.Timeout = time.Until(timeoutDeadline) + } + + requestStart := time.Now() + response, rtt, err := client.ExchangeContext(ctx, msg, targetIP) + // The rtt value returned from client.Exchange includes only the time to + // exchange messages with the server _after_ the connection is created. + // We compute the connection time as the total time for the operation + // minus the time for the actual request rtt. + probeDNSDurationGaugeVec.WithLabelValues("connect").Set((time.Since(requestStart) - rtt).Seconds()) + probeDNSDurationGaugeVec.WithLabelValues("request").Set(rtt.Seconds()) + + if err != nil { + level.Error(logger).Log("msg", "Error while sending a DNS query", "err", err) + + if errors.Is(err, context.DeadlineExceeded) { + // We ran out of time. The probe was not successful. + return false, err + } + + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + // The request timed out. Retry. + continue + } + + return false, errors.Join(err, errors.New("Error while sending a DNS query")) + } + + level.Info(logger).Log("msg", "Got response", "response", response, "retry", retry) + + probeDNSAnswerRRSGauge.Set(float64(len(response.Answer))) + probeDNSAuthorityRRSGauge.Set(float64(len(response.Ns))) + probeDNSAdditionalRRSGauge.Set(float64(len(response.Extra))) + probeDNSQuerySucceeded.Set(1) + + if qt == dns.TypeSOA { + probeDNSSOAGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "probe_dns_serial", + Help: "Returns the serial number of the zone", + }) + registry.MustRegister(probeDNSSOAGauge) + + for _, a := range response.Answer { + if soa, ok := a.(*dns.SOA); ok { + probeDNSSOAGauge.Set(float64(soa.Serial)) + } + } + } + + if !validRcode(response.Rcode, module.DNS.ValidRcodes, logger) { + return false, errors.New("Invalid rcode") + } + level.Info(logger).Log("msg", "Validating Answer RRs") + if !validRRs(&response.Answer, &module.DNS.ValidateAnswer, logger) { + level.Error(logger).Log("msg", "Answer RRs validation failed") + return false, errors.New("Answer RRs validation failed") + } + level.Info(logger).Log("msg", "Validating Authority RRs") + if !validRRs(&response.Ns, &module.DNS.ValidateAuthority, logger) { + level.Error(logger).Log("msg", "Authority RRs validation failed") + return false, errors.New("Authority RRs validation failed") + } + level.Info(logger).Log("msg", "Validating Additional RRs") + if !validRRs(&response.Extra, &module.DNS.ValidateAdditional, logger) { + level.Error(logger).Log("msg", "Additional RRs validation failed") + return false, errors.New("Additional RRs validation failed") + } + + // We did everything we needed to do. The probe was successful. + return true, nil + } + + return false, errors.New("Too many retries.") +} diff --git a/internal/prober/dns/internal/bbe/prober/utils.go b/internal/prober/dns/internal/bbe/prober/utils.go new file mode 100644 index 000000000..94de66753 --- /dev/null +++ b/internal/prober/dns/internal/bbe/prober/utils.go @@ -0,0 +1,145 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//nolint +package prober + +import ( + "context" + "fmt" + "hash/fnv" + "net" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + + "github.com/prometheus/client_golang/prometheus" +) + +var protocolToGauge = map[string]float64{ + "ip4": 4, + "ip6": 6, +} + +// Returns the IP for the IPProtocol and lookup time. +func chooseProtocol(ctx context.Context, IPProtocol string, fallbackIPProtocol bool, target string, registry *prometheus.Registry, logger log.Logger) (ip *net.IPAddr, lookupTime float64, err error) { + var fallbackProtocol string + probeDNSLookupTimeSeconds := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "probe_dns_lookup_time_seconds", + Help: "Returns the time taken for probe dns lookup in seconds", + }) + + probeIPProtocolGauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "probe_ip_protocol", + Help: "Specifies whether probe ip protocol is IP4 or IP6", + }) + + probeIPAddrHash := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "probe_ip_addr_hash", + Help: "Specifies the hash of IP address. It's useful to detect if the IP address changes.", + }) + registry.MustRegister(probeIPProtocolGauge) + registry.MustRegister(probeDNSLookupTimeSeconds) + registry.MustRegister(probeIPAddrHash) + + if IPProtocol == "ip6" || IPProtocol == "" { + IPProtocol = "ip6" + fallbackProtocol = "ip4" + } else { + IPProtocol = "ip4" + fallbackProtocol = "ip6" + } + + level.Info(logger).Log("msg", "Resolving target address", "target", target, "ip_protocol", IPProtocol) + resolveStart := time.Now() + + defer func() { + lookupTime = time.Since(resolveStart).Seconds() + probeDNSLookupTimeSeconds.Add(lookupTime) + }() + + resolver := &net.Resolver{} + if !fallbackIPProtocol { + ips, err := resolver.LookupIP(ctx, IPProtocol, target) + if err == nil { + for _, ip := range ips { + level.Info(logger).Log("msg", "Resolved target address", "target", target, "ip", ip.String()) + probeIPProtocolGauge.Set(protocolToGauge[IPProtocol]) + probeIPAddrHash.Set(ipHash(ip)) + return &net.IPAddr{IP: ip}, lookupTime, nil + } + } + level.Error(logger).Log("msg", "Resolution with IP protocol failed", "target", target, "ip_protocol", IPProtocol, "err", err) + return nil, 0.0, err + } + + ips, err := resolver.LookupIPAddr(ctx, target) + if err != nil { + level.Error(logger).Log("msg", "Resolution with IP protocol failed", "target", target, "err", err) + return nil, 0.0, err + } + + // Return the IP in the requested protocol. + var fallback *net.IPAddr + for _, ip := range ips { + switch IPProtocol { + case "ip4": + if ip.IP.To4() != nil { + level.Info(logger).Log("msg", "Resolved target address", "target", target, "ip", ip.String()) + probeIPProtocolGauge.Set(4) + probeIPAddrHash.Set(ipHash(ip.IP)) + return &ip, lookupTime, nil + } + + // ip4 as fallback + fallback = &ip + + case "ip6": + if ip.IP.To4() == nil { + level.Info(logger).Log("msg", "Resolved target address", "target", target, "ip", ip.String()) + probeIPProtocolGauge.Set(6) + probeIPAddrHash.Set(ipHash(ip.IP)) + return &ip, lookupTime, nil + } + + // ip6 as fallback + fallback = &ip + } + } + + // Unable to find ip and no fallback set. + if fallback == nil || !fallbackIPProtocol { + return nil, 0.0, fmt.Errorf("unable to find ip; no fallback") + } + + // Use fallback ip protocol. + if fallbackProtocol == "ip4" { + probeIPProtocolGauge.Set(4) + } else { + probeIPProtocolGauge.Set(6) + } + probeIPAddrHash.Set(ipHash(fallback.IP)) + level.Info(logger).Log("msg", "Resolved target address", "target", target, "ip", fallback.String()) + return fallback, lookupTime, nil +} + +func ipHash(ip net.IP) float64 { + h := fnv.New32a() + if ip.To4() != nil { + h.Write(ip.To4()) + } else { + h.Write(ip.To16()) + } + return float64(h.Sum32()) +} diff --git a/internal/prober/prober.go b/internal/prober/prober.go index 9a63c609a..41ae8d822 100644 --- a/internal/prober/prober.go +++ b/internal/prober/prober.go @@ -6,6 +6,7 @@ import ( "net/http" "github.com/grafana/synthetic-monitoring-agent/internal/error_types" + "github.com/grafana/synthetic-monitoring-agent/internal/feature" "github.com/grafana/synthetic-monitoring-agent/internal/k6runner" "github.com/grafana/synthetic-monitoring-agent/internal/model" "github.com/grafana/synthetic-monitoring-agent/internal/prober/browser" @@ -39,14 +40,16 @@ type ProberFactory interface { } type proberFactory struct { - runner k6runner.Runner - probeId int64 + runner k6runner.Runner + probeId int64 + features feature.Collection } -func NewProberFactory(runner k6runner.Runner, probeId int64) ProberFactory { +func NewProberFactory(runner k6runner.Runner, probeId int64, features feature.Collection) ProberFactory { return proberFactory{ - runner: runner, - probeId: probeId, + runner: runner, + probeId: probeId, + features: features, } } @@ -68,7 +71,11 @@ func (f proberFactory) New(ctx context.Context, logger zerolog.Logger, check mod target = check.Target case sm.CheckTypeDns: - p, err = dns.NewProber(check.Check) + if f.features.IsSet(feature.ExperimentalDnsProber) { + p, err = dns.NewExperimentalProber(check.Check) + } else { + p, err = dns.NewProber(check.Check) + } target = check.Settings.Dns.Server case sm.CheckTypeTcp: diff --git a/internal/scraper/scraper.go b/internal/scraper/scraper.go index c491e61b5..0d8aa64b4 100644 --- a/internal/scraper/scraper.go +++ b/internal/scraper/scraper.go @@ -22,6 +22,7 @@ import ( "github.com/rs/zerolog" logproto "github.com/grafana/loki/pkg/push" + "github.com/grafana/synthetic-monitoring-agent/internal/feature" "github.com/grafana/synthetic-monitoring-agent/internal/k6runner" "github.com/grafana/synthetic-monitoring-agent/internal/model" "github.com/grafana/synthetic-monitoring-agent/internal/prober" @@ -77,8 +78,12 @@ type Scraper struct { } type Factory func( - ctx context.Context, check model.Check, publisher pusher.Publisher, probe sm.Probe, logger zerolog.Logger, - metrics Metrics, k6runner k6runner.Runner, labelsLimiter LabelsLimiter, + ctx context.Context, check model.Check, publisher pusher.Publisher, probe sm.Probe, + features feature.Collection, + logger zerolog.Logger, + metrics Metrics, + k6runner k6runner.Runner, + labelsLimiter LabelsLimiter, telemeter *telemetry.Telemeter, ) (*Scraper, error) @@ -107,20 +112,26 @@ func (d *probeData) Tenant() model.GlobalID { func New( ctx context.Context, check model.Check, publisher pusher.Publisher, probe sm.Probe, - logger zerolog.Logger, metrics Metrics, - k6runner k6runner.Runner, labelsLimiter LabelsLimiter, telemeter *telemetry.Telemeter, + features feature.Collection, + logger zerolog.Logger, + metrics Metrics, + k6runner k6runner.Runner, + labelsLimiter LabelsLimiter, + telemeter *telemetry.Telemeter, ) (*Scraper, error) { return NewWithOpts(ctx, check, ScraperOpts{ Probe: probe, Publisher: publisher, Logger: logger, Metrics: metrics, - ProbeFactory: prober.NewProberFactory(k6runner, probe.Id), + ProbeFactory: prober.NewProberFactory(k6runner, probe.Id, features), LabelsLimiter: labelsLimiter, Telemeter: telemeter, }) } +var _ Factory = New + type ScraperOpts struct { Probe sm.Probe Publisher pusher.Publisher