Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Elasticsearch storage support for adaptive sampling #5158

Merged
merged 63 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
90969fa
some prog
Pushkarm029 Jan 28, 2024
8918fb0
fix needed
Pushkarm029 Jan 31, 2024
72a91a0
fix
Pushkarm029 Feb 2, 2024
4c4681d
Merge branch 'main' into elasticSearch_Sampling
Pushkarm029 Feb 2, 2024
0fe929d
major fixes
Pushkarm029 Feb 7, 2024
b837338
Merge branch 'main' into elasticSearch_Sampling
Pushkarm029 Feb 7, 2024
8870297
integration test left
Pushkarm029 Feb 8, 2024
fc39c16
Merge branch 'main' into elasticSearch_Sampling
Pushkarm029 Feb 8, 2024
dd79413
added integration tests
Pushkarm029 Feb 9, 2024
529043c
Merge branch 'main' into elasticSearch_Sampling
Pushkarm029 Feb 9, 2024
1391f16
test
Pushkarm029 Feb 9, 2024
eac82cb
added some more cases in integration test
Pushkarm029 Feb 9, 2024
accd51e
fix
Pushkarm029 Feb 9, 2024
df2b025
fix
Pushkarm029 Feb 10, 2024
9b61b64
Merge branch 'main' into elasticSearch_Sampling
Pushkarm029 Feb 10, 2024
4aff800
fix
Pushkarm029 Feb 10, 2024
6a5bc17
added license
Pushkarm029 Feb 10, 2024
bd85d73
fix
Pushkarm029 Feb 10, 2024
945bd2f
fix
Pushkarm029 Feb 10, 2024
81a5750
test
Pushkarm029 Feb 11, 2024
05d1eab
improve code cov
Pushkarm029 Feb 12, 2024
ad8b048
added tests
Pushkarm029 Feb 12, 2024
7a9d726
fix idl
Pushkarm029 Feb 12, 2024
de180a8
Merge branch 'main' into elasticSearch_Sampling
Pushkarm029 Feb 12, 2024
b9e472d
fix
Pushkarm029 Feb 13, 2024
03ae595
added require and removed returning error in test
Pushkarm029 Feb 13, 2024
41d74f8
use dbmodel instead of model
Pushkarm029 Feb 14, 2024
e0bb50c
Merge branch 'main' into elasticSearch_Sampling
Pushkarm029 Feb 14, 2024
7b4fa14
some fixes
Pushkarm029 Feb 15, 2024
230362c
Merge branch 'main' into elasticSearch_Sampling
Pushkarm029 Feb 15, 2024
b4b6a7c
fix test
Pushkarm029 Feb 15, 2024
65e6299
fix
Pushkarm029 Feb 15, 2024
087660f
fix
Pushkarm029 Feb 17, 2024
d4e4ad2
added cfg MaxSampleTime
Pushkarm029 Feb 17, 2024
e5b0c22
fix
Pushkarm029 Feb 18, 2024
671e1ee
fix
Pushkarm029 Feb 18, 2024
11efac6
fix
Pushkarm029 Feb 18, 2024
69a7998
Merge branch 'main' into elasticSearch_Sampling
Pushkarm029 Feb 19, 2024
30684a9
added some tests
Pushkarm029 Feb 19, 2024
abc761a
fix lint
Pushkarm029 Feb 19, 2024
ff74735
increased code coverage
Pushkarm029 Feb 20, 2024
445fc79
Merge branch 'main' into elasticSearch_Sampling
Pushkarm029 Feb 20, 2024
fdbe617
fix
Pushkarm029 Feb 22, 2024
32a7031
fix
Pushkarm029 Feb 22, 2024
5b18405
fix
Pushkarm029 Feb 22, 2024
647e500
fix tests
Pushkarm029 Feb 22, 2024
ca168e1
fix
Pushkarm029 Feb 22, 2024
dfe0230
fix
Pushkarm029 Feb 22, 2024
77c80ea
fix
Pushkarm029 Feb 23, 2024
d1cbf7b
fix tests
Pushkarm029 Feb 23, 2024
c3ce3c0
fix
Pushkarm029 Feb 23, 2024
c5a7567
fix tests
Pushkarm029 Feb 23, 2024
4ed0979
Merge branch 'main' into elasticSearch_Sampling
Pushkarm029 Feb 23, 2024
92b7fc8
major fix
Pushkarm029 Feb 25, 2024
f12e90b
fix
Pushkarm029 Feb 25, 2024
ed763c5
fix
Pushkarm029 Feb 25, 2024
929e152
fix
Pushkarm029 Feb 25, 2024
702280c
some fix
Pushkarm029 Feb 25, 2024
8822d92
adds adaptiveSampling flag
Pushkarm029 Feb 26, 2024
584a49b
Merge branch 'main' into elasticSearch_Sampling
Pushkarm029 Feb 26, 2024
06d8306
fix
Pushkarm029 Feb 26, 2024
2510e95
fix
Pushkarm029 Feb 26, 2024
851f37c
Merge branch 'main' into elasticSearch_Sampling
yurishkuro Feb 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
"github.com/jaegertracing/jaeger/plugin"
esDepStore "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore"
"github.com/jaegertracing/jaeger/plugin/storage/es/mappings"
esSampling "github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore"
esSpanStore "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand Down Expand Up @@ -272,6 +274,25 @@
return writer, nil
}

