Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tracemode): Tracemode v1 #4030

Merged
merged 5 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions agent/client/workflow_send_trace_mode_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package client

import (
"context"
"fmt"

"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/telemetry"
)

func (c *Client) SendTraces(ctx context.Context, response *proto.ExportRequest) error {
client := proto.NewOrchestratorClient(c.conn)

response.AgentIdentification = c.sessionConfig.AgentIdentification
response.Metadata = telemetry.ExtractMetadataFromContext(ctx)

_, err := client.Export(ctx, response)
if err != nil {
return fmt.Errorf("could not send list traces result request: %w", err)
}

return nil
}
12 changes: 11 additions & 1 deletion agent/collector/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package collector
import (
"slices"
"sync"
"time"

gocache "github.com/Code-Hex/go-generics-cache"
"go.opentelemetry.io/otel/trace"
Expand All @@ -14,6 +15,7 @@ type TraceCache interface {
Append(string, []*v1.Span)
RemoveSpans(string, []string)
Exists(string) bool
Keys() []string
}

type traceCache struct {
Expand All @@ -30,6 +32,14 @@ func (c *traceCache) Get(traceID string) ([]*v1.Span, bool) {
return c.internalCache.Get(traceID)
}

// List implements TraceCache.
func (c *traceCache) Keys() []string {
c.mutex.Lock()
defer c.mutex.Unlock()

return c.internalCache.Keys()
}

// Append implements TraceCache.
func (c *traceCache) Append(traceID string, spans []*v1.Span) {
c.mutex.Lock()
Expand All @@ -42,7 +52,7 @@ func (c *traceCache) Append(traceID string, spans []*v1.Span) {
spans = append(existingTraces, spans...)

c.internalCache.Set(traceID, spans)
c.receivedSpans.Set(traceID, currentNumberSpans)
c.receivedSpans.Set(traceID, currentNumberSpans, gocache.WithExpiration(time.Minute*10))
}

func (c *traceCache) RemoveSpans(traceID string, spanID []string) {
Expand Down
14 changes: 13 additions & 1 deletion agent/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ func WithTraceCache(traceCache TraceCache) CollectorOption {
}
}

func WithTraceMode(traceMode bool) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.traceMode = traceMode
}
}

func WithStartRemoteServer(startRemoteServer bool) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.startRemoteServer = startRemoteServer
Expand All @@ -57,6 +63,12 @@ func WithSensor(sensor sensors.Sensor) CollectorOption {
}
}

func WithTraceModeForwarder(traceModeForwarder TraceModeForwarder) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.traceModeForwarder = traceModeForwarder
}
}

type collector struct {
grpcServer stoppable
httpServer stoppable
Expand Down Expand Up @@ -108,7 +120,7 @@ func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...Coll
opt(&ingesterConfig)
}

ingester, err := newForwardIngester(ctx, config.BatchTimeout, ingesterConfig, ingesterConfig.startRemoteServer)
ingester, err := newForwardIngester(ctx, config.BatchTimeout, ingesterConfig, ingesterConfig.traceModeForwarder, ingesterConfig.startRemoteServer)
if err != nil {
return nil, fmt.Errorf("could not start local collector: %w", err)
}
Expand Down
77 changes: 54 additions & 23 deletions agent/collector/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"github.com/kubeshop/tracetest/agent/ui/dashboard/events"
"github.com/kubeshop/tracetest/agent/ui/dashboard/sensors"
"github.com/kubeshop/tracetest/server/otlp"
"github.com/kubeshop/tracetest/server/traces"
"go.opencensus.io/trace"
pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v11 "go.opentelemetry.io/proto/otlp/common/v1"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
"go.uber.org/zap"
)
Expand All @@ -29,15 +31,20 @@ type ingester interface {
SetSensor(sensors.Sensor)
}

