Skip to content

Commit

Permalink
Passthrough OAuth bearer token supplied to Query service through to E…
Browse files Browse the repository at this point in the history
…S storage

Signed-off-by: Ruben Vargas <[email protected]>
  • Loading branch information
rubenvp8510 committed Jun 19, 2019
1 parent f62a0a3 commit e48b633
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 51 deletions.
14 changes: 10 additions & 4 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
)

const (
queryPort = "query.port"
queryBasePath = "query.base-path"
queryStaticFiles = "query.static-files"
queryUIConfig = "query.ui-config"
queryPort = "query.port"
queryBasePath = "query.base-path"
queryStaticFiles = "query.static-files"
queryUIConfig = "query.ui-config"
queryTokenPropagation = "query.bearer-token-propagation"
)

// QueryOptions holds configuration for query service
Expand All @@ -39,6 +40,8 @@ type QueryOptions struct {
StaticAssets string
// UIConfig is the path to a configuration file for the UI
UIConfig string
// BearerTokenPropagation activate/deactivate bearer token propagation to storage
BearerTokenPropagation bool
}

// AddFlags adds flags for QueryOptions
Expand All @@ -47,6 +50,8 @@ func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(queryBasePath, "/", "The base path for all HTTP routes, e.g. /jaeger; useful when running behind a reverse proxy")
flagSet.String(queryStaticFiles, "", "The directory path override for the static assets for the UI")
flagSet.String(queryUIConfig, "", "The path to the UI configuration file in JSON format")
flagSet.Bool(queryTokenPropagation, true, "Allow propagation of bearer token to be used by storage plugins")

}

// InitFromViper initializes QueryOptions with properties from viper
Expand All @@ -55,5 +60,6 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper) *QueryOptions {
qOpts.BasePath = v.GetString(queryBasePath)
qOpts.StaticAssets = v.GetString(queryStaticFiles)
qOpts.UIConfig = v.GetString(queryUIConfig)
qOpts.BearerTokenPropagation = v.GetBool(queryTokenPropagation)
return qOpts
}
26 changes: 23 additions & 3 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// Server runs HTTP, Mux and a grpc server
Expand Down Expand Up @@ -80,14 +81,33 @@ func createHTTPServer(querySvc *querysvc.QueryService, queryOpts *QueryOptions,

apiHandler.RegisterRoutes(r)
RegisterStaticHandler(r, logger, queryOpts)
compressHandler := handlers.CompressHandler(r)
var handler http.Handler = r
if queryOpts.BearerTokenPropagation {
handler = bearTokenPropagationHandler(logger, r)
}
handler = handlers.CompressHandler(handler)
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)

return &http.Server{
Handler: recoveryHandler(compressHandler),
Handler: recoveryHandler(handler),
}
}

func bearTokenPropagationHandler(logger *zap.Logger, h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
authHeaderValue := r.Header.Get("Authorization")
if authHeaderValue != "" {
bearerToken := strings.Split(authHeaderValue, " ")
if len(bearerToken) == 2 {
h.ServeHTTP(w, r.WithContext(spanstore.ContextWithBearerToken(ctx, bearerToken[1])))
}
logger.Warn("Invalid authorization header, skipping bearer token propagation")
} else {
h.ServeHTTP(w, r.WithContext(ctx))
}
})
}