func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) {
return createSamplingStore(f.getPrimaryClient, f.primaryConfig, f.logger)

Check warning on line 278 in plugin/storage/es/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/factory.go#L277-L278

Added lines #L277 - L278 were not covered by tests
}

func createSamplingStore(
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
clientFn func() es.Client,
cfg *config.Configuration,
logger *zap.Logger,
) (samplingstore.Store, error) {
store := esSampling.NewSamplingStore(esSampling.SamplingStoreParams{
Client: clientFn,
Logger: logger,
IndexPrefix: cfg.IndexPrefix,
IndexDateLayout: cfg.IndexDateLayoutDependencies,
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
MaxDocCount: cfg.MaxDocCount,
})
return store, nil

Check warning on line 293 in plugin/storage/es/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/factory.go#L285-L293

Added lines #L285 - L293 were not covered by tests
}

func createDependencyReader(
clientFn func() es.Client,
cfg *config.Configuration,
Expand Down
189 changes: 189 additions & 0 deletions plugin/storage/es/samplingstore/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright (c) 2024 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package samplingstore

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
"github.com/jaegertracing/jaeger/pkg/es"
"github.com/olivere/elastic"
)

const (
samplingIndex = "jaeger-sampling-"
throughputType = "throughput-sampling"
probabilitiesType = "probabilities-sampling"
indexPrefixSeparator = "-"
)

type SamplingStore struct {
client func() es.Client
logger *zap.Logger
samplingIndexPrefix string
indexDateLayout string
maxDocCount int
}

type SamplingStoreParams struct {
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
Client func() es.Client
Logger *zap.Logger
IndexPrefix string
IndexDateLayout string
MaxDocCount int
}

type TimeThroughput struct {
Timestamp time.Time `json:"timestamp"`
Throughput []*model.Throughput `json:"dependencies"`
}

type TimeProbabilitiesAndQPS struct {
Timestamp time.Time `json:"timestamp"`
Throughput ProbabilitiesAndQPS `json:"dependencies"`
}

type ProbabilitiesAndQPS struct {
Hostname string
Probabilities model.ServiceOperationProbabilities
QPS model.ServiceOperationQPS
}

func NewSamplingStore(p SamplingStoreParams) *SamplingStore {
return &SamplingStore{
client: p.Client,
logger: p.Logger,
samplingIndexPrefix: prefixIndexName(p.IndexPrefix, samplingIndex),
indexDateLayout: p.IndexDateLayout,
maxDocCount: p.MaxDocCount,
}
}

func (s *SamplingStore) InsertThroughput(throughput []*model.Throughput) error {
ts := time.Now()
writeIndexName := indexWithDate(s.samplingIndexPrefix, s.indexDateLayout, ts)
s.writeThroughput(writeIndexName, ts, throughput)
return nil
}

