Skip to content

Commit

Permalink
Passthrough OAuth bearer token supplied to Query service through to E…
Browse files Browse the repository at this point in the history
…S storage

Signed-off-by: Ruben Vargas <[email protected]>
  • Loading branch information
rubenvp8510 committed Jun 19, 2019
1 parent f62a0a3 commit 22e1a92
Show file tree
Hide file tree
Showing 13 changed files with 243 additions and 53 deletions.
14 changes: 10 additions & 4 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
)

const (
queryPort = "query.port"
queryBasePath = "query.base-path"
queryStaticFiles = "query.static-files"
queryUIConfig = "query.ui-config"
queryPort = "query.port"
queryBasePath = "query.base-path"
queryStaticFiles = "query.static-files"
queryUIConfig = "query.ui-config"
queryTokenPropagation = "query.bearer-token-propagation"
)

// QueryOptions holds configuration for query service
Expand All @@ -39,6 +40,8 @@ type QueryOptions struct {
StaticAssets string
// UIConfig is the path to a configuration file for the UI
UIConfig string
// BearerTokenPropagation activate/deactivate bearer token propagation to storage
BearerTokenPropagation bool
}

// AddFlags adds flags for QueryOptions
Expand All @@ -47,6 +50,8 @@ func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(queryBasePath, "/", "The base path for all HTTP routes, e.g. /jaeger; useful when running behind a reverse proxy")
flagSet.String(queryStaticFiles, "", "The directory path override for the static assets for the UI")
flagSet.String(queryUIConfig, "", "The path to the UI configuration file in JSON format")
flagSet.Bool(queryTokenPropagation, true, "Allow propagation of bearer token to be used by storage plugins")

}

// InitFromViper initializes QueryOptions with properties from viper
Expand All @@ -55,5 +60,6 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper) *QueryOptions {
qOpts.BasePath = v.GetString(queryBasePath)
qOpts.StaticAssets = v.GetString(queryStaticFiles)
qOpts.UIConfig = v.GetString(queryUIConfig)
qOpts.BearerTokenPropagation = v.GetBool(queryTokenPropagation)
return qOpts
}
11 changes: 7 additions & 4 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"strings"

"github.com/gorilla/handlers"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -80,11 +80,14 @@ func createHTTPServer(querySvc *querysvc.QueryService, queryOpts *QueryOptions,

apiHandler.RegisterRoutes(r)
RegisterStaticHandler(r, logger, queryOpts)
compressHandler := handlers.CompressHandler(r)
var handler http.Handler = r
if queryOpts.BearerTokenPropagation {
handler = bearTokenPropagationHandler(logger, r)
}
handler = handlers.CompressHandler(handler)
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)

return &http.Server{
Handler: recoveryHandler(compressHandler),
Handler: recoveryHandler(handler),
}
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func TestServer(t *testing.T) {
querySvc := &querysvc.QueryService{}
tracer := opentracing.NoopTracer{}

server := NewServer(flagsSvc, querySvc, &QueryOptions{Port: ports.QueryAdminHTTP}, tracer)
server := NewServer(flagsSvc, querySvc, &QueryOptions{Port: ports.QueryAdminHTTP,
BearerTokenPropagation: true}, tracer)
assert.NoError(t, server.Start())

// TODO wait for servers to come up and test http and grpc endpoints
Expand Down
82 changes: 82 additions & 0 deletions cmd/query/app/token_propagation_hander_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"net/http"
"net/http/httptest"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/storage/spanstore"
)


func Test_bearTokenPropagationHandler(t *testing.T) {
logger := zap.NewNop()
bearerToken := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsIm5hbWUiOiJKb2huIERvZSIsImlhdCI"

validTokenHandler := func(stop *sync.WaitGroup) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
token, ok := spanstore.GetBearerToken(ctx)
assert.Equal(t, token, bearerToken)
assert.True(t, ok)
stop.Done()
})
}

emptyHandler := func(stop *sync.WaitGroup) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
token, _ := spanstore.GetBearerToken(ctx)
assert.Empty(t, token, bearerToken)
stop.Done()
})
}

testCases := []struct {
name string
sendHeader bool
header string
handler func(stop *sync.WaitGroup) http.HandlerFunc
}{
{ name:"Bearer token", sendHeader: true, header: "Bearer " + bearerToken, handler:validTokenHandler},
{ name:"Invalid header",sendHeader: true, header: bearerToken, handler:emptyHandler},
{ name:"No header", sendHeader: false, handler:emptyHandler},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
stop := sync.WaitGroup{}
stop.Add(1)
r := bearTokenPropagationHandler(logger, testCase.handler(&stop))
server := httptest.NewServer(r)
defer server.Close()
req , err := http.NewRequest("GET", server.URL, nil)
assert.Nil(t,err)
if testCase.sendHeader {
req.Header.Add("Authorization", testCase.header)
}
_, err = httpClient.Do(req)
assert.Nil(t, err)
stop.Wait()
})
}

}
43 changes: 43 additions & 0 deletions cmd/query/app/token_propagation_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"net/http"
"strings"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/storage/spanstore"
)

func bearTokenPropagationHandler(logger *zap.Logger, h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
authHeaderValue := r.Header.Get("Authorization")
if authHeaderValue != "" {
headerValue := strings.Split(authHeaderValue, " ")
token := ""
if len(headerValue) == 2 {
token = headerValue[1]
}
h.ServeHTTP(w, r.WithContext(spanstore.ContextWithBearerToken(ctx, token)))
logger.Warn("Invalid authorization header, skipping bearer token propagation")
} else {
h.ServeHTTP(w, r.WithContext(ctx))
}
})

}
6 changes: 4 additions & 2 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
istorage "github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)

