Skip to content

Commit

Permalink
Replace Thrift-gen with Proto-gen types for sampling strategies (jaeg…
Browse files Browse the repository at this point in the history
…ertracing#4181)

As the official API is now Protobuf, this change avoids unnecessary
conversions to/from Thrift types.

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored and shubbham1215 committed Feb 22, 2023
1 parent 28b325e commit 9da9598
Show file tree
Hide file tree
Showing 25 changed files with 240 additions and 225 deletions.
4 changes: 2 additions & 2 deletions cmd/agent/app/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import (
"github.com/jaegertracing/jaeger/internal/metrics/fork"
"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

Expand Down Expand Up @@ -203,7 +203,7 @@ func (fakeCollectorProxy) Close() error {
return nil
}

func (f fakeCollectorProxy) GetSamplingStrategy(_ context.Context, _ string) (*sampling.SamplingStrategyResponse, error) {
func (f fakeCollectorProxy) GetSamplingStrategy(_ context.Context, _ string) (*api_v2.SamplingStrategyResponse, error) {
return nil, errors.New("no peers available")
}

Expand Down
20 changes: 7 additions & 13 deletions cmd/agent/app/configmanager/grpc/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,28 @@ import (

"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/model/converter/thrift/jaeger"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

// SamplingManager returns sampling decisions from collector over gRPC.
type SamplingManager struct {
// ConfigManagerProxy returns sampling decisions from collector over gRPC.
type ConfigManagerProxy struct {
client api_v2.SamplingManagerClient
}

// NewConfigManager creates gRPC sampling manager.
func NewConfigManager(conn *grpc.ClientConn) *SamplingManager {
return &SamplingManager{
func NewConfigManager(conn *grpc.ClientConn) *ConfigManagerProxy {
return &ConfigManagerProxy{
client: api_v2.NewSamplingManagerClient(conn),
}
}

// GetSamplingStrategy returns sampling strategies from collector.
func (s *SamplingManager) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
r, err := s.client.GetSamplingStrategy(ctx, &api_v2.SamplingStrategyParameters{ServiceName: serviceName})
if err != nil {
return nil, err
}
return jaeger.ConvertSamplingResponseFromDomain(r)
func (s *ConfigManagerProxy) GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) {
return s.client.GetSamplingStrategy(ctx, &api_v2.SamplingStrategyParameters{ServiceName: serviceName})
}

// GetBaggageRestrictions returns baggage restrictions from collector.
func (s *SamplingManager) GetBaggageRestrictions(_ context.Context, _ string) ([]*baggage.BaggageRestriction, error) {
func (s *ConfigManagerProxy) GetBaggageRestrictions(_ context.Context, _ string) ([]*baggage.BaggageRestriction, error) {
return nil, errors.New("baggage not implemented")
}
3 changes: 1 addition & 2 deletions cmd/agent/app/configmanager/grpc/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

func close(t *testing.T, c io.Closer) {
Expand All @@ -44,7 +43,7 @@ func TestSamplingManager_GetSamplingStrategy(t *testing.T) {
manager := NewConfigManager(conn)
resp, err := manager.GetSamplingStrategy(context.Background(), "any")
require.NoError(t, err)
assert.Equal(t, &sampling.SamplingStrategyResponse{StrategyType: sampling.SamplingStrategyType_PROBABILISTIC}, resp)
assert.Equal(t, &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, resp)
}

func TestSamplingManager_GetSamplingStrategy_error(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions cmd/agent/app/configmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package configmanager
import (
"context"

"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

// TODO this interface could be moved to pkg/clientcfg, along with grpc proxy,
// but not the metrics wrapper (because its metric names are specific to agent).

// ClientConfigManager decides:
// 1) which sampling strategy a given service should be using
// 2) which baggage restrictions a given service should be using.
type ClientConfigManager interface {
GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error)
GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error)
GetBaggageRestrictions(ctx context.Context, serviceName string) ([]*baggage.BaggageRestriction, error)
}
4 changes: 2 additions & 2 deletions cmd/agent/app/configmanager/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"context"

"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

// configManagerMetrics holds metrics related to ClientConfigManager
Expand Down Expand Up @@ -51,7 +51,7 @@ func WrapWithMetrics(manager ClientConfigManager, mFactory metrics.Factory) *Man
}

// GetSamplingStrategy returns sampling strategy from server.
func (m *ManagerWithMetrics) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
func (m *ManagerWithMetrics) GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) {
r, err := m.wrapped.GetSamplingStrategy(ctx, serviceName)
if err != nil {
m.metrics.SamplingFailures.Inc(1)
Expand Down
6 changes: 3 additions & 3 deletions cmd/agent/app/configmanager/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ import (
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

type noopManager struct{}

func (noopManager) GetSamplingStrategy(_ context.Context, s string) (*sampling.SamplingStrategyResponse, error) {
func (noopManager) GetSamplingStrategy(_ context.Context, s string) (*api_v2.SamplingStrategyResponse, error) {
if s == "failed" {
return nil, errors.New("failed")
}
return &sampling.SamplingStrategyResponse{StrategyType: sampling.SamplingStrategyType_PROBABILISTIC}, nil
return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil
}

func (noopManager) GetBaggageRestrictions(_ context.Context, s string) ([]*baggage.BaggageRestriction, error) {
Expand Down
64 changes: 29 additions & 35 deletions cmd/all-in-one/all_in_one_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"testing"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

ui "github.com/jaegertracing/jaeger/model/json"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/proto-gen/api_v3"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

const (
Expand All @@ -57,17 +58,13 @@ var httpClient = &http.Client{

func TestAllInOne(t *testing.T) {
// Check if the query service is available
if err := healthCheck(); err != nil {
t.Fatal(err)
}
// Check if the favicon icon is available
if err := faviconCheck(); err != nil {
t.Fatal(err)
}
createTrace(t)
getAPITrace(t)
getSamplingStrategy(t)
getServicesAPIV3(t)
healthCheck(t)

t.Run("Check if the favicon icon is available", faviconCheck)
t.Run("createTrace", createTrace)
t.Run("getAPITrace", getAPITrace)
t.Run("getSamplingStrategy", getSamplingStrategy)
t.Run("getServicesAPIV3", getServicesAPIV3)
}

func createTrace(t *testing.T) {
Expand Down Expand Up @@ -113,43 +110,40 @@ func getSamplingStrategy(t *testing.T) {
req, err := http.NewRequest("GET", getSamplingStrategyURL, nil)
require.NoError(t, err)

var queryResponse sampling.SamplingStrategyResponse
resp, err := httpClient.Do(req)
require.NoError(t, err)

body, _ := io.ReadAll(resp.Body)

err = json.Unmarshal(body, &queryResponse)
var queryResponse api_v2.SamplingStrategyResponse
err = jsonpb.Unmarshal(bytes.NewReader(body), &queryResponse)
require.NoError(t, err)
resp.Body.Close()

assert.NotNil(t, queryResponse.ProbabilisticSampling)
assert.EqualValues(t, 1.0, queryResponse.ProbabilisticSampling.SamplingRate)
}

func healthCheck() error {
println("Health-checking all-in-one...")
for i := 0; i < 10; i++ {
if _, err := http.Get(queryURL); err == nil {
println("Health-check successful")
return nil
}
println("Health-check unsuccessful, waiting 1sec...")
time.Sleep(time.Second)
}
return fmt.Errorf("query service is not ready")
func healthCheck(t *testing.T) {
t.Log("Health-checking all-in-one...")
require.Eventuallyf(
t,
func() bool {
_, err := http.Get(queryURL)
return err == nil
},
10*time.Second,
time.Second,
"expecting query endpoint to be healhty",
)
}

func faviconCheck() error {
println("Checking favicon...")
func faviconCheck(t *testing.T) {
t.Log("Checking favicon...")
resp, err := http.Get(queryURL + "/favicon.ico")
if err == nil && resp.StatusCode == http.StatusOK {
println("Favicon check successful")
return nil
} else {
println("Favicon check failed")
return fmt.Errorf("all-in-one failed to serve favicon icon")
}
require.NoError(t, err)
require.NotNil(t, resp)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}

func getServicesAPIV3(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

var _ (io.Closer) = (*Collector)(nil)
Expand Down Expand Up @@ -122,8 +122,8 @@ func TestCollector_StartErrors(t *testing.T) {

type mockStrategyStore struct{}

func (m *mockStrategyStore) GetSamplingStrategy(_ context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
return &sampling.SamplingStrategyResponse{}, nil
func (m *mockStrategyStore) GetSamplingStrategy(_ context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) {
return &api_v2.SamplingStrategyResponse{}, nil
}

func TestCollector_PublishOpts(t *testing.T) {
Expand Down
7 changes: 1 addition & 6 deletions cmd/collector/app/sampling/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/model/converter/thrift/jaeger"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

Expand All @@ -36,9 +35,5 @@ func NewGRPCHandler(store strategystore.StrategyStore) GRPCHandler {

// GetSamplingStrategy returns sampling decision from store.
func (s GRPCHandler) GetSamplingStrategy(ctx context.Context, param *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) {
r, err := s.store.GetSamplingStrategy(ctx, param.GetServiceName())
if err != nil {
return nil, err
}
return jaeger.ConvertSamplingResponseToDomain(r)
return s.store.GetSamplingStrategy(ctx, param.GetServiceName())
}
5 changes: 2 additions & 3 deletions cmd/collector/app/sampling/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@ import (
"golang.org/x/net/context"

"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

type mockSamplingStore struct{}

func (s mockSamplingStore) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
func (s mockSamplingStore) GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) {
if serviceName == "error" {
return nil, errors.New("some error")
} else if serviceName == "nil" {
return nil, nil
}
return &sampling.SamplingStrategyResponse{StrategyType: sampling.SamplingStrategyType_PROBABILISTIC}, nil
return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil
}

func TestNewGRPCHandler(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/sampling/strategystore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"context"
"io"

"github.com/jaegertracing/jaeger/thrift-gen/sampling"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

// StrategyStore keeps track of service specific sampling strategies.
type StrategyStore interface {
// GetSamplingStrategy retrieves the sampling strategy for the specified service.
GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error)
GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error)
}

// Aggregator defines an interface used to aggregate operation throughput.
Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/server/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

type mockSamplingStore struct{}

func (s mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
func (s mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) {
return nil, nil
}

Expand Down
12 changes: 6 additions & 6 deletions crossdock/services/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
package services

import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/thrift-gen/sampling"
p2json "github.com/jaegertracing/jaeger/model/converter/json"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

var errSamplingRateMissing = errors.New("sampling rate is missing")
Expand Down Expand Up @@ -65,14 +65,14 @@ func (s *agentService) GetSamplingRate(service, operation string) (float64, erro
}
s.logger.Info("Retrieved sampling rates from agent", zap.String("body", string(body)))

var response sampling.SamplingStrategyResponse
if err = json.Unmarshal(body, &response); err != nil {
response, err := p2json.SamplingStrategyResponseFromJSON(body)
if err != nil {
return 0, err
}
return getSamplingRate(operation, &response)
return getSamplingRate(operation, response)
}

func getSamplingRate(operation string, response *sampling.SamplingStrategyResponse) (float64, error) {
func getSamplingRate(operation string, response *api_v2.SamplingStrategyResponse) (float64, error) {
if response.OperationSampling == nil {
return 0, errSamplingRateMissing
}
Expand Down
Loading

0 comments on commit 9da9598

Please sign in to comment.