func (s *SamplingStore) InsertProbabilitiesAndQPS(hostname string,
probabilities model.ServiceOperationProbabilities,
qps model.ServiceOperationQPS,
) error {
ts := time.Now()
writeIndexName := indexWithDate(s.samplingIndexPrefix, s.indexDateLayout, ts)
val := ProbabilitiesAndQPS{
Hostname: hostname,
Probabilities: probabilities,
QPS: qps,
}
s.writeProbabilitiesAndQPS(writeIndexName, ts, val)
return nil
}

func (s *SamplingStore) writeProbabilitiesAndQPS(indexName string, ts time.Time, dependencies ProbabilitiesAndQPS) {
s.client().Index().Index(indexName).Type(probabilitiesType).
BodyJson(&TimeProbabilitiesAndQPS{
Timestamp: ts,
Throughput: dependencies,
}).Add()
}

func indexWithDate(indexNamePrefix, indexDateLayout string, date time.Time) string {
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
return indexNamePrefix + date.UTC().Format(indexDateLayout)
}

func (s *SamplingStore) writeThroughput(indexName string, ts time.Time, dependencies []*model.Throughput) {
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
s.client().Index().Index(indexName).Type(throughputType).
BodyJson(&TimeThroughput{
Timestamp: ts,
Throughput: dependencies,
}).Add()
}

func (s *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput, error) {
ctx := context.Background()
indices := s.getReadIndices(start, end)
searchResult, err := s.client().Search(indices...).
Size(s.maxDocCount).
Query(buildTSQuery(start, end)).
IgnoreUnavailable(true).
Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to search for throughputs: %w", err)
}
var retSamples []*model.Throughput
hits := searchResult.Hits.Hits
for _, hit := range hits {
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
source := hit.Source
var tToD TimeThroughput
if err := json.Unmarshal(*source, &tToD); err != nil {
return nil, errors.New("unmarshalling ElasticSearch documents failed12312")
}
retSamples = append(retSamples, tToD.Throughput...)

Check warning on line 141 in plugin/storage/es/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/samplingstore/storage.go#L122-L141

Added lines #L122 - L141 were not covered by tests
}
return retSamples, nil

Check warning on line 143 in plugin/storage/es/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/samplingstore/storage.go#L143

Added line #L143 was not covered by tests
}

func (s *SamplingStore) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) {
ctx := context.Background()
searchResult, err := s.client().Search().
Size(1).
IgnoreUnavailable(true).
Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to search for Latest Probabilities: %w", err)
}
if len(searchResult.Hits.Hits) == 0 {
return nil, nil
}
hit := searchResult.Hits.Hits[0]
var unMarshalProbabilities ProbabilitiesAndQPS
err = json.Unmarshal(*hit.Source, &unMarshalProbabilities)
if err != nil {
return nil, err
}

Check warning on line 163 in plugin/storage/es/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/samplingstore/storage.go#L146-L163

Added lines #L146 - L163 were not covered by tests

return unMarshalProbabilities.Probabilities, nil

Check warning on line 165 in plugin/storage/es/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/samplingstore/storage.go#L165

Added line #L165 was not covered by tests
}

func buildTSQuery(start, end time.Time) elastic.Query {
return elastic.NewRangeQuery("timestamp").Gte(start).Lte(end)

Check warning on line 169 in plugin/storage/es/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/samplingstore/storage.go#L168-L169

Added lines #L168 - L169 were not covered by tests
}

func (s *SamplingStore) getReadIndices(start, end time.Time) []string {
var indices []string
firstIndex := indexWithDate(s.samplingIndexPrefix, s.indexDateLayout, start)
currentIndex := indexWithDate(s.samplingIndexPrefix, s.indexDateLayout, end)
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
end = end.Add(-24 * time.Hour)
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
currentIndex = indexWithDate(s.samplingIndexPrefix, s.indexDateLayout, end)
}
return append(indices, firstIndex)

Check warning on line 181 in plugin/storage/es/samplingstore/storage.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/samplingstore/storage.go#L172-L181

Added lines #L172 - L181 were not covered by tests
}

func prefixIndexName(prefix, index string) string {
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
if prefix != "" {
return prefix + indexPrefixSeparator + index
}
return index
}
Loading
Loading