Expand Down Expand Up @@ -78,7 +79,9 @@ func main() {
}
defer closer.Close()
opentracing.SetGlobalTracer(tracer)

queryOpts := new(app.QueryOptions).InitFromViper(v)
// TODO: Need to figure out set enable/disable propagation on storage plugins.
v.Set(spanstore.StoragePropagationKey, queryOpts.BearerTokenPropagation)
storageFactory.InitFromViper(v)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
Expand All @@ -98,7 +101,6 @@ func main() {
dependencyReader,
*queryServiceOptions)

queryOpts := new(app.QueryOptions).InitFromViper(v)
server := app.NewServer(svc, queryService, queryOpts, tracer)

if err := server.Start(); err != nil {
Expand Down
71 changes: 43 additions & 28 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,34 @@ import (

"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/wrapper"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)

// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string
Username string
Password string
TokenFilePath string
Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
MaxNumSpans int // defines maximum number of spans to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
Timeout time.Duration `validate:"min=500"`
BulkSize int
BulkWorkers int
BulkActions int
BulkFlushInterval time.Duration
IndexPrefix string
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
Enabled bool
TLS TLSConfig
UseReadWriteAliases bool
Servers []string
Username string
Password string
TokenFilePath string
AllowTokenFromContext bool
Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
MaxNumSpans int // defines maximum number of spans to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
Timeout time.Duration `validate:"min=500"`
BulkSize int
BulkWorkers int
BulkActions int
BulkFlushInterval time.Duration
IndexPrefix string
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
Enabled bool
TLS TLSConfig
UseReadWriteAliases bool
}

// TLSConfig describes the configuration properties to connect tls enabled ElasticSearch cluster
Expand Down Expand Up @@ -248,7 +250,8 @@ func (c *Configuration) IsEnabled() bool {

// getConfigOptions wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) {
options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)}
options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer),
elastic.SetHealthcheck(!c.AllowTokenFromContext)}
httpClient := &http.Client{
Timeout: c.Timeout,
}
Expand All @@ -271,14 +274,21 @@ func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) {
}
httpTransport.TLSClientConfig = &tls.Config{RootCAs: ca}
}

token := ""
if c.TokenFilePath != "" {
token, err := loadToken(c.TokenFilePath)
tokenFromFile, err := loadToken(c.TokenFilePath)
if err != nil {
return nil, err
}
token = tokenFromFile
}

if token != "" || c.AllowTokenFromContext {
httpClient.Transport = &tokenAuthTransport{
token: token,
wrapped: httpTransport,
token: token,
allowOverrideFromCtx: c.AllowTokenFromContext,
wrapped: httpTransport,
}
} else {
httpClient.Transport = httpTransport
Expand Down Expand Up @@ -329,12 +339,17 @@ func (tlsConfig *TLSConfig) loadPrivateKey() (*tls.Certificate, error) {

// TokenAuthTransport
type tokenAuthTransport struct {
token string
wrapped *http.Transport
token string
allowOverrideFromCtx bool
wrapped *http.Transport
}

func (tr *tokenAuthTransport) RoundTrip(r *http.Request) (*http.Response, error) {
r.Header.Set("Authorization", "Bearer "+tr.token)
token := tr.token
if tr.allowOverrideFromCtx {
token, _ = spanstore.GetBearerToken(r.Context())
}
r.Header.Set("Authorization", "Bearer "+token)
return tr.wrapped.RoundTrip(r)
}

Expand Down
3 changes: 3 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
Expand Down Expand Up @@ -255,6 +256,8 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.TagDotReplacement = v.GetString(cfg.namespace + suffixTagDeDotChar)
cfg.UseReadWriteAliases = v.GetBool(cfg.namespace + suffixReadAlias)
cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled)
// TODO: Need to figure out a better way for do this.
cfg.AllowTokenFromContext = v.GetBool(spanstore.StoragePropagationKey)
}

// GetPrimary returns primary configuration.
Expand Down
8 changes: 3 additions & 5 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader {
logger: p.Logger,
maxSpanAge: p.MaxSpanAge,
maxNumSpans: p.MaxNumSpans,
serviceOperationStorage: NewServiceOperationStorage(ctx, p.Client, p.Logger, 0), // the decorator takes care of metrics
serviceOperationStorage: NewServiceOperationStorage(p.Client, p.Logger, 0), // the decorator takes care of metrics
spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex),
serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex),
spanConverter: dbmodel.NewToDomain(p.TagDotReplacement),
Expand Down Expand Up @@ -226,22 +226,20 @@ func (s *SpanReader) unmarshalJSONSpan(esSpanRaw *elastic.SearchHit) (*dbmodel.S

// GetServices returns all services traced by Jaeger, ordered by frequency
func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) {
//lint:ignore SA4006 failing to re-assign context is worse than unused variable
span, ctx := opentracing.StartSpanFromContext(ctx, "GetServices")
defer span.Finish()
currentTime := time.Now()
jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime)
return s.serviceOperationStorage.getServices(jaegerIndices)
return s.serviceOperationStorage.getServices(ctx, jaegerIndices)
}

// GetOperations returns all operations for a specific service traced by Jaeger
func (s *SpanReader) GetOperations(ctx context.Context, service string) ([]string, error) {
//lint:ignore SA4006 failing to re-assign context is worse than unused variable
span, ctx := opentracing.StartSpanFromContext(ctx, "GetOperations")
defer span.Finish()
currentTime := time.Now()
jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime)
return s.serviceOperationStorage.getOperations(jaegerIndices, service)
return s.serviceOperationStorage.getOperations(ctx, jaegerIndices, service)
}

func bucketToStringArray(buckets []*elastic.AggregationBucketKeyItem) ([]string, error) {
Expand Down
Loading

0 comments on commit 22e1a92

Please sign in to comment.