diff --git a/cmd/query/app/flags.go b/cmd/query/app/flags.go index 006c43712e93..668ba208621b 100644 --- a/cmd/query/app/flags.go +++ b/cmd/query/app/flags.go @@ -23,10 +23,11 @@ import ( ) const ( - queryPort = "query.port" - queryBasePath = "query.base-path" - queryStaticFiles = "query.static-files" - queryUIConfig = "query.ui-config" + queryPort = "query.port" + queryBasePath = "query.base-path" + queryStaticFiles = "query.static-files" + queryUIConfig = "query.ui-config" + queryTokenPropagation = "query.bearer-token-propagation" ) // QueryOptions holds configuration for query service @@ -39,6 +40,8 @@ type QueryOptions struct { StaticAssets string // UIConfig is the path to a configuration file for the UI UIConfig string + // BearerTokenPropagation activate/deactivate bearer token propagation to storage + BearerTokenPropagation bool } // AddFlags adds flags for QueryOptions @@ -47,6 +50,8 @@ func AddFlags(flagSet *flag.FlagSet) { flagSet.String(queryBasePath, "/", "The base path for all HTTP routes, e.g. /jaeger; useful when running behind a reverse proxy") flagSet.String(queryStaticFiles, "", "The directory path override for the static assets for the UI") flagSet.String(queryUIConfig, "", "The path to the UI configuration file in JSON format") + flagSet.Bool(queryTokenPropagation, true, "Allow propagation of bearer token to be used by storage plugins") + } // InitFromViper initializes QueryOptions with properties from viper @@ -55,5 +60,6 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper) *QueryOptions { qOpts.BasePath = v.GetString(queryBasePath) qOpts.StaticAssets = v.GetString(queryStaticFiles) qOpts.UIConfig = v.GetString(queryUIConfig) + qOpts.BearerTokenPropagation = v.GetBool(queryTokenPropagation) return qOpts } diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index cddcc7bf1689..89aff3aa4db9 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -31,6 +31,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/recoveryhandler" "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "github.com/jaegertracing/jaeger/storage/spanstore" ) // Server runs HTTP, Mux and a grpc server @@ -80,14 +81,33 @@ func createHTTPServer(querySvc *querysvc.QueryService, queryOpts *QueryOptions, apiHandler.RegisterRoutes(r) RegisterStaticHandler(r, logger, queryOpts) - compressHandler := handlers.CompressHandler(r) + var handler http.Handler = r + if queryOpts.BearerTokenPropagation { + handler = bearTokenPropagationHandler(logger, r) + } + handler = handlers.CompressHandler(handler) recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true) - return &http.Server{ - Handler: recoveryHandler(compressHandler), + Handler: recoveryHandler(handler), } } +func bearTokenPropagationHandler(logger *zap.Logger, h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + authHeaderValue := r.Header.Get("Authorization") + if authHeaderValue != "" { + bearerToken := strings.Split(authHeaderValue, " ") + if len(bearerToken) == 2 { + h.ServeHTTP(w, r.WithContext(spanstore.ContextWithBearerToken(ctx, bearerToken[1]))) + } + logger.Warn("Invalid authorization header, skipping bearer token propagation") + } else { + h.ServeHTTP(w, r.WithContext(ctx)) + } + }) +} + // Start http, GRPC and cmux servers concurrently func (s *Server) Start() error { conn, err := net.Listen("tcp", fmt.Sprintf(":%d", s.queryOptions.Port)) diff --git a/cmd/query/main.go b/cmd/query/main.go index a601d9c7bc28..d9a78794b0ba 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -38,6 +38,7 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage" "github.com/jaegertracing/jaeger/ports" istorage "github.com/jaegertracing/jaeger/storage" + "github.com/jaegertracing/jaeger/storage/spanstore" storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" ) @@ -78,7 +79,9 @@ func main() { } defer closer.Close() opentracing.SetGlobalTracer(tracer) - + queryOpts := new(app.QueryOptions).InitFromViper(v) + // TODO: Need to figure out set enable/disable propagation on storage plugins. + v.Set(spanstore.StoragePropagationKey, queryOpts.BearerTokenPropagation) storageFactory.InitFromViper(v) if err := storageFactory.Initialize(baseFactory, logger); err != nil { logger.Fatal("Failed to init storage factory", zap.Error(err)) @@ -98,7 +101,6 @@ func main() { dependencyReader, *queryServiceOptions) - queryOpts := new(app.QueryOptions).InitFromViper(v) server := app.NewServer(svc, queryService, queryOpts, tracer) if err := server.Start(); err != nil { diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 84c7a6cfaf13..e20d6e8edfd4 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -32,32 +32,34 @@ import ( "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/pkg/es/wrapper" + "github.com/jaegertracing/jaeger/storage/spanstore" storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" ) // Configuration describes the configuration properties needed to connect to an ElasticSearch cluster type Configuration struct { - Servers []string - Username string - Password string - TokenFilePath string - Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing - MaxNumSpans int // defines maximum number of spans to fetch from storage per query - MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads - NumShards int64 `yaml:"shards"` - NumReplicas int64 `yaml:"replicas"` - Timeout time.Duration `validate:"min=500"` - BulkSize int - BulkWorkers int - BulkActions int - BulkFlushInterval time.Duration - IndexPrefix string - TagsFilePath string - AllTagsAsFields bool - TagDotReplacement string - Enabled bool - TLS TLSConfig - UseReadWriteAliases bool + Servers []string + Username string + Password string + TokenFilePath string + AllowTokenFromContext bool + Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing + MaxNumSpans int // defines maximum number of spans to fetch from storage per query + MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads + NumShards int64 `yaml:"shards"` + NumReplicas int64 `yaml:"replicas"` + Timeout time.Duration `validate:"min=500"` + BulkSize int + BulkWorkers int + BulkActions int + BulkFlushInterval time.Duration + IndexPrefix string + TagsFilePath string + AllTagsAsFields bool + TagDotReplacement string + Enabled bool + TLS TLSConfig + UseReadWriteAliases bool } // TLSConfig describes the configuration properties to connect tls enabled ElasticSearch cluster @@ -248,7 +250,8 @@ func (c *Configuration) IsEnabled() bool { // getConfigOptions wraps the configs to feed to the ElasticSearch client init func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) { - options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)} + options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer), + elastic.SetHealthcheck(!c.AllowTokenFromContext)} httpClient := &http.Client{ Timeout: c.Timeout, } @@ -271,14 +274,21 @@ func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) { } httpTransport.TLSClientConfig = &tls.Config{RootCAs: ca} } + + token := "" if c.TokenFilePath != "" { - token, err := loadToken(c.TokenFilePath) + tokenFromFile, err := loadToken(c.TokenFilePath) if err != nil { return nil, err } + token = tokenFromFile + } + + if token != "" || c.AllowTokenFromContext { httpClient.Transport = &tokenAuthTransport{ - token: token, - wrapped: httpTransport, + token: token, + allowOverrideFromCtx: c.AllowTokenFromContext, + wrapped: httpTransport, } } else { httpClient.Transport = httpTransport @@ -329,12 +339,17 @@ func (tlsConfig *TLSConfig) loadPrivateKey() (*tls.Certificate, error) { // TokenAuthTransport type tokenAuthTransport struct { - token string - wrapped *http.Transport + token string + allowOverrideFromCtx bool + wrapped *http.Transport } func (tr *tokenAuthTransport) RoundTrip(r *http.Request) (*http.Response, error) { - r.Header.Set("Authorization", "Bearer "+tr.token) + token := tr.token + if tr.allowOverrideFromCtx { + token, _ = spanstore.GetBearerToken(r.Context()) + } + r.Header.Set("Authorization", "Bearer "+token) return tr.wrapped.RoundTrip(r) } diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 2b168303c50e..33b630130f89 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -22,6 +22,7 @@ import ( "github.com/spf13/viper" "github.com/jaegertracing/jaeger/pkg/es/config" + "github.com/jaegertracing/jaeger/storage/spanstore" ) const ( @@ -255,6 +256,8 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.TagDotReplacement = v.GetString(cfg.namespace + suffixTagDeDotChar) cfg.UseReadWriteAliases = v.GetBool(cfg.namespace + suffixReadAlias) cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled) + // TODO: Need to figure out a better way for do this. + cfg.AllowTokenFromContext = v.GetBool(spanstore.StoragePropagationKey) } // GetPrimary returns primary configuration. diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 494c445383f4..4eeb1b2a43aa 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -125,7 +125,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { logger: p.Logger, maxSpanAge: p.MaxSpanAge, maxNumSpans: p.MaxNumSpans, - serviceOperationStorage: NewServiceOperationStorage(ctx, p.Client, p.Logger, 0), // the decorator takes care of metrics + serviceOperationStorage: NewServiceOperationStorage(p.Client, p.Logger, 0), // the decorator takes care of metrics spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex), serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), @@ -226,22 +226,20 @@ func (s *SpanReader) unmarshalJSONSpan(esSpanRaw *elastic.SearchHit) (*dbmodel.S // GetServices returns all services traced by Jaeger, ordered by frequency func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) { - //lint:ignore SA4006 failing to re-assign context is worse than unused variable span, ctx := opentracing.StartSpanFromContext(ctx, "GetServices") defer span.Finish() currentTime := time.Now() jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) - return s.serviceOperationStorage.getServices(jaegerIndices) + return s.serviceOperationStorage.getServices(ctx, jaegerIndices) } // GetOperations returns all operations for a specific service traced by Jaeger func (s *SpanReader) GetOperations(ctx context.Context, service string) ([]string, error) { - //lint:ignore SA4006 failing to re-assign context is worse than unused variable span, ctx := opentracing.StartSpanFromContext(ctx, "GetOperations") defer span.Finish() currentTime := time.Now() jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) - return s.serviceOperationStorage.getOperations(jaegerIndices, service) + return s.serviceOperationStorage.getOperations(ctx, jaegerIndices, service) } func bucketToStringArray(buckets []*elastic.AggregationBucketKeyItem) ([]string, error) { diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 8228b992741b..7fd96dda9cd6 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io/ioutil" + "reflect" "testing" "time" @@ -749,7 +750,10 @@ func mockSearchService(r *spanReaderTest) *mock.Call { searchService.On("Aggregation", stringMatcher(operationsAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService) searchService.On("Aggregation", stringMatcher(traceIDAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService) r.client.On("Search", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(searchService) - return searchService.On("Do", mock.AnythingOfType("*context.emptyCtx")) + return searchService.On("Do", mock.MatchedBy(func(ctx context.Context) bool{ + t := reflect.TypeOf(ctx).String() + return t == "*context.valueCtx" || t == "*context.emptyCtx" + })) } func TestTraceQueryParameterValidation(t *testing.T) { diff --git a/plugin/storage/es/spanstore/service_operation.go b/plugin/storage/es/spanstore/service_operation.go index 91ccd088392c..de7baef047d4 100644 --- a/plugin/storage/es/spanstore/service_operation.go +++ b/plugin/storage/es/spanstore/service_operation.go @@ -38,7 +38,6 @@ const ( // ServiceOperationStorage stores service to operation pairs. type ServiceOperationStorage struct { - ctx context.Context client es.Client logger *zap.Logger serviceCache cache.Cache @@ -46,13 +45,11 @@ type ServiceOperationStorage struct { // NewServiceOperationStorage returns a new ServiceOperationStorage. func NewServiceOperationStorage( - ctx context.Context, client es.Client, logger *zap.Logger, cacheTTL time.Duration, ) *ServiceOperationStorage { return &ServiceOperationStorage{ - ctx: ctx, client: client, logger: logger, serviceCache: cache.NewLRUWithOptions( @@ -79,7 +76,7 @@ func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *dbmodel.Span } } -func (s *ServiceOperationStorage) getServices(indices []string) ([]string, error) { +func (s *ServiceOperationStorage) getServices(context context.Context, indices []string) ([]string, error) { serviceAggregation := getServicesAggregation() searchService := s.client.Search(indices...). @@ -88,7 +85,7 @@ func (s *ServiceOperationStorage) getServices(indices []string) ([]string, error IgnoreUnavailable(true). Aggregation(servicesAggregation, serviceAggregation) - searchResult, err := searchService.Do(s.ctx) + searchResult, err := searchService.Do(context) if err != nil { return nil, errors.Wrap(err, "Search service failed") } @@ -109,7 +106,7 @@ func getServicesAggregation() elastic.Query { Size(defaultDocCount) // Must set to some large number. ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 } -func (s *ServiceOperationStorage) getOperations(indices []string, service string) ([]string, error) { +func (s *ServiceOperationStorage) getOperations(context context.Context, indices []string, service string) ([]string, error) { serviceQuery := elastic.NewTermQuery(serviceName, service) serviceFilter := getOperationsAggregation() @@ -120,7 +117,7 @@ func (s *ServiceOperationStorage) getOperations(indices []string, service string IgnoreUnavailable(true). Aggregation(operationsAggregation, serviceFilter) - searchResult, err := searchService.Do(s.ctx) + searchResult, err := searchService.Do(context) if err != nil { return nil, errors.Wrap(err, "Search service failed") } diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index b30a99da6d8a..40a2f16aadf4 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -75,7 +75,7 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter { ctx := context.Background() // TODO: Configurable TTL - serviceOperationStorage := NewServiceOperationStorage(ctx, p.Client, p.Logger, time.Hour*12) + serviceOperationStorage := NewServiceOperationStorage(p.Client, p.Logger, time.Hour*12) return &SpanWriter{ ctx: ctx, client: p.Client, diff --git a/storage/spanstore/token_propagation.go b/storage/spanstore/token_propagation.go new file mode 100644 index 000000000000..557f0a2dfea0 --- /dev/null +++ b/storage/spanstore/token_propagation.go @@ -0,0 +1,36 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanstore + +import "context" + +type contextKey string + +const bearerToken = contextKey("bearer.token") + +// StoragePropagationKey is a key for viper configuration to pass this option to storage plugins. +const StoragePropagationKey = "storage.propagate.token" + +// ContextWithBearerToken set bearer token in context +func ContextWithBearerToken(ctx context.Context, token string) context.Context { + return context.WithValue(ctx, bearerToken, token) + +} + +// GetBearerToken from context, or empty string if there is no token +func GetBearerToken(ctx context.Context) (string, bool) { + val, ok := ctx.Value(bearerToken).(string) + return val, ok +}