func newForwardIngester(ctx context.Context, batchTimeout time.Duration, cfg remoteIngesterConfig, startRemoteServer bool) (ingester, error) {
type TraceModeForwarder interface {
Export(ctx context.Context, request *pb.ExportTraceServiceRequest) error
}

func newForwardIngester(ctx context.Context, batchTimeout time.Duration, cfg remoteIngesterConfig, traceModeForwarder TraceModeForwarder, startRemoteServer bool) (ingester, error) {
ingester := &forwardIngester{
BatchTimeout: batchTimeout,
RemoteIngester: cfg,
traceIDs: make(map[string]bool, 0),
done: make(chan bool),
traceCache: cfg.traceCache,
logger: cfg.logger,
sensor: cfg.sensor,
BatchTimeout: batchTimeout,
RemoteIngester: cfg,
traceIDs: make(map[string]bool, 0),
done: make(chan bool),
traceCache: cfg.traceCache,
logger: cfg.logger,
sensor: cfg.sensor,
traceModeForwarder: traceModeForwarder,
}

return ingester, nil
Expand All @@ -51,28 +58,31 @@ type Statistics struct {
// forwardIngester forwards all incoming spans to a remote ingester. It also batches those
// spans to reduce network traffic.
type forwardIngester struct {
BatchTimeout time.Duration
RemoteIngester remoteIngesterConfig
mutex sync.Mutex
traceIDs map[string]bool
done chan bool
traceCache TraceCache
logger *zap.Logger
sensor sensors.Sensor
BatchTimeout time.Duration
RemoteIngester remoteIngesterConfig
mutex sync.Mutex
traceIDs map[string]bool
done chan bool
traceCache TraceCache
logger *zap.Logger
sensor sensors.Sensor
traceModeForwarder TraceModeForwarder

statistics Statistics

sync.Mutex
}

type remoteIngesterConfig struct {
URL string
Token string
traceCache TraceCache
startRemoteServer bool
logger *zap.Logger
observer event.Observer
sensor sensors.Sensor
URL string
Token string
traceCache TraceCache
startRemoteServer bool
logger *zap.Logger
observer event.Observer
sensor sensors.Sensor
traceMode bool
traceModeForwarder TraceModeForwarder
}

func (i *forwardIngester) Statistics() Statistics {
Expand All @@ -89,6 +99,12 @@ func (i *forwardIngester) SetSensor(sensor sensors.Sensor) {

func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceServiceRequest, requestType otlp.RequestType) (*pb.ExportTraceServiceResponse, error) {
go i.ingestSpans(request)
if i.RemoteIngester.traceMode {
err := i.traceModeForwarder.Export(ctx, request)
if err != nil {
i.logger.Error("failed to forward spans to trace mode", zap.Error(err))
}
}

return &pb.ExportTraceServiceResponse{
PartialSuccess: &pb.ExportTracePartialSuccess{
Expand Down Expand Up @@ -136,6 +152,21 @@ func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) {
for _, resourceSpan := range resourceSpans {
for _, scopedSpan := range resourceSpan.ScopeSpans {
for _, span := range scopedSpan.Spans {
if scopedSpan.Scope != nil {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds some of the missing stuff

span.Attributes = append(span.Attributes, &v11.KeyValue{
Key: traces.MetadataServiceName,
Value: &v11.AnyValue{Value: &v11.AnyValue_StringValue{StringValue: scopedSpan.Scope.Name}},
})

// Add attributes from the resource
span.Attributes = append(span.Attributes, scopedSpan.Scope.Attributes...)
}

// Add attributes from the resource
if resourceSpan.Resource != nil {
span.Attributes = append(span.Attributes, resourceSpan.Resource.Attributes...)
}

traceID := trace.TraceID(span.TraceId).String()
spans[traceID] = append(spans[traceID], span)
}
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config struct {
Mode string `mapstructure:"mode"`
Insecure bool `mapstructure:"insecure"`
SkipVerify bool `mapstructure:"skip_verify"`
TraceMode bool `mapstructure:"trace_mode"`

OTLPServer OtlpServer `mapstructure:"otlp_server"`
}
Expand Down
1 change: 1 addition & 0 deletions agent/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Flags struct {
CollectorEndpoint string
Insecure bool
SkipVerify bool
TraceMode bool
}

func (f Flags) AutomatedEnvironmentCanBeInferred() bool {
Expand Down
10 changes: 10 additions & 0 deletions agent/opentelemetry/proto/collector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# OpenTelemetry Collector Proto

This package describes the OpenTelemetry collector protocol.

## Packages

1. `common` package contains the common messages shared between different services.
2. `trace` package contains the Trace Service protos.
3. `metrics` package contains the Metrics Service protos.
4. `logs` package contains the Logs Service protos.
79 changes: 79 additions & 0 deletions agent/opentelemetry/proto/collector/logs/v1/logs_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package opentelemetry.proto.collector.logs.v1;

import "opentelemetry/proto/logs/v1/logs.proto";

option csharp_namespace = "OpenTelemetry.Proto.Collector.Logs.V1";
option java_multiple_files = true;
option java_package = "io.opentelemetry.proto.collector.logs.v1";
option java_outer_classname = "LogsServiceProto";
option go_package = "go.opentelemetry.io/proto/otlp/collector/logs/v1";

// Service that can be used to push logs between one Application instrumented with
// OpenTelemetry and an collector, or between an collector and a central collector (in this
// case logs are sent/received to/from multiple Applications).
service LogsService {
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
rpc Export(ExportLogsServiceRequest) returns (ExportLogsServiceResponse) {}
}

message ExportLogsServiceRequest {
// An array of ResourceLogs.
// For data coming from a single resource this array will typically contain one
// element. Intermediary nodes (such as OpenTelemetry Collector) that receive
// data from multiple origins typically batch the data before forwarding further and
// in that case this array will contain multiple elements.
repeated opentelemetry.proto.logs.v1.ResourceLogs resource_logs = 1;
}

message ExportLogsServiceResponse {
// The details of a partially successful export request.
//
// If the request is only partially accepted
// (i.e. when the server accepts only parts of the data and rejects the rest)
// the server MUST initialize the `partial_success` field and MUST
// set the `rejected_<signal>` with the number of items it rejected.
//
// Servers MAY also make use of the `partial_success` field to convey
// warnings/suggestions to senders even when the request was fully accepted.
// In such cases, the `rejected_<signal>` MUST have a value of `0` and
// the `error_message` MUST be non-empty.
//
// A `partial_success` message with an empty value (rejected_<signal> = 0 and
// `error_message` = "") is equivalent to it not being set/present. Senders
// SHOULD interpret it the same way as in the full success case.
ExportLogsPartialSuccess partial_success = 1;
}

message ExportLogsPartialSuccess {
// The number of rejected log records.
//
// A `rejected_<signal>` field holding a `0` value indicates that the
// request was fully accepted.
int64 rejected_log_records = 1;

// A developer-facing human-readable message in English. It should be used
// either to explain why the server rejected parts of the data during a partial
// success or to convey warnings/suggestions during a full success. The message
// should offer guidance on how users can address such issues.
//
// error_message is an optional field. An error_message with an empty value
// is equivalent to it not being set.
string error_message = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# This is an API configuration to generate an HTTP/JSON -> gRPC gateway for the
# OpenTelemetry service using github.com/grpc-ecosystem/grpc-gateway.
type: google.api.Service
config_version: 3
http:
rules:
- selector: opentelemetry.proto.collector.logs.v1.LogsService.Export
post: /v1/logs
body: "*"
Loading
Loading