From 2076b8bcd39f041106d37de7e4a829f666de9813 Mon Sep 17 00:00:00 2001 From: someshkoli Date: Mon, 30 Aug 2021 20:40:01 +0530 Subject: [PATCH] pull work from #4104 Signed-off-by: someshkoli --- cmd/thanos/config.go | 18 +---- cmd/thanos/sidecar.go | 28 +++++--- go.mod | 3 +- go.sum | 3 +- pkg/extkingpin/flags.go | 8 +++ pkg/http/http.go | 125 ++++++++++++++++++++++++++++++++- pkg/promclient/promclient.go | 10 +-- pkg/reloader/reloader.go | 3 +- test/e2e/e2ethanos/services.go | 61 ++++++++++++++++ test/e2e/query_test.go | 39 ++++++++++ 10 files changed, 264 insertions(+), 34 deletions(-) diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index 3770389a68..46be3b70d3 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -65,6 +65,7 @@ func (hc *httpConfig) registerFlag(cmd extkingpin.FlagClause) *httpConfig { type prometheusConfig struct { url *url.URL readyTimeout time.Duration + httpClient *extflag.PathOrContent } func (pc *prometheusConfig) registerFlag(cmd extkingpin.FlagClause) *prometheusConfig { @@ -74,22 +75,9 @@ func (pc *prometheusConfig) registerFlag(cmd extkingpin.FlagClause) *prometheusC cmd.Flag("prometheus.ready_timeout", "Maximum time to wait for the Prometheus instance to start up"). Default("10m").DurationVar(&pc.readyTimeout) - return pc -} + pc.httpClient = extkingpin.RegisterHTTPConfigFlags(cmd) -type connConfig struct { - maxIdleConns int - maxIdleConnsPerHost int -} - -func (cc *connConfig) registerFlag(cmd extkingpin.FlagClause) *connConfig { - cmd.Flag("receive.connection-pool-size", - "Controls the http MaxIdleConns. Default is 0, which is unlimited"). - IntVar(&cc.maxIdleConns) - cmd.Flag("receive.connection-pool-size-per-host", - "Controls the http MaxIdleConnsPerHost"). - Default("100").IntVar(&cc.maxIdleConnsPerHost) - return cc + return pc } type tsdbConfig struct { diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index fee36d6537..d69ce79c80 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -6,7 +6,6 @@ package main import ( "context" "math" - "net/http" "net/url" "sync" "time" @@ -26,7 +25,6 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/exemplars" - "github.com/thanos-io/thanos/pkg/exthttp" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/extprom" thanoshttp "github.com/thanos-io/thanos/pkg/http" @@ -45,7 +43,6 @@ import ( "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/targets" "github.com/thanos-io/thanos/pkg/tls" - "github.com/thanos-io/thanos/pkg/tracing" ) func registerSidecar(app *extkingpin.App) { @@ -84,6 +81,22 @@ func runSidecar( grpcLogOpts []grpc_logging.Option, tagOpts []tags.Option, ) error { + httpConfContentYaml, err := conf.prometheus.httpClient.Content() + if err != nil { + return errors.Wrap(err, "getting http client config") + } + httpClientConfig, err := thanoshttp.NewClientConfigFromYAML(httpConfContentYaml) + if err != nil { + return errors.Wrap(err, "parsing http config YAML") + } + + httpClient, err := thanoshttp.NewHTTPClient(*httpClientConfig, "thanos-sidecar") + if err != nil { + return errors.Wrap(err, "Improper http client config") + } + + reloader.HTTPClient = *httpClient + var m = &promMetadata{ promURL: conf.prometheus.url, @@ -93,7 +106,7 @@ func runSidecar( maxt: math.MaxInt64, limitMinTime: conf.limitMinTime, - client: promclient.NewWithTracingClient(logger, "thanos-sidecar"), + client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"), } confContentYaml, err := conf.objStore.Content() @@ -231,10 +244,7 @@ func runSidecar( }) } { - t := exthttp.NewTransport() - t.MaxIdleConnsPerHost = conf.connection.maxIdleConnsPerHost - t.MaxIdleConns = conf.connection.maxIdleConns - c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, thanoshttp.ThanosUserAgent) + c := promclient.NewWithTracingClient(logger, httpClient, thanoshttp.ThanosUserAgent) promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version) if err != nil { @@ -441,7 +451,6 @@ type sidecarConfig struct { http httpConfig grpc grpcConfig prometheus prometheusConfig - connection connConfig tsdb tsdbConfig reloader reloaderConfig reqLogConfig *extflag.PathOrContent @@ -454,7 +463,6 @@ func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) { sc.http.registerFlag(cmd) sc.grpc.registerFlag(cmd) sc.prometheus.registerFlag(cmd) - sc.connection.registerFlag(cmd) sc.tsdb.registerFlag(cmd) sc.reloader.registerFlag(cmd) sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) diff --git a/go.mod b/go.mod index 0586501dd0..e44ac67dd6 100644 --- a/go.mod +++ b/go.mod @@ -56,7 +56,7 @@ require ( github.com/prometheus/alertmanager v0.21.1-0.20210422101724-8176f78a70e1 github.com/prometheus/client_golang v1.10.0 github.com/prometheus/client_model v0.2.0 - github.com/prometheus/common v0.23.0 + github.com/prometheus/common v0.25.0 github.com/prometheus/exporter-toolkit v0.5.1 github.com/prometheus/prometheus v1.8.2-0.20210519120135-d95b0972505f github.com/uber/jaeger-client-go v2.28.0+incompatible @@ -68,6 +68,7 @@ require ( go.uber.org/automaxprocs v1.4.0 go.uber.org/goleak v1.1.10 golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a + golang.org/x/net v0.0.0-20210505214959-0714010a04ed golang.org/x/oauth2 v0.0.0-20210427180440-81ed05c6b58c golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/text v0.3.6 diff --git a/go.sum b/go.sum index 89e4cc817d..23ebe9b8a1 100644 --- a/go.sum +++ b/go.sum @@ -1129,8 +1129,9 @@ github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16 github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= github.com/prometheus/common v0.20.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= github.com/prometheus/common v0.21.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= -github.com/prometheus/common v0.23.0 h1:GXWvPYuTUenIa+BhOq/x+L/QZzCqASkVRny5KTlPDGM= github.com/prometheus/common v0.23.0/go.mod h1:H6QK/N6XVT42whUeIdI3dp36w49c+/iMDk7UAI2qm7Q= +github.com/prometheus/common v0.25.0 h1:IjJYZJCI8HZYtqA3xYwGyDzSCy1r4CA2GRh+4vdOmtE= +github.com/prometheus/common v0.25.0/go.mod h1:H6QK/N6XVT42whUeIdI3dp36w49c+/iMDk7UAI2qm7Q= github.com/prometheus/exporter-toolkit v0.5.0/go.mod h1:OCkM4805mmisBhLmVFw858QYi3v0wKdY6/UxrT0pZVg= github.com/prometheus/exporter-toolkit v0.5.1 h1:9eqgis5er9xN613ZSADjypCJaDGj9ZlcWBvsIHa8/3c= github.com/prometheus/exporter-toolkit v0.5.1/go.mod h1:OCkM4805mmisBhLmVFw858QYi3v0wKdY6/UxrT0pZVg= diff --git a/pkg/extkingpin/flags.go b/pkg/extkingpin/flags.go index a7468b83a9..4d94029794 100644 --- a/pkg/extkingpin/flags.go +++ b/pkg/extkingpin/flags.go @@ -98,3 +98,11 @@ func RegisterSelectorRelabelFlags(cmd FlagClause) *extflag.PathOrContent { extflag.WithEnvSubstitution(), ) } + +func RegisterHTTPConfigFlags(cmd FlagClause) *extflag.PathOrContent { + return extflag.RegisterPathOrContent( + cmd, + "prometheus.http-client", + "YAML file or string with http client configs. see Format details : ...", + ) +} diff --git a/pkg/http/http.go b/pkg/http/http.go index be4233cdfb..a076180c08 100644 --- a/pkg/http/http.go +++ b/pkg/http/http.go @@ -6,18 +6,22 @@ package http import ( "context" + "crypto/tls" "fmt" "net/http" "net/url" "path" "sync" + "time" "github.com/go-kit/kit/log" + "github.com/mwitkow/go-conntrack" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/version" "github.com/prometheus/prometheus/discovery/file" "github.com/prometheus/prometheus/discovery/targetgroup" + "golang.org/x/net/http2" "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/discovery/cache" @@ -35,6 +39,9 @@ type ClientConfig struct { ProxyURL string `yaml:"proxy_url"` // TLSConfig to use to connect to the targets. TLSConfig TLSConfig `yaml:"tls_config"` + + // TransportConfig for Client transport properties + TransportConfig TransportConfig `yaml:"transport_config"` } // TLSConfig configures TLS connections. @@ -63,6 +70,104 @@ func (b BasicAuth) IsZero() bool { return b.Username == "" && b.Password == "" && b.PasswordFile == "" } +// Transport configures client's transport properties. +type TransportConfig struct { + MaxIdleConns int `yaml:"max_idle_conns"` + MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"` + IdleConnTimeout int `yaml:"idle_conn_timeout"` + ResponseHeaderTimeout int `yaml:"response_header_timeout"` + ExpectContinueTimeout int `yaml:"expect_continue_timeout"` + MaxConnsPerHost int `yaml:"max_conns_per_host"` + DisableCompression bool `yaml:"disable_compression"` + TLSHandshakeTimeout int `yaml:"tls_handshake_timeout"` +} + +var defaultTransportConfig TransportConfig = TransportConfig{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 2, + ResponseHeaderTimeout: 0, + MaxConnsPerHost: 0, + IdleConnTimeout: int(90 * time.Second), + ExpectContinueTimeout: int(10 * time.Second), + DisableCompression: false, + TLSHandshakeTimeout: int(10 * time.Second), +} + +func NewClientConfigFromYAML(cfg []byte) (*ClientConfig, error) { + conf := &ClientConfig{TransportConfig: defaultTransportConfig} + if err := yaml.Unmarshal(cfg, conf); err != nil { + return nil, err + } + return conf, nil +} + +// NewRoundTripperFromConfig returns a new HTTP RoundTripper configured for the +// given http.HTTPClientConfig and http.HTTPClientOption. +func NewRoundTripperFromConfig(cfg config_util.HTTPClientConfig, transportConfig TransportConfig, name string) (http.RoundTripper, error) { + newRT := func(tlsConfig *tls.Config) (http.RoundTripper, error) { + var rt http.RoundTripper = &http.Transport{ + Proxy: http.ProxyURL(cfg.ProxyURL.URL), + MaxIdleConns: transportConfig.MaxIdleConns, + MaxIdleConnsPerHost: transportConfig.MaxIdleConnsPerHost, + MaxConnsPerHost: transportConfig.MaxConnsPerHost, + TLSClientConfig: tlsConfig, + DisableCompression: transportConfig.DisableCompression, + IdleConnTimeout: time.Duration(transportConfig.IdleConnTimeout), + ResponseHeaderTimeout: time.Duration(transportConfig.ResponseHeaderTimeout), + ExpectContinueTimeout: time.Duration(transportConfig.ExpectContinueTimeout), + TLSHandshakeTimeout: time.Duration(transportConfig.TLSHandshakeTimeout), + DialContext: conntrack.NewDialContextFunc( + conntrack.DialWithTracing(), + conntrack.DialWithName(name)), + } + + // HTTP/2 support is golang has many problematic cornercases where + // dead connections would be kept and used in connection pools. + // https://github.com/golang/go/issues/32388 + // https://github.com/golang/go/issues/39337 + // https://github.com/golang/go/issues/39750 + // TODO: Re-Enable HTTP/2 once upstream issue is fixed. + // TODO: use ForceAttemptHTTP2 when we move to Go 1.13+. + err := http2.ConfigureTransport(rt.(*http.Transport)) + if err != nil { + return nil, err + } + + // If a authorization_credentials is provided, create a round tripper that will set the + // Authorization header correctly on each request. + if cfg.Authorization != nil && len(cfg.Authorization.Credentials) > 0 { + rt = config_util.NewAuthorizationCredentialsRoundTripper(cfg.Authorization.Type, cfg.Authorization.Credentials, rt) + } else if cfg.Authorization != nil && len(cfg.Authorization.CredentialsFile) > 0 { + rt = config_util.NewAuthorizationCredentialsFileRoundTripper(cfg.Authorization.Type, cfg.Authorization.CredentialsFile, rt) + } + // Backwards compatibility, be nice with importers who would not have + // called Validate(). + if len(cfg.BearerToken) > 0 { + rt = config_util.NewAuthorizationCredentialsRoundTripper("Bearer", cfg.BearerToken, rt) + } else if len(cfg.BearerTokenFile) > 0 { + rt = config_util.NewAuthorizationCredentialsFileRoundTripper("Bearer", cfg.BearerTokenFile, rt) + } + + if cfg.BasicAuth != nil { + rt = config_util.NewBasicAuthRoundTripper(cfg.BasicAuth.Username, cfg.BasicAuth.Password, cfg.BasicAuth.PasswordFile, rt) + } + // Return a new configured RoundTripper. + return rt, nil + } + + tlsConfig, err := config_util.NewTLSConfig(&cfg.TLSConfig) + if err != nil { + return nil, err + } + + if len(cfg.TLSConfig.CAFile) == 0 { + // No need for a RoundTripper that reloads the CA file automatically. + return newRT(tlsConfig) + } + + return config_util.NewTLSRoundTripper(tlsConfig, cfg.TLSConfig.CAFile, newRT) +} + // NewHTTPClient returns a new HTTP client. func NewHTTPClient(cfg ClientConfig, name string) (*http.Client, error) { httpClientConfig := config_util.HTTPClientConfig{ @@ -91,15 +196,31 @@ func NewHTTPClient(cfg ClientConfig, name string) (*http.Client, error) { PasswordFile: cfg.BasicAuth.PasswordFile, } } + + if cfg.BearerToken != "" { + httpClientConfig.BearerToken = config_util.Secret(cfg.BearerToken) + } + + if cfg.BearerTokenFile != "" { + httpClientConfig.BearerTokenFile = cfg.BearerTokenFile + } + if err := httpClientConfig.Validate(); err != nil { return nil, err } - client, err := config_util.NewClientFromConfig(httpClientConfig, name, config_util.WithHTTP2Disabled()) + rt, err := NewRoundTripperFromConfig( + httpClientConfig, + cfg.TransportConfig, + name, + ) if err != nil { return nil, err } - client.Transport = &userAgentRoundTripper{name: ThanosUserAgent, rt: client.Transport} + + rt = &userAgentRoundTripper{name: ThanosUserAgent, rt: rt} + client := &http.Client{Transport: rt} + return client, nil } diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index 614bf9df68..83c156e600 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" + thanoshttp "github.com/thanos-io/thanos/pkg/http" "github.com/thanos-io/thanos/pkg/metadata/metadatapb" "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/runutil" @@ -84,18 +85,19 @@ func NewClient(c HTTPClient, logger log.Logger, userAgent string) *Client { // NewDefaultClient returns Client with tracing tripperware. func NewDefaultClient() *Client { + client, _ := thanoshttp.NewHTTPClient(thanoshttp.ClientConfig{}, "") return NewWithTracingClient( log.NewNopLogger(), + client, "", ) } // NewWithTracingClient returns client with tracing tripperware. -func NewWithTracingClient(logger log.Logger, userAgent string) *Client { +func NewWithTracingClient(logger log.Logger, httpClient *http.Client, userAgent string) *Client { + httpClient.Transport = tracing.HTTPTripperware(log.NewNopLogger(), httpClient.Transport) return NewClient( - &http.Client{ - Transport: tracing.HTTPTripperware(log.NewNopLogger(), http.DefaultTransport), - }, + httpClient, logger, userAgent, ) diff --git a/pkg/reloader/reloader.go b/pkg/reloader/reloader.go index 1e6c9fde7d..9d8150f1cb 100644 --- a/pkg/reloader/reloader.go +++ b/pkg/reloader/reloader.go @@ -85,6 +85,7 @@ import ( type Reloader struct { logger log.Logger reloadURL *url.URL + HTTPClient http.Client cfgFile string cfgOutputFile string watchInterval time.Duration @@ -413,7 +414,7 @@ func (r *Reloader) triggerReload(ctx context.Context) error { } req = req.WithContext(ctx) - resp, err := http.DefaultClient.Do(req) + resp, err := r.HTTPClient.Do(req) if err != nil { return errors.Wrap(err, "reload request failed") } diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 3d83325774..681f15b5e1 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -716,3 +716,64 @@ func NewToolsBucketWeb( return toolsBucketWeb, nil } + +func NewPrometheusAndSidecarWithBasicAuth(sharedDir string, netName string, name string, promConfig, webConfig, promImage string) (*e2e.HTTPService, *Service, error) { + dir := filepath.Join(sharedDir, "data", "prometheus", name) + container := filepath.Join(e2e.ContainerSharedDir, "data", "prometheus", name) + if err := os.MkdirAll(dir, 0777); err != nil { + return nil, nil, errors.Wrap(err, "create prometheus dir") + } + + if err := ioutil.WriteFile(filepath.Join(dir, "prometheus.yml"), []byte(promConfig), 0666); err != nil { + return nil, nil, errors.Wrap(err, "creating prom config failed") + } + + if err := ioutil.WriteFile(filepath.Join(dir, "web-config.yml"), []byte(webConfig), 0666); err != nil { + return nil, nil, errors.Wrap(err, "creating web-config failed") + } + + args := e2e.BuildArgs(map[string]string{ + "--config.file": filepath.Join(container, "prometheus.yml"), + "--storage.tsdb.path": container, + "--storage.tsdb.max-block-duration": "2h", + "--log.level": infoLogLevel, + "--web.listen-address": ":9090", + "--web.config.file": filepath.Join(container, "web-config.yml"), + }) + prom := e2e.NewHTTPService( + fmt.Sprintf("prometheus-%s", name), + promImage, + e2e.NewCommandWithoutEntrypoint("prometheus", args...), + e2e.NewHTTPReadinessProbe(9090, "/-/ready", 200, 200), + 9090, + ) + prom.SetUser(strconv.Itoa(os.Getuid())) + prom.SetBackoff(defaultBackoffConfig) + + args = e2e.BuildArgs(map[string]string{ + "--debug.name": fmt.Sprintf("sidecar-%v", name), + "--grpc-address": ":9091", + "--grpc-grace-period": "0s", + "--http-address": ":8080", + "--prometheus.url": "http://" + prom.NetworkEndpointFor(netName, 9090), + "--tsdb.path": container, + "--log.level": infoLogLevel, + "--prometheus.http-client": ` +basic_auth: + username: test + password: test +`, + }) + sidecar := NewService( + fmt.Sprintf("sidecar-%s", name), + DefaultImage(), + e2e.NewCommand("sidecar", args...), + e2e.NewHTTPReadinessProbe(8080, "/-/ready", 200, 200), + 8080, + 9091, + ) + sidecar.SetUser(strconv.Itoa(os.Getuid())) + sidecar.SetBackoff(defaultBackoffConfig) + + return prom, sidecar, nil +} diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index cbae749af5..56c2a40888 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -85,6 +85,14 @@ rule_files: return config } +func defaultWebConfig() string { + // username: test, secret: test(bcrypt hash) + return ` +basic_auth_users: + test: $2y$10$IsC9GG9U61sPCuDwwwcnPuMRyzx62cIcdNRs4SIdKwgWihfX4IC.C +` +} + func sortResults(res model.Vector) { sort.Slice(res, func(i, j int) bool { return res[i].String() < res[j].String() @@ -340,6 +348,37 @@ func TestQueryLabelValues(t *testing.T) { ) } +func TestQueryWithAuthorizedSidecar(t *testing.T) { + t.Parallel() + + s, err := e2e.NewScenario("e2e_test_query_authorized_sidecar") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, s)) + + prom, sidecar, err := e2ethanos.NewPrometheusAndSidecarWithBasicAuth(s.SharedDir(), s.NetworkName(), "alone", defaultPromConfig("prom-alone", 0, "", ""), defaultWebConfig(), e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, err) + _ = s.StartAndWaitReady(prom, sidecar) + + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{sidecar.GRPCNetworkEndpoint()}).Build() + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + t.Cleanup(cancel) + + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics)) + + queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "job": "myself", + "prometheus": "prom-alone", + "replica": "0", + }, + }) +} + func checkNetworkRequests(t *testing.T, addr string) { ctx, cancel := chromedp.NewContext(context.Background()) t.Cleanup(cancel)