// Start http, GRPC and cmux servers concurrently
func (s *Server) Start() error {
conn, err := net.Listen("tcp", fmt.Sprintf(":%d", s.queryOptions.Port))
Expand Down
6 changes: 4 additions & 2 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
istorage "github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)

Expand Down Expand Up @@ -78,7 +79,9 @@ func main() {
}
defer closer.Close()
opentracing.SetGlobalTracer(tracer)

queryOpts := new(app.QueryOptions).InitFromViper(v)
// TODO: Need to figure out set enable/disable propagation on storage plugins.
v.Set(spanstore.StoragePropagationKey, queryOpts.BearerTokenPropagation)
storageFactory.InitFromViper(v)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
Expand All @@ -98,7 +101,6 @@ func main() {
dependencyReader,
*queryServiceOptions)

queryOpts := new(app.QueryOptions).InitFromViper(v)
server := app.NewServer(svc, queryService, queryOpts, tracer)

if err := server.Start(); err != nil {
Expand Down
71 changes: 43 additions & 28 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,34 @@ import (

"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/wrapper"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)

// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string
Username string
Password string
TokenFilePath string
Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
MaxNumSpans int // defines maximum number of spans to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
Timeout time.Duration `validate:"min=500"`
BulkSize int
BulkWorkers int
BulkActions int
BulkFlushInterval time.Duration
IndexPrefix string
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
Enabled bool
TLS TLSConfig
UseReadWriteAliases bool
Servers []string
Username string
Password string
TokenFilePath string
AllowTokenFromContext bool
Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
MaxNumSpans int // defines maximum number of spans to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
Timeout time.Duration `validate:"min=500"`
BulkSize int
BulkWorkers int
BulkActions int
BulkFlushInterval time.Duration
IndexPrefix string
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
Enabled bool
TLS TLSConfig
UseReadWriteAliases bool
}

// TLSConfig describes the configuration properties to connect tls enabled ElasticSearch cluster
Expand Down Expand Up @@ -248,7 +250,8 @@ func (c *Configuration) IsEnabled() bool {

// getConfigOptions wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) {
options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)}
options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer),
elastic.SetHealthcheck(!c.AllowTokenFromContext)}
httpClient := &http.Client{
Timeout: c.Timeout,
}
Expand All @@ -271,14 +274,21 @@ func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) {
}
httpTransport.TLSClientConfig = &tls.Config{RootCAs: ca}
}

token := ""
if c.TokenFilePath != "" {
token, err := loadToken(c.TokenFilePath)
tokenFromFile, err := loadToken(c.TokenFilePath)
if err != nil {
return nil, err
}
token = tokenFromFile
}

if token != "" || c.AllowTokenFromContext {
httpClient.Transport = &tokenAuthTransport{
token: token,
wrapped: httpTransport,
token: token,
allowOverrideFromCtx: c.AllowTokenFromContext,
wrapped: httpTransport,
}
} else {
httpClient.Transport = httpTransport
Expand Down Expand Up @@ -329,12 +339,17 @@ func (tlsConfig *TLSConfig) loadPrivateKey() (*tls.Certificate, error) {

// TokenAuthTransport
type tokenAuthTransport struct {
token string
wrapped *http.Transport
token string
allowOverrideFromCtx bool
wrapped *http.Transport
}

func (tr *tokenAuthTransport) RoundTrip(r *http.Request) (*http.Response, error) {
r.Header.Set("Authorization", "Bearer "+tr.token)
token := tr.token
if tr.allowOverrideFromCtx {
token, _ = spanstore.GetBearerToken(r.Context())
}
r.Header.Set("Authorization", "Bearer "+token)
return tr.wrapped.RoundTrip(r)
}

Expand Down
3 changes: 3 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
Expand Down Expand Up @@ -255,6 +256,8 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.TagDotReplacement = v.GetString(cfg.namespace + suffixTagDeDotChar)
cfg.UseReadWriteAliases = v.GetBool(cfg.namespace + suffixReadAlias)
cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled)
// TODO: Need to figure out a better way for do this.
cfg.AllowTokenFromContext = v.GetBool(spanstore.StoragePropagationKey)
}

// GetPrimary returns primary configuration.
Expand Down
8 changes: 3 additions & 5 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader {
logger: p.Logger,
maxSpanAge: p.MaxSpanAge,
maxNumSpans: p.MaxNumSpans,
serviceOperationStorage: NewServiceOperationStorage(ctx, p.Client, p.Logger, 0), // the decorator takes care of metrics
serviceOperationStorage: NewServiceOperationStorage(p.Client, p.Logger, 0), // the decorator takes care of metrics
spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex),
serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex),
spanConverter: dbmodel.NewToDomain(p.TagDotReplacement),
Expand Down Expand Up @@ -226,22 +226,20 @@ func (s *SpanReader) unmarshalJSONSpan(esSpanRaw *elastic.SearchHit) (*dbmodel.S

// GetServices returns all services traced by Jaeger, ordered by frequency
func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) {
//lint:ignore SA4006 failing to re-assign context is worse than unused variable
span, ctx := opentracing.StartSpanFromContext(ctx, "GetServices")
defer span.Finish()
currentTime := time.Now()
jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime)
return s.serviceOperationStorage.getServices(jaegerIndices)
return s.serviceOperationStorage.getServices(ctx, jaegerIndices)
}

// GetOperations returns all operations for a specific service traced by Jaeger
func (s *SpanReader) GetOperations(ctx context.Context, service string) ([]string, error) {
//lint:ignore SA4006 failing to re-assign context is worse than unused variable
span, ctx := opentracing.StartSpanFromContext(ctx, "GetOperations")
defer span.Finish()
currentTime := time.Now()
jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime)
return s.serviceOperationStorage.getOperations(jaegerIndices, service)
return s.serviceOperationStorage.getOperations(ctx, jaegerIndices, service)
}

