Skip to content

Commit

Permalink
Yuri's approach for swapping client
Browse files Browse the repository at this point in the history
Signed-off-by: haanhvu <[email protected]>
  • Loading branch information
haanhvu committed Apr 21, 2023
1 parent f1d1a0e commit 6495012
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 53 deletions.
10 changes: 5 additions & 5 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (

// DependencyStore handles all queries and insertions to ElasticSearch dependencies
type DependencyStore struct {
client es.Client
client func() es.Client
logger *zap.Logger
dependencyIndexPrefix string
indexDateLayout string
Expand All @@ -48,7 +48,7 @@ type DependencyStore struct {

// DependencyStoreParams holds constructor parameters for NewDependencyStore
type DependencyStoreParams struct {
Client es.Client
Client func() es.Client
Logger *zap.Logger
IndexPrefix string
IndexDateLayout string
Expand Down Expand Up @@ -84,15 +84,15 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D

// CreateTemplates creates index templates.
func (s *DependencyStore) CreateTemplates(dependenciesTemplate string) error {
_, err := s.client.CreateTemplate("jaeger-dependencies").Body(dependenciesTemplate).Do(context.Background())
_, err := s.client().CreateTemplate("jaeger-dependencies").Body(dependenciesTemplate).Do(context.Background())
if err != nil {
return err
}
return nil
}

func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, dependencies []model.DependencyLink) {
s.client.Index().Index(indexName).Type(dependencyType).
s.client().Index().Index(indexName).Type(dependencyType).
BodyJson(&dbmodel.TimeDependencies{
Timestamp: ts,
Dependencies: dbmodel.FromDomainDependencies(dependencies),
Expand All @@ -102,7 +102,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe
// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
indices := s.getReadIndices(endTs, lookback)
searchResult, err := s.client.Search(indices...).
searchResult, err := s.client().Search(indices...).
Size(s.maxDocCount).
Query(buildTSQuery(endTs, lookback)).
IgnoreUnavailable(true).
Expand Down
5 changes: 3 additions & 2 deletions plugin/storage/es/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/mocks"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage/dependencystore"
Expand All @@ -51,7 +52,7 @@ func withDepStorage(indexPrefix, indexDateLayout string, maxDocCount int, fn fun
logger: logger,
logBuffer: logBuffer,
storage: NewDependencyStore(DependencyStoreParams{
Client: client,
Client: func() es.Client { return client },
Logger: logger,
IndexPrefix: indexPrefix,
IndexDateLayout: indexDateLayout,
Expand All @@ -78,7 +79,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) {
for _, testCase := range testCases {
client := &mocks.Client{}
r := NewDependencyStore(DependencyStoreParams{
Client: client,
Client: func() es.Client { return client },
Logger: zap.NewNop(),
IndexPrefix: testCase.prefix,
IndexDateLayout: "2006-01-02",
Expand Down
6 changes: 3 additions & 3 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func createSpanReader(
return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping")
}
return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{
Client: client,
Client: func() es.Client { return client },
Logger: logger,
MetricsFactory: mFactory,
MaxDocCount: cfg.MaxDocCount,
Expand Down Expand Up @@ -204,7 +204,7 @@ func createSpanWriter(
return nil, err
}
writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{
Client: client,
Client: func() es.Client { return client },
Logger: logger,
MetricsFactory: mFactory,
IndexPrefix: cfg.IndexPrefix,
Expand Down Expand Up @@ -233,7 +233,7 @@ func createDependencyReader(
cfg *config.Configuration,
) (dependencystore.Reader, error) {
reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{
Client: client,
Client: func() es.Client { return client },
Logger: logger,
IndexPrefix: cfg.IndexPrefix,
IndexDateLayout: cfg.IndexDateLayoutDependencies,
Expand Down
8 changes: 4 additions & 4 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ var (

// SpanReader can query for and load traces from ElasticSearch
type SpanReader struct {
client es.Client
client func() es.Client
logger *zap.Logger
// The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day,
// this will be rounded down to UTC 00:00 of that day.
Expand All @@ -113,7 +113,7 @@ type SpanReader struct {

// SpanReaderParams holds constructor params for NewSpanReader
type SpanReaderParams struct {
Client es.Client
Client func() es.Client
Logger *zap.Logger
MaxSpanAge time.Duration
MaxDocCount int
Expand Down Expand Up @@ -401,7 +401,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st
}
// set traceIDs to empty
traceIDs = nil
results, err := s.client.MultiSearch().Add(searchRequests...).Index(indices...).Do(ctx)
results, err := s.client().MultiSearch().Add(searchRequests...).Index(indices...).Do(ctx)
if err != nil {
logErrorToSpan(childSpan, err)
return nil, err
Expand Down Expand Up @@ -563,7 +563,7 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra
boolQuery := s.buildFindTraceIDsQuery(traceQuery)
jaegerIndices := s.timeRangeIndices(s.spanIndexPrefix, s.spanIndexDateLayout, traceQuery.StartTimeMin, traceQuery.StartTimeMax, s.spanIndexRolloverFrequency)

searchService := s.client.Search(jaegerIndices...).
searchService := s.client().Search(jaegerIndices...).
Size(0). // set to 0 because we don't want actual documents.
Aggregation(traceIDAggregation, aggregation).
IgnoreUnavailable(true).
Expand Down
27 changes: 14 additions & 13 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/mocks"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel"
Expand Down Expand Up @@ -97,7 +98,7 @@ func withSpanReader(fn func(r *spanReaderTest)) {
logger: logger,
logBuffer: logBuffer,
reader: NewSpanReader(SpanReaderParams{
Client: client,
Client: func() es.Client { return client },
Logger: zap.NewNop(),
MaxSpanAge: 0,
IndexPrefix: "",
Expand All @@ -116,7 +117,7 @@ func withArchiveSpanReader(readAlias bool, fn func(r *spanReaderTest)) {
logger: logger,
logBuffer: logBuffer,
reader: NewSpanReader(SpanReaderParams{
Client: client,
Client: func() es.Client { return client },
Logger: zap.NewNop(),
MaxSpanAge: 0,
IndexPrefix: "",
Expand Down Expand Up @@ -177,56 +178,56 @@ func TestSpanReaderIndices(t *testing.T) {
}{
{
params: SpanReaderParams{
Client: client, Logger: logger, MetricsFactory: metricsFactory,
Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout,
},
indices: []string{spanIndex + spanDataLayoutFormat, serviceIndex + serviceDataLayoutFormat},
},
{
params: SpanReaderParams{
Client: client, Logger: logger, MetricsFactory: metricsFactory,
Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", UseReadWriteAliases: true,
},
indices: []string{spanIndex + "read", serviceIndex + "read"},
},
{
params: SpanReaderParams{
Client: client, Logger: logger, MetricsFactory: metricsFactory,
Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout,
},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndex + serviceDataLayoutFormat},
},
{
params: SpanReaderParams{
Client: client, Logger: logger, MetricsFactory: metricsFactory,
Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", UseReadWriteAliases: true,
},
indices: []string{"foo:-" + spanIndex + "read", "foo:-" + serviceIndex + "read"},
},
{
params: SpanReaderParams{
Client: client, Logger: logger, MetricsFactory: metricsFactory,
Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: true,
},
indices: []string{spanIndex + archiveIndexSuffix, serviceIndex + archiveIndexSuffix},
},
{
params: SpanReaderParams{
Client: client, Logger: logger, MetricsFactory: metricsFactory,
Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: true,
},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveIndexSuffix},
},
{
params: SpanReaderParams{
Client: client, Logger: logger, MetricsFactory: metricsFactory,
Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true,
},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveReadIndexSuffix},
},
{
params: SpanReaderParams{
Client: client, Logger: logger, MetricsFactory: metricsFactory,
Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout,
},
indices: []string{
Expand All @@ -240,7 +241,7 @@ func TestSpanReaderIndices(t *testing.T) {
},
{
params: SpanReaderParams{
Client: client, Logger: logger, MetricsFactory: metricsFactory,
Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"},
},
indices: []string{
Expand All @@ -254,7 +255,7 @@ func TestSpanReaderIndices(t *testing.T) {
},
{
params: SpanReaderParams{
Client: client, Logger: logger, MetricsFactory: metricsFactory,
Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"},
},
indices: []string{
Expand All @@ -268,7 +269,7 @@ func TestSpanReaderIndices(t *testing.T) {
},
{
params: SpanReaderParams{
Client: client, Logger: logger, MetricsFactory: metricsFactory,
Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: true, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"},
},
indices: []string{
Expand Down
10 changes: 5 additions & 5 deletions plugin/storage/es/spanstore/service_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ const (

// ServiceOperationStorage stores service to operation pairs.
type ServiceOperationStorage struct {
client es.Client
client func() es.Client
logger *zap.Logger
serviceCache cache.Cache
}

// NewServiceOperationStorage returns a new ServiceOperationStorage.
func NewServiceOperationStorage(
client es.Client,
client func() es.Client,
logger *zap.Logger,
cacheTTL time.Duration,
) *ServiceOperationStorage {
Expand All @@ -72,15 +72,15 @@ func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *dbmodel.Span

cacheKey := hashCode(service)
if !keyInCache(cacheKey, s.serviceCache) {
s.client.Index().Index(indexName).Type(serviceType).Id(cacheKey).BodyJson(service).Add()
s.client().Index().Index(indexName).Type(serviceType).Id(cacheKey).BodyJson(service).Add()
writeCache(cacheKey, s.serviceCache)
}
}

func (s *ServiceOperationStorage) getServices(context context.Context, indices []string, maxDocCount int) ([]string, error) {
serviceAggregation := getServicesAggregation(maxDocCount)

searchService := s.client.Search(indices...).
searchService := s.client().Search(indices...).
Size(0). // set to 0 because we don't want actual documents.
IgnoreUnavailable(true).
Aggregation(servicesAggregation, serviceAggregation)
Expand Down Expand Up @@ -110,7 +110,7 @@ func (s *ServiceOperationStorage) getOperations(context context.Context, indices
serviceQuery := elastic.NewTermQuery(serviceName, service)
serviceFilter := getOperationsAggregation(maxDocCount)

searchService := s.client.Search(indices...).
searchService := s.client().Search(indices...).
Size(0).
Query(serviceQuery).
IgnoreUnavailable(true).
Expand Down
12 changes: 6 additions & 6 deletions plugin/storage/es/spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type serviceWriter func(string, *dbmodel.Span)

// SpanWriter is a wrapper around elastic.Client
type SpanWriter struct {
client es.Client
client func() es.Client
logger *zap.Logger
writerMetrics spanWriterMetrics // TODO: build functions to wrap around each Do fn
indexCache cache.Cache
Expand All @@ -56,7 +56,7 @@ type SpanWriter struct {

// SpanWriterParams holds constructor parameters for NewSpanWriter
type SpanWriterParams struct {
Client es.Client
Client func() es.Client
Logger *zap.Logger
MetricsFactory metrics.Factory
IndexPrefix string
Expand Down Expand Up @@ -107,11 +107,11 @@ func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate, indexPrefix
if indexPrefix != "" && !strings.HasSuffix(indexPrefix, "-") {
indexPrefix += "-"
}
_, err := s.client.CreateTemplate(indexPrefix + "jaeger-span").Body(spanTemplate).Do(context.Background())
_, err := s.client().CreateTemplate(indexPrefix + "jaeger-span").Body(spanTemplate).Do(context.Background())
if err != nil {
return err
}
_, err = s.client.CreateTemplate(indexPrefix + "jaeger-service").Body(serviceTemplate).Do(context.Background())
_, err = s.client().CreateTemplate(indexPrefix + "jaeger-service").Body(serviceTemplate).Do(context.Background())
if err != nil {
return err
}
Expand Down Expand Up @@ -159,7 +159,7 @@ func (s *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error {

// Close closes SpanWriter
func (s *SpanWriter) Close() error {
return s.client.Close()
return s.client().Close()
}

func keyInCache(key string, c cache.Cache) bool {
Expand All @@ -175,5 +175,5 @@ func (s *SpanWriter) writeService(indexName string, jsonSpan *dbmodel.Span) {
}

func (s *SpanWriter) writeSpan(indexName string, jsonSpan *dbmodel.Span) {
s.client.Index().Index(indexName).Type(spanType).BodyJson(&jsonSpan).Add()
s.client().Index().Index(indexName).Type(spanType).BodyJson(&jsonSpan).Add()
}
Loading

0 comments on commit 6495012

Please sign in to comment.