func bucketToStringArray(buckets []*elastic.AggregationBucketKeyItem) ([]string, error) {
Expand Down
6 changes: 5 additions & 1 deletion plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -749,7 +750,10 @@ func mockSearchService(r *spanReaderTest) *mock.Call {
searchService.On("Aggregation", stringMatcher(operationsAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService)
searchService.On("Aggregation", stringMatcher(traceIDAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService)
r.client.On("Search", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(searchService)
return searchService.On("Do", mock.AnythingOfType("*context.emptyCtx"))
return searchService.On("Do", mock.MatchedBy(func(ctx context.Context) bool{
t := reflect.TypeOf(ctx).String()
return t == "*context.valueCtx" || t == "*context.emptyCtx"
}))
}

func TestTraceQueryParameterValidation(t *testing.T) {
Expand Down
11 changes: 4 additions & 7 deletions plugin/storage/es/spanstore/service_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,18 @@ const (

// ServiceOperationStorage stores service to operation pairs.
type ServiceOperationStorage struct {
ctx context.Context
client es.Client
logger *zap.Logger
serviceCache cache.Cache
}

// NewServiceOperationStorage returns a new ServiceOperationStorage.
func NewServiceOperationStorage(
ctx context.Context,
client es.Client,
logger *zap.Logger,
cacheTTL time.Duration,
) *ServiceOperationStorage {
return &ServiceOperationStorage{
ctx: ctx,
client: client,
logger: logger,
serviceCache: cache.NewLRUWithOptions(
Expand All @@ -79,7 +76,7 @@ func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *dbmodel.Span
}
}

func (s *ServiceOperationStorage) getServices(indices []string) ([]string, error) {
func (s *ServiceOperationStorage) getServices(context context.Context, indices []string) ([]string, error) {
serviceAggregation := getServicesAggregation()

searchService := s.client.Search(indices...).
Expand All @@ -88,7 +85,7 @@ func (s *ServiceOperationStorage) getServices(indices []string) ([]string, error
IgnoreUnavailable(true).
Aggregation(servicesAggregation, serviceAggregation)

searchResult, err := searchService.Do(s.ctx)
searchResult, err := searchService.Do(context)
if err != nil {
return nil, errors.Wrap(err, "Search service failed")
}
Expand All @@ -109,7 +106,7 @@ func getServicesAggregation() elastic.Query {
Size(defaultDocCount) // Must set to some large number. ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838
}

func (s *ServiceOperationStorage) getOperations(indices []string, service string) ([]string, error) {
func (s *ServiceOperationStorage) getOperations(context context.Context, indices []string, service string) ([]string, error) {
serviceQuery := elastic.NewTermQuery(serviceName, service)
serviceFilter := getOperationsAggregation()

Expand All @@ -120,7 +117,7 @@ func (s *ServiceOperationStorage) getOperations(indices []string, service string
IgnoreUnavailable(true).
Aggregation(operationsAggregation, serviceFilter)

searchResult, err := searchService.Do(s.ctx)
searchResult, err := searchService.Do(context)
if err != nil {
return nil, errors.Wrap(err, "Search service failed")
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/es/spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter {
ctx := context.Background()

// TODO: Configurable TTL
serviceOperationStorage := NewServiceOperationStorage(ctx, p.Client, p.Logger, time.Hour*12)
serviceOperationStorage := NewServiceOperationStorage(p.Client, p.Logger, time.Hour*12)
return &SpanWriter{
ctx: ctx,
client: p.Client,
Expand Down
36 changes: 36 additions & 0 deletions storage/spanstore/token_propagation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package spanstore

import "context"

type contextKey string

const bearerToken = contextKey("bearer.token")

// StoragePropagationKey is a key for viper configuration to pass this option to storage plugins.
const StoragePropagationKey = "storage.propagate.token"

// ContextWithBearerToken set bearer token in context
func ContextWithBearerToken(ctx context.Context, token string) context.Context {
return context.WithValue(ctx, bearerToken, token)

}

// GetBearerToken from context, or empty string if there is no token
func GetBearerToken(ctx context.Context) (string, bool) {
val, ok := ctx.Value(bearerToken).(string)
return val, ok
}

0 comments on commit e48b633

Please sign